diff --git a/lib/meson.build b/lib/meson.build index b3b519d5..91acaf28 100644 --- a/lib/meson.build +++ b/lib/meson.build @@ -42,6 +42,7 @@ lib_mu=static_library( # misc 'mu-maildir.cc', 'mu-script.cc', + 'mu-store-worker.cc' ], dependencies: [ glib_dep, diff --git a/lib/mu-store-worker.cc b/lib/mu-store-worker.cc new file mode 100644 index 00000000..d4bcbbfc --- /dev/null +++ b/lib/mu-store-worker.cc @@ -0,0 +1,68 @@ +/* +** Copyright (C) 2024 Dirk-Jan C. Binnema +** +** This program is free software; you can redistribute it and/or modify it +** under the terms of the GNU General Public License as published by the +** Free Software Foundation; either version 3, or (at your option) any +** later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software Foundation, +** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +** +*/ + +#include "mu-store-worker.hh" +#include "mu-store.hh" +#include "utils/mu-utils.hh" + +#include + +using namespace Mu; + +// helper constant for the visitor +template inline constexpr bool always_false_v = false; + +void +StoreWorker::run() { + + running_ = true; + + while (running_) { + WorkItem workitem; + + if (!q_.pop(workitem)) + continue; + + std::visit([&](auto&& item) { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + if (!sexp_handler_) + mu_critical("no handler for sexp '{}'", item); + else + sexp_handler_(item); + } else if constexpr (std::is_same_v) { + store_.set_dirstamp(item.path, item.tstamp); + } else if constexpr (std::is_same_v) { + store_.config().set(item.tstamp); + } else if constexpr (std::is_same_v) { + store_.xapian_db().request_transaction(); + } else if constexpr (std::is_same_v) { + store_.xapian_db().request_commit(true); + } else if constexpr (std::is_same_v) { + store_.remove_messages(item); + } else if constexpr (std::is_same_v) { + store_.consume_message(std::move(item.msg), true/*new*/); + } else if constexpr (std::is_same_v) { + store_.consume_message(std::move(item.msg), false/*maybe not new*/); + } else + static_assert(always_false_v, "non-exhaustive visitor"); + }, workitem); + } +} diff --git a/lib/mu-store-worker.hh b/lib/mu-store-worker.hh new file mode 100644 index 00000000..6c6ce83b --- /dev/null +++ b/lib/mu-store-worker.hh @@ -0,0 +1,169 @@ +/* +** Copyright (C) 2024 Dirk-Jan C. Binnema +** +** This program is free software; you can redistribute it and/or modify it +** under the terms of the GNU General Public License as published by the +** Free Software Foundation; either version 3, or (at your option) any +** later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software Foundation, +** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +** +*/ + + +/** + * The store worker maintains a worker thread and an async queue to which + * commands can be added from any thread; the worker thread that is the sole + * thread to talk to the store / Xapian (at least for writing). + */ + +#ifndef MU_STORE_WORKER_HH +#define MU_STORE_WORKER_HH + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace Mu { + +/**< Sum type for all commands */ + +class Store; /// fwd declaration + +/** + * Worker for sending requests to the Store + * + * I.e. to execute database commands in a single thread. + */ +class StoreWorker { +public: + /** + * CTOR. This will create the store worker and start the worker thread. + * + * @param store a store + */ + StoreWorker(Store& store): + store_{store}, + runner_ {std::thread([this]{run();})} + {} + + /** + * DTOR. Destroy the store worker after joining the worker thread + */ + ~StoreWorker() { + running_ = false; + if (runner_.joinable()) + runner_.join(); + } + + /* + * The following types of work-item can be added to the queue: + */ + struct SetDirStamp { + std::string path; /**< full path to directory */ + ::time_t tstamp; /**< Timestamp for directory */ + }; /**< Write a directory timestamp to the store */ + + struct SetLastIndex { + ::time_t tstamp; /**< Timestamp */ + }; /**< Write last indexing timestamp to the store */ + + struct StartTransaction{}; /**< Request transaction start + * (opportunistically) */ + struct EndTransaction{}; /**< Request transaction end/commit + * (opportunistically) */ + struct AddMessage { + Message msg; /**< Add a new message */ + }; /**< Add a new message; this is faster version of UpdateMessage + * if we know the message does not exist yet. */ + struct UpdateMessage { + Message msg; /**< Add or update a message */ + }; /**< Add message or update if it already exists */ + + using RemoveMessages = std::vector; + /**< Remove all message with the given ids */ + using SexpCommand = std::string; /**< A sexp-command (i.e., from mu4e); + * requires install_sexp_handler() */ + + using WorkItem = std::variant; + /// Sumtype with all types of work-item + + using QueueType = AsyncQueue; + const QueueType& queue() const { return q_; } + QueueType& queue() { return q_; } + + /** + * Push a work item to the que + * + * @param item + */ + void push(WorkItem&& item) { + q_.push(std::move(item)); + } + + /** + * Get the current size of the work item queue + * + * @return the size + */ + size_t size() const { + return q_.size(); + } + + /** + * Is the work item queue empty? + * + * @return true or false + */ + bool empty() const { + return q_.empty(); + } + + /** + * Clear the queue of any items + */ + void clear() { + q_.clear(); + } + + using SexpCommandHandler = std::function; + /**< Prototype for a SexpCommand handler function */ + + /** + * Install a handler for Sexp commands + * + * @param handler + */ + void install_sexp_handler(SexpCommandHandler&& handler) { + sexp_handler_ = handler; + } + +private: + void run(); + size_t cleanup_orphans(); + + QueueType q_; + Store& store_;; + std::thread runner_; + std::atomic running_{}; + SexpCommandHandler sexp_handler_{}; +}; + +} // namespace Mu +#endif /*MU_STORE_WORKER_HH*/ diff --git a/lib/mu-store.cc b/lib/mu-store.cc index 0db198ac..5aa36709 100644 --- a/lib/mu-store.cc +++ b/lib/mu-store.cc @@ -131,6 +131,7 @@ struct Store::Private { XapianDb xapian_db_; Config config_; ContactsCache contacts_cache_; + std::unique_ptr store_worker_; std::unique_ptr indexer_; const std::string root_maildir_; @@ -252,6 +253,7 @@ Store::Store(Store&& other) { priv_ = std::move(other.priv_); priv_->indexer_.reset(); + priv_->store_worker_.reset(); } Store::~Store() = default; @@ -316,6 +318,15 @@ Store::indexer() return *priv_->indexer_.get(); } +StoreWorker& +Store::store_worker() +{ + if (!priv_->store_worker_) + priv_->store_worker_ = std::make_unique(*this); + + return *priv_->store_worker_; +} + Result Store::add_message(Message& msg, bool is_new) { diff --git a/lib/mu-store.hh b/lib/mu-store.hh index 07d8a03c..643bb31c 100644 --- a/lib/mu-store.hh +++ b/lib/mu-store.hh @@ -1,5 +1,5 @@ /* -** Copyright (C) 2023 Dirk-Jan C. Binnema +** Copyright (C) 2024 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the @@ -31,6 +31,7 @@ #include "mu-config.hh" #include "mu-indexer.hh" #include "mu-query-results.hh" +#include "mu-store-worker.hh" #include #include @@ -147,6 +148,13 @@ public: */ Indexer& indexer(); + /** + * Get the store-worker instance + * + * @return the store-worker + */ + StoreWorker& store_worker(); + /** * Run a query; see the `mu-query` man page for the syntax. *