From 697d6b6b4fe922af17a7535f25497798d925b68f Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Mon, 27 May 2024 23:02:42 +0300 Subject: [PATCH] server: pass sexp-commmands through store worker To ensure all Xapian rw commands happen in the same thread. --- lib/mu-server.cc | 70 ++++++++++++++++++++++++++++++++++++--------- mu/mu-cmd-server.cc | 5 ++-- 2 files changed, 60 insertions(+), 15 deletions(-) diff --git a/lib/mu-server.cc b/lib/mu-server.cc index 62c9ca0c..edc63982 100644 --- a/lib/mu-server.cc +++ b/lib/mu-server.cc @@ -1,5 +1,5 @@ /* -** Copyright (C) 2020-2023 Dirk-Jan C. Binnema +** Copyright (C) 2020-2024 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the @@ -29,9 +29,12 @@ #include #include #include +#include #include #include + + #include #include #include @@ -116,7 +119,7 @@ struct OutputStream { } private: - std::string fname_; + std::string fname_; using OutType = std::variant; OutType out_; }; @@ -126,11 +129,21 @@ private: /// @brief object to manage the server-context for all commands. struct Server::Private { Private(Store& store, const Server::Options& opts, Output output) - : store_{store}, options_{opts}, output_{output}, + : store_{store}, + store_worker_{store.store_worker()}, + options_{opts}, output_{output}, command_handler_{make_command_map()}, keep_going_{true}, - tmp_dir_{unwrap(make_temp_dir())} - {} + tmp_dir_{unwrap(make_temp_dir())} { + + // tell the store-worker that we (this class) can handle + // sexp strings. + store_worker_.install_sexp_handler( + [this](const std::string& sexp) { + this->invoke(sexp); + }); + + } ~Private() { indexer().stop(); @@ -148,13 +161,10 @@ struct Server::Private { // acccessors Store& store() { return store_; } const Store& store() const { return store_; } + StoreWorker& store_worker() { return store_worker_; } Indexer& indexer() { return store().indexer(); } //CommandMap& command_map() const { return command_map_; } - // - // invoke - // - bool invoke(const std::string& expr) noexcept; // // output @@ -186,7 +196,19 @@ struct Server::Private { void remove_handler(const Command& cmd); void view_handler(const Command& cmd); + bool keep_going() const { return keep_going_; } + void set_keep_going(bool going) { keep_going_ = going; } + + // make main thread wait until done with the command. + std::mutex done_lock_; + std::condition_variable done_cond_; + private: + // + // invoke + // + bool invoke(const std::string& expr) noexcept; + void move_docid(Store::Id docid, Option flagstr, bool new_name, bool no_view); @@ -209,6 +231,7 @@ private: std::ofstream make_temp_file_stream(std::string& fname) const; Store& store_; + StoreWorker& store_worker_; Server::Options options_; Server::Output output_; const CommandHandler command_handler_; @@ -480,6 +503,10 @@ Server::Private::invoke(const std::string& expr) noexcept keep_going_ = false; } + // tell main thread we're done with the command. + std::lock_guard l{done_lock_}; + done_cond_.notify_one(); + return keep_going_; } @@ -690,9 +717,6 @@ Server::Private::find_handler(const Command& cmd) StopWatch sw{mu_format("{} (indexing: {})", __func__, indexer().is_running() ? "yes" : "no")}; - // we need to _lock_ the store while querying (which likely consists of - // multiple actual queries) + grabbing the results. - std::lock_guard l{store_.lock()}; auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)}; if (!qres) throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what()); @@ -1081,7 +1105,27 @@ Server::~Server() = default; bool Server::invoke(const std::string& expr) noexcept { - return priv_->invoke(expr); + /* a _little_ hacky; handle _quit_ directly to properly + * shut down the server */ + if (expr == "(quit)") { + mu_debug("quitting"); + priv_->set_keep_going(false); + return false; + } + + /* + * feed the command to the queue; it'll get executed in the + * store-worker thread; however, sync on its completion + * so we get its keep_going() result + * + * as an added bonus, this ensures mu server shell doesn't require an + * extra user RET to get back the prompt + */ + std::unique_lock done_lock{priv_->done_lock_}; + priv_->store_worker().push(StoreWorker::SexpCommand{expr}); + priv_->done_cond_.wait(done_lock); + + return priv_->keep_going(); } /* LCOV_EXCL_STOP */ diff --git a/mu/mu-cmd-server.cc b/mu/mu-cmd-server.cc index 3a694566..50c7c010 100644 --- a/mu/mu-cmd-server.cc +++ b/mu/mu-cmd-server.cc @@ -79,11 +79,12 @@ cookie(size_t n) ::printf(COOKIE_PRE "%x" COOKIE_POST, num); } - - static void output_stdout(const std::string& str, Server::OutputFlags flags) { + // Note: with the StoreWorker, we _always_ need to flush + flags |= Server::OutputFlags::Flush; + cookie(str.size() + 1); if (G_UNLIKELY(::puts(str.c_str()) < 0)) { mu_critical("failed to write output '{}'", str);