server: pass sexp-commmands through store worker

To ensure all Xapian rw commands happen in the same thread.
This commit is contained in:
Dirk-Jan C. Binnema
2024-05-27 23:02:42 +03:00
parent f2f01595a5
commit 697d6b6b4f
2 changed files with 60 additions and 15 deletions

View File

@ -1,5 +1,5 @@
/* /*
** Copyright (C) 2020-2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ** Copyright (C) 2020-2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
** **
** This program is free software; you can redistribute it and/or modify it ** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the ** under the terms of the GNU General Public License as published by the
@ -29,9 +29,12 @@
#include <atomic> #include <atomic>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable>
#include <variant> #include <variant>
#include <functional> #include <functional>
#include <cstring> #include <cstring>
#include <glib.h> #include <glib.h>
#include <glib/gprintf.h> #include <glib/gprintf.h>
@ -116,7 +119,7 @@ struct OutputStream {
} }
private: private:
std::string fname_; std::string fname_;
using OutType = std::variant<std::ofstream, std::ostringstream>; using OutType = std::variant<std::ofstream, std::ostringstream>;
OutType out_; OutType out_;
}; };
@ -126,11 +129,21 @@ private:
/// @brief object to manage the server-context for all commands. /// @brief object to manage the server-context for all commands.
struct Server::Private { struct Server::Private {
Private(Store& store, const Server::Options& opts, Output output) Private(Store& store, const Server::Options& opts, Output output)
: store_{store}, options_{opts}, output_{output}, : store_{store},
store_worker_{store.store_worker()},
options_{opts}, output_{output},
command_handler_{make_command_map()}, command_handler_{make_command_map()},
keep_going_{true}, keep_going_{true},
tmp_dir_{unwrap(make_temp_dir())} tmp_dir_{unwrap(make_temp_dir())} {
{}
// tell the store-worker that we (this class) can handle
// sexp strings.
store_worker_.install_sexp_handler(
[this](const std::string& sexp) {
this->invoke(sexp);
});
}
~Private() { ~Private() {
indexer().stop(); indexer().stop();
@ -148,13 +161,10 @@ struct Server::Private {
// acccessors // acccessors
Store& store() { return store_; } Store& store() { return store_; }
const Store& store() const { return store_; } const Store& store() const { return store_; }
StoreWorker& store_worker() { return store_worker_; }
Indexer& indexer() { return store().indexer(); } Indexer& indexer() { return store().indexer(); }
//CommandMap& command_map() const { return command_map_; } //CommandMap& command_map() const { return command_map_; }
//
// invoke
//
bool invoke(const std::string& expr) noexcept;
// //
// output // output
@ -186,7 +196,19 @@ struct Server::Private {
void remove_handler(const Command& cmd); void remove_handler(const Command& cmd);
void view_handler(const Command& cmd); void view_handler(const Command& cmd);
bool keep_going() const { return keep_going_; }
void set_keep_going(bool going) { keep_going_ = going; }
// make main thread wait until done with the command.
std::mutex done_lock_;
std::condition_variable done_cond_;
private: private:
//
// invoke
//
bool invoke(const std::string& expr) noexcept;
void move_docid(Store::Id docid, Option<std::string> flagstr, void move_docid(Store::Id docid, Option<std::string> flagstr,
bool new_name, bool no_view); bool new_name, bool no_view);
@ -209,6 +231,7 @@ private:
std::ofstream make_temp_file_stream(std::string& fname) const; std::ofstream make_temp_file_stream(std::string& fname) const;
Store& store_; Store& store_;
StoreWorker& store_worker_;
Server::Options options_; Server::Options options_;
Server::Output output_; Server::Output output_;
const CommandHandler command_handler_; const CommandHandler command_handler_;
@ -480,6 +503,10 @@ Server::Private::invoke(const std::string& expr) noexcept
keep_going_ = false; keep_going_ = false;
} }
// tell main thread we're done with the command.
std::lock_guard l{done_lock_};
done_cond_.notify_one();
return keep_going_; return keep_going_;
} }
@ -690,9 +717,6 @@ Server::Private::find_handler(const Command& cmd)
StopWatch sw{mu_format("{} (indexing: {})", __func__, StopWatch sw{mu_format("{} (indexing: {})", __func__,
indexer().is_running() ? "yes" : "no")}; indexer().is_running() ? "yes" : "no")};
// we need to _lock_ the store while querying (which likely consists of
// multiple actual queries) + grabbing the results.
std::lock_guard l{store_.lock()};
auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)}; auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)};
if (!qres) if (!qres)
throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what()); throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what());
@ -1081,7 +1105,27 @@ Server::~Server() = default;
bool bool
Server::invoke(const std::string& expr) noexcept Server::invoke(const std::string& expr) noexcept
{ {
return priv_->invoke(expr); /* a _little_ hacky; handle _quit_ directly to properly
* shut down the server */
if (expr == "(quit)") {
mu_debug("quitting");
priv_->set_keep_going(false);
return false;
}
/*
* feed the command to the queue; it'll get executed in the
* store-worker thread; however, sync on its completion
* so we get its keep_going() result
*
* as an added bonus, this ensures mu server shell doesn't require an
* extra user RET to get back the prompt
*/
std::unique_lock done_lock{priv_->done_lock_};
priv_->store_worker().push(StoreWorker::SexpCommand{expr});
priv_->done_cond_.wait(done_lock);
return priv_->keep_going();
} }
/* LCOV_EXCL_STOP */ /* LCOV_EXCL_STOP */

View File

@ -79,11 +79,12 @@ cookie(size_t n)
::printf(COOKIE_PRE "%x" COOKIE_POST, num); ::printf(COOKIE_PRE "%x" COOKIE_POST, num);
} }
static void static void
output_stdout(const std::string& str, Server::OutputFlags flags) output_stdout(const std::string& str, Server::OutputFlags flags)
{ {
// Note: with the StoreWorker, we _always_ need to flush
flags |= Server::OutputFlags::Flush;
cookie(str.size() + 1); cookie(str.size() + 1);
if (G_UNLIKELY(::puts(str.c_str()) < 0)) { if (G_UNLIKELY(::puts(str.c_str()) < 0)) {
mu_critical("failed to write output '{}'", str); mu_critical("failed to write output '{}'", str);