Merge commit 'store-worker'
This implements some rework of the way mu writes to the database; the update ensures that everything goes through the "store-worker" which maintains a queue, and does all the writes in a single (worker) thread; this attempt to fix come cases of database corruption we say; or at least make those a lot less likely
This commit is contained in:
@ -42,6 +42,7 @@ lib_mu=static_library(
|
|||||||
# misc
|
# misc
|
||||||
'mu-maildir.cc',
|
'mu-maildir.cc',
|
||||||
'mu-script.cc',
|
'mu-script.cc',
|
||||||
|
'mu-store-worker.cc'
|
||||||
],
|
],
|
||||||
dependencies: [
|
dependencies: [
|
||||||
glib_dep,
|
glib_dep,
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
** Copyright (C) 2020-2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
** Copyright (C) 2020-2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
||||||
**
|
**
|
||||||
** This program is free software; you can redistribute it and/or modify it
|
** This program is free software; you can redistribute it and/or modify it
|
||||||
** under the terms of the GNU General Public License as published by the
|
** under the terms of the GNU General Public License as published by the
|
||||||
@ -29,6 +29,7 @@
|
|||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <string_view>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
@ -42,49 +43,28 @@ using namespace std::chrono_literals;
|
|||||||
|
|
||||||
using namespace Mu;
|
using namespace Mu;
|
||||||
|
|
||||||
struct IndexState {
|
// states
|
||||||
enum State { Idle,
|
|
||||||
Scanning,
|
|
||||||
Finishing,
|
|
||||||
Cleaning
|
|
||||||
};
|
|
||||||
static const char* name(State s) {
|
|
||||||
switch (s) {
|
|
||||||
case Idle:
|
|
||||||
return "idle";
|
|
||||||
case Scanning:
|
|
||||||
return "scanning";
|
|
||||||
case Finishing:
|
|
||||||
return "finishing";
|
|
||||||
case Cleaning:
|
|
||||||
return "cleaning";
|
|
||||||
default:
|
|
||||||
return "<error>";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool operator==(State rhs) const {
|
enum struct State { Idle, Scanning, Draining };
|
||||||
return state_.load() == rhs;
|
constexpr std::string_view
|
||||||
|
format_as(State s)
|
||||||
|
{
|
||||||
|
switch (s) {
|
||||||
|
case State::Idle: return "idle";
|
||||||
|
case State::Scanning: return "scanning";
|
||||||
|
case State::Draining: return "draining";
|
||||||
|
default: return "<error>";
|
||||||
}
|
}
|
||||||
bool operator!=(State rhs) const {
|
}
|
||||||
return state_.load() != rhs;
|
|
||||||
}
|
|
||||||
void change_to(State new_state) {
|
|
||||||
mu_debug("changing indexer state {}->{}", name((State)state_),
|
|
||||||
name((State)new_state));
|
|
||||||
state_.store(new_state);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::atomic<State> state_{Idle};
|
|
||||||
};
|
|
||||||
|
|
||||||
struct Indexer::Private {
|
struct Indexer::Private {
|
||||||
Private(Mu::Store& store)
|
Private(Mu::Store& store)
|
||||||
: store_{store}, scanner_{store_.root_maildir(),
|
: store_{store},
|
||||||
|
store_worker_{store.store_worker()},
|
||||||
|
scanner_{store_.root_maildir(),
|
||||||
[this](auto&& path,
|
[this](auto&& path,
|
||||||
auto&& statbuf, auto&& info) {
|
auto&& statbuf, auto&& info) {
|
||||||
return handler(path, statbuf, info);
|
return scan_handler(path, statbuf, info);
|
||||||
}},
|
}},
|
||||||
max_message_size_{store_.config().get<Mu::Config::Id::MaxMessageSize>()},
|
max_message_size_{store_.config().get<Mu::Config::Id::MaxMessageSize>()},
|
||||||
was_empty_{store.empty()} {
|
was_empty_{store.empty()} {
|
||||||
@ -97,27 +77,55 @@ struct Indexer::Private {
|
|||||||
store.config().get<Mu::Config::Id::SupportNgrams>());
|
store.config().get<Mu::Config::Id::SupportNgrams>());
|
||||||
}
|
}
|
||||||
|
|
||||||
~Private() {
|
~Private() { force_cleanup(); }
|
||||||
stop();
|
|
||||||
|
void force_cleanup() {
|
||||||
|
switch_state(State::Idle);
|
||||||
|
scanner_.stop();
|
||||||
|
if (scanner_worker_.joinable())
|
||||||
|
scanner_worker_.join();
|
||||||
|
msg_paths_.clear();
|
||||||
|
for (auto&& w : workers_)
|
||||||
|
if (w.joinable())
|
||||||
|
w.join();
|
||||||
|
store_worker_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool dir_predicate(const std::string& path, const struct dirent* dirent) const;
|
bool dir_predicate(const std::string& path, const struct dirent* dirent) const;
|
||||||
bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype);
|
bool scan_handler(const std::string& fullpath, struct stat* statbuf,
|
||||||
|
Scanner::HandleType htype);
|
||||||
|
|
||||||
void maybe_start_worker();
|
void maybe_start_worker();
|
||||||
void item_worker();
|
void item_worker();
|
||||||
void scan_worker();
|
void scan_worker();
|
||||||
|
|
||||||
bool add_message(const std::string& path);
|
|
||||||
|
|
||||||
bool cleanup();
|
|
||||||
bool start(const Indexer::Config& conf, bool block);
|
bool start(const Indexer::Config& conf, bool block);
|
||||||
bool stop();
|
bool stop();
|
||||||
|
|
||||||
bool is_running() const { return state_ != IndexState::Idle; }
|
bool is_running() const { return state_ != State::Idle; }
|
||||||
|
void switch_state(State new_state) {
|
||||||
|
mu_debug("changing indexer state {}->{}", state_, new_state);
|
||||||
|
state_ = new_state;
|
||||||
|
}
|
||||||
|
|
||||||
|
// pace a bit so scan_items queue doesn't get too big.
|
||||||
|
void pace_scan_worker() {
|
||||||
|
while (msg_paths_.size() > 8 * max_workers_) {
|
||||||
|
std::this_thread::sleep_for(25ms);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// pace a bit so store-worker queue doesn't get too big.
|
||||||
|
void pace_store_worker() {
|
||||||
|
while (store_worker_.size() > 8) {
|
||||||
|
std::this_thread::sleep_for(25ms);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Indexer::Config conf_;
|
Indexer::Config conf_;
|
||||||
Store& store_;
|
const Store& store_;
|
||||||
|
StoreWorker& store_worker_;
|
||||||
Scanner scanner_;
|
Scanner scanner_;
|
||||||
const size_t max_message_size_;
|
const size_t max_message_size_;
|
||||||
|
|
||||||
@ -126,27 +134,18 @@ struct Indexer::Private {
|
|||||||
std::vector<std::thread> workers_;
|
std::vector<std::thread> workers_;
|
||||||
std::thread scanner_worker_;
|
std::thread scanner_worker_;
|
||||||
|
|
||||||
struct WorkItem {
|
AsyncQueue<std::string> msg_paths_;
|
||||||
std::string full_path;
|
|
||||||
enum Type {
|
|
||||||
Dir,
|
|
||||||
File
|
|
||||||
};
|
|
||||||
Type type;
|
|
||||||
};
|
|
||||||
|
|
||||||
AsyncQueue<WorkItem> todos_;
|
|
||||||
|
|
||||||
Progress progress_{};
|
Progress progress_{};
|
||||||
IndexState state_{};
|
std::atomic<State> state_{State::Idle};
|
||||||
std::mutex lock_, w_lock_;
|
std::mutex lock_, w_lock_;
|
||||||
std::atomic<time_t> completed_{};
|
std::atomic<time_t> completed_{};
|
||||||
bool was_empty_{};
|
bool was_empty_{};
|
||||||
};
|
};
|
||||||
|
|
||||||
bool
|
bool
|
||||||
Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
|
Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf,
|
||||||
Scanner::HandleType htype)
|
Scanner::HandleType htype)
|
||||||
{
|
{
|
||||||
switch (htype) {
|
switch (htype) {
|
||||||
case Scanner::HandleType::EnterDir:
|
case Scanner::HandleType::EnterDir:
|
||||||
@ -160,7 +159,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// in lazy-mode, we ignore this dir if its dirstamp suggest it
|
// in lazy-mode, we ignore this dir if its dirstamp suggests it
|
||||||
// is up-to-date (this is _not_ always true; hence we call it
|
// is up-to-date (this is _not_ always true; hence we call it
|
||||||
// lazy-mode); only for actual message dirs, since the dir
|
// lazy-mode); only for actual message dirs, since the dir
|
||||||
// tstamps may not bubble up.U
|
// tstamps may not bubble up.U
|
||||||
@ -193,14 +192,16 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
case Scanner::HandleType::LeaveDir: {
|
case Scanner::HandleType::LeaveDir: {
|
||||||
todos_.push({fullpath, WorkItem::Type::Dir});
|
// directly push to store worker, bypass scan-items queue
|
||||||
|
pace_store_worker();
|
||||||
|
store_worker_.push(StoreWorker::SetDirStamp{fullpath, ::time({})});
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
case Scanner::HandleType::File: {
|
case Scanner::HandleType::File: {
|
||||||
++progress_.checked;
|
++progress_.checked;
|
||||||
|
|
||||||
if ((size_t)statbuf->st_size > max_message_size_) {
|
if (static_cast<size_t>(statbuf->st_size) > max_message_size_) {
|
||||||
mu_debug("skip {} (too big: {} bytes)", fullpath, statbuf->st_size);
|
mu_debug("skip {} (too big: {} bytes)", fullpath, statbuf->st_size);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -210,9 +211,10 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
|
|||||||
if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath))
|
if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
// push the remaining messages to our "todo" queue for
|
// push the remaining messages to our "scan-items" queue for
|
||||||
// (re)parsing and adding/updating to the database.
|
// (re)parsing and adding/updating to the database.
|
||||||
todos_.push({fullpath, WorkItem::Type::File});
|
pace_scan_worker();
|
||||||
|
msg_paths_.push(std::string{fullpath}); // move?
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -226,160 +228,94 @@ Indexer::Private::maybe_start_worker()
|
|||||||
{
|
{
|
||||||
std::lock_guard lock{w_lock_};
|
std::lock_guard lock{w_lock_};
|
||||||
|
|
||||||
if (todos_.size() > workers_.size() && workers_.size() < max_workers_) {
|
if (msg_paths_.size() > workers_.size() && workers_.size() < max_workers_) {
|
||||||
workers_.emplace_back(std::thread([this] { item_worker(); }));
|
workers_.emplace_back(std::thread([this] { item_worker(); }));
|
||||||
mu_debug("added worker {}", workers_.size());
|
mu_debug("added worker {}", workers_.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
|
||||||
Indexer::Private::add_message(const std::string& path)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Having the lock here makes things a _lot_ slower.
|
|
||||||
*
|
|
||||||
* The reason for having the lock is some helgrind warnings;
|
|
||||||
* but it believed those are _false alarms_
|
|
||||||
* https://gitlab.gnome.org/GNOME/glib/-/issues/2662
|
|
||||||
*/
|
|
||||||
//std::unique_lock lock{w_lock_};
|
|
||||||
auto msg{Message::make_from_path(path, store_.message_options())};
|
|
||||||
if (!msg) {
|
|
||||||
mu_warning("failed to create message from {}: {}", path, msg.error().what());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// if the store was empty, we know that the message is completely new
|
|
||||||
// and can use the fast path (Xapians 'add_document' rather than
|
|
||||||
// 'replace_document)
|
|
||||||
auto res = store_.consume_message(std::move(msg.value()), was_empty_);
|
|
||||||
if (!res) {
|
|
||||||
mu_warning("failed to add message @ {}: {}", path, res.error().what());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
Indexer::Private::item_worker()
|
Indexer::Private::item_worker()
|
||||||
{
|
{
|
||||||
WorkItem item;
|
|
||||||
|
|
||||||
mu_debug("started worker");
|
mu_debug("started worker");
|
||||||
|
|
||||||
while (state_ == IndexState::Scanning) {
|
while (state_ == State::Scanning ||
|
||||||
if (!todos_.pop(item, 250ms))
|
(state_ == State::Draining && !msg_paths_.empty())) {
|
||||||
|
|
||||||
|
std::string msgpath;
|
||||||
|
if (!msg_paths_.pop(msgpath, 250ms))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
auto msg{Message::make_from_path(msgpath, store_.message_options())};
|
||||||
|
if (!msg) {
|
||||||
|
mu_warning("failed to create message from {}: {}",
|
||||||
|
msgpath, msg.error().what());
|
||||||
continue;
|
continue;
|
||||||
try {
|
|
||||||
switch (item.type) {
|
|
||||||
case WorkItem::Type::File: {
|
|
||||||
if (G_LIKELY(add_message(item.full_path)))
|
|
||||||
++progress_.updated;
|
|
||||||
} break;
|
|
||||||
case WorkItem::Type::Dir:
|
|
||||||
store_.set_dirstamp(item.full_path, ::time(NULL));
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
g_warn_if_reached();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (const Mu::Error& er) {
|
|
||||||
mu_warning("error adding message @ {}: {}", item.full_path, er.what());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pace_store_worker(); /* slow down if store-worker q gets too big */
|
||||||
|
|
||||||
|
// if the store was empty, we know that the message is
|
||||||
|
// completely new and can use the fast path (Xapians
|
||||||
|
// 'add_document' rather than 'replace_document)
|
||||||
|
if (was_empty_)
|
||||||
|
store_worker_.push(StoreWorker::AddMessage{std::move(*msg)});
|
||||||
|
else
|
||||||
|
store_worker_.push(StoreWorker::UpdateMessage{std::move(*msg)});
|
||||||
|
++progress_.updated;
|
||||||
|
|
||||||
maybe_start_worker();
|
maybe_start_worker();
|
||||||
std::this_thread::yield();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
|
||||||
Indexer::Private::cleanup()
|
|
||||||
{
|
|
||||||
mu_debug("starting cleanup");
|
|
||||||
|
|
||||||
size_t n{};
|
|
||||||
std::vector<Store::Id> orphans; // store messages without files.
|
|
||||||
store_.for_each_message_path([&](Store::Id id, const std::string& path) {
|
|
||||||
++n;
|
|
||||||
if (::access(path.c_str(), R_OK) != 0) {
|
|
||||||
mu_debug("cannot read {} (id={}); queuing for removal from store",
|
|
||||||
path, id);
|
|
||||||
orphans.emplace_back(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
return state_ == IndexState::Cleaning;
|
|
||||||
});
|
|
||||||
|
|
||||||
if (orphans.empty())
|
|
||||||
mu_debug("nothing to clean up");
|
|
||||||
else {
|
|
||||||
mu_debug("removing {} stale message(s) from store", orphans.size());
|
|
||||||
store_.remove_messages(orphans);
|
|
||||||
progress_.removed += orphans.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
Indexer::Private::scan_worker()
|
Indexer::Private::scan_worker()
|
||||||
{
|
{
|
||||||
XapianDb::Transaction tx{store_.xapian_db()}; // RAII
|
|
||||||
|
|
||||||
progress_.reset();
|
|
||||||
if (conf_.scan) {
|
if (conf_.scan) {
|
||||||
mu_debug("starting scanner");
|
mu_debug("starting scanner");
|
||||||
if (!scanner_.start()) { // blocks.
|
if (!scanner_.start()) { // blocks.
|
||||||
mu_warning("failed to start scanner");
|
mu_warning("failed to start scanner");
|
||||||
state_.change_to(IndexState::Idle);
|
switch_state(State::Idle);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
mu_debug("scanner finished with {} file(s) in queue", todos_.size());
|
mu_debug("scanner finished with {} file(s) in queue", msg_paths_.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
// now there may still be messages in the work queue...
|
|
||||||
// finish those; this is a bit ugly; perhaps we should
|
|
||||||
// handle SIGTERM etc.
|
|
||||||
|
|
||||||
if (!todos_.empty()) {
|
|
||||||
const auto workers_size = std::invoke([this] {
|
|
||||||
std::lock_guard lock{w_lock_};
|
|
||||||
return workers_.size();
|
|
||||||
});
|
|
||||||
mu_debug("process {} remaining message(s) with {} worker(s)",
|
|
||||||
todos_.size(), workers_size);
|
|
||||||
while (!todos_.empty())
|
|
||||||
std::this_thread::sleep_for(100ms);
|
|
||||||
}
|
|
||||||
// and let the worker finish their work.
|
|
||||||
state_.change_to(IndexState::Finishing);
|
|
||||||
for (auto&& w : workers_)
|
|
||||||
if (w.joinable())
|
|
||||||
w.join();
|
|
||||||
|
|
||||||
if (conf_.cleanup) {
|
if (conf_.cleanup) {
|
||||||
mu_debug("starting cleanup");
|
mu_debug("starting cleanup with work-item(s) left: {}",
|
||||||
state_.change_to(IndexState::Cleaning);
|
store_worker_.size());
|
||||||
cleanup();
|
|
||||||
mu_debug("cleanup finished");
|
std::vector<Store::Id> orphans; // store messages without files.
|
||||||
|
store_.for_each_message_path([&](Store::Id id, const std::string& path) {
|
||||||
|
if (::access(path.c_str(), R_OK) != 0) {
|
||||||
|
mu_debug("orphan: cannot read {} (id={})", path, id);
|
||||||
|
orphans.emplace_back(id);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
progress_.removed = orphans.size();
|
||||||
|
if (!orphans.empty()) {
|
||||||
|
mu_info("removing {} orphan message(s)", orphans.size());
|
||||||
|
store_worker_.push(StoreWorker::RemoveMessages{std::move(orphans)});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
completed_ = ::time({});
|
completed_ = ::time({});
|
||||||
store_.config().set<Mu::Config::Id::LastIndex>(completed_);
|
store_worker_.push(StoreWorker::SetLastIndex{completed_});
|
||||||
state_.change_to(IndexState::Idle);
|
|
||||||
|
stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
Indexer::Private::start(const Indexer::Config& conf, bool block)
|
Indexer::Private::start(const Indexer::Config& conf, bool block)
|
||||||
{
|
{
|
||||||
stop();
|
force_cleanup();
|
||||||
|
|
||||||
conf_ = conf;
|
conf_ = conf;
|
||||||
if (conf_.max_threads == 0) {
|
if (conf_.max_threads == 0) {
|
||||||
/* benchmarking suggests that ~4 threads is the fastest (the
|
/* benchmarking suggests that ~4 threads is the fastest (the
|
||||||
* real bottleneck is the database, so adding more threads just
|
* real bottleneck is the database, so adding more threads just
|
||||||
* slows things down)
|
* slows things down) */
|
||||||
*/
|
|
||||||
max_workers_ = std::min(4U, std::thread::hardware_concurrency());
|
max_workers_ = std::min(4U, std::thread::hardware_concurrency());
|
||||||
} else
|
} else
|
||||||
max_workers_ = conf.max_threads;
|
max_workers_ = conf.max_threads;
|
||||||
@ -393,11 +329,14 @@ Indexer::Private::start(const Indexer::Config& conf, bool block)
|
|||||||
mu_debug("indexing: {}; clean-up: {}", conf_.scan ? "yes" : "no",
|
mu_debug("indexing: {}; clean-up: {}", conf_.scan ? "yes" : "no",
|
||||||
conf_.cleanup ? "yes" : "no");
|
conf_.cleanup ? "yes" : "no");
|
||||||
|
|
||||||
state_.change_to(IndexState::Scanning);
|
progress_.reset();
|
||||||
|
switch_state(State::Scanning);
|
||||||
/* kick off the first worker, which will spawn more if needed. */
|
/* kick off the first worker, which will spawn more if needed. */
|
||||||
workers_.emplace_back(std::thread([this] { item_worker(); }));
|
workers_.emplace_back(std::thread([this]{item_worker();}));
|
||||||
/* kick the disk-scanner thread */
|
/* kick the file-system-scanner thread */
|
||||||
scanner_worker_ = std::thread([this] { scan_worker(); });
|
if (scanner_worker_.joinable())
|
||||||
|
scanner_worker_.join(); // kill old one
|
||||||
|
scanner_worker_ = std::thread([this]{scan_worker();});
|
||||||
|
|
||||||
mu_debug("started indexer in {}-mode", block ? "blocking" : "non-blocking");
|
mu_debug("started indexer in {}-mode", block ? "blocking" : "non-blocking");
|
||||||
if (block) {
|
if (block) {
|
||||||
@ -413,18 +352,33 @@ Indexer::Private::start(const Indexer::Config& conf, bool block)
|
|||||||
bool
|
bool
|
||||||
Indexer::Private::stop()
|
Indexer::Private::stop()
|
||||||
{
|
{
|
||||||
|
switch_state(State::Draining);
|
||||||
|
|
||||||
scanner_.stop();
|
scanner_.stop();
|
||||||
|
// cannot join scanner_worker_ here since it may be our
|
||||||
|
// current thread.
|
||||||
|
|
||||||
todos_.clear();
|
// wait for completion.
|
||||||
if (scanner_worker_.joinable())
|
while (!msg_paths_.empty()) {
|
||||||
scanner_worker_.join();
|
mu_debug("scan-items left: {}", msg_paths_.size());
|
||||||
|
std::this_thread::sleep_for(250ms);
|
||||||
|
}
|
||||||
|
|
||||||
state_.change_to(IndexState::Idle);
|
|
||||||
for (auto&& w : workers_)
|
for (auto&& w : workers_)
|
||||||
if (w.joinable())
|
if (w.joinable())
|
||||||
w.join();
|
w.join();
|
||||||
workers_.clear();
|
workers_.clear();
|
||||||
|
|
||||||
|
store_worker_.push(StoreWorker::EndTransaction());
|
||||||
|
|
||||||
|
// wait for completion.
|
||||||
|
while (!store_worker_.empty()) {
|
||||||
|
mu_debug("work-items left: {}", store_worker_.size());
|
||||||
|
std::this_thread::sleep_for(250ms);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch_state(State::Idle);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -471,7 +425,7 @@ Indexer::is_running() const
|
|||||||
const Indexer::Progress&
|
const Indexer::Progress&
|
||||||
Indexer::progress() const
|
Indexer::progress() const
|
||||||
{
|
{
|
||||||
priv_->progress_.running = priv_->state_ == IndexState::Idle ? false : true;
|
priv_->progress_.running = priv_->state_ == State::Idle ? false : true;
|
||||||
|
|
||||||
return priv_->progress_;
|
return priv_->progress_;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
** Copyright (C) 2020-2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
** Copyright (C) 2020-2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
||||||
**
|
**
|
||||||
** This program is free software; you can redistribute it and/or modify it
|
** This program is free software; you can redistribute it and/or modify it
|
||||||
** under the terms of the GNU General Public License as published by the
|
** under the terms of the GNU General Public License as published by the
|
||||||
@ -29,9 +29,12 @@
|
|||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
#include <variant>
|
#include <variant>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <glib.h>
|
#include <glib.h>
|
||||||
#include <glib/gprintf.h>
|
#include <glib/gprintf.h>
|
||||||
@ -116,7 +119,7 @@ struct OutputStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::string fname_;
|
std::string fname_;
|
||||||
using OutType = std::variant<std::ofstream, std::ostringstream>;
|
using OutType = std::variant<std::ofstream, std::ostringstream>;
|
||||||
OutType out_;
|
OutType out_;
|
||||||
};
|
};
|
||||||
@ -126,11 +129,21 @@ private:
|
|||||||
/// @brief object to manage the server-context for all commands.
|
/// @brief object to manage the server-context for all commands.
|
||||||
struct Server::Private {
|
struct Server::Private {
|
||||||
Private(Store& store, const Server::Options& opts, Output output)
|
Private(Store& store, const Server::Options& opts, Output output)
|
||||||
: store_{store}, options_{opts}, output_{output},
|
: store_{store},
|
||||||
|
store_worker_{store.store_worker()},
|
||||||
|
options_{opts}, output_{output},
|
||||||
command_handler_{make_command_map()},
|
command_handler_{make_command_map()},
|
||||||
keep_going_{true},
|
keep_going_{true},
|
||||||
tmp_dir_{unwrap(make_temp_dir())}
|
tmp_dir_{unwrap(make_temp_dir())} {
|
||||||
{}
|
|
||||||
|
// tell the store-worker that we (this class) can handle
|
||||||
|
// sexp strings.
|
||||||
|
store_worker_.install_sexp_handler(
|
||||||
|
[this](const std::string& sexp) {
|
||||||
|
this->invoke(sexp);
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
~Private() {
|
~Private() {
|
||||||
indexer().stop();
|
indexer().stop();
|
||||||
@ -148,13 +161,10 @@ struct Server::Private {
|
|||||||
// acccessors
|
// acccessors
|
||||||
Store& store() { return store_; }
|
Store& store() { return store_; }
|
||||||
const Store& store() const { return store_; }
|
const Store& store() const { return store_; }
|
||||||
|
StoreWorker& store_worker() { return store_worker_; }
|
||||||
Indexer& indexer() { return store().indexer(); }
|
Indexer& indexer() { return store().indexer(); }
|
||||||
//CommandMap& command_map() const { return command_map_; }
|
//CommandMap& command_map() const { return command_map_; }
|
||||||
|
|
||||||
//
|
|
||||||
// invoke
|
|
||||||
//
|
|
||||||
bool invoke(const std::string& expr) noexcept;
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// output
|
// output
|
||||||
@ -186,7 +196,19 @@ struct Server::Private {
|
|||||||
void remove_handler(const Command& cmd);
|
void remove_handler(const Command& cmd);
|
||||||
void view_handler(const Command& cmd);
|
void view_handler(const Command& cmd);
|
||||||
|
|
||||||
|
bool keep_going() const { return keep_going_; }
|
||||||
|
void set_keep_going(bool going) { keep_going_ = going; }
|
||||||
|
|
||||||
|
// make main thread wait until done with the command.
|
||||||
|
std::mutex done_lock_;
|
||||||
|
std::condition_variable done_cond_;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
//
|
||||||
|
// invoke
|
||||||
|
//
|
||||||
|
bool invoke(const std::string& expr) noexcept;
|
||||||
|
|
||||||
void move_docid(Store::Id docid, Option<std::string> flagstr,
|
void move_docid(Store::Id docid, Option<std::string> flagstr,
|
||||||
bool new_name, bool no_view);
|
bool new_name, bool no_view);
|
||||||
|
|
||||||
@ -209,6 +231,7 @@ private:
|
|||||||
std::ofstream make_temp_file_stream(std::string& fname) const;
|
std::ofstream make_temp_file_stream(std::string& fname) const;
|
||||||
|
|
||||||
Store& store_;
|
Store& store_;
|
||||||
|
StoreWorker& store_worker_;
|
||||||
Server::Options options_;
|
Server::Options options_;
|
||||||
Server::Output output_;
|
Server::Output output_;
|
||||||
const CommandHandler command_handler_;
|
const CommandHandler command_handler_;
|
||||||
@ -480,6 +503,10 @@ Server::Private::invoke(const std::string& expr) noexcept
|
|||||||
keep_going_ = false;
|
keep_going_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tell main thread we're done with the command.
|
||||||
|
std::lock_guard l{done_lock_};
|
||||||
|
done_cond_.notify_one();
|
||||||
|
|
||||||
return keep_going_;
|
return keep_going_;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -559,7 +586,9 @@ Server::Private::data_handler(const Command& cmd)
|
|||||||
{
|
{
|
||||||
const auto request_type{unwrap(cmd.symbol_arg(":kind"))};
|
const auto request_type{unwrap(cmd.symbol_arg(":kind"))};
|
||||||
|
|
||||||
if (request_type == "maildirs") {
|
if (request_type == "doccount") {
|
||||||
|
output(mu_format("(:doccount {})", store().size()));
|
||||||
|
} else if (request_type == "maildirs") {
|
||||||
auto&& out{make_output_stream()};
|
auto&& out{make_output_stream()};
|
||||||
mu_print(out, "(");
|
mu_print(out, "(");
|
||||||
for (auto&& mdir: store().maildirs())
|
for (auto&& mdir: store().maildirs())
|
||||||
@ -690,9 +719,6 @@ Server::Private::find_handler(const Command& cmd)
|
|||||||
StopWatch sw{mu_format("{} (indexing: {})", __func__,
|
StopWatch sw{mu_format("{} (indexing: {})", __func__,
|
||||||
indexer().is_running() ? "yes" : "no")};
|
indexer().is_running() ? "yes" : "no")};
|
||||||
|
|
||||||
// we need to _lock_ the store while querying (which likely consists of
|
|
||||||
// multiple actual queries) + grabbing the results.
|
|
||||||
std::lock_guard l{store_.lock()};
|
|
||||||
auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)};
|
auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)};
|
||||||
if (!qres)
|
if (!qres)
|
||||||
throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what());
|
throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what());
|
||||||
@ -1081,7 +1107,27 @@ Server::~Server() = default;
|
|||||||
bool
|
bool
|
||||||
Server::invoke(const std::string& expr) noexcept
|
Server::invoke(const std::string& expr) noexcept
|
||||||
{
|
{
|
||||||
return priv_->invoke(expr);
|
/* a _little_ hacky; handle _quit_ directly to properly
|
||||||
|
* shut down the server */
|
||||||
|
if (expr == "(quit)") {
|
||||||
|
mu_debug("quitting");
|
||||||
|
priv_->set_keep_going(false);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* feed the command to the queue; it'll get executed in the
|
||||||
|
* store-worker thread; however, sync on its completion
|
||||||
|
* so we get its keep_going() result
|
||||||
|
*
|
||||||
|
* as an added bonus, this ensures mu server shell doesn't require an
|
||||||
|
* extra user RET to get back the prompt
|
||||||
|
*/
|
||||||
|
std::unique_lock done_lock{priv_->done_lock_};
|
||||||
|
priv_->store_worker().push(StoreWorker::SexpCommand{expr});
|
||||||
|
priv_->done_cond_.wait(done_lock);
|
||||||
|
|
||||||
|
return priv_->keep_going();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* LCOV_EXCL_STOP */
|
/* LCOV_EXCL_STOP */
|
||||||
|
|||||||
68
lib/mu-store-worker.cc
Normal file
68
lib/mu-store-worker.cc
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
/*
|
||||||
|
** Copyright (C) 2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
||||||
|
**
|
||||||
|
** This program is free software; you can redistribute it and/or modify it
|
||||||
|
** under the terms of the GNU General Public License as published by the
|
||||||
|
** Free Software Foundation; either version 3, or (at your option) any
|
||||||
|
** later version.
|
||||||
|
**
|
||||||
|
** This program is distributed in the hope that it will be useful,
|
||||||
|
** but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
** GNU General Public License for more details.
|
||||||
|
**
|
||||||
|
** You should have received a copy of the GNU General Public License
|
||||||
|
** along with this program; if not, write to the Free Software Foundation,
|
||||||
|
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
**
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "mu-store-worker.hh"
|
||||||
|
#include "mu-store.hh"
|
||||||
|
#include "utils/mu-utils.hh"
|
||||||
|
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
|
using namespace Mu;
|
||||||
|
|
||||||
|
// helper constant for the visitor
|
||||||
|
template<class> inline constexpr bool always_false_v = false;
|
||||||
|
|
||||||
|
void
|
||||||
|
StoreWorker::run() {
|
||||||
|
|
||||||
|
running_ = true;
|
||||||
|
|
||||||
|
while (running_) {
|
||||||
|
WorkItem workitem;
|
||||||
|
|
||||||
|
if (!q_.pop(workitem))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
std::visit([&](auto&& item) {
|
||||||
|
using T = std::decay_t<decltype(item)>;
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<T, SexpCommand>) {
|
||||||
|
if (!sexp_handler_)
|
||||||
|
mu_critical("no handler for sexp '{}'", item);
|
||||||
|
else
|
||||||
|
sexp_handler_(item);
|
||||||
|
} else if constexpr (std::is_same_v<T, SetDirStamp>) {
|
||||||
|
store_.set_dirstamp(item.path, item.tstamp);
|
||||||
|
} else if constexpr (std::is_same_v<T, SetLastIndex>) {
|
||||||
|
store_.config().set<Mu::Config::Id::LastIndex>(item.tstamp);
|
||||||
|
} else if constexpr (std::is_same_v<T, StartTransaction>) {
|
||||||
|
store_.xapian_db().request_transaction();
|
||||||
|
} else if constexpr (std::is_same_v<T, EndTransaction>) {
|
||||||
|
store_.xapian_db().request_commit(true);
|
||||||
|
} else if constexpr (std::is_same_v<T, RemoveMessages>) {
|
||||||
|
store_.remove_messages(item);
|
||||||
|
} else if constexpr (std::is_same_v<T, AddMessage>) {
|
||||||
|
store_.consume_message(std::move(item.msg), true/*new*/);
|
||||||
|
} else if constexpr (std::is_same_v<T, UpdateMessage>) {
|
||||||
|
store_.consume_message(std::move(item.msg), false/*maybe not new*/);
|
||||||
|
} else
|
||||||
|
static_assert(always_false_v<T>, "non-exhaustive visitor");
|
||||||
|
}, workitem);
|
||||||
|
}
|
||||||
|
}
|
||||||
169
lib/mu-store-worker.hh
Normal file
169
lib/mu-store-worker.hh
Normal file
@ -0,0 +1,169 @@
|
|||||||
|
/*
|
||||||
|
** Copyright (C) 2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
||||||
|
**
|
||||||
|
** This program is free software; you can redistribute it and/or modify it
|
||||||
|
** under the terms of the GNU General Public License as published by the
|
||||||
|
** Free Software Foundation; either version 3, or (at your option) any
|
||||||
|
** later version.
|
||||||
|
**
|
||||||
|
** This program is distributed in the hope that it will be useful,
|
||||||
|
** but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
** GNU General Public License for more details.
|
||||||
|
**
|
||||||
|
** You should have received a copy of the GNU General Public License
|
||||||
|
** along with this program; if not, write to the Free Software Foundation,
|
||||||
|
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
**
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The store worker maintains a worker thread and an async queue to which
|
||||||
|
* commands can be added from any thread; the worker thread that is the sole
|
||||||
|
* thread to talk to the store / Xapian (at least for writing).
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef MU_STORE_WORKER_HH
|
||||||
|
#define MU_STORE_WORKER_HH
|
||||||
|
|
||||||
|
#include <variant>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <atomic>
|
||||||
|
#include <vector>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
|
#include <message/mu-message.hh>
|
||||||
|
#include <utils/mu-async-queue.hh>
|
||||||
|
|
||||||
|
namespace Mu {
|
||||||
|
|
||||||
|
/**< Sum type for all commands */
|
||||||
|
|
||||||
|
class Store; /// fwd declaration
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Worker for sending requests to the Store
|
||||||
|
*
|
||||||
|
* I.e. to execute database commands in a single thread.
|
||||||
|
*/
|
||||||
|
class StoreWorker {
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* CTOR. This will create the store worker and start the worker thread.
|
||||||
|
*
|
||||||
|
* @param store a store
|
||||||
|
*/
|
||||||
|
StoreWorker(Store& store):
|
||||||
|
store_{store},
|
||||||
|
runner_ {std::thread([this]{run();})}
|
||||||
|
{}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DTOR. Destroy the store worker after joining the worker thread
|
||||||
|
*/
|
||||||
|
~StoreWorker() {
|
||||||
|
running_ = false;
|
||||||
|
if (runner_.joinable())
|
||||||
|
runner_.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The following types of work-item can be added to the queue:
|
||||||
|
*/
|
||||||
|
struct SetDirStamp {
|
||||||
|
std::string path; /**< full path to directory */
|
||||||
|
::time_t tstamp; /**< Timestamp for directory */
|
||||||
|
}; /**< Write a directory timestamp to the store */
|
||||||
|
|
||||||
|
struct SetLastIndex {
|
||||||
|
::time_t tstamp; /**< Timestamp */
|
||||||
|
}; /**< Write last indexing timestamp to the store */
|
||||||
|
|
||||||
|
struct StartTransaction{}; /**< Request transaction start
|
||||||
|
* (opportunistically) */
|
||||||
|
struct EndTransaction{}; /**< Request transaction end/commit
|
||||||
|
* (opportunistically) */
|
||||||
|
struct AddMessage {
|
||||||
|
Message msg; /**< Add a new message */
|
||||||
|
}; /**< Add a new message; this is faster version of UpdateMessage
|
||||||
|
* if we know the message does not exist yet. */
|
||||||
|
struct UpdateMessage {
|
||||||
|
Message msg; /**< Add or update a message */
|
||||||
|
}; /**< Add message or update if it already exists */
|
||||||
|
|
||||||
|
using RemoveMessages = std::vector<unsigned>;
|
||||||
|
/**< Remove all message with the given ids */
|
||||||
|
using SexpCommand = std::string; /**< A sexp-command (i.e., from mu4e);
|
||||||
|
* requires install_sexp_handler() */
|
||||||
|
|
||||||
|
using WorkItem = std::variant<SetDirStamp, SetLastIndex,
|
||||||
|
AddMessage, UpdateMessage,
|
||||||
|
StartTransaction, EndTransaction,
|
||||||
|
RemoveMessages, SexpCommand>;
|
||||||
|
/// Sumtype with all types of work-item
|
||||||
|
|
||||||
|
using QueueType = AsyncQueue<WorkItem>;
|
||||||
|
const QueueType& queue() const { return q_; }
|
||||||
|
QueueType& queue() { return q_; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push a work item to the que
|
||||||
|
*
|
||||||
|
* @param item
|
||||||
|
*/
|
||||||
|
void push(WorkItem&& item) {
|
||||||
|
q_.push(std::move(item));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current size of the work item queue
|
||||||
|
*
|
||||||
|
* @return the size
|
||||||
|
*/
|
||||||
|
size_t size() const {
|
||||||
|
return q_.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is the work item queue empty?
|
||||||
|
*
|
||||||
|
* @return true or false
|
||||||
|
*/
|
||||||
|
bool empty() const {
|
||||||
|
return q_.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear the queue of any items
|
||||||
|
*/
|
||||||
|
void clear() {
|
||||||
|
q_.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
using SexpCommandHandler = std::function<void(const std::string& sexp)>;
|
||||||
|
/**< Prototype for a SexpCommand handler function */
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Install a handler for Sexp commands
|
||||||
|
*
|
||||||
|
* @param handler
|
||||||
|
*/
|
||||||
|
void install_sexp_handler(SexpCommandHandler&& handler) {
|
||||||
|
sexp_handler_ = handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void run();
|
||||||
|
size_t cleanup_orphans();
|
||||||
|
|
||||||
|
QueueType q_;
|
||||||
|
Store& store_;;
|
||||||
|
std::thread runner_;
|
||||||
|
std::atomic<bool> running_{};
|
||||||
|
SexpCommandHandler sexp_handler_{};
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace Mu
|
||||||
|
#endif /*MU_STORE_WORKER_HH*/
|
||||||
@ -131,6 +131,7 @@ struct Store::Private {
|
|||||||
XapianDb xapian_db_;
|
XapianDb xapian_db_;
|
||||||
Config config_;
|
Config config_;
|
||||||
ContactsCache contacts_cache_;
|
ContactsCache contacts_cache_;
|
||||||
|
std::unique_ptr<StoreWorker> store_worker_;
|
||||||
std::unique_ptr<Indexer> indexer_;
|
std::unique_ptr<Indexer> indexer_;
|
||||||
|
|
||||||
const std::string root_maildir_;
|
const std::string root_maildir_;
|
||||||
@ -252,6 +253,7 @@ Store::Store(Store&& other)
|
|||||||
{
|
{
|
||||||
priv_ = std::move(other.priv_);
|
priv_ = std::move(other.priv_);
|
||||||
priv_->indexer_.reset();
|
priv_->indexer_.reset();
|
||||||
|
priv_->store_worker_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
Store::~Store() = default;
|
Store::~Store() = default;
|
||||||
@ -316,6 +318,15 @@ Store::indexer()
|
|||||||
return *priv_->indexer_.get();
|
return *priv_->indexer_.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StoreWorker&
|
||||||
|
Store::store_worker()
|
||||||
|
{
|
||||||
|
if (!priv_->store_worker_)
|
||||||
|
priv_->store_worker_ = std::make_unique<StoreWorker>(*this);
|
||||||
|
|
||||||
|
return *priv_->store_worker_;
|
||||||
|
}
|
||||||
|
|
||||||
Result<Store::Id>
|
Result<Store::Id>
|
||||||
Store::add_message(Message& msg, bool is_new)
|
Store::add_message(Message& msg, bool is_new)
|
||||||
{
|
{
|
||||||
@ -382,10 +393,12 @@ Store::remove_messages(const std::vector<Store::Id>& ids)
|
|||||||
{
|
{
|
||||||
std::lock_guard guard{priv_->lock_};
|
std::lock_guard guard{priv_->lock_};
|
||||||
|
|
||||||
XapianDb::Transaction tx (xapian_db()); // RAII
|
xapian_db().request_transaction();
|
||||||
|
|
||||||
for (auto&& id : ids)
|
for (auto&& id : ids)
|
||||||
xapian_db().delete_document(id);
|
xapian_db().delete_document(id);
|
||||||
|
|
||||||
|
xapian_db().request_commit(true/*force*/);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
** Copyright (C) 2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
** Copyright (C) 2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
||||||
**
|
**
|
||||||
** This program is free software; you can redistribute it and/or modify it
|
** This program is free software; you can redistribute it and/or modify it
|
||||||
** under the terms of the GNU General Public License as published by the
|
** under the terms of the GNU General Public License as published by the
|
||||||
@ -31,6 +31,7 @@
|
|||||||
#include "mu-config.hh"
|
#include "mu-config.hh"
|
||||||
#include "mu-indexer.hh"
|
#include "mu-indexer.hh"
|
||||||
#include "mu-query-results.hh"
|
#include "mu-query-results.hh"
|
||||||
|
#include "mu-store-worker.hh"
|
||||||
|
|
||||||
#include <utils/mu-utils.hh>
|
#include <utils/mu-utils.hh>
|
||||||
#include <utils/mu-utils.hh>
|
#include <utils/mu-utils.hh>
|
||||||
@ -147,6 +148,13 @@ public:
|
|||||||
*/
|
*/
|
||||||
Indexer& indexer();
|
Indexer& indexer();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the store-worker instance
|
||||||
|
*
|
||||||
|
* @return the store-worker
|
||||||
|
*/
|
||||||
|
StoreWorker& store_worker();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run a query; see the `mu-query` man page for the syntax.
|
* Run a query; see the `mu-query` man page for the syntax.
|
||||||
*
|
*
|
||||||
|
|||||||
@ -41,6 +41,7 @@ XapianDb::wdb()
|
|||||||
{
|
{
|
||||||
if (read_only())
|
if (read_only())
|
||||||
throw std::runtime_error("database is read-only");
|
throw std::runtime_error("database is read-only");
|
||||||
|
|
||||||
return std::get<Xapian::WritableDatabase>(db_);
|
return std::get<Xapian::WritableDatabase>(db_);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,7 +107,7 @@ XapianDb::XapianDb(const std::string& db_path, Flavor flavor):
|
|||||||
if (flavor == Flavor::CreateOverwrite)
|
if (flavor == Flavor::CreateOverwrite)
|
||||||
set_timestamp(MetadataIface::created_key);
|
set_timestamp(MetadataIface::created_key);
|
||||||
|
|
||||||
mu_debug("created {} / {} (batch-size: {})", flavor, *this, batch_size_);
|
mu_debug("created {}", *this);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|||||||
@ -24,6 +24,7 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <thread>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
@ -186,10 +187,8 @@ private:
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Fairly thin wrapper around Xapian::Database and Xapian::WritableDatabase
|
* Fairly thin wrapper around Xapian::Database and Xapian::WritableDatabase
|
||||||
* with just the things we need + locking + exception handling
|
|
||||||
*/
|
*/
|
||||||
class XapianDb: public MetadataIface {
|
class XapianDb: public MetadataIface {
|
||||||
#define DB_LOCKED std::unique_lock lock__{lock_};
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Type of database to create.
|
* Type of database to create.
|
||||||
@ -202,7 +201,7 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* XapianDb CTOR. This may throw some Xapian exception.
|
* XapianDb CTOR. This may throw.
|
||||||
*
|
*
|
||||||
* @param db_path path to the database
|
* @param db_path path to the database
|
||||||
* @param flavor kind of database
|
* @param flavor kind of database
|
||||||
@ -213,13 +212,9 @@ public:
|
|||||||
* DTOR
|
* DTOR
|
||||||
*/
|
*/
|
||||||
~XapianDb() {
|
~XapianDb() {
|
||||||
if (tx_level_ > 0)
|
if (!read_only())
|
||||||
mu_warning("inconsistent transaction level ({})", tx_level_);
|
request_commit(true/*force*/);
|
||||||
if (tx_level_ > 0) {
|
mu_debug("closing db");
|
||||||
mu_debug("closing db after committing {} change(s)", changes_);
|
|
||||||
xapian_try([this]{ DB_LOCKED; wdb().commit_transaction(); });
|
|
||||||
} else
|
|
||||||
mu_debug("closing db");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -260,7 +255,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
size_t size() const noexcept {
|
size_t size() const noexcept {
|
||||||
return xapian_try([this]{
|
return xapian_try([this]{
|
||||||
DB_LOCKED; return db().get_doccount(); }, 0);
|
return db().get_doccount(); }, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -276,7 +271,7 @@ public:
|
|||||||
* @return an enquire object
|
* @return an enquire object
|
||||||
*/
|
*/
|
||||||
Xapian::Enquire enquire() const {
|
Xapian::Enquire enquire() const {
|
||||||
DB_LOCKED; return Xapian::Enquire(db());
|
return Xapian::Enquire(db());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -288,7 +283,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
Result<Xapian::Document> document(Xapian::docid id) const {
|
Result<Xapian::Document> document(Xapian::docid id) const {
|
||||||
return xapian_try_result([&]{
|
return xapian_try_result([&]{
|
||||||
DB_LOCKED; return Ok(db().get_document(id)); });
|
return Ok(db().get_document(id)); });
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -300,7 +295,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
std::string metadata(const std::string& key) const override {
|
std::string metadata(const std::string& key) const override {
|
||||||
return xapian_try([&]{
|
return xapian_try([&]{
|
||||||
DB_LOCKED; return db().get_metadata(key);}, "");
|
return db().get_metadata(key);}, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -310,8 +305,8 @@ public:
|
|||||||
* @param val new value for key
|
* @param val new value for key
|
||||||
*/
|
*/
|
||||||
void set_metadata(const std::string& key, const std::string& val) override {
|
void set_metadata(const std::string& key, const std::string& val) override {
|
||||||
xapian_try([&] { DB_LOCKED; wdb().set_metadata(key, val);
|
xapian_try([&] { wdb().set_metadata(key, val);
|
||||||
maybe_commit(); });
|
maybe_commit();});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -323,7 +318,6 @@ public:
|
|||||||
//using each_func = MetadataIface::each_func;
|
//using each_func = MetadataIface::each_func;
|
||||||
void for_each(MetadataIface::each_func&& func) const override {
|
void for_each(MetadataIface::each_func&& func) const override {
|
||||||
xapian_try([&]{
|
xapian_try([&]{
|
||||||
DB_LOCKED;
|
|
||||||
for (auto&& it = db().metadata_keys_begin();
|
for (auto&& it = db().metadata_keys_begin();
|
||||||
it != db().metadata_keys_end(); ++it)
|
it != db().metadata_keys_end(); ++it)
|
||||||
func(*it, db().get_metadata(*it));
|
func(*it, db().get_metadata(*it));
|
||||||
@ -339,7 +333,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
bool term_exists(const std::string& term) const {
|
bool term_exists(const std::string& term) const {
|
||||||
return xapian_try([&]{
|
return xapian_try([&]{
|
||||||
DB_LOCKED; return db().term_exists(term);}, false);
|
return db().term_exists(term);}, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -351,7 +345,6 @@ public:
|
|||||||
*/
|
*/
|
||||||
Result<Xapian::docid> add_document(const Xapian::Document& doc) {
|
Result<Xapian::docid> add_document(const Xapian::Document& doc) {
|
||||||
return xapian_try_result([&]{
|
return xapian_try_result([&]{
|
||||||
DB_LOCKED;
|
|
||||||
auto&& id{wdb().add_document(doc)};
|
auto&& id{wdb().add_document(doc)};
|
||||||
set_timestamp(MetadataIface::last_change_key);
|
set_timestamp(MetadataIface::last_change_key);
|
||||||
maybe_commit();
|
maybe_commit();
|
||||||
@ -369,9 +362,9 @@ public:
|
|||||||
* @return new docid or an error
|
* @return new docid or an error
|
||||||
*/
|
*/
|
||||||
Result<Xapian::docid>
|
Result<Xapian::docid>
|
||||||
replace_document(const std::string& term, const Xapian::Document& doc) {
|
replace_document(const std::string& term,
|
||||||
|
const Xapian::Document& doc) {
|
||||||
return xapian_try_result([&]{
|
return xapian_try_result([&]{
|
||||||
DB_LOCKED;
|
|
||||||
auto&& id{wdb().replace_document(term, doc)};
|
auto&& id{wdb().replace_document(term, doc)};
|
||||||
set_timestamp(MetadataIface::last_change_key);
|
set_timestamp(MetadataIface::last_change_key);
|
||||||
maybe_commit();
|
maybe_commit();
|
||||||
@ -379,9 +372,9 @@ public:
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
Result<Xapian::docid>
|
Result<Xapian::docid>
|
||||||
replace_document(Xapian::docid id, const Xapian::Document& doc) {
|
replace_document(Xapian::docid id,
|
||||||
|
const Xapian::Document& doc) {
|
||||||
return xapian_try_result([&]{
|
return xapian_try_result([&]{
|
||||||
DB_LOCKED;
|
|
||||||
wdb().replace_document(id, doc);
|
wdb().replace_document(id, doc);
|
||||||
set_timestamp(MetadataIface::last_change_key);
|
set_timestamp(MetadataIface::last_change_key);
|
||||||
maybe_commit();
|
maybe_commit();
|
||||||
@ -398,7 +391,6 @@ public:
|
|||||||
*/
|
*/
|
||||||
Result<void> delete_document(const std::string& term) {
|
Result<void> delete_document(const std::string& term) {
|
||||||
return xapian_try_result([&]{
|
return xapian_try_result([&]{
|
||||||
DB_LOCKED;
|
|
||||||
wdb().delete_document(term);
|
wdb().delete_document(term);
|
||||||
set_timestamp(MetadataIface::last_change_key);
|
set_timestamp(MetadataIface::last_change_key);
|
||||||
maybe_commit();
|
maybe_commit();
|
||||||
@ -407,7 +399,6 @@ public:
|
|||||||
}
|
}
|
||||||
Result<void> delete_document(Xapian::docid id) {
|
Result<void> delete_document(Xapian::docid id) {
|
||||||
return xapian_try_result([&]{
|
return xapian_try_result([&]{
|
||||||
DB_LOCKED;
|
|
||||||
wdb().delete_document(id);
|
wdb().delete_document(id);
|
||||||
set_timestamp(MetadataIface::last_change_key);
|
set_timestamp(MetadataIface::last_change_key);
|
||||||
maybe_commit();
|
maybe_commit();
|
||||||
@ -417,7 +408,6 @@ public:
|
|||||||
|
|
||||||
template<typename Func>
|
template<typename Func>
|
||||||
size_t all_terms(const std::string& prefix, Func&& func) const {
|
size_t all_terms(const std::string& prefix, Func&& func) const {
|
||||||
DB_LOCKED;
|
|
||||||
size_t n{};
|
size_t n{};
|
||||||
for (auto it = db().allterms_begin(prefix); it != db().allterms_end(prefix); ++it) {
|
for (auto it = db().allterms_begin(prefix); it != db().allterms_end(prefix); ++it) {
|
||||||
if (!func(*it))
|
if (!func(*it))
|
||||||
@ -427,110 +417,69 @@ public:
|
|||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* If the "transaction ref count" > 0 (with inc_transactions());, we run
|
|
||||||
* in "transaction mode". That means that the subsequent Xapian mutation
|
|
||||||
* are part of a transactions, which is flushed when the number of
|
|
||||||
* changes reaches the batch size, _or_ the transaction ref count is
|
|
||||||
* decreased to 0 (dec_transactions()). *
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increase the transaction level; needs to be balance by dec_transactions()
|
* Requests a transaction to be started; this is only
|
||||||
*/
|
* a request, which may not be granted.
|
||||||
void inc_transaction_level() {
|
|
||||||
xapian_try([this]{
|
|
||||||
DB_LOCKED;
|
|
||||||
if (tx_level_ == 0) {// need to start the Xapian transaction?
|
|
||||||
mu_debug("begin transaction");
|
|
||||||
wdb().begin_transaction();
|
|
||||||
}
|
|
||||||
++tx_level_;
|
|
||||||
mu_debug("ind'd tx level to {}", tx_level_);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decrease the transaction level (to balance inc_transactions())
|
|
||||||
*
|
*
|
||||||
* If the level reach 0, perform a Xapian commit.
|
* If you're already in a transaction but that transaction
|
||||||
|
* was started in another thread, that transaction will be
|
||||||
|
* committed before starting a new one.
|
||||||
|
*
|
||||||
|
* Otherwise, start a transaction if you're not already in one.
|
||||||
|
*
|
||||||
|
* @return A result; either true if a transaction was started; false
|
||||||
|
* otherwise, or an error.
|
||||||
*/
|
*/
|
||||||
void dec_transaction_level() {
|
Result<bool> request_transaction() {
|
||||||
xapian_try([this]{
|
return xapian_try_result([this]() {
|
||||||
DB_LOCKED;
|
auto& db = wdb();
|
||||||
if (tx_level_ == 0) {
|
if (in_transaction())
|
||||||
mu_critical("cannot dec transaction-level)");
|
return Ok(false); // nothing to
|
||||||
throw std::runtime_error("cannot dec transactions");
|
|
||||||
}
|
|
||||||
|
|
||||||
--tx_level_;
|
db.begin_transaction();
|
||||||
if (tx_level_ == 0) {// need to commit the Xapian transaction?
|
mu_debug("begin transaction");
|
||||||
mu_debug("committing {} changes", changes_);
|
in_transaction_ = true;
|
||||||
changes_ = 0;
|
return Ok(true);
|
||||||
wdb().commit_transaction();
|
|
||||||
}
|
|
||||||
|
|
||||||
mu_debug("dec'd tx level to {}", tx_level_);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Explicitly request the Xapian DB to be committed to disk. This won't
|
||||||
|
* do anything when not in a transaction.
|
||||||
|
*
|
||||||
|
* @param force whether to force-commit
|
||||||
|
*/
|
||||||
|
void request_commit(bool force = false) { request_commit(wdb(), force); }
|
||||||
|
void maybe_commit() { request_commit(false); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Are we inside a transaction?
|
* Are we inside a transaction?
|
||||||
*
|
*
|
||||||
* @return true or false
|
* @return true or false
|
||||||
*/
|
*/
|
||||||
bool in_transaction() const { DB_LOCKED; return tx_level_ > 0; }
|
bool in_transaction() const { return in_transaction_; }
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* RAII Transaction object
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
struct Transaction {
|
|
||||||
Transaction(XapianDb& db): db_{db} {
|
|
||||||
db_.inc_transaction_level();
|
|
||||||
}
|
|
||||||
~Transaction() {
|
|
||||||
db_.dec_transaction_level();
|
|
||||||
}
|
|
||||||
private:
|
|
||||||
XapianDb& db_;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Manually request the Xapian DB to be committed to disk. This won't
|
|
||||||
* do anything while in a transaction.
|
|
||||||
*/
|
|
||||||
void commit() {
|
|
||||||
xapian_try([this]{
|
|
||||||
DB_LOCKED;
|
|
||||||
if (tx_level_ == 0) {
|
|
||||||
mu_info("committing xapian-db @ {}", path_);
|
|
||||||
wdb().commit();
|
|
||||||
} else
|
|
||||||
mu_debug("not committing while in transaction");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
using DbType = std::variant<Xapian::Database, Xapian::WritableDatabase>;
|
using DbType = std::variant<Xapian::Database, Xapian::WritableDatabase>;
|
||||||
private:
|
|
||||||
|
|
||||||
|
private:
|
||||||
/**
|
/**
|
||||||
* To be called after all changes, with DB_LOCKED held.
|
* To be called after all changes, with DB_LOCKED held.
|
||||||
*/
|
*/
|
||||||
void maybe_commit() {
|
void request_commit(Xapian::WritableDatabase& db, bool force) {
|
||||||
// in transaction-mode and enough changes, commit them
|
// in transaction-mode and enough changes, commit them
|
||||||
// and start a new transaction
|
if (!in_transaction())
|
||||||
if (tx_level_ > 0 && ++changes_ >= batch_size_) {
|
return;
|
||||||
mu_debug("batch full ({}/{}); committing change", changes_, batch_size_);
|
if ((++changes_ < batch_size_) && !force)
|
||||||
wdb().commit_transaction();
|
return;
|
||||||
wdb().commit();
|
xapian_try([&]{
|
||||||
--tx_level_;
|
mu_debug("committing transaction with {} changes; "
|
||||||
|
"forced={}", changes_, force ? "yes" : "no");
|
||||||
|
db.commit_transaction();
|
||||||
|
db.commit();
|
||||||
changes_ = 0;
|
changes_ = 0;
|
||||||
wdb().begin_transaction();
|
in_transaction_ = {};
|
||||||
++tx_level_;
|
});
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void set_timestamp(const std::string_view key);
|
void set_timestamp(const std::string_view key);
|
||||||
@ -549,12 +498,11 @@ private:
|
|||||||
*/
|
*/
|
||||||
Xapian::WritableDatabase& wdb();
|
Xapian::WritableDatabase& wdb();
|
||||||
|
|
||||||
mutable std::mutex lock_;
|
|
||||||
std::string path_;
|
std::string path_;
|
||||||
DbType db_;
|
DbType db_;
|
||||||
size_t tx_level_{};
|
|
||||||
size_t batch_size_;
|
|
||||||
size_t changes_{};
|
size_t changes_{};
|
||||||
|
bool in_transaction_{};
|
||||||
|
size_t batch_size_;
|
||||||
};
|
};
|
||||||
|
|
||||||
constexpr std::string_view
|
constexpr std::string_view
|
||||||
|
|||||||
@ -79,11 +79,12 @@ cookie(size_t n)
|
|||||||
::printf(COOKIE_PRE "%x" COOKIE_POST, num);
|
::printf(COOKIE_PRE "%x" COOKIE_POST, num);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
output_stdout(const std::string& str, Server::OutputFlags flags)
|
output_stdout(const std::string& str, Server::OutputFlags flags)
|
||||||
{
|
{
|
||||||
|
// Note: with the StoreWorker, we _always_ need to flush
|
||||||
|
flags |= Server::OutputFlags::Flush;
|
||||||
|
|
||||||
cookie(str.size() + 1);
|
cookie(str.size() + 1);
|
||||||
if (G_UNLIKELY(::puts(str.c_str()) < 0)) {
|
if (G_UNLIKELY(::puts(str.c_str()) < 0)) {
|
||||||
mu_critical("failed to write output '{}'", str);
|
mu_critical("failed to write output '{}'", str);
|
||||||
|
|||||||
@ -385,7 +385,10 @@ The server output is as follows:
|
|||||||
((plist-get sexp :info)
|
((plist-get sexp :info)
|
||||||
(funcall mu4e-info-func sexp))
|
(funcall mu4e-info-func sexp))
|
||||||
|
|
||||||
;; get some data
|
;; get some data XXX generalize
|
||||||
|
((plist-get sexp :doccount)
|
||||||
|
(plist-put mu4e--server-props :doccount
|
||||||
|
(mu4e--server-plist-get sexp :doccount)))
|
||||||
((plist-get sexp :maildirs)
|
((plist-get sexp :maildirs)
|
||||||
(setq mu4e-maildir-list (mu4e--server-plist-get sexp :maildirs)))
|
(setq mu4e-maildir-list (mu4e--server-plist-get sexp :maildirs)))
|
||||||
|
|
||||||
@ -556,9 +559,16 @@ get at most MAX contacts."
|
|||||||
|
|
||||||
(defun mu4e--server-data (kind)
|
(defun mu4e--server-data (kind)
|
||||||
"Request data of some KIND.
|
"Request data of some KIND.
|
||||||
KIND is a symbol. Currently supported kinds: maildirs."
|
KIND is a symbol or a list of symbols. Currently supported kinds:
|
||||||
(mu4e--server-call-mu
|
`maildirs', `doccount'."
|
||||||
`(data :kind ,kind)))
|
(pcase kind
|
||||||
|
((pred (lambda (k) (memq k '(maildirs doccount))))
|
||||||
|
(mu4e--server-call-mu `(data :kind ,kind)))
|
||||||
|
((pred listp)
|
||||||
|
(when kind
|
||||||
|
(mu4e--server-data (car kind))
|
||||||
|
(mu4e--server-data (cdr kind))))
|
||||||
|
(_ (mu4e-error "Unexpected kind %s" kind))))
|
||||||
|
|
||||||
(defun mu4e--server-find (query threads sortfield sortdir maxnum skip-dups
|
(defun mu4e--server-find (query threads sortfield sortdir maxnum skip-dups
|
||||||
include-related)
|
include-related)
|
||||||
|
|||||||
@ -130,9 +130,9 @@ changed")
|
|||||||
If non-nil, this is a plist of the form:
|
If non-nil, this is a plist of the form:
|
||||||
\(
|
\(
|
||||||
:checked <number of messages processed> (checked whether up-to-date)
|
:checked <number of messages processed> (checked whether up-to-date)
|
||||||
:updated <number of messages updated/added
|
:updated <number of messages updated/added>
|
||||||
:cleaned-up <number of stale messages removed from store
|
:cleaned-up <number of stale messages removed from store>
|
||||||
:stamp <emacs (current-time) timestamp for the status)")
|
:stamp <emacs (current-time) timestamp for the status>")
|
||||||
|
|
||||||
(defconst mu4e-last-update-buffer "*mu4e-last-update*"
|
(defconst mu4e-last-update-buffer "*mu4e-last-update*"
|
||||||
"Name of buffer with cloned from the last update buffer.
|
"Name of buffer with cloned from the last update buffer.
|
||||||
|
|||||||
65
mu4e/mu4e.el
65
mu4e/mu4e.el
@ -1,6 +1,6 @@
|
|||||||
;;; mu4e.el --- Mu4e, the mu mail user agent -*- lexical-binding: t -*-
|
;;; mu4e.el --- Mu4e, the mu mail user agent -*- lexical-binding: t -*-
|
||||||
|
|
||||||
;; Copyright (C) 2011-2023 Dirk-Jan C. Binnema
|
;; Copyright (C) 2011-2024 Dirk-Jan C. Binnema
|
||||||
|
|
||||||
;; Author: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
;; Author: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
||||||
;; Maintainer: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
;; Maintainer: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
||||||
@ -203,42 +203,47 @@ Otherwise, check requirements, then start mu4e. When successful, invoke
|
|||||||
(_ (mu4e-error "Error %d: %s" errcode errmsg))))
|
(_ (mu4e-error "Error %d: %s" errcode errmsg))))
|
||||||
|
|
||||||
(defun mu4e--update-status (info)
|
(defun mu4e--update-status (info)
|
||||||
"Update the status message with INFO."
|
"Update `mu4e-index-update-status' with INFO.
|
||||||
|
Return the former. As an additional side-effect, updates
|
||||||
|
doccount in server-properties."
|
||||||
(setq mu4e-index-update-status
|
(setq mu4e-index-update-status
|
||||||
`(:tstamp ,(current-time)
|
`(:tstamp ,(current-time)
|
||||||
:checked ,(plist-get info :checked)
|
:checked ,(or (plist-get info :checked) 0)
|
||||||
:updated ,(plist-get info :updated)
|
:updated ,(or (plist-get info :updated) 0)
|
||||||
:cleaned-up ,(plist-get info :cleaned-up))))
|
:cleaned-up ,(or (plist-get info :cleaned-up) 0)))
|
||||||
|
mu4e-index-update-status)
|
||||||
|
|
||||||
(defun mu4e--info-handler (info)
|
(defun mu4e--info-handler (info)
|
||||||
"Handler function for (:INFO ...) sexps received from server."
|
"Handler function for (:INFO ...) sexps received from server."
|
||||||
(let* ((type (plist-get info :info))
|
(let* ((type (plist-get info :info)))
|
||||||
(checked (plist-get info :checked))
|
|
||||||
(updated (plist-get info :updated))
|
|
||||||
(cleaned-up (plist-get info :cleaned-up)))
|
|
||||||
(cond
|
(cond
|
||||||
((eq type 'add) t) ;; do nothing
|
((eq type 'add) t) ;; do nothing
|
||||||
((eq type 'index)
|
((eq type 'index)
|
||||||
(if (eq (plist-get info :status) 'running)
|
(let* ((info (mu4e--update-status info))
|
||||||
(mu4e-index-message
|
(checked (plist-get info :checked))
|
||||||
"Indexing... checked %d, updated %d" checked updated)
|
(updated (plist-get info :updated))
|
||||||
(progn ;; i.e. 'complete
|
(cleaned-up (plist-get info :cleaned-up)))
|
||||||
(mu4e--update-status info)
|
(if (eq (plist-get info :status) 'running)
|
||||||
(mu4e-index-message
|
(mu4e-index-message
|
||||||
"%s completed; checked %d, updated %d, cleaned-up %d"
|
"Indexing... checked %d, updated %d"
|
||||||
(if mu4e-index-lazy-check "Lazy indexing" "Indexing")
|
checked updated)
|
||||||
checked updated cleaned-up)
|
(progn ;; i.e. 'complete
|
||||||
;; index done; grab updated queries
|
(mu4e-index-message
|
||||||
(mu4e--query-items-refresh)
|
"%s completed; checked %d, updated %d, cleaned-up %d"
|
||||||
(run-hooks 'mu4e-index-updated-hook)
|
(if mu4e-index-lazy-check "Lazy indexing" "Indexing")
|
||||||
;; backward compatibility...
|
checked updated cleaned-up)
|
||||||
(unless (zerop (+ updated cleaned-up))
|
;; index done; grab updated queries
|
||||||
mu4e-message-changed-hook)
|
(mu4e--query-items-refresh)
|
||||||
(unless (and (not (string= mu4e--contacts-tstamp "0"))
|
(run-hooks 'mu4e-index-updated-hook)
|
||||||
(zerop (plist-get info :updated)))
|
;; backward compatibility...
|
||||||
(mu4e--request-contacts-maybe)
|
(unless (zerop (+ updated cleaned-up))
|
||||||
(mu4e--server-data 'maildirs)) ;; update maildir list
|
mu4e-message-changed-hook)
|
||||||
(mu4e--main-redraw))))
|
(unless (and (not (string= mu4e--contacts-tstamp "0"))
|
||||||
|
(zerop (plist-get info :updated)))
|
||||||
|
(mu4e--request-contacts-maybe)
|
||||||
|
(mu4e--server-data '(maildirs doccount)))
|
||||||
|
;; update maildir list & doccount
|
||||||
|
(mu4e--main-redraw)))))
|
||||||
((plist-get info :message)
|
((plist-get info :message)
|
||||||
(mu4e-index-message "%s" (plist-get info :message))))))
|
(mu4e-index-message "%s" (plist-get info :message))))))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user