index/store: simplify transaction handling

only have transactions for indexing, and make it opportunistic. All other ops do
not use transactions.
This commit is contained in:
Dirk-Jan C. Binnema
2021-10-20 17:18:51 +03:00
parent 3dd721d5a3
commit 89014ecd06
4 changed files with 69 additions and 42 deletions

View File

@ -72,11 +72,13 @@ struct Indexer::Private {
[this](auto&& path, auto&& statbuf, auto&& info) { [this](auto&& path, auto&& statbuf, auto&& info) {
return handler(path, statbuf, info); return handler(path, statbuf, info);
}}, }},
max_message_size_{store_.metadata().max_message_size} max_message_size_{store_.metadata().max_message_size},
batch_size_{store_.metadata().batch_size}
{ {
g_message("created indexer for %s -> %s", g_message("created indexer for %s -> %s (batch-size: %zu)",
store.metadata().root_maildir.c_str(), store.metadata().root_maildir.c_str(),
store.metadata().database_path.c_str()); store.metadata().database_path.c_str(),
batch_size_);
} }
~Private() { stop(); } ~Private() { stop(); }
@ -106,6 +108,8 @@ struct Indexer::Private {
Progress progress_; Progress progress_;
IndexState state_; IndexState state_;
const size_t batch_size_; /**< Max number of messages added before
* committing */
std::mutex lock_, wlock_; std::mutex lock_, wlock_;
}; };
@ -192,6 +196,11 @@ Indexer::Private::worker()
g_debug("started worker"); g_debug("started worker");
/* note that transaction starting/committing is opportunistic,
* and are NOPs if we are already/not in a transaction */
store_.begin_transaction();
while (state_ == IndexState::Scanning || !fq_.empty()) { while (state_ == IndexState::Scanning || !fq_.empty()) {
if (!fq_.pop(item, 250ms)) if (!fq_.pop(item, 250ms))
continue; continue;
@ -200,15 +209,24 @@ Indexer::Private::worker()
++progress_.processed; ++progress_.processed;
try { try {
std::unique_lock lock{lock_};
store_.add_message(item); store_.add_message(item);
++progress_.updated; ++progress_.updated;
if (progress_.updated % batch_size_ == 0) {
store_.commit_transaction();
store_.begin_transaction();
}
} catch (const Mu::Error& er) { } catch (const Mu::Error& er) {
g_warning("error adding message @ %s: %s", item.c_str(), er.what()); g_warning("error adding message @ %s: %s", item.c_str(), er.what());
} }
maybe_start_worker(); maybe_start_worker();
} }
store_.commit_transaction();
} }
bool bool
@ -230,9 +248,14 @@ Indexer::Private::cleanup()
return state_ == IndexState::Cleaning; return state_ == IndexState::Cleaning;
}); });
g_debug("remove %zu message(s) from store", orphans.size()); // No need for transactions here, remove_messages does that for us.
if (orphans.empty())
g_debug("nothing to clean up");
else {
g_debug("removing up %zu stale message(s) from store", orphans.size());
store_.remove_messages(orphans); store_.remove_messages(orphans);
progress_.removed += orphans.size(); progress_.removed += orphans.size();
}
return true; return true;
} }
@ -280,14 +303,11 @@ Indexer::Private::start(const Indexer::Config& conf)
cleanup(); cleanup();
g_debug("cleanup finished"); g_debug("cleanup finished");
} }
store_.commit();
leave: leave:
state_.change_to(IndexState::Idle); state_.change_to(IndexState::Idle);
}); });
g_debug("started indexer"); g_debug("started indexer");
return true; return true;
} }

View File

@ -1,5 +1,5 @@
/* /*
** Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ** Copyright (C) 2021 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
** **
** This program is free software; you can redistribute it and/or modify it ** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the ** under the terms of the GNU General Public License as published by the

View File

@ -110,8 +110,6 @@ struct Store::Private {
mdata_{make_metadata(path)}, contacts_{db().get_metadata(ContactsKey), mdata_{make_metadata(path)}, contacts_{db().get_metadata(ContactsKey),
mdata_.personal_addresses} mdata_.personal_addresses}
{ {
if (!readonly)
begin_transaction();
} }
Private(const std::string& path, Private(const std::string& path,
@ -122,7 +120,6 @@ struct Store::Private {
mdata_{init_metadata(conf, path, root_maildir, personal_addresses)}, mdata_{init_metadata(conf, path, root_maildir, personal_addresses)},
contacts_{"", mdata_.personal_addresses} contacts_{"", mdata_.personal_addresses}
{ {
begin_transaction();
} }
Private(const std::string& root_maildir, Private(const std::string& root_maildir,
@ -140,15 +137,14 @@ struct Store::Private {
if (!read_only_) { if (!read_only_) {
xapian_try([&] { xapian_try([&] {
writable_db().set_metadata(ContactsKey, contacts_.serialize()); writable_db().set_metadata(ContactsKey, contacts_.serialize());
commit();
}); });
if (in_transaction_)
commit_transaction();
} }
} }
std::unique_ptr<Xapian::Database> make_xapian_db(const std::string db_path, XapianOpts opts) std::unique_ptr<Xapian::Database> make_xapian_db(const std::string db_path, XapianOpts opts)
try { try {
in_transaction_ = false;
switch (opts) { switch (opts) {
case XapianOpts::ReadOnly: return std::make_unique<Xapian::Database>(db_path); case XapianOpts::ReadOnly: return std::make_unique<Xapian::Database>(db_path);
case XapianOpts::Open: case XapianOpts::Open:
@ -184,32 +180,30 @@ struct Store::Private {
return dynamic_cast<Xapian::WritableDatabase&>(*db_.get()); return dynamic_cast<Xapian::WritableDatabase&>(*db_.get());
} }
void dirty()
{
if (++dirtiness_ > mdata_.batch_size)
xapian_try([this] { commit(); });
}
void begin_transaction() noexcept void begin_transaction() noexcept
{ {
if (mdata_.in_memory)
return; // not supported in the in-memory backend.
g_return_if_fail(!in_transaction_); g_return_if_fail(!in_transaction_);
g_debug("starting transaction");
xapian_try([this] { xapian_try([this] {
writable_db().begin_transaction(); writable_db().begin_transaction();
in_transaction_ = true; in_transaction_ = true;
}); });
} }
void commit() noexcept void commit_transaction() noexcept
{ {
g_debug("committing %zu modification(s)", dirtiness_);
dirtiness_ = 0;
if (mdata_.in_memory) if (mdata_.in_memory)
return; // not supported in the in-memory backend. return; // not supported in the in-memory backend.
g_return_if_fail(in_transaction_);
g_debug("committing modification(s)");
xapian_try([this] { xapian_try([this] {
if (in_transaction_) if (in_transaction_)
writable_db().commit_transaction(); writable_db().commit_transaction();
in_transaction_ = false; in_transaction_ = false;
begin_transaction();
}); });
} }
@ -286,9 +280,6 @@ struct Store::Private {
std::atomic<bool> in_transaction_{}; std::atomic<bool> in_transaction_{};
std::mutex lock_; std::mutex lock_;
size_t dirtiness_{};
mutable std::atomic<std::size_t> ref_count_{1};
}; };
static void static void
@ -433,7 +424,6 @@ Store::add_message(const std::string& path)
throw Error{Error::Code::Message, "failed to add message"}; throw Error{Error::Code::Message, "failed to add message"};
g_debug("added message @ %s; docid = %u", path.c_str(), docid); g_debug("added message @ %s; docid = %u", path.c_str(), docid);
priv_->dirty();
return docid; return docid;
} }
@ -447,7 +437,6 @@ Store::update_message(MuMsg* msg, unsigned docid)
throw Error{Error::Code::Internal, "failed to update message"}; throw Error{Error::Code::Internal, "failed to update message"};
g_debug("updated message @ %s; docid = %u", mu_msg_get_path(msg), docid); g_debug("updated message @ %s; docid = %u", mu_msg_get_path(msg), docid);
priv_->dirty();
return true; return true;
} }
@ -462,7 +451,6 @@ Store::remove_message(const std::string& path)
priv_->writable_db().delete_document(term); priv_->writable_db().delete_document(term);
g_debug("deleted message @ %s from store", path.c_str()); g_debug("deleted message @ %s from store", path.c_str());
priv_->dirty();
return true; return true;
}, },
@ -472,13 +460,16 @@ Store::remove_message(const std::string& path)
void void
Store::remove_messages(const std::vector<Store::Id>& ids) Store::remove_messages(const std::vector<Store::Id>& ids)
{ {
begin_transaction();
xapian_try([&] { xapian_try([&] {
LOCKED; LOCKED;
for (auto&& id : ids) { for (auto&& id : ids) {
priv_->writable_db().delete_document(id); priv_->writable_db().delete_document(id);
priv_->dirty();
} }
}); });
commit_transaction();
} }
time_t time_t
@ -502,7 +493,6 @@ Store::set_dirstamp(const std::string& path, time_t tstamp)
const std::size_t len = g_snprintf(data.data(), data.size(), "%zx", (size_t)tstamp); const std::size_t len = g_snprintf(data.data(), data.size(), "%zx", (size_t)tstamp);
priv_->writable_db().set_metadata(path, std::string{data.data(), len}); priv_->writable_db().set_metadata(path, std::string{data.data(), len});
priv_->dirty();
} }
MuMsg* MuMsg*
@ -595,11 +585,22 @@ Store::for_each_term(const std::string& field, Store::ForEachTermFunc func) cons
} }
void void
Store::commit() Store::begin_transaction()
{ {
xapian_try([&] { xapian_try([&] {
LOCKED; LOCKED;
priv_->commit(); if (!priv_->in_transaction_)
priv_->begin_transaction();
});
}
void
Store::commit_transaction()
{
xapian_try([&] {
LOCKED;
if (priv_->in_transaction_)
priv_->commit_transaction();
}); });
} }

View File

@ -279,11 +279,17 @@ class Store {
bool empty() const; bool empty() const;
/** /**
* Commit the current group of modifications (i.e., transaction) to disk; * Start a Xapian transaction, opportunistically. If a transaction
* This rarely needs to be called explicitly, as Store will take care of * is already underway, do nothing.
* it.
*/ */
void commit(); void begin_transaction();
/**
* Commit the current group of modifications (i.e., transaction) to
* disk, opportunistically. If no transaction is underway, do
* nothing.
*/
void commit_transaction();
/** /**
* Get a reference to the private data. For internal use. * Get a reference to the private data. For internal use.