From 146b80113f2fb12e0ea04be7f0bde9dcc510a478 Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Wed, 13 Dec 2023 21:45:04 +0200 Subject: [PATCH] lib: move transaction handling to mu-xapian Instead of handling transactions in the store, handle it in xapian-db. Make the code a bit more natural / cleaner-out Handle transaction automatically (with a batch-size) and add some RAII Transaction object, which makes all database interaction transactable for the duration. So, no more need for explicit parameters to add_message while indexing. --- lib/mu-indexer.cc | 8 +- lib/mu-server.cc | 4 +- lib/mu-store.cc | 63 ++------------ lib/mu-store.hh | 16 +--- lib/mu-xapian-db.cc | 10 +-- lib/mu-xapian-db.hh | 140 ++++++++++++++++++++++++++----- lib/tests/test-mu-store-query.cc | 2 - lib/tests/test-mu-store.cc | 5 +- 8 files changed, 141 insertions(+), 107 deletions(-) diff --git a/lib/mu-indexer.cc b/lib/mu-indexer.cc index 36f3e155..74808878 100644 --- a/lib/mu-indexer.cc +++ b/lib/mu-indexer.cc @@ -252,9 +252,7 @@ Indexer::Private::add_message(const std::string& path) // if the store was empty, we know that the message is completely new // and can use the fast path (Xapians 'add_document' rather tahn // 'replace_document) - auto res = store_.add_message(msg.value(), - true /*use-transaction*/, - was_empty_); + auto res = store_.add_message(msg.value(), was_empty_); if (!res) { mu_warning("failed to add message @ {}: {}", path, res.error().what()); return false; @@ -327,8 +325,9 @@ Indexer::Private::cleanup() void Indexer::Private::scan_worker() { - progress_.reset(); + XapianDb::Transaction tx{store_.xapian_db()}; // RAII + progress_.reset(); if (conf_.scan) { mu_debug("starting scanner"); if (!scanner_.start()) { // blocks. @@ -367,6 +366,7 @@ Indexer::Private::scan_worker() } completed_ = ::time({}); + store_.config().set(completed_); state_.change_to(IndexState::Idle); } diff --git a/lib/mu-server.cc b/lib/mu-server.cc index 606ae3d2..46626c22 100644 --- a/lib/mu-server.cc +++ b/lib/mu-server.cc @@ -783,6 +783,8 @@ Server::Private::find_handler(const Command& cmd) StopWatch sw{mu_format("{} (indexing: {})", __func__, indexer().is_running() ? "yes" : "no")}; + // we need to _lock_ the store while querying (which likely consists of + // multiple actual queries) + grabbing the results. std::lock_guard l{store_.lock()}; auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)}; if (!qres) @@ -843,7 +845,6 @@ static Sexp get_stats(const Indexer::Progress& stats, const std::string& state) { Sexp sexp; - sexp.put_props( ":info", "index"_sym, ":status", Sexp::Symbol(state), @@ -878,7 +879,6 @@ Server::Private::index_handler(const Command& cmd) } output_sexp(get_stats(indexer().progress(), "complete"), Server::OutputFlags::Flush); - store().commit(); /* ensure on-disk database is updated, too */ }); } diff --git a/lib/mu-store.cc b/lib/mu-store.cc index c1afbfd2..1f2f43f7 100644 --- a/lib/mu-store.cc +++ b/lib/mu-store.cc @@ -80,42 +80,12 @@ struct Store::Private { ~Private() try { mu_debug("closing store @ {}", xapian_db_.path()); - if (!xapian_db_.read_only()) { - transaction_maybe_commit(true /*force*/); - } + if (!xapian_db_.read_only()) + contacts_cache_.serialize(); } catch (...) { mu_critical("caught exception in store dtor"); } - // If not started yet, start a transaction. Otherwise, just update the transaction size. - void transaction_inc() noexcept { - if (transaction_size_ == 0) { - mu_debug("starting transaction"); - xapian_db_.begin_transaction(); - } - ++transaction_size_; - } - - // Opportunistically commit a transaction if the transaction size - // filled up a batch, or with force. - void transaction_maybe_commit(bool force = false) noexcept { - static auto batch_size = config_.get(); - if (force || transaction_size_ >= batch_size) { - contacts_cache_.serialize(); - - if (indexer_) // save last index time. - if (auto&& t{indexer_->completed()}; t != 0) - config_.set(::time({})); - - if (transaction_size_ == 0) - return; // nothing more to do here. - - mu_debug("committing transaction (n={})", transaction_size_); - xapian_db_.commit_transaction(); - transaction_size_ = 0; - } - } - Config make_config(XapianDb& xapian_db, const std::string& root_maildir, Option conf) { @@ -159,7 +129,6 @@ struct Store::Private { const std::string root_maildir_; const Message::Options message_opts_; - size_t transaction_size_{}; std::mutex lock_; }; @@ -341,7 +310,7 @@ Store::indexer() } Result -Store::add_message(Message& msg, bool use_transaction, bool is_new) +Store::add_message(Message& msg, bool is_new) { const auto mdir{maildir_from_path(msg.path(), root_maildir())}; if (!mdir) @@ -367,31 +336,25 @@ Store::add_message(Message& msg, bool use_transaction, bool is_new) msg.set_flags(msg.flags() | Flags::Personal); std::lock_guard guard{priv_->lock_}; - if (use_transaction) - priv_->transaction_inc(); - auto&& res = is_new ? priv_->add_message_unlocked(msg) : priv_->update_message_unlocked(msg, msg.path()); if (!res) return Err(res.error()); - if (use_transaction) /* commit if batch is full */ - priv_->transaction_maybe_commit(); - - mu_debug("added {}message @ {}; docid = {}", - is_personal ? "personal " : "", msg.path(), *res); + mu_debug("added {}{}message @ {}; docid = {}", + is_new ? "new " : "", is_personal ? "personal " : "", msg.path(), *res); return res; } Result -Store::add_message(const std::string& path, bool use_transaction, bool is_new) +Store::add_message(const std::string& path, bool is_new) { if (auto msg{Message::make_from_path(path, priv_->message_opts_)}; !msg) return Err(msg.error()); else - return add_message(msg.value(), use_transaction, is_new); + return add_message(msg.value(), is_new); } @@ -412,12 +375,10 @@ Store::remove_messages(const std::vector& ids) { std::lock_guard guard{priv_->lock_}; - priv_->transaction_inc(); + XapianDb::Transaction tx (xapian_db()); // RAII for (auto&& id : ids) xapian_db().delete_document(id); - - priv_->transaction_maybe_commit(true /*force*/); } @@ -641,14 +602,6 @@ Store::for_each_message_path(Store::ForEachMessageFunc msg_func) const return n; } -void -Store::commit() -{ - std::lock_guard guard{priv_->lock_}; - priv_->transaction_maybe_commit(true /*force*/); -} - - std::size_t Store::for_each_term(Field::Id field_id, Store::ForEachTermFunc func) const { diff --git a/lib/mu-store.hh b/lib/mu-store.hh index a699a0ec..e143b228 100644 --- a/lib/mu-store.hh +++ b/lib/mu-store.hh @@ -193,7 +193,7 @@ public: /** * Add or update a message to the store. When planning to write many * messages, it's much faster to do so in a transaction. If so, set - * @in_transaction to true. When done with adding messages, call + * @param in_transaction to true. When done with adding messages, call * commit(). * * Optimization: If you are sure the message (i.e., a message with the @@ -202,16 +202,12 @@ public: * have to check for the existing message. * * @param msg a message - * @param use_transaction whether to bundle up to batch_size - * changes in a transaction * @param is_new whether this is a completely new message * * @return the doc id of the added message or an error. */ - Result add_message(Message& msg, bool use_transaction = false, - bool is_new = false); - Result add_message(const std::string& path, bool use_transaction = false, - bool is_new = false); + Result add_message(Message& msg, bool is_new = false); + Result add_message(const std::string& path, bool is_new = false); /** * Remove a message from the store. It will _not_ remove the message @@ -393,12 +389,6 @@ public: */ void set_dirstamp(const std::string& path, time_t tstamp); - /** - * Commit the current batch of modifications to disk, opportunistically. - * If no transaction is underway, do nothing. - */ - void commit(); - /* * * Some convenience diff --git a/lib/mu-xapian-db.cc b/lib/mu-xapian-db.cc index 0b54150f..cfa1629f 100644 --- a/lib/mu-xapian-db.cc +++ b/lib/mu-xapian-db.cc @@ -21,6 +21,7 @@ #include "mu-xapian-db.hh" #include "utils/mu-utils.hh" #include +#include #include @@ -99,17 +100,16 @@ make_db(const std::string& db_path, Flavor flavor) XapianDb::XapianDb(const std::string& db_path, Flavor flavor) : path_(make_path(db_path, flavor)), - db_(make_db(path_,flavor)) { - + db_(make_db(path_, flavor)), + batch_size_{Config(*this).get()} +{ if (flavor == Flavor::CreateOverwrite) set_timestamp(MetadataIface::created_key); - mu_debug("created {} / {}", flavor, *this); + mu_debug("created {} / {} (batch-size: {})", flavor, *this, batch_size_); } - - #ifdef BUILD_TESTS /* * Tests. diff --git a/lib/mu-xapian-db.hh b/lib/mu-xapian-db.hh index 6fc682be..66b7bf8d 100644 --- a/lib/mu-xapian-db.hh +++ b/lib/mu-xapian-db.hh @@ -185,8 +185,27 @@ public: CreateOverwrite, /**< Create new or overwrite existing */ }; + /** + * XapianDb CTOR + * + * @param db_path path to the database + * @param flavor kind of database + */ XapianDb(const std::string& db_path, Flavor flavor); + /** + * DTOR + */ + ~XapianDb() { + if (tx_level_ > 0) + mu_warning("inconsistent transaction level ({})", tx_level_); + if (tx_level_ > 0) { + mu_debug("closing db after committing {} change(s)", changes_); + xapian_try([this]{ DB_LOCKED; wdb().commit_transaction(); }); + } else + mu_debug("closing db"); + } + /** * Is the database read-only? * @@ -195,7 +214,7 @@ public: bool read_only() const override; /** - * Path to the database; empty for in-memory database + * Path to the database; empty for in-memory databases * * @return path to database */ @@ -210,7 +229,6 @@ public: return db().get_description(); } - /** * Get the number of documents (messages) in the database * @@ -268,7 +286,8 @@ public: * @param val new value for key */ void set_metadata(const std::string& key, const std::string& val) override { - xapian_try([&] { DB_LOCKED; wdb().set_metadata(key, val); }); + xapian_try([&] { DB_LOCKED; wdb().set_metadata(key, val); + maybe_commit(); }); } /** @@ -287,7 +306,6 @@ public: }); } - /** * Does the given term exist in the database? * @@ -312,6 +330,7 @@ public: DB_LOCKED; auto&& id{wdb().add_document(doc)}; set_timestamp(MetadataIface::last_change_key); + maybe_commit(); return Ok(std::move(id)); }); } @@ -331,6 +350,7 @@ public: DB_LOCKED; auto&& id{wdb().replace_document(term, doc)}; set_timestamp(MetadataIface::last_change_key); + maybe_commit(); return Ok(std::move(id)); }); } @@ -340,6 +360,7 @@ public: DB_LOCKED; wdb().replace_document(id, doc); set_timestamp(MetadataIface::last_change_key); + maybe_commit(); return Ok(std::move(id)); }); } @@ -356,6 +377,7 @@ public: DB_LOCKED; wdb().delete_document(term); set_timestamp(MetadataIface::last_change_key); + maybe_commit(); return Ok(); }); } @@ -364,6 +386,7 @@ public: DB_LOCKED; wdb().delete_document(id); set_timestamp(MetadataIface::last_change_key); + maybe_commit(); return Ok(); }); } @@ -381,38 +404,106 @@ public: } /* - * transactions + * If the "transaction ref count" > 0 (with inc_transactions());, we run + * in "transaction mode". That means that the subsequent Xapian mutation + * are part of a transactions, which is flushed when the number of + * changes reaches the batch size, _or_ the transaction ref count is + * decreased to 0 (dec_transactions()). * */ /** - * Start a transaction - * - * @param flushed - * - * @return Ok or Error + * Increase the transaction level; needs to be balance by dec_transactions() */ - Result begin_transaction(bool flushed=true) { - return xapian_try_result([&]{ + void inc_transaction_level() { + xapian_try([this]{ DB_LOCKED; - wdb().begin_transaction(flushed); - return Ok(); + if (tx_level_ == 0) {// need to start the Xapian transaction? + mu_debug("begin transaction"); + wdb().begin_transaction(); + } + ++tx_level_; }); } + /** - * Commit a transaction + * Decrease the transaction level (to balance inc_transactions()) * - * @return Ok or Error*/ - Result commit_transaction() { - return xapian_try_result([&]{ + * If the level reach 0, perform a Xapian commit. + */ + void dec_transaction_level() { + xapian_try([this]{ DB_LOCKED; - wdb().commit_transaction(); - return Ok(); + if (tx_level_ == 0) { + mu_critical("cannot dec transaction-level)"); + throw std::runtime_error("cannot dec transactions"); + } + if (tx_level_ == 1) {// need to commit the Xapian transaction? + mu_debug("committing {} changes", changes_); + wdb().commit_transaction(); + changes_ = 0; + } + --tx_level_; + }); + } + + /** + * Are we inside a transaction? + * + * @return true or false + */ + bool in_transaction() const { DB_LOCKED; return tx_level_ > 0; } + + + /** + * RAII Transaction object + * + */ + struct Transaction { + Transaction(XapianDb& db): db_{db} { + db_.inc_transaction_level(); + } + ~Transaction() { + db_.dec_transaction_level(); + } + private: + XapianDb& db_; + }; + + + /** + * Manually request the Xapian DB to be committed to disk. This won't + * do anything while in a transaction. + */ + void commit() { + xapian_try([this]{ + DB_LOCKED; + if (tx_level_ == 0) { + mu_info("committing xapian-db @ {}", path_); + wdb().commit(); + } else + mu_debug("not committing while in transaction"); }); } using DbType = std::variant; - private: + + /** + * To be called after all changes, with DB_LOCKED held. + */ + void maybe_commit() { + // in transaction-mode and enough changes, commit them + // and start a new transaction + if (tx_level_ > 0 && ++changes_ >= batch_size_) { + mu_debug("batch full ({}/{}); committing change", changes_, batch_size_); + wdb().commit_transaction(); + wdb().commit(); + --tx_level_; + changes_ = 0; + wdb().begin_transaction(); + } + } + void set_timestamp(const std::string_view key); /** @@ -432,7 +523,12 @@ private: mutable std::mutex lock_; std::string path_; - DbType db_; + DbType db_; + bool in_tx_; + + size_t tx_level_{}; + const size_t batch_size_; + size_t changes_{}; }; constexpr std::string_view diff --git a/lib/tests/test-mu-store-query.cc b/lib/tests/test-mu-store-query.cc index 098579ca..57a6a716 100644 --- a/lib/tests/test-mu-store-query.cc +++ b/lib/tests/test-mu-store-query.cc @@ -125,7 +125,6 @@ I said: "Aujourd'hui!" }}; TempDir tdir; auto store{make_test_store(tdir.path(), test_msgs, {})}; - store.commit(); // matches for (auto&& expr: { @@ -846,7 +845,6 @@ https://trac.xapian.org/ticket/719 TempDir tdir; auto store{make_test_store(tdir.path(), test_msgs, conf)}; - store.commit(); /* true: match; false: no match */ const auto cases = std::vector>{{ diff --git a/lib/tests/test-mu-store.cc b/lib/tests/test-mu-store.cc index 949f37a7..da7f1202 100644 --- a/lib/tests/test-mu-store.cc +++ b/lib/tests/test-mu-store.cc @@ -128,14 +128,12 @@ test_store_add_count_remove() const auto msgpath{MuTestMaildir + "/cur/1283599333.1840_11.cthulhu!2,"}; const auto id1 = store->add_message(msgpath); assert_valid_result(id1); - store->commit(); g_assert_cmpuint(store->size(), ==, 1); g_assert_true(store->contains_message(msgpath)); const auto id2 = store->add_message(MuTestMaildir2 + "/bar/cur/mail3"); g_assert_false(!!id2); // wrong maildir. - store->commit(); const auto msg3path{MuTestMaildir + "/cur/1252168370_3.14675.cthulhu!2,S"}; const auto id3 = store->add_message(msg3path); @@ -202,7 +200,7 @@ goto * instructions[pOp->opcode]; g_assert_cmpuint(store->size(),==, 1); /* ensure 'update' dtrt, i.e., nothing. */ - const auto docid2 = store->add_message(*message, *docid); + const auto docid2 = store->add_message(*message); assert_valid_result(docid2); g_assert_cmpuint(store->size(),==, 1); g_assert_cmpuint(*docid,==,*docid2); @@ -285,7 +283,6 @@ World! const auto docid = store->add_message(*message); assert_valid_result(docid); - store->commit(); auto msg2{store->find_message(*docid)}; g_assert_true(!!msg2);