From c05b28e761ad0e0f3dc9cc14d84ac0543fd7ff53 Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Sun, 26 May 2024 11:42:50 +0300 Subject: [PATCH 1/6] xapian-db: remove locks, transaction levels Simplify xapian-db: locks should go elsewhere; transaction level add too much complication. --- lib/mu-indexer.cc | 2 - lib/mu-store.cc | 4 +- lib/mu-xapian-db.cc | 3 +- lib/mu-xapian-db.hh | 174 ++++++++++++++++---------------------------- 4 files changed, 66 insertions(+), 117 deletions(-) diff --git a/lib/mu-indexer.cc b/lib/mu-indexer.cc index 72756d1b..283a11b8 100644 --- a/lib/mu-indexer.cc +++ b/lib/mu-indexer.cc @@ -324,8 +324,6 @@ Indexer::Private::cleanup() void Indexer::Private::scan_worker() { - XapianDb::Transaction tx{store_.xapian_db()}; // RAII - progress_.reset(); if (conf_.scan) { mu_debug("starting scanner"); diff --git a/lib/mu-store.cc b/lib/mu-store.cc index 927b38cb..0db198ac 100644 --- a/lib/mu-store.cc +++ b/lib/mu-store.cc @@ -382,10 +382,12 @@ Store::remove_messages(const std::vector& ids) { std::lock_guard guard{priv_->lock_}; - XapianDb::Transaction tx (xapian_db()); // RAII + xapian_db().request_transaction(); for (auto&& id : ids) xapian_db().delete_document(id); + + xapian_db().request_commit(true/*force*/); } diff --git a/lib/mu-xapian-db.cc b/lib/mu-xapian-db.cc index bce9bf79..ddc2c115 100644 --- a/lib/mu-xapian-db.cc +++ b/lib/mu-xapian-db.cc @@ -41,6 +41,7 @@ XapianDb::wdb() { if (read_only()) throw std::runtime_error("database is read-only"); + return std::get(db_); } @@ -106,7 +107,7 @@ XapianDb::XapianDb(const std::string& db_path, Flavor flavor): if (flavor == Flavor::CreateOverwrite) set_timestamp(MetadataIface::created_key); - mu_debug("created {} / {} (batch-size: {})", flavor, *this, batch_size_); + mu_debug("created {}", *this); } void diff --git a/lib/mu-xapian-db.hh b/lib/mu-xapian-db.hh index f0a977ff..66739e58 100644 --- a/lib/mu-xapian-db.hh +++ b/lib/mu-xapian-db.hh @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -186,10 +187,8 @@ private: /** * Fairly thin wrapper around Xapian::Database and Xapian::WritableDatabase - * with just the things we need + locking + exception handling */ class XapianDb: public MetadataIface { -#define DB_LOCKED std::unique_lock lock__{lock_}; public: /** * Type of database to create. @@ -202,7 +201,7 @@ public: }; /** - * XapianDb CTOR. This may throw some Xapian exception. + * XapianDb CTOR. This may throw. * * @param db_path path to the database * @param flavor kind of database @@ -213,13 +212,9 @@ public: * DTOR */ ~XapianDb() { - if (tx_level_ > 0) - mu_warning("inconsistent transaction level ({})", tx_level_); - if (tx_level_ > 0) { - mu_debug("closing db after committing {} change(s)", changes_); - xapian_try([this]{ DB_LOCKED; wdb().commit_transaction(); }); - } else - mu_debug("closing db"); + if (!read_only()) + request_commit(true/*force*/); + mu_debug("closing db"); } /** @@ -260,7 +255,7 @@ public: */ size_t size() const noexcept { return xapian_try([this]{ - DB_LOCKED; return db().get_doccount(); }, 0); + return db().get_doccount(); }, 0); } /** @@ -276,7 +271,7 @@ public: * @return an enquire object */ Xapian::Enquire enquire() const { - DB_LOCKED; return Xapian::Enquire(db()); + return Xapian::Enquire(db()); } /** @@ -288,7 +283,7 @@ public: */ Result document(Xapian::docid id) const { return xapian_try_result([&]{ - DB_LOCKED; return Ok(db().get_document(id)); }); + return Ok(db().get_document(id)); }); } /** @@ -300,7 +295,7 @@ public: */ std::string metadata(const std::string& key) const override { return xapian_try([&]{ - DB_LOCKED; return db().get_metadata(key);}, ""); + return db().get_metadata(key);}, ""); } /** @@ -310,8 +305,8 @@ public: * @param val new value for key */ void set_metadata(const std::string& key, const std::string& val) override { - xapian_try([&] { DB_LOCKED; wdb().set_metadata(key, val); - maybe_commit(); }); + xapian_try([&] { wdb().set_metadata(key, val); + maybe_commit();}); } /** @@ -323,7 +318,6 @@ public: //using each_func = MetadataIface::each_func; void for_each(MetadataIface::each_func&& func) const override { xapian_try([&]{ - DB_LOCKED; for (auto&& it = db().metadata_keys_begin(); it != db().metadata_keys_end(); ++it) func(*it, db().get_metadata(*it)); @@ -339,7 +333,7 @@ public: */ bool term_exists(const std::string& term) const { return xapian_try([&]{ - DB_LOCKED; return db().term_exists(term);}, false); + return db().term_exists(term);}, false); } /** @@ -351,7 +345,6 @@ public: */ Result add_document(const Xapian::Document& doc) { return xapian_try_result([&]{ - DB_LOCKED; auto&& id{wdb().add_document(doc)}; set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -369,9 +362,9 @@ public: * @return new docid or an error */ Result - replace_document(const std::string& term, const Xapian::Document& doc) { + replace_document(const std::string& term, + const Xapian::Document& doc) { return xapian_try_result([&]{ - DB_LOCKED; auto&& id{wdb().replace_document(term, doc)}; set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -379,9 +372,9 @@ public: }); } Result - replace_document(Xapian::docid id, const Xapian::Document& doc) { + replace_document(Xapian::docid id, + const Xapian::Document& doc) { return xapian_try_result([&]{ - DB_LOCKED; wdb().replace_document(id, doc); set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -398,7 +391,6 @@ public: */ Result delete_document(const std::string& term) { return xapian_try_result([&]{ - DB_LOCKED; wdb().delete_document(term); set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -407,7 +399,6 @@ public: } Result delete_document(Xapian::docid id) { return xapian_try_result([&]{ - DB_LOCKED; wdb().delete_document(id); set_timestamp(MetadataIface::last_change_key); maybe_commit(); @@ -417,7 +408,6 @@ public: template size_t all_terms(const std::string& prefix, Func&& func) const { - DB_LOCKED; size_t n{}; for (auto it = db().allterms_begin(prefix); it != db().allterms_end(prefix); ++it) { if (!func(*it)) @@ -427,110 +417,69 @@ public: return n; } - /* - * If the "transaction ref count" > 0 (with inc_transactions());, we run - * in "transaction mode". That means that the subsequent Xapian mutation - * are part of a transactions, which is flushed when the number of - * changes reaches the batch size, _or_ the transaction ref count is - * decreased to 0 (dec_transactions()). * - */ - /** - * Increase the transaction level; needs to be balance by dec_transactions() - */ - void inc_transaction_level() { - xapian_try([this]{ - DB_LOCKED; - if (tx_level_ == 0) {// need to start the Xapian transaction? - mu_debug("begin transaction"); - wdb().begin_transaction(); - } - ++tx_level_; - mu_debug("ind'd tx level to {}", tx_level_); - }); - } - - /** - * Decrease the transaction level (to balance inc_transactions()) + * Requests a transaction to be started; this is only + * a request, which may not be granted. * - * If the level reach 0, perform a Xapian commit. + * If you're already in a transaction but that transaction + * was started in another thread, that transaction will be + * committed before starting a new one. + * + * Otherwise, start a transaction if you're not already in one. + * + * @return A result; either true if a transaction was started; false + * otherwise, or an error. */ - void dec_transaction_level() { - xapian_try([this]{ - DB_LOCKED; - if (tx_level_ == 0) { - mu_critical("cannot dec transaction-level)"); - throw std::runtime_error("cannot dec transactions"); - } + Result request_transaction() { + return xapian_try_result([this]() { + auto& db = wdb(); + if (in_transaction()) + return Ok(false); // nothing to - --tx_level_; - if (tx_level_ == 0) {// need to commit the Xapian transaction? - mu_debug("committing {} changes", changes_); - changes_ = 0; - wdb().commit_transaction(); - } - - mu_debug("dec'd tx level to {}", tx_level_); + db.begin_transaction(); + mu_debug("begin transaction"); + in_transaction_ = true; + return Ok(true); }); } + + /** + * Explicitly request the Xapian DB to be committed to disk. This won't + * do anything when not in a transaction. + * + * @param force whether to force-commit + */ + void request_commit(bool force = false) { request_commit(wdb(), force); } + void maybe_commit() { request_commit(false); } + /** * Are we inside a transaction? * * @return true or false */ - bool in_transaction() const { DB_LOCKED; return tx_level_ > 0; } - - - /** - * RAII Transaction object - * - */ - struct Transaction { - Transaction(XapianDb& db): db_{db} { - db_.inc_transaction_level(); - } - ~Transaction() { - db_.dec_transaction_level(); - } - private: - XapianDb& db_; - }; - - - /** - * Manually request the Xapian DB to be committed to disk. This won't - * do anything while in a transaction. - */ - void commit() { - xapian_try([this]{ - DB_LOCKED; - if (tx_level_ == 0) { - mu_info("committing xapian-db @ {}", path_); - wdb().commit(); - } else - mu_debug("not committing while in transaction"); - }); - } + bool in_transaction() const { return in_transaction_; } using DbType = std::variant; -private: +private: /** * To be called after all changes, with DB_LOCKED held. */ - void maybe_commit() { + void request_commit(Xapian::WritableDatabase& db, bool force) { // in transaction-mode and enough changes, commit them - // and start a new transaction - if (tx_level_ > 0 && ++changes_ >= batch_size_) { - mu_debug("batch full ({}/{}); committing change", changes_, batch_size_); - wdb().commit_transaction(); - wdb().commit(); - --tx_level_; + if (!in_transaction()) + return; + if ((++changes_ < batch_size_) && !force) + return; + xapian_try([&]{ + mu_debug("committing transaction with {} changes; " + "forced={}", changes_, force ? "yes" : "no"); + db.commit_transaction(); + db.commit(); changes_ = 0; - wdb().begin_transaction(); - ++tx_level_; - } + in_transaction_ = {}; + }); } void set_timestamp(const std::string_view key); @@ -549,12 +498,11 @@ private: */ Xapian::WritableDatabase& wdb(); - mutable std::mutex lock_; std::string path_; DbType db_; - size_t tx_level_{}; - size_t batch_size_; size_t changes_{}; + bool in_transaction_{}; + size_t batch_size_; }; constexpr std::string_view From cf9c09867fa553c7edb60651553cb9cf8b7e15ae Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Sun, 26 May 2024 14:20:47 +0300 Subject: [PATCH 2/6] store-worker: implement store-worker is a thread + async queue to throttle requests to a single thread. --- lib/meson.build | 1 + lib/mu-store-worker.cc | 68 +++++++++++++++++ lib/mu-store-worker.hh | 169 +++++++++++++++++++++++++++++++++++++++++ lib/mu-store.cc | 11 +++ lib/mu-store.hh | 10 ++- 5 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 lib/mu-store-worker.cc create mode 100644 lib/mu-store-worker.hh diff --git a/lib/meson.build b/lib/meson.build index b3b519d5..91acaf28 100644 --- a/lib/meson.build +++ b/lib/meson.build @@ -42,6 +42,7 @@ lib_mu=static_library( # misc 'mu-maildir.cc', 'mu-script.cc', + 'mu-store-worker.cc' ], dependencies: [ glib_dep, diff --git a/lib/mu-store-worker.cc b/lib/mu-store-worker.cc new file mode 100644 index 00000000..d4bcbbfc --- /dev/null +++ b/lib/mu-store-worker.cc @@ -0,0 +1,68 @@ +/* +** Copyright (C) 2024 Dirk-Jan C. Binnema +** +** This program is free software; you can redistribute it and/or modify it +** under the terms of the GNU General Public License as published by the +** Free Software Foundation; either version 3, or (at your option) any +** later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software Foundation, +** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +** +*/ + +#include "mu-store-worker.hh" +#include "mu-store.hh" +#include "utils/mu-utils.hh" + +#include + +using namespace Mu; + +// helper constant for the visitor +template inline constexpr bool always_false_v = false; + +void +StoreWorker::run() { + + running_ = true; + + while (running_) { + WorkItem workitem; + + if (!q_.pop(workitem)) + continue; + + std::visit([&](auto&& item) { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + if (!sexp_handler_) + mu_critical("no handler for sexp '{}'", item); + else + sexp_handler_(item); + } else if constexpr (std::is_same_v) { + store_.set_dirstamp(item.path, item.tstamp); + } else if constexpr (std::is_same_v) { + store_.config().set(item.tstamp); + } else if constexpr (std::is_same_v) { + store_.xapian_db().request_transaction(); + } else if constexpr (std::is_same_v) { + store_.xapian_db().request_commit(true); + } else if constexpr (std::is_same_v) { + store_.remove_messages(item); + } else if constexpr (std::is_same_v) { + store_.consume_message(std::move(item.msg), true/*new*/); + } else if constexpr (std::is_same_v) { + store_.consume_message(std::move(item.msg), false/*maybe not new*/); + } else + static_assert(always_false_v, "non-exhaustive visitor"); + }, workitem); + } +} diff --git a/lib/mu-store-worker.hh b/lib/mu-store-worker.hh new file mode 100644 index 00000000..6c6ce83b --- /dev/null +++ b/lib/mu-store-worker.hh @@ -0,0 +1,169 @@ +/* +** Copyright (C) 2024 Dirk-Jan C. Binnema +** +** This program is free software; you can redistribute it and/or modify it +** under the terms of the GNU General Public License as published by the +** Free Software Foundation; either version 3, or (at your option) any +** later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software Foundation, +** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +** +*/ + + +/** + * The store worker maintains a worker thread and an async queue to which + * commands can be added from any thread; the worker thread that is the sole + * thread to talk to the store / Xapian (at least for writing). + */ + +#ifndef MU_STORE_WORKER_HH +#define MU_STORE_WORKER_HH + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace Mu { + +/**< Sum type for all commands */ + +class Store; /// fwd declaration + +/** + * Worker for sending requests to the Store + * + * I.e. to execute database commands in a single thread. + */ +class StoreWorker { +public: + /** + * CTOR. This will create the store worker and start the worker thread. + * + * @param store a store + */ + StoreWorker(Store& store): + store_{store}, + runner_ {std::thread([this]{run();})} + {} + + /** + * DTOR. Destroy the store worker after joining the worker thread + */ + ~StoreWorker() { + running_ = false; + if (runner_.joinable()) + runner_.join(); + } + + /* + * The following types of work-item can be added to the queue: + */ + struct SetDirStamp { + std::string path; /**< full path to directory */ + ::time_t tstamp; /**< Timestamp for directory */ + }; /**< Write a directory timestamp to the store */ + + struct SetLastIndex { + ::time_t tstamp; /**< Timestamp */ + }; /**< Write last indexing timestamp to the store */ + + struct StartTransaction{}; /**< Request transaction start + * (opportunistically) */ + struct EndTransaction{}; /**< Request transaction end/commit + * (opportunistically) */ + struct AddMessage { + Message msg; /**< Add a new message */ + }; /**< Add a new message; this is faster version of UpdateMessage + * if we know the message does not exist yet. */ + struct UpdateMessage { + Message msg; /**< Add or update a message */ + }; /**< Add message or update if it already exists */ + + using RemoveMessages = std::vector; + /**< Remove all message with the given ids */ + using SexpCommand = std::string; /**< A sexp-command (i.e., from mu4e); + * requires install_sexp_handler() */ + + using WorkItem = std::variant; + /// Sumtype with all types of work-item + + using QueueType = AsyncQueue; + const QueueType& queue() const { return q_; } + QueueType& queue() { return q_; } + + /** + * Push a work item to the que + * + * @param item + */ + void push(WorkItem&& item) { + q_.push(std::move(item)); + } + + /** + * Get the current size of the work item queue + * + * @return the size + */ + size_t size() const { + return q_.size(); + } + + /** + * Is the work item queue empty? + * + * @return true or false + */ + bool empty() const { + return q_.empty(); + } + + /** + * Clear the queue of any items + */ + void clear() { + q_.clear(); + } + + using SexpCommandHandler = std::function; + /**< Prototype for a SexpCommand handler function */ + + /** + * Install a handler for Sexp commands + * + * @param handler + */ + void install_sexp_handler(SexpCommandHandler&& handler) { + sexp_handler_ = handler; + } + +private: + void run(); + size_t cleanup_orphans(); + + QueueType q_; + Store& store_;; + std::thread runner_; + std::atomic running_{}; + SexpCommandHandler sexp_handler_{}; +}; + +} // namespace Mu +#endif /*MU_STORE_WORKER_HH*/ diff --git a/lib/mu-store.cc b/lib/mu-store.cc index 0db198ac..5aa36709 100644 --- a/lib/mu-store.cc +++ b/lib/mu-store.cc @@ -131,6 +131,7 @@ struct Store::Private { XapianDb xapian_db_; Config config_; ContactsCache contacts_cache_; + std::unique_ptr store_worker_; std::unique_ptr indexer_; const std::string root_maildir_; @@ -252,6 +253,7 @@ Store::Store(Store&& other) { priv_ = std::move(other.priv_); priv_->indexer_.reset(); + priv_->store_worker_.reset(); } Store::~Store() = default; @@ -316,6 +318,15 @@ Store::indexer() return *priv_->indexer_.get(); } +StoreWorker& +Store::store_worker() +{ + if (!priv_->store_worker_) + priv_->store_worker_ = std::make_unique(*this); + + return *priv_->store_worker_; +} + Result Store::add_message(Message& msg, bool is_new) { diff --git a/lib/mu-store.hh b/lib/mu-store.hh index 07d8a03c..643bb31c 100644 --- a/lib/mu-store.hh +++ b/lib/mu-store.hh @@ -1,5 +1,5 @@ /* -** Copyright (C) 2023 Dirk-Jan C. Binnema +** Copyright (C) 2024 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the @@ -31,6 +31,7 @@ #include "mu-config.hh" #include "mu-indexer.hh" #include "mu-query-results.hh" +#include "mu-store-worker.hh" #include #include @@ -147,6 +148,13 @@ public: */ Indexer& indexer(); + /** + * Get the store-worker instance + * + * @return the store-worker + */ + StoreWorker& store_worker(); + /** * Run a query; see the `mu-query` man page for the syntax. * From f2f01595a51380ae38aafb4cd11a0d3c17a33a10 Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Sun, 26 May 2024 15:59:58 +0300 Subject: [PATCH 3/6] indexer: use store-worker Use the store worker (-thread) to do all database modification. Currently, the "removed" field of Progress is always 0. --- lib/mu-indexer.cc | 320 ++++++++++++++++++++-------------------------- 1 file changed, 138 insertions(+), 182 deletions(-) diff --git a/lib/mu-indexer.cc b/lib/mu-indexer.cc index 283a11b8..bfeac487 100644 --- a/lib/mu-indexer.cc +++ b/lib/mu-indexer.cc @@ -1,5 +1,5 @@ /* -** Copyright (C) 2020-2023 Dirk-Jan C. Binnema +** Copyright (C) 2020-2024 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the @@ -29,6 +29,7 @@ #include #include #include +#include #include using namespace std::chrono_literals; @@ -42,49 +43,28 @@ using namespace std::chrono_literals; using namespace Mu; -struct IndexState { - enum State { Idle, - Scanning, - Finishing, - Cleaning - }; - static const char* name(State s) { - switch (s) { - case Idle: - return "idle"; - case Scanning: - return "scanning"; - case Finishing: - return "finishing"; - case Cleaning: - return "cleaning"; - default: - return ""; - } - } +// states - bool operator==(State rhs) const { - return state_.load() == rhs; +enum struct State { Idle, Scanning, Draining }; +constexpr std::string_view +format_as(State s) +{ + switch (s) { + case State::Idle: return "idle"; + case State::Scanning: return "scanning"; + case State::Draining: return "draining"; + default: return ""; } - bool operator!=(State rhs) const { - return state_.load() != rhs; - } - void change_to(State new_state) { - mu_debug("changing indexer state {}->{}", name((State)state_), - name((State)new_state)); - state_.store(new_state); - } - -private: - std::atomic state_{Idle}; -}; +} struct Indexer::Private { Private(Mu::Store& store) - : store_{store}, scanner_{store_.root_maildir(), + : store_{store}, + store_worker_{store.store_worker()}, + scanner_{store_.root_maildir(), [this](auto&& path, auto&& statbuf, auto&& info) { - return handler(path, statbuf, info); + return scan_handler(path, statbuf, info); }}, max_message_size_{store_.config().get()}, was_empty_{store.empty()} { @@ -97,27 +77,55 @@ struct Indexer::Private { store.config().get()); } - ~Private() { - stop(); + ~Private() { force_cleanup(); } + + void force_cleanup() { + switch_state(State::Idle); + scanner_.stop(); + if (scanner_worker_.joinable()) + scanner_worker_.join(); + msg_paths_.clear(); + for (auto&& w : workers_) + if (w.joinable()) + w.join(); + store_worker_.clear(); } bool dir_predicate(const std::string& path, const struct dirent* dirent) const; - bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype); + bool scan_handler(const std::string& fullpath, struct stat* statbuf, + Scanner::HandleType htype); void maybe_start_worker(); void item_worker(); void scan_worker(); - bool add_message(const std::string& path); - - bool cleanup(); bool start(const Indexer::Config& conf, bool block); bool stop(); - bool is_running() const { return state_ != IndexState::Idle; } + bool is_running() const { return state_ != State::Idle; } + void switch_state(State new_state) { + mu_debug("changing indexer state {}->{}", state_, new_state); + state_ = new_state; + } + + // pace a bit so scan_items queue doesn't get too big. + void pace_scan_worker() { + while (msg_paths_.size() > 8 * max_workers_) { + std::this_thread::sleep_for(25ms); + continue; + } + } + // pace a bit so store-worker queue doesn't get too big. + void pace_store_worker() { + while (store_worker_.size() > 8) { + std::this_thread::sleep_for(25ms); + continue; + } + } Indexer::Config conf_; - Store& store_; + const Store& store_; + StoreWorker& store_worker_; Scanner scanner_; const size_t max_message_size_; @@ -126,27 +134,18 @@ struct Indexer::Private { std::vector workers_; std::thread scanner_worker_; - struct WorkItem { - std::string full_path; - enum Type { - Dir, - File - }; - Type type; - }; - - AsyncQueue todos_; + AsyncQueue msg_paths_; Progress progress_{}; - IndexState state_{}; + std::atomic state_{State::Idle}; std::mutex lock_, w_lock_; std::atomic completed_{}; bool was_empty_{}; }; bool -Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, - Scanner::HandleType htype) +Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf, + Scanner::HandleType htype) { switch (htype) { case Scanner::HandleType::EnterDir: @@ -160,7 +159,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, return false; } - // in lazy-mode, we ignore this dir if its dirstamp suggest it + // in lazy-mode, we ignore this dir if its dirstamp suggests it // is up-to-date (this is _not_ always true; hence we call it // lazy-mode); only for actual message dirs, since the dir // tstamps may not bubble up.U @@ -193,14 +192,16 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, return true; } case Scanner::HandleType::LeaveDir: { - todos_.push({fullpath, WorkItem::Type::Dir}); + // directly push to store worker, bypass scan-items queue + pace_store_worker(); + store_worker_.push(StoreWorker::SetDirStamp{fullpath, ::time({})}); return true; } case Scanner::HandleType::File: { ++progress_.checked; - if ((size_t)statbuf->st_size > max_message_size_) { + if (static_cast(statbuf->st_size) > max_message_size_) { mu_debug("skip {} (too big: {} bytes)", fullpath, statbuf->st_size); return false; } @@ -210,9 +211,10 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath)) return false; - // push the remaining messages to our "todo" queue for + // push the remaining messages to our "scan-items" queue for // (re)parsing and adding/updating to the database. - todos_.push({fullpath, WorkItem::Type::File}); + pace_scan_worker(); + msg_paths_.push(std::string{fullpath}); // move? return true; } default: @@ -226,158 +228,94 @@ Indexer::Private::maybe_start_worker() { std::lock_guard lock{w_lock_}; - if (todos_.size() > workers_.size() && workers_.size() < max_workers_) { + if (msg_paths_.size() > workers_.size() && workers_.size() < max_workers_) { workers_.emplace_back(std::thread([this] { item_worker(); })); mu_debug("added worker {}", workers_.size()); } } -bool -Indexer::Private::add_message(const std::string& path) -{ - /* - * Having the lock here makes things a _lot_ slower. - * - * The reason for having the lock is some helgrind warnings; - * but it believed those are _false alarms_ - * https://gitlab.gnome.org/GNOME/glib/-/issues/2662 - */ - //std::unique_lock lock{w_lock_}; - auto msg{Message::make_from_path(path, store_.message_options())}; - if (!msg) { - mu_warning("failed to create message from {}: {}", path, msg.error().what()); - return false; - } - // if the store was empty, we know that the message is completely new - // and can use the fast path (Xapians 'add_document' rather than - // 'replace_document) - auto res = store_.consume_message(std::move(msg.value()), was_empty_); - if (!res) { - mu_warning("failed to add message @ {}: {}", path, res.error().what()); - return false; - } - - return true; -} - void Indexer::Private::item_worker() { - WorkItem item; - mu_debug("started worker"); - while (state_ == IndexState::Scanning) { - if (!todos_.pop(item, 250ms)) + while (state_ == State::Scanning || + (state_ == State::Draining && !msg_paths_.empty())) { + + std::string msgpath; + if (!msg_paths_.pop(msgpath, 250ms)) + continue; + + auto msg{Message::make_from_path(msgpath, store_.message_options())}; + if (!msg) { + mu_warning("failed to create message from {}: {}", + msgpath, msg.error().what()); continue; - try { - switch (item.type) { - case WorkItem::Type::File: { - if (G_LIKELY(add_message(item.full_path))) - ++progress_.updated; - } break; - case WorkItem::Type::Dir: - store_.set_dirstamp(item.full_path, ::time(NULL)); - break; - default: - g_warn_if_reached(); - break; - } - } catch (const Mu::Error& er) { - mu_warning("error adding message @ {}: {}", item.full_path, er.what()); } + pace_store_worker(); /* slow down if store-worker q gets too big */ + + // if the store was empty, we know that the message is + // completely new and can use the fast path (Xapians + // 'add_document' rather than 'replace_document) + if (was_empty_) + store_worker_.push(StoreWorker::AddMessage{std::move(*msg)}); + else + store_worker_.push(StoreWorker::UpdateMessage{std::move(*msg)}); + ++progress_.updated; + maybe_start_worker(); - std::this_thread::yield(); } } -bool -Indexer::Private::cleanup() -{ - mu_debug("starting cleanup"); - - size_t n{}; - std::vector orphans; // store messages without files. - store_.for_each_message_path([&](Store::Id id, const std::string& path) { - ++n; - if (::access(path.c_str(), R_OK) != 0) { - mu_debug("cannot read {} (id={}); queuing for removal from store", - path, id); - orphans.emplace_back(id); - } - - return state_ == IndexState::Cleaning; - }); - - if (orphans.empty()) - mu_debug("nothing to clean up"); - else { - mu_debug("removing {} stale message(s) from store", orphans.size()); - store_.remove_messages(orphans); - progress_.removed += orphans.size(); - } - - return true; -} - void Indexer::Private::scan_worker() { - progress_.reset(); if (conf_.scan) { mu_debug("starting scanner"); if (!scanner_.start()) { // blocks. mu_warning("failed to start scanner"); - state_.change_to(IndexState::Idle); + switch_state(State::Idle); return; } - mu_debug("scanner finished with {} file(s) in queue", todos_.size()); + mu_debug("scanner finished with {} file(s) in queue", msg_paths_.size()); } - // now there may still be messages in the work queue... - // finish those; this is a bit ugly; perhaps we should - // handle SIGTERM etc. - - if (!todos_.empty()) { - const auto workers_size = std::invoke([this] { - std::lock_guard lock{w_lock_}; - return workers_.size(); - }); - mu_debug("process {} remaining message(s) with {} worker(s)", - todos_.size(), workers_size); - while (!todos_.empty()) - std::this_thread::sleep_for(100ms); - } - // and let the worker finish their work. - state_.change_to(IndexState::Finishing); - for (auto&& w : workers_) - if (w.joinable()) - w.join(); - if (conf_.cleanup) { - mu_debug("starting cleanup"); - state_.change_to(IndexState::Cleaning); - cleanup(); - mu_debug("cleanup finished"); + mu_debug("starting cleanup with work-item(s) left: {}", + store_worker_.size()); + + std::vector orphans; // store messages without files. + store_.for_each_message_path([&](Store::Id id, const std::string& path) { + if (::access(path.c_str(), R_OK) != 0) { + mu_debug("orphan: cannot read {} (id={})", path, id); + orphans.emplace_back(id); + } + return true; + }); + progress_.removed = orphans.size(); + if (!orphans.empty()) { + mu_info("removing {} orphan message(s)", orphans.size()); + store_worker_.push(StoreWorker::RemoveMessages{std::move(orphans)}); + } } completed_ = ::time({}); - store_.config().set(completed_); - state_.change_to(IndexState::Idle); + store_worker_.push(StoreWorker::SetLastIndex{completed_}); + + stop(); } bool Indexer::Private::start(const Indexer::Config& conf, bool block) { - stop(); + force_cleanup(); conf_ = conf; if (conf_.max_threads == 0) { /* benchmarking suggests that ~4 threads is the fastest (the * real bottleneck is the database, so adding more threads just - * slows things down) - */ + * slows things down) */ max_workers_ = std::min(4U, std::thread::hardware_concurrency()); } else max_workers_ = conf.max_threads; @@ -391,11 +329,14 @@ Indexer::Private::start(const Indexer::Config& conf, bool block) mu_debug("indexing: {}; clean-up: {}", conf_.scan ? "yes" : "no", conf_.cleanup ? "yes" : "no"); - state_.change_to(IndexState::Scanning); + progress_.reset(); + switch_state(State::Scanning); /* kick off the first worker, which will spawn more if needed. */ - workers_.emplace_back(std::thread([this] { item_worker(); })); - /* kick the disk-scanner thread */ - scanner_worker_ = std::thread([this] { scan_worker(); }); + workers_.emplace_back(std::thread([this]{item_worker();})); + /* kick the file-system-scanner thread */ + if (scanner_worker_.joinable()) + scanner_worker_.join(); // kill old one + scanner_worker_ = std::thread([this]{scan_worker();}); mu_debug("started indexer in {}-mode", block ? "blocking" : "non-blocking"); if (block) { @@ -411,18 +352,33 @@ Indexer::Private::start(const Indexer::Config& conf, bool block) bool Indexer::Private::stop() { + switch_state(State::Draining); + scanner_.stop(); + // cannot join scanner_worker_ here since it may be our + // current thread. - todos_.clear(); - if (scanner_worker_.joinable()) - scanner_worker_.join(); + // wait for completion. + while (!msg_paths_.empty()) { + mu_debug("scan-items left: {}", msg_paths_.size()); + std::this_thread::sleep_for(250ms); + } - state_.change_to(IndexState::Idle); for (auto&& w : workers_) if (w.joinable()) w.join(); workers_.clear(); + store_worker_.push(StoreWorker::EndTransaction()); + + // wait for completion. + while (!store_worker_.empty()) { + mu_debug("work-items left: {}", store_worker_.size()); + std::this_thread::sleep_for(250ms); + } + + switch_state(State::Idle); + return true; } @@ -469,7 +425,7 @@ Indexer::is_running() const const Indexer::Progress& Indexer::progress() const { - priv_->progress_.running = priv_->state_ == IndexState::Idle ? false : true; + priv_->progress_.running = priv_->state_ == State::Idle ? false : true; return priv_->progress_; } From 697d6b6b4fe922af17a7535f25497798d925b68f Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Mon, 27 May 2024 23:02:42 +0300 Subject: [PATCH 4/6] server: pass sexp-commmands through store worker To ensure all Xapian rw commands happen in the same thread. --- lib/mu-server.cc | 70 ++++++++++++++++++++++++++++++++++++--------- mu/mu-cmd-server.cc | 5 ++-- 2 files changed, 60 insertions(+), 15 deletions(-) diff --git a/lib/mu-server.cc b/lib/mu-server.cc index 62c9ca0c..edc63982 100644 --- a/lib/mu-server.cc +++ b/lib/mu-server.cc @@ -1,5 +1,5 @@ /* -** Copyright (C) 2020-2023 Dirk-Jan C. Binnema +** Copyright (C) 2020-2024 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the @@ -29,9 +29,12 @@ #include #include #include +#include #include #include + + #include #include #include @@ -116,7 +119,7 @@ struct OutputStream { } private: - std::string fname_; + std::string fname_; using OutType = std::variant; OutType out_; }; @@ -126,11 +129,21 @@ private: /// @brief object to manage the server-context for all commands. struct Server::Private { Private(Store& store, const Server::Options& opts, Output output) - : store_{store}, options_{opts}, output_{output}, + : store_{store}, + store_worker_{store.store_worker()}, + options_{opts}, output_{output}, command_handler_{make_command_map()}, keep_going_{true}, - tmp_dir_{unwrap(make_temp_dir())} - {} + tmp_dir_{unwrap(make_temp_dir())} { + + // tell the store-worker that we (this class) can handle + // sexp strings. + store_worker_.install_sexp_handler( + [this](const std::string& sexp) { + this->invoke(sexp); + }); + + } ~Private() { indexer().stop(); @@ -148,13 +161,10 @@ struct Server::Private { // acccessors Store& store() { return store_; } const Store& store() const { return store_; } + StoreWorker& store_worker() { return store_worker_; } Indexer& indexer() { return store().indexer(); } //CommandMap& command_map() const { return command_map_; } - // - // invoke - // - bool invoke(const std::string& expr) noexcept; // // output @@ -186,7 +196,19 @@ struct Server::Private { void remove_handler(const Command& cmd); void view_handler(const Command& cmd); + bool keep_going() const { return keep_going_; } + void set_keep_going(bool going) { keep_going_ = going; } + + // make main thread wait until done with the command. + std::mutex done_lock_; + std::condition_variable done_cond_; + private: + // + // invoke + // + bool invoke(const std::string& expr) noexcept; + void move_docid(Store::Id docid, Option flagstr, bool new_name, bool no_view); @@ -209,6 +231,7 @@ private: std::ofstream make_temp_file_stream(std::string& fname) const; Store& store_; + StoreWorker& store_worker_; Server::Options options_; Server::Output output_; const CommandHandler command_handler_; @@ -480,6 +503,10 @@ Server::Private::invoke(const std::string& expr) noexcept keep_going_ = false; } + // tell main thread we're done with the command. + std::lock_guard l{done_lock_}; + done_cond_.notify_one(); + return keep_going_; } @@ -690,9 +717,6 @@ Server::Private::find_handler(const Command& cmd) StopWatch sw{mu_format("{} (indexing: {})", __func__, indexer().is_running() ? "yes" : "no")}; - // we need to _lock_ the store while querying (which likely consists of - // multiple actual queries) + grabbing the results. - std::lock_guard l{store_.lock()}; auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)}; if (!qres) throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what()); @@ -1081,7 +1105,27 @@ Server::~Server() = default; bool Server::invoke(const std::string& expr) noexcept { - return priv_->invoke(expr); + /* a _little_ hacky; handle _quit_ directly to properly + * shut down the server */ + if (expr == "(quit)") { + mu_debug("quitting"); + priv_->set_keep_going(false); + return false; + } + + /* + * feed the command to the queue; it'll get executed in the + * store-worker thread; however, sync on its completion + * so we get its keep_going() result + * + * as an added bonus, this ensures mu server shell doesn't require an + * extra user RET to get back the prompt + */ + std::unique_lock done_lock{priv_->done_lock_}; + priv_->store_worker().push(StoreWorker::SexpCommand{expr}); + priv_->done_cond_.wait(done_lock); + + return priv_->keep_going(); } /* LCOV_EXCL_STOP */ diff --git a/mu/mu-cmd-server.cc b/mu/mu-cmd-server.cc index 3a694566..50c7c010 100644 --- a/mu/mu-cmd-server.cc +++ b/mu/mu-cmd-server.cc @@ -79,11 +79,12 @@ cookie(size_t n) ::printf(COOKIE_PRE "%x" COOKIE_POST, num); } - - static void output_stdout(const std::string& str, Server::OutputFlags flags) { + // Note: with the StoreWorker, we _always_ need to flush + flags |= Server::OutputFlags::Flush; + cookie(str.size() + 1); if (G_UNLIKELY(::puts(str.c_str()) < 0)) { mu_critical("failed to write output '{}'", str); From b4ff8d62af8c3a0224ab581ba139a9508ed7ee2b Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Sun, 2 Jun 2024 10:37:39 +0300 Subject: [PATCH 5/6] server: support doccount data request So we can update doccount in mu4e after indexing --- lib/mu-server.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/mu-server.cc b/lib/mu-server.cc index edc63982..e18ae390 100644 --- a/lib/mu-server.cc +++ b/lib/mu-server.cc @@ -586,7 +586,9 @@ Server::Private::data_handler(const Command& cmd) { const auto request_type{unwrap(cmd.symbol_arg(":kind"))}; - if (request_type == "maildirs") { + if (request_type == "doccount") { + output(mu_format("(:doccount {})", store().size())); + } else if (request_type == "maildirs") { auto&& out{make_output_stream()}; mu_print(out, "("); for (auto&& mdir: store().maildirs()) From 525d110f7cdf2a24f558de17ca834067f9fb1633 Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Sun, 2 Jun 2024 10:38:37 +0300 Subject: [PATCH 6/6] mu4e: improve update data Update doccount after indexing. --- mu4e/mu4e-server.el | 18 ++++++++++--- mu4e/mu4e-update.el | 6 ++--- mu4e/mu4e.el | 65 ++++++++++++++++++++++++--------------------- 3 files changed, 52 insertions(+), 37 deletions(-) diff --git a/mu4e/mu4e-server.el b/mu4e/mu4e-server.el index fae83514..0790501d 100644 --- a/mu4e/mu4e-server.el +++ b/mu4e/mu4e-server.el @@ -385,7 +385,10 @@ The server output is as follows: ((plist-get sexp :info) (funcall mu4e-info-func sexp)) - ;; get some data + ;; get some data XXX generalize + ((plist-get sexp :doccount) + (plist-put mu4e--server-props :doccount + (mu4e--server-plist-get sexp :doccount))) ((plist-get sexp :maildirs) (setq mu4e-maildir-list (mu4e--server-plist-get sexp :maildirs))) @@ -556,9 +559,16 @@ get at most MAX contacts." (defun mu4e--server-data (kind) "Request data of some KIND. -KIND is a symbol. Currently supported kinds: maildirs." - (mu4e--server-call-mu - `(data :kind ,kind))) +KIND is a symbol or a list of symbols. Currently supported kinds: + `maildirs', `doccount'." + (pcase kind + ((pred (lambda (k) (memq k '(maildirs doccount)))) + (mu4e--server-call-mu `(data :kind ,kind))) + ((pred listp) + (when kind + (mu4e--server-data (car kind)) + (mu4e--server-data (cdr kind)))) + (_ (mu4e-error "Unexpected kind %s" kind)))) (defun mu4e--server-find (query threads sortfield sortdir maxnum skip-dups include-related) diff --git a/mu4e/mu4e-update.el b/mu4e/mu4e-update.el index 5bb9e1da..11228fac 100644 --- a/mu4e/mu4e-update.el +++ b/mu4e/mu4e-update.el @@ -130,9 +130,9 @@ changed") If non-nil, this is a plist of the form: \( :checked (checked whether up-to-date) -:updated +:cleaned-up +:stamp ") (defconst mu4e-last-update-buffer "*mu4e-last-update*" "Name of buffer with cloned from the last update buffer. diff --git a/mu4e/mu4e.el b/mu4e/mu4e.el index 8efc7b68..31d4bf60 100644 --- a/mu4e/mu4e.el +++ b/mu4e/mu4e.el @@ -1,6 +1,6 @@ ;;; mu4e.el --- Mu4e, the mu mail user agent -*- lexical-binding: t -*- -;; Copyright (C) 2011-2023 Dirk-Jan C. Binnema +;; Copyright (C) 2011-2024 Dirk-Jan C. Binnema ;; Author: Dirk-Jan C. Binnema ;; Maintainer: Dirk-Jan C. Binnema @@ -203,42 +203,47 @@ Otherwise, check requirements, then start mu4e. When successful, invoke (_ (mu4e-error "Error %d: %s" errcode errmsg)))) (defun mu4e--update-status (info) - "Update the status message with INFO." + "Update `mu4e-index-update-status' with INFO. +Return the former. As an additional side-effect, updates +doccount in server-properties." (setq mu4e-index-update-status - `(:tstamp ,(current-time) - :checked ,(plist-get info :checked) - :updated ,(plist-get info :updated) - :cleaned-up ,(plist-get info :cleaned-up)))) + `(:tstamp ,(current-time) + :checked ,(or (plist-get info :checked) 0) + :updated ,(or (plist-get info :updated) 0) + :cleaned-up ,(or (plist-get info :cleaned-up) 0))) + mu4e-index-update-status) (defun mu4e--info-handler (info) "Handler function for (:INFO ...) sexps received from server." - (let* ((type (plist-get info :info)) - (checked (plist-get info :checked)) - (updated (plist-get info :updated)) - (cleaned-up (plist-get info :cleaned-up))) + (let* ((type (plist-get info :info))) (cond ((eq type 'add) t) ;; do nothing ((eq type 'index) - (if (eq (plist-get info :status) 'running) - (mu4e-index-message - "Indexing... checked %d, updated %d" checked updated) - (progn ;; i.e. 'complete - (mu4e--update-status info) - (mu4e-index-message - "%s completed; checked %d, updated %d, cleaned-up %d" - (if mu4e-index-lazy-check "Lazy indexing" "Indexing") - checked updated cleaned-up) - ;; index done; grab updated queries - (mu4e--query-items-refresh) - (run-hooks 'mu4e-index-updated-hook) - ;; backward compatibility... - (unless (zerop (+ updated cleaned-up)) - mu4e-message-changed-hook) - (unless (and (not (string= mu4e--contacts-tstamp "0")) - (zerop (plist-get info :updated))) - (mu4e--request-contacts-maybe) - (mu4e--server-data 'maildirs)) ;; update maildir list - (mu4e--main-redraw)))) + (let* ((info (mu4e--update-status info)) + (checked (plist-get info :checked)) + (updated (plist-get info :updated)) + (cleaned-up (plist-get info :cleaned-up))) + (if (eq (plist-get info :status) 'running) + (mu4e-index-message + "Indexing... checked %d, updated %d" + checked updated) + (progn ;; i.e. 'complete + (mu4e-index-message + "%s completed; checked %d, updated %d, cleaned-up %d" + (if mu4e-index-lazy-check "Lazy indexing" "Indexing") + checked updated cleaned-up) + ;; index done; grab updated queries + (mu4e--query-items-refresh) + (run-hooks 'mu4e-index-updated-hook) + ;; backward compatibility... + (unless (zerop (+ updated cleaned-up)) + mu4e-message-changed-hook) + (unless (and (not (string= mu4e--contacts-tstamp "0")) + (zerop (plist-get info :updated))) + (mu4e--request-contacts-maybe) + (mu4e--server-data '(maildirs doccount))) + ;; update maildir list & doccount + (mu4e--main-redraw))))) ((plist-get info :message) (mu4e-index-message "%s" (plist-get info :message))))))