diff --git a/lib/mu-indexer.cc b/lib/mu-indexer.cc index 72756d1b..283a11b8 100644 --- a/lib/mu-indexer.cc +++ b/lib/mu-indexer.cc @@ -324,8 +324,6 @@ Indexer::Private::cleanup() void Indexer::Private::scan_worker() { - XapianDb::Transaction tx{store_.xapian_db()}; // RAII - progress_.reset(); if (conf_.scan) { mu_debug("starting scanner"); diff --git a/lib/mu-store.cc b/lib/mu-store.cc index 927b38cb..0db198ac 100644 --- a/lib/mu-store.cc +++ b/lib/mu-store.cc @@ -382,10 +382,12 @@ Store::remove_messages(const std::vector& ids) { std::lock_guard guard{priv_->lock_}; - XapianDb::Transaction tx (xapian_db()); // RAII + xapian_db().request_transaction(); for (auto&& id : ids) xapian_db().delete_document(id); + + xapian_db().request_commit(true/*force*/); } diff --git a/lib/mu-xapian-db.cc b/lib/mu-xapian-db.cc index bce9bf79..ddc2c115 100644 --- a/lib/mu-xapian-db.cc +++ b/lib/mu-xapian-db.cc @@ -41,6 +41,7 @@ XapianDb::wdb() { if (read_only()) throw std::runtime_error("database is read-only"); + return std::get(db_); } @@ -106,7 +107,7 @@ XapianDb::XapianDb(const std::string& db_path, Flavor flavor): if (flavor == Flavor::CreateOverwrite) set_timestamp(MetadataIface::created_key); - mu_debug("created {} / {} (batch-size: {})", flavor, *this, batch_size_); + mu_debug("created {}", *this); } void diff --git a/lib/mu-xapian-db.hh b/lib/mu-xapian-db.hh index f0a977ff..66739e58 100644 --- a/lib/mu-xapian-db.hh +++ b/lib/mu-xapian-db.hh @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -186,10 +187,8 @@ private: /** * Fairly thin wrapper around Xapian::Database and Xapian::WritableDatabase - * with just the things we need + locking + exception handling */ class XapianDb: public MetadataIface { -#define DB_LOCKED std::unique_lock lock__{lock_}; public: /** * Type of database to create. @@ -202,7 +201,7 @@ public: }; /** - * XapianDb CTOR. This may throw some Xapian exception. + * XapianDb CTOR. This may throw. * * @param db_path path to the database * @param flavor kind of database @@ -213,13 +212,9 @@ public: * DTOR */ ~XapianDb() { - if (tx_level_ > 0) - mu_warning("inconsistent transaction level ({})", tx_level_); - if (tx_level_ > 0) { - mu_debug("closing db after committing {} change(s)", changes_); - xapian_try([this]{ DB_LOCKED; wdb().commit_transaction(); }); - } else - mu_debug("closing db"); + if (!read_only()) + request_commit(true/*force*/); + mu_debug("closing db"); } /** @@ -260,7 +255,7 @@ public: */ size_t size() const noexcept { return xapian_try([this]{ - DB_LOCKED; return db().get_doccount(); }, 0); + return db().get_doccount(); }, 0); } /** @@ -276,7 +271,7 @@ public: * @return an enquire object */ Xapian::Enquire enquire() const { - DB_LOCKED; return Xapian::Enquire(db()); + return Xapian::Enquire(db()); } /** @@ -288,7 +283,7 @@ public: */ Result document(Xapian::docid id) const { return xapian_try_result([&]{ - DB_LOCKED; return Ok(db().get_document(id)); }); + return Ok(db().get_document(id)); }); } /** @@ -300,7 +295,7 @@ public: */ std::string metadata(const std::string& key) const override { return xapian_try([&]{ - DB_LOCKED; return db().get_metadata(key);}, ""); + return db().get_metadata(key);}, ""); } /** @@ -310,8 +305,8 @@ public: * @param val new value for key */ void set_metadata(const std::string& key, const std::string& val) override { - xapian_try([&] { DB_LOCKED; wdb().set_metadata(key, val); - maybe_commit(); }); + xapian_try([&] { wdb().set_metadata(key, val); + maybe_commit();}); } /** @@ -323,7 +318,6 @@ public: //using each_func = MetadataIface::each_func; void for_each(MetadataIface::each_func&& func) const override { xapian_try([&]{ - DB_LOCKED; for (auto&& it = db().metadata_keys_begin(); it != db().metadata_keys_end(); ++it) func(*it, db().get_metadata(*it)); @@ -339,7 +333,7 @@ public: */ bool term_exists(const std::string& term) const { return xapian_try([&]{ - DB_LOCKED; return db().term_exists(term);}, false); + return db().term_exists(term);}, false); } /** @@ -351,7 +345,6 @@ public: */ Result add_document(const Xapian::Document& doc) { return xapian_try_result([&]{ - DB_LOCKED; auto&& id{wdb().add_document(doc)}; set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -369,9 +362,9 @@ public: * @return new docid or an error */ Result - replace_document(const std::string& term, const Xapian::Document& doc) { + replace_document(const std::string& term, + const Xapian::Document& doc) { return xapian_try_result([&]{ - DB_LOCKED; auto&& id{wdb().replace_document(term, doc)}; set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -379,9 +372,9 @@ public: }); } Result - replace_document(Xapian::docid id, const Xapian::Document& doc) { + replace_document(Xapian::docid id, + const Xapian::Document& doc) { return xapian_try_result([&]{ - DB_LOCKED; wdb().replace_document(id, doc); set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -398,7 +391,6 @@ public: */ Result delete_document(const std::string& term) { return xapian_try_result([&]{ - DB_LOCKED; wdb().delete_document(term); set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -407,7 +399,6 @@ public: } Result delete_document(Xapian::docid id) { return xapian_try_result([&]{ - DB_LOCKED; wdb().delete_document(id); set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -417,7 +408,6 @@ public: template size_t all_terms(const std::string& prefix, Func&& func) const { - DB_LOCKED; size_t n{}; for (auto it = db().allterms_begin(prefix); it != db().allterms_end(prefix); ++it) { if (!func(*it)) @@ -427,110 +417,69 @@ public: return n; } - /* - * If the "transaction ref count" > 0 (with inc_transactions());, we run - * in "transaction mode". That means that the subsequent Xapian mutation - * are part of a transactions, which is flushed when the number of - * changes reaches the batch size, _or_ the transaction ref count is - * decreased to 0 (dec_transactions()). * - */ - /** - * Increase the transaction level; needs to be balance by dec_transactions() - */ - void inc_transaction_level() { - xapian_try([this]{ - DB_LOCKED; - if (tx_level_ == 0) {// need to start the Xapian transaction? - mu_debug("begin transaction"); - wdb().begin_transaction(); - } - ++tx_level_; - mu_debug("ind'd tx level to {}", tx_level_); - }); - } - - /** - * Decrease the transaction level (to balance inc_transactions()) + * Requests a transaction to be started; this is only + * a request, which may not be granted. * - * If the level reach 0, perform a Xapian commit. + * If you're already in a transaction but that transaction + * was started in another thread, that transaction will be + * committed before starting a new one. + * + * Otherwise, start a transaction if you're not already in one. + * + * @return A result; either true if a transaction was started; false + * otherwise, or an error. */ - void dec_transaction_level() { - xapian_try([this]{ - DB_LOCKED; - if (tx_level_ == 0) { - mu_critical("cannot dec transaction-level)"); - throw std::runtime_error("cannot dec transactions"); - } + Result request_transaction() { + return xapian_try_result([this]() { + auto& db = wdb(); + if (in_transaction()) + return Ok(false); // nothing to - --tx_level_; - if (tx_level_ == 0) {// need to commit the Xapian transaction? - mu_debug("committing {} changes", changes_); - changes_ = 0; - wdb().commit_transaction(); - } - - mu_debug("dec'd tx level to {}", tx_level_); + db.begin_transaction(); + mu_debug("begin transaction"); + in_transaction_ = true; + return Ok(true); }); } + + /** + * Explicitly request the Xapian DB to be committed to disk. This won't + * do anything when not in a transaction. + * + * @param force whether to force-commit + */ + void request_commit(bool force = false) { request_commit(wdb(), force); } + void maybe_commit() { request_commit(false); } + /** * Are we inside a transaction? * * @return true or false */ - bool in_transaction() const { DB_LOCKED; return tx_level_ > 0; } - - - /** - * RAII Transaction object - * - */ - struct Transaction { - Transaction(XapianDb& db): db_{db} { - db_.inc_transaction_level(); - } - ~Transaction() { - db_.dec_transaction_level(); - } - private: - XapianDb& db_; - }; - - - /** - * Manually request the Xapian DB to be committed to disk. This won't - * do anything while in a transaction. - */ - void commit() { - xapian_try([this]{ - DB_LOCKED; - if (tx_level_ == 0) { - mu_info("committing xapian-db @ {}", path_); - wdb().commit(); - } else - mu_debug("not committing while in transaction"); - }); - } + bool in_transaction() const { return in_transaction_; } using DbType = std::variant; -private: +private: /** * To be called after all changes, with DB_LOCKED held. */ - void maybe_commit() { + void request_commit(Xapian::WritableDatabase& db, bool force) { // in transaction-mode and enough changes, commit them - // and start a new transaction - if (tx_level_ > 0 && ++changes_ >= batch_size_) { - mu_debug("batch full ({}/{}); committing change", changes_, batch_size_); - wdb().commit_transaction(); - wdb().commit(); - --tx_level_; + if (!in_transaction()) + return; + if ((++changes_ < batch_size_) && !force) + return; + xapian_try([&]{ + mu_debug("committing transaction with {} changes; " + "forced={}", changes_, force ? "yes" : "no"); + db.commit_transaction(); + db.commit(); changes_ = 0; - wdb().begin_transaction(); - ++tx_level_; - } + in_transaction_ = {}; + }); } void set_timestamp(const std::string_view key); @@ -549,12 +498,11 @@ private: */ Xapian::WritableDatabase& wdb(); - mutable std::mutex lock_; std::string path_; DbType db_; - size_t tx_level_{}; - size_t batch_size_; size_t changes_{}; + bool in_transaction_{}; + size_t batch_size_; }; constexpr std::string_view