store: ensure updates update message sexp too

And turn all "add" into "replace" so old messages get removed.
Update tests too.
This commit is contained in:
Dirk-Jan C. Binnema
2022-05-05 01:27:08 +03:00
parent c0ae7e6860
commit 6126d7ee62
8 changed files with 329 additions and 170 deletions

View File

@ -66,16 +66,12 @@ struct Store::Private {
enum struct XapianOpts { ReadOnly, Open, CreateOverwrite };
Private(const std::string& path, bool readonly)
: read_only_{readonly}, db_{make_xapian_db(path,
read_only_ ? XapianOpts::ReadOnly
: XapianOpts::Open)},
properties_{make_properties(path)},
contacts_cache_{db().get_metadata(ContactsKey),
: read_only_{readonly}, db_{make_xapian_db(path,
read_only_ ? XapianOpts::ReadOnly
: XapianOpts::Open)},
properties_{make_properties(path)},
contacts_cache_{db().get_metadata(ContactsKey),
properties_.personal_addresses} {
/* we do our own flushing, set Xapian's internal one as the backstop*/
g_setenv("XAPIAN_FLUSH_THRESHOLD",
format("%zu", properties_.batch_size + 100).c_str(), 1);
}
Private(const std::string& path,
@ -85,9 +81,20 @@ struct Store::Private {
: read_only_{false}, db_{make_xapian_db(path, XapianOpts::CreateOverwrite)},
properties_{init_metadata(conf, path, root_maildir, personal_addresses)},
contacts_cache_{"", properties_.personal_addresses} {
/* we do our own flushing, set Xapian's internal one as the backstop*/
g_setenv("XAPIAN_FLUSH_THRESHOLD",
format("%zu", properties_.batch_size + 100).c_str(), 1);
/* add synonym */
for (auto&& info: AllMessageFlagInfos) {
constexpr auto field{field_from_id(Field::Id::Flags)};
const auto s1{field.xapian_term(info.name)};
const auto s2{field.xapian_term(info.shortcut)};
writable_db().add_synonym(s1, s2);
}
for (auto&& prio : AllMessagePriorities) {
constexpr auto field{field_from_id(Field::Id::Priority)};
const auto s1{field.xapian_term(to_string(prio))};
const auto s2{field.xapian_term(to_char(prio))};
writable_db().add_synonym(s1, s2);
}
}
~Private()
@ -102,15 +109,23 @@ struct Store::Private {
std::unique_ptr<Xapian::Database> make_xapian_db(const std::string db_path, XapianOpts opts)
try {
/* we do our own flushing, set Xapian's internal one as the
* backstop*/
g_setenv("XAPIAN_FLUSH_THRESHOLD",
format("%zu", properties_.batch_size + 100).c_str(), 1);
switch (opts) {
case XapianOpts::ReadOnly: return std::make_unique<Xapian::Database>(db_path);
case XapianOpts::ReadOnly:
return std::make_unique<Xapian::Database>(db_path);
case XapianOpts::Open:
return std::make_unique<Xapian::WritableDatabase>(db_path, Xapian::DB_OPEN);
case XapianOpts::CreateOverwrite:
return std::make_unique<Xapian::WritableDatabase>(
db_path,
db_path,
Xapian::DB_CREATE_OR_OVERWRITE);
default: throw std::logic_error("invalid xapian options");
default:
throw std::logic_error("invalid xapian options");
}
} catch (const Xapian::DatabaseError& xde) {
@ -145,8 +160,7 @@ struct Store::Private {
// Opportunistically commit a transaction if the transaction size
// filled up a batch, or with force.
void transaction_maybe_commit(bool force = false) noexcept
{
void transaction_maybe_commit(bool force = false) noexcept {
if (force || transaction_size_ >= properties_.batch_size) {
if (contacts_cache_.dirty()) {
xapian_try([&] {
@ -168,29 +182,7 @@ struct Store::Private {
}
}
void add_synonyms()
{
for (auto&& info: AllMessageFlagInfos) {
constexpr auto field{field_from_id(Field::Id::Flags)};
const auto s1{field.xapian_term(info.name)};
const auto s2{field.xapian_term(info.shortcut)};
writable_db().clear_synonyms(s1);
writable_db().clear_synonyms(s2);
writable_db().add_synonym(s1, s2);
}
for (auto&& prio : AllMessagePriorities) {
constexpr auto field{field_from_id(Field::Id::Priority)};
const auto s1{field.xapian_term(to_string(prio))};
const auto s2{field.xapian_term(to_char(prio))};
writable_db().clear_synonyms(s1);
writable_db().clear_synonyms(s2);
writable_db().add_synonym(s1, s2);
}
}
time_t metadata_time_t(const std::string& key) const
{
time_t metadata_time_t(const std::string& key) const {
const auto ts = db().get_metadata(key);
return (time_t)atoll(db().get_metadata(key).c_str());
}
@ -214,8 +206,8 @@ struct Store::Private {
Store::Properties init_metadata(const Store::Config& conf,
const std::string& path,
const std::string& root_maildir,
const StringVec& personal_addresses)
{
const StringVec& personal_addresses) {
writable_db().set_metadata(SchemaVersionKey, ExpectedSchemaVersion);
writable_db().set_metadata(CreatedKey, Mu::format("%" PRId64, (int64_t)::time({})));
@ -240,6 +232,10 @@ struct Store::Private {
return make_properties(path);
}
Option<Message> find_message_unlocked(Store::Id docid) const;
Result<Store::Id> update_message_unlocked(Message& msg, Store::Id docid);
Result<Store::Id> update_message_unlocked(Message& msg, const std::string& old_path);
/* metadata to write as part of a transaction commit */
std::unordered_map<std::string, std::string> metadata_cache_;
@ -254,6 +250,47 @@ struct Store::Private {
std::mutex lock_;
};
Result<Store::Id>
Store::Private::update_message_unlocked(Message& msg, Store::Id docid)
{
msg.update_cached_sexp();
return xapian_try_result([&]{
writable_db().replace_document(docid, msg.document().xapian_document());
g_debug("updated message @ %s; docid = %u", msg.path().c_str(), docid);
return Ok(std::move(docid));
});
}
Result<Store::Id>
Store::Private::update_message_unlocked(Message& msg, const std::string& path_to_replace)
{
msg.update_cached_sexp();
return xapian_try_result([&]{
auto id = writable_db().replace_document(
field_from_id(Field::Id::Path).xapian_term(path_to_replace),
msg.document().xapian_document());
return Ok(std::move(id));
});
}
Option<Message>
Store::Private::find_message_unlocked(Store::Id docid) const
{
return xapian_try([&]()->Option<Message> {
auto res = Message::make_from_document(db().get_document(docid));
if (res)
return Some(std::move(res.value()));
else
return Nothing;
}, Nothing);
}
Store::Store(const std::string& path, bool readonly)
: priv_{std::make_unique<Private>(path, readonly)}
{
@ -293,12 +330,6 @@ Store::database() const
return priv_->db();
}
Xapian::WritableDatabase&
Store::writable_database()
{
return priv_->writable_db();
}
Indexer&
Store::indexer()
{
@ -337,6 +368,8 @@ Store::add_message(const std::string& path, bool use_transaction)
Result<Store::Id>
Store::add_message(Message& msg, bool use_transaction)
{
std::lock_guard guard{priv_->lock_};
const auto mdir{mu_maildir_from_path(msg.path(),
properties().root_maildir)};
if (!mdir)
@ -356,51 +389,28 @@ Store::add_message(Message& msg, bool use_transaction)
if (is_personal)
msg.set_flags(msg.flags() | Flags::Personal);
/* now, we're done with all the fields; generate the sexp string for this
* message */
msg.update_cached_sexp();
if (use_transaction)
priv_->transaction_inc();
std::lock_guard guard{priv_->lock_};
auto res = priv_->update_message_unlocked(msg, msg.path());
if (!res)
return Err(res.error());
const auto docid = xapian_try([&]{
if (use_transaction) /* commit if batch is full */
priv_->transaction_maybe_commit();
if (use_transaction)
priv_->transaction_inc();
g_debug("added message @ %s; docid = %u", msg.path().c_str(), *res);
const auto docid = priv_->writable_db().add_document(
msg.document().xapian_document());
if (use_transaction) /* commit if batch is full */
priv_->transaction_maybe_commit();
return docid;
}, InvalidId);
if (G_UNLIKELY(docid == InvalidId))
return Err(Error::Code::Message, "failed to add message");
g_debug("added message @ %s; docid = %u", msg.path().c_str(), docid);
g_debug("%s", msg.document().xapian_document().get_description().c_str());
return Ok(static_cast<Store::Id>(docid));
return res;
}
bool
Store::update_message(Message& msg, unsigned docid)
Result<Store::Id>
Store::update_message(Message& msg, Store::Id docid)
{
msg.update_cached_sexp();
std::lock_guard guard{priv_->lock_};
return xapian_try(
[&]{
priv_->writable_db().replace_document(
docid, msg.document().xapian_document());
g_debug("updated message @ %s; docid = %u",
msg.path().c_str(), docid);
return true;
}, false);
return priv_->update_message_unlocked(msg, docid);
}
bool
@ -435,48 +445,52 @@ Store::remove_messages(const std::vector<Store::Id>& ids)
}
Option<Message>
Store::find_message(Store::Id docid) const
{
std::lock_guard guard{priv_->lock_};
return priv_->find_message_unlocked(docid);
}
Result<Message>
Store::move_message(Store::Id id,
Option<const std::string&> target_mdir,
Option<Flags> new_flags, bool change_name)
{
auto msg = find_message(id);
std::lock_guard guard{priv_->lock_};
auto msg = priv_->find_message_unlocked(id);
if (!msg)
return Err(Error::Code::Store, "cannot find message <%u>", id);
const auto old_path = msg->path();
const auto target_flags = new_flags.value_or(msg->flags());
const auto target_maildir = target_mdir.value_or(msg->maildir());
/* 1. first determine the file system path of the target */
const auto target_path =
mu_maildir_determine_target(msg->path(),
properties().root_maildir,
target_maildir,
target_flags,
change_name);
mu_maildir_determine_target(msg->path(), properties().root_maildir,
target_maildir,target_flags, change_name);
if (!target_path)
return Err(target_path.error());
/* 2. let's move it */
const auto move_res =
mu_maildir_move_message(msg->path(),
target_path.value(),
true/*ignore dups*/);
if (!move_res)
return Err(move_res.error());
if (const auto res = mu_maildir_move_message(
msg->path(), target_path.value(), true/*ignore dups*/); !res)
return Err(res.error());
/* 3. file move worked, now update the message with the new info.*/
const auto update_res = msg->update_after_move(target_path.value(),
target_maildir,
target_flags);
if (!update_res)
return Err(update_res.error());
if (auto&& res = msg->update_after_move(
target_path.value(), target_maildir, target_flags); !res)
return Err(res.error());
/* 4. update message worked; re-store it */
if (!update_message(*msg, id))
return Err(Error::Code::Store, "failed to update message <%u>", id);
if (auto&& res = priv_->update_message_unlocked(*msg, old_path); !res)
return Err(res.error());
/* 5. Profit! */
/* 6. Profit! */
return Ok(std::move(msg.value()));
}
@ -529,21 +543,6 @@ Store::set_dirstamp(const std::string& path, time_t tstamp)
set_metadata(path, std::string{data.data(), len});
}
Option<Message>
Store::find_message(Store::Id docid) const
{
return xapian_try(
[&]()->Option<Message> {
std::lock_guard guard{priv_->lock_};
auto res = Message::make_from_document(priv_->db().get_document(docid));
if (res)
return Some(std::move(res.value()));
else
return Nothing;
},
Nothing);
}
bool
Store::contains_message(const std::string& path) const
{
@ -599,6 +598,7 @@ Store::for_each_term(Field::Id field_id, Store::ForEachTermFunc func) const
const auto prefix{field_from_id(field_id).xapian_term()};
for (auto it = priv_->db().allterms_begin(prefix);
it != priv_->db().allterms_end(prefix); ++it) {
++n;
if (!func(*it))
break;
}