store-worker: temporarily revert

Of course, after merging some problems come up.
Let's fix those first.

This reverts commit f2f01595a5.
This commit is contained in:
Dirk-Jan C. Binnema
2024-06-05 12:21:24 +03:00
parent 853fa32ace
commit 5bd439271d
6 changed files with 235 additions and 253 deletions

View File

@ -1,5 +1,5 @@
/* /*
** Copyright (C) 2020-2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ** Copyright (C) 2020-2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
** **
** This program is free software; you can redistribute it and/or modify it ** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the ** under the terms of the GNU General Public License as published by the
@ -29,7 +29,6 @@
#include <condition_variable> #include <condition_variable>
#include <iostream> #include <iostream>
#include <atomic> #include <atomic>
#include <string_view>
#include <chrono> #include <chrono>
using namespace std::chrono_literals; using namespace std::chrono_literals;
@ -43,28 +42,49 @@ using namespace std::chrono_literals;
using namespace Mu; using namespace Mu;
// states struct IndexState {
enum State { Idle,
enum struct State { Idle, Scanning, Draining }; Scanning,
constexpr std::string_view Finishing,
format_as(State s) Cleaning
{ };
static const char* name(State s) {
switch (s) { switch (s) {
case State::Idle: return "idle"; case Idle:
case State::Scanning: return "scanning"; return "idle";
case State::Draining: return "draining"; case Scanning:
default: return "<error>"; return "scanning";
case Finishing:
return "finishing";
case Cleaning:
return "cleaning";
default:
return "<error>";
} }
} }
bool operator==(State rhs) const {
return state_.load() == rhs;
}
bool operator!=(State rhs) const {
return state_.load() != rhs;
}
void change_to(State new_state) {
mu_debug("changing indexer state {}->{}", name((State)state_),
name((State)new_state));
state_.store(new_state);
}
private:
std::atomic<State> state_{Idle};
};
struct Indexer::Private { struct Indexer::Private {
Private(Mu::Store& store) Private(Mu::Store& store)
: store_{store}, : store_{store}, scanner_{store_.root_maildir(),
store_worker_{store.store_worker()},
scanner_{store_.root_maildir(),
[this](auto&& path, [this](auto&& path,
auto&& statbuf, auto&& info) { auto&& statbuf, auto&& info) {
return scan_handler(path, statbuf, info); return handler(path, statbuf, info);
}}, }},
max_message_size_{store_.config().get<Mu::Config::Id::MaxMessageSize>()}, max_message_size_{store_.config().get<Mu::Config::Id::MaxMessageSize>()},
was_empty_{store.empty()} { was_empty_{store.empty()} {
@ -77,55 +97,27 @@ struct Indexer::Private {
store.config().get<Mu::Config::Id::SupportNgrams>()); store.config().get<Mu::Config::Id::SupportNgrams>());
} }
~Private() { force_cleanup(); } ~Private() {
stop();
void force_cleanup() {
switch_state(State::Idle);
scanner_.stop();
if (scanner_worker_.joinable())
scanner_worker_.join();
msg_paths_.clear();
for (auto&& w : workers_)
if (w.joinable())
w.join();
store_worker_.clear();
} }
bool dir_predicate(const std::string& path, const struct dirent* dirent) const; bool dir_predicate(const std::string& path, const struct dirent* dirent) const;
bool scan_handler(const std::string& fullpath, struct stat* statbuf, bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype);
Scanner::HandleType htype);
void maybe_start_worker(); void maybe_start_worker();
void item_worker(); void item_worker();
void scan_worker(); void scan_worker();
bool add_message(const std::string& path);
bool cleanup();
bool start(const Indexer::Config& conf, bool block); bool start(const Indexer::Config& conf, bool block);
bool stop(); bool stop();
bool is_running() const { return state_ != State::Idle; } bool is_running() const { return state_ != IndexState::Idle; }
void switch_state(State new_state) {
mu_debug("changing indexer state {}->{}", state_, new_state);
state_ = new_state;
}
// pace a bit so scan_items queue doesn't get too big.
void pace_scan_worker() {
while (msg_paths_.size() > 8 * max_workers_) {
std::this_thread::sleep_for(25ms);
continue;
}
}
// pace a bit so store-worker queue doesn't get too big.
void pace_store_worker() {
while (store_worker_.size() > 8) {
std::this_thread::sleep_for(25ms);
continue;
}
}
Indexer::Config conf_; Indexer::Config conf_;
const Store& store_; Store& store_;
StoreWorker& store_worker_;
Scanner scanner_; Scanner scanner_;
const size_t max_message_size_; const size_t max_message_size_;
@ -134,17 +126,26 @@ struct Indexer::Private {
std::vector<std::thread> workers_; std::vector<std::thread> workers_;
std::thread scanner_worker_; std::thread scanner_worker_;
AsyncQueue<std::string> msg_paths_; struct WorkItem {
std::string full_path;
enum Type {
Dir,
File
};
Type type;
};
AsyncQueue<WorkItem> todos_;
Progress progress_{}; Progress progress_{};
std::atomic<State> state_{State::Idle}; IndexState state_{};
std::mutex lock_, w_lock_; std::mutex lock_, w_lock_;
std::atomic<time_t> completed_{}; std::atomic<time_t> completed_{};
bool was_empty_{}; bool was_empty_{};
}; };
bool bool
Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf, Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
Scanner::HandleType htype) Scanner::HandleType htype)
{ {
switch (htype) { switch (htype) {
@ -159,7 +160,7 @@ Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf
return false; return false;
} }
// in lazy-mode, we ignore this dir if its dirstamp suggests it // in lazy-mode, we ignore this dir if its dirstamp suggest it
// is up-to-date (this is _not_ always true; hence we call it // is up-to-date (this is _not_ always true; hence we call it
// lazy-mode); only for actual message dirs, since the dir // lazy-mode); only for actual message dirs, since the dir
// tstamps may not bubble up.U // tstamps may not bubble up.U
@ -192,16 +193,14 @@ Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf
return true; return true;
} }
case Scanner::HandleType::LeaveDir: { case Scanner::HandleType::LeaveDir: {
// directly push to store worker, bypass scan-items queue todos_.push({fullpath, WorkItem::Type::Dir});
pace_store_worker();
store_worker_.push(StoreWorker::SetDirStamp{fullpath, ::time({})});
return true; return true;
} }
case Scanner::HandleType::File: { case Scanner::HandleType::File: {
++progress_.checked; ++progress_.checked;
if (static_cast<size_t>(statbuf->st_size) > max_message_size_) { if ((size_t)statbuf->st_size > max_message_size_) {
mu_debug("skip {} (too big: {} bytes)", fullpath, statbuf->st_size); mu_debug("skip {} (too big: {} bytes)", fullpath, statbuf->st_size);
return false; return false;
} }
@ -211,10 +210,9 @@ Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf
if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath)) if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath))
return false; return false;
// push the remaining messages to our "scan-items" queue for // push the remaining messages to our "todo" queue for
// (re)parsing and adding/updating to the database. // (re)parsing and adding/updating to the database.
pace_scan_worker(); todos_.push({fullpath, WorkItem::Type::File});
msg_paths_.push(std::string{fullpath}); // move?
return true; return true;
} }
default: default:
@ -228,94 +226,158 @@ Indexer::Private::maybe_start_worker()
{ {
std::lock_guard lock{w_lock_}; std::lock_guard lock{w_lock_};
if (msg_paths_.size() > workers_.size() && workers_.size() < max_workers_) { if (todos_.size() > workers_.size() && workers_.size() < max_workers_) {
workers_.emplace_back(std::thread([this] { item_worker(); })); workers_.emplace_back(std::thread([this] { item_worker(); }));
mu_debug("added worker {}", workers_.size()); mu_debug("added worker {}", workers_.size());
} }
} }
bool
Indexer::Private::add_message(const std::string& path)
{
/*
* Having the lock here makes things a _lot_ slower.
*
* The reason for having the lock is some helgrind warnings;
* but it believed those are _false alarms_
* https://gitlab.gnome.org/GNOME/glib/-/issues/2662
*/
//std::unique_lock lock{w_lock_};
auto msg{Message::make_from_path(path, store_.message_options())};
if (!msg) {
mu_warning("failed to create message from {}: {}", path, msg.error().what());
return false;
}
// if the store was empty, we know that the message is completely new
// and can use the fast path (Xapians 'add_document' rather than
// 'replace_document)
auto res = store_.consume_message(std::move(msg.value()), was_empty_);
if (!res) {
mu_warning("failed to add message @ {}: {}", path, res.error().what());
return false;
}
return true;
}
void void
Indexer::Private::item_worker() Indexer::Private::item_worker()
{ {
WorkItem item;
mu_debug("started worker"); mu_debug("started worker");
while (state_ == State::Scanning || while (state_ == IndexState::Scanning) {
(state_ == State::Draining && !msg_paths_.empty())) { if (!todos_.pop(item, 250ms))
std::string msgpath;
if (!msg_paths_.pop(msgpath, 250ms))
continue; continue;
try {
auto msg{Message::make_from_path(msgpath, store_.message_options())}; switch (item.type) {
if (!msg) { case WorkItem::Type::File: {
mu_warning("failed to create message from {}: {}", if (G_LIKELY(add_message(item.full_path)))
msgpath, msg.error().what());
continue;
}
pace_store_worker(); /* slow down if store-worker q gets too big */
// if the store was empty, we know that the message is
// completely new and can use the fast path (Xapians
// 'add_document' rather than 'replace_document)
if (was_empty_)
store_worker_.push(StoreWorker::AddMessage{std::move(*msg)});
else
store_worker_.push(StoreWorker::UpdateMessage{std::move(*msg)});
++progress_.updated; ++progress_.updated;
} break;
case WorkItem::Type::Dir:
store_.set_dirstamp(item.full_path, ::time(NULL));
break;
default:
g_warn_if_reached();
break;
}
} catch (const Mu::Error& er) {
mu_warning("error adding message @ {}: {}", item.full_path, er.what());
}
maybe_start_worker(); maybe_start_worker();
std::this_thread::yield();
} }
} }
bool
Indexer::Private::cleanup()
{
mu_debug("starting cleanup");
size_t n{};
std::vector<Store::Id> orphans; // store messages without files.
store_.for_each_message_path([&](Store::Id id, const std::string& path) {
++n;
if (::access(path.c_str(), R_OK) != 0) {
mu_debug("cannot read {} (id={}); queuing for removal from store",
path, id);
orphans.emplace_back(id);
}
return state_ == IndexState::Cleaning;
});
if (orphans.empty())
mu_debug("nothing to clean up");
else {
mu_debug("removing {} stale message(s) from store", orphans.size());
store_.remove_messages(orphans);
progress_.removed += orphans.size();
}
return true;
}
void void
Indexer::Private::scan_worker() Indexer::Private::scan_worker()
{ {
progress_.reset();
if (conf_.scan) { if (conf_.scan) {
mu_debug("starting scanner"); mu_debug("starting scanner");
if (!scanner_.start()) { // blocks. if (!scanner_.start()) { // blocks.
mu_warning("failed to start scanner"); mu_warning("failed to start scanner");
switch_state(State::Idle); state_.change_to(IndexState::Idle);
return; return;
} }
mu_debug("scanner finished with {} file(s) in queue", msg_paths_.size()); mu_debug("scanner finished with {} file(s) in queue", todos_.size());
} }
// now there may still be messages in the work queue...
// finish those; this is a bit ugly; perhaps we should
// handle SIGTERM etc.
if (!todos_.empty()) {
const auto workers_size = std::invoke([this] {
std::lock_guard lock{w_lock_};
return workers_.size();
});
mu_debug("process {} remaining message(s) with {} worker(s)",
todos_.size(), workers_size);
while (!todos_.empty())
std::this_thread::sleep_for(100ms);
}
// and let the worker finish their work.
state_.change_to(IndexState::Finishing);
for (auto&& w : workers_)
if (w.joinable())
w.join();
if (conf_.cleanup) { if (conf_.cleanup) {
mu_debug("starting cleanup with work-item(s) left: {}", mu_debug("starting cleanup");
store_worker_.size()); state_.change_to(IndexState::Cleaning);
cleanup();
std::vector<Store::Id> orphans; // store messages without files. mu_debug("cleanup finished");
store_.for_each_message_path([&](Store::Id id, const std::string& path) {
if (::access(path.c_str(), R_OK) != 0) {
mu_debug("orphan: cannot read {} (id={})", path, id);
orphans.emplace_back(id);
}
return true;
});
progress_.removed = orphans.size();
if (!orphans.empty()) {
mu_info("removing {} orphan message(s)", orphans.size());
store_worker_.push(StoreWorker::RemoveMessages{std::move(orphans)});
}
} }
completed_ = ::time({}); completed_ = ::time({});
store_worker_.push(StoreWorker::SetLastIndex{completed_}); store_.config().set<Mu::Config::Id::LastIndex>(completed_);
state_.change_to(IndexState::Idle);
stop();
} }
bool bool
Indexer::Private::start(const Indexer::Config& conf, bool block) Indexer::Private::start(const Indexer::Config& conf, bool block)
{ {
force_cleanup(); stop();
conf_ = conf; conf_ = conf;
if (conf_.max_threads == 0) { if (conf_.max_threads == 0) {
/* benchmarking suggests that ~4 threads is the fastest (the /* benchmarking suggests that ~4 threads is the fastest (the
* real bottleneck is the database, so adding more threads just * real bottleneck is the database, so adding more threads just
* slows things down) */ * slows things down)
*/
max_workers_ = std::min(4U, std::thread::hardware_concurrency()); max_workers_ = std::min(4U, std::thread::hardware_concurrency());
} else } else
max_workers_ = conf.max_threads; max_workers_ = conf.max_threads;
@ -329,14 +391,11 @@ Indexer::Private::start(const Indexer::Config& conf, bool block)
mu_debug("indexing: {}; clean-up: {}", conf_.scan ? "yes" : "no", mu_debug("indexing: {}; clean-up: {}", conf_.scan ? "yes" : "no",
conf_.cleanup ? "yes" : "no"); conf_.cleanup ? "yes" : "no");
progress_.reset(); state_.change_to(IndexState::Scanning);
switch_state(State::Scanning);
/* kick off the first worker, which will spawn more if needed. */ /* kick off the first worker, which will spawn more if needed. */
workers_.emplace_back(std::thread([this]{item_worker();})); workers_.emplace_back(std::thread([this] { item_worker(); }));
/* kick the file-system-scanner thread */ /* kick the disk-scanner thread */
if (scanner_worker_.joinable()) scanner_worker_ = std::thread([this] { scan_worker(); });
scanner_worker_.join(); // kill old one
scanner_worker_ = std::thread([this]{scan_worker();});
mu_debug("started indexer in {}-mode", block ? "blocking" : "non-blocking"); mu_debug("started indexer in {}-mode", block ? "blocking" : "non-blocking");
if (block) { if (block) {
@ -352,33 +411,18 @@ Indexer::Private::start(const Indexer::Config& conf, bool block)
bool bool
Indexer::Private::stop() Indexer::Private::stop()
{ {
switch_state(State::Draining);
scanner_.stop(); scanner_.stop();
// cannot join scanner_worker_ here since it may be our
// current thread.
// wait for completion. todos_.clear();
while (!msg_paths_.empty()) { if (scanner_worker_.joinable())
mu_debug("scan-items left: {}", msg_paths_.size()); scanner_worker_.join();
std::this_thread::sleep_for(250ms);
}
state_.change_to(IndexState::Idle);
for (auto&& w : workers_) for (auto&& w : workers_)
if (w.joinable()) if (w.joinable())
w.join(); w.join();
workers_.clear(); workers_.clear();
store_worker_.push(StoreWorker::EndTransaction());
// wait for completion.
while (!store_worker_.empty()) {
mu_debug("work-items left: {}", store_worker_.size());
std::this_thread::sleep_for(250ms);
}
switch_state(State::Idle);
return true; return true;
} }
@ -425,7 +469,7 @@ Indexer::is_running() const
const Indexer::Progress& const Indexer::Progress&
Indexer::progress() const Indexer::progress() const
{ {
priv_->progress_.running = priv_->state_ == State::Idle ? false : true; priv_->progress_.running = priv_->state_ == IndexState::Idle ? false : true;
return priv_->progress_; return priv_->progress_;
} }

View File

@ -1,5 +1,5 @@
/* /*
** Copyright (C) 2020-2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ** Copyright (C) 2020-2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
** **
** This program is free software; you can redistribute it and/or modify it ** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the ** under the terms of the GNU General Public License as published by the
@ -29,12 +29,9 @@
#include <atomic> #include <atomic>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable>
#include <variant> #include <variant>
#include <functional> #include <functional>
#include <cstring> #include <cstring>
#include <glib.h> #include <glib.h>
#include <glib/gprintf.h> #include <glib/gprintf.h>
@ -129,21 +126,11 @@ private:
/// @brief object to manage the server-context for all commands. /// @brief object to manage the server-context for all commands.
struct Server::Private { struct Server::Private {
Private(Store& store, const Server::Options& opts, Output output) Private(Store& store, const Server::Options& opts, Output output)
: store_{store}, : store_{store}, options_{opts}, output_{output},
store_worker_{store.store_worker()},
options_{opts}, output_{output},
command_handler_{make_command_map()}, command_handler_{make_command_map()},
keep_going_{true}, keep_going_{true},
tmp_dir_{unwrap(make_temp_dir())} { tmp_dir_{unwrap(make_temp_dir())}
{}
// tell the store-worker that we (this class) can handle
// sexp strings.
store_worker_.install_sexp_handler(
[this](const std::string& sexp) {
this->invoke(sexp);
});
}
~Private() { ~Private() {
indexer().stop(); indexer().stop();
@ -161,10 +148,13 @@ struct Server::Private {
// acccessors // acccessors
Store& store() { return store_; } Store& store() { return store_; }
const Store& store() const { return store_; } const Store& store() const { return store_; }
StoreWorker& store_worker() { return store_worker_; }
Indexer& indexer() { return store().indexer(); } Indexer& indexer() { return store().indexer(); }
//CommandMap& command_map() const { return command_map_; } //CommandMap& command_map() const { return command_map_; }
//
// invoke
//
bool invoke(const std::string& expr) noexcept;
// //
// output // output
@ -196,19 +186,7 @@ struct Server::Private {
void remove_handler(const Command& cmd); void remove_handler(const Command& cmd);
void view_handler(const Command& cmd); void view_handler(const Command& cmd);
bool keep_going() const { return keep_going_; }
void set_keep_going(bool going) { keep_going_ = going; }
// make main thread wait until done with the command.
std::mutex done_lock_;
std::condition_variable done_cond_;
private: private:
//
// invoke
//
bool invoke(const std::string& expr) noexcept;
void move_docid(Store::Id docid, Option<std::string> flagstr, void move_docid(Store::Id docid, Option<std::string> flagstr,
bool new_name, bool no_view); bool new_name, bool no_view);
@ -231,7 +209,6 @@ private:
std::ofstream make_temp_file_stream(std::string& fname) const; std::ofstream make_temp_file_stream(std::string& fname) const;
Store& store_; Store& store_;
StoreWorker& store_worker_;
Server::Options options_; Server::Options options_;
Server::Output output_; Server::Output output_;
const CommandHandler command_handler_; const CommandHandler command_handler_;
@ -503,10 +480,6 @@ Server::Private::invoke(const std::string& expr) noexcept
keep_going_ = false; keep_going_ = false;
} }
// tell main thread we're done with the command.
std::lock_guard l{done_lock_};
done_cond_.notify_one();
return keep_going_; return keep_going_;
} }
@ -586,9 +559,7 @@ Server::Private::data_handler(const Command& cmd)
{ {
const auto request_type{unwrap(cmd.symbol_arg(":kind"))}; const auto request_type{unwrap(cmd.symbol_arg(":kind"))};
if (request_type == "doccount") { if (request_type == "maildirs") {
output(mu_format("(:doccount {})", store().size()));
} else if (request_type == "maildirs") {
auto&& out{make_output_stream()}; auto&& out{make_output_stream()};
mu_print(out, "("); mu_print(out, "(");
for (auto&& mdir: store().maildirs()) for (auto&& mdir: store().maildirs())
@ -719,6 +690,9 @@ Server::Private::find_handler(const Command& cmd)
StopWatch sw{mu_format("{} (indexing: {})", __func__, StopWatch sw{mu_format("{} (indexing: {})", __func__,
indexer().is_running() ? "yes" : "no")}; indexer().is_running() ? "yes" : "no")};
// we need to _lock_ the store while querying (which likely consists of
// multiple actual queries) + grabbing the results.
std::lock_guard l{store_.lock()};
auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)}; auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)};
if (!qres) if (!qres)
throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what()); throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what());
@ -1107,27 +1081,7 @@ Server::~Server() = default;
bool bool
Server::invoke(const std::string& expr) noexcept Server::invoke(const std::string& expr) noexcept
{ {
/* a _little_ hacky; handle _quit_ directly to properly return priv_->invoke(expr);
* shut down the server */
if (expr == "(quit)") {
mu_debug("quitting");
priv_->set_keep_going(false);
return false;
}
/*
* feed the command to the queue; it'll get executed in the
* store-worker thread; however, sync on its completion
* so we get its keep_going() result
*
* as an added bonus, this ensures mu server shell doesn't require an
* extra user RET to get back the prompt
*/
std::unique_lock done_lock{priv_->done_lock_};
priv_->store_worker().push(StoreWorker::SexpCommand{expr});
priv_->done_cond_.wait(done_lock);
return priv_->keep_going();
} }
/* LCOV_EXCL_STOP */ /* LCOV_EXCL_STOP */

View File

@ -79,12 +79,11 @@ cookie(size_t n)
::printf(COOKIE_PRE "%x" COOKIE_POST, num); ::printf(COOKIE_PRE "%x" COOKIE_POST, num);
} }
static void static void
output_stdout(const std::string& str, Server::OutputFlags flags) output_stdout(const std::string& str, Server::OutputFlags flags)
{ {
// Note: with the StoreWorker, we _always_ need to flush
flags |= Server::OutputFlags::Flush;
cookie(str.size() + 1); cookie(str.size() + 1);
if (G_UNLIKELY(::puts(str.c_str()) < 0)) { if (G_UNLIKELY(::puts(str.c_str()) < 0)) {
mu_critical("failed to write output '{}'", str); mu_critical("failed to write output '{}'", str);

View File

@ -385,10 +385,7 @@ The server output is as follows:
((plist-get sexp :info) ((plist-get sexp :info)
(funcall mu4e-info-func sexp)) (funcall mu4e-info-func sexp))
;; get some data XXX generalize ;; get some data
((plist-get sexp :doccount)
(plist-put mu4e--server-props :doccount
(mu4e--server-plist-get sexp :doccount)))
((plist-get sexp :maildirs) ((plist-get sexp :maildirs)
(setq mu4e-maildir-list (mu4e--server-plist-get sexp :maildirs))) (setq mu4e-maildir-list (mu4e--server-plist-get sexp :maildirs)))
@ -559,16 +556,9 @@ get at most MAX contacts."
(defun mu4e--server-data (kind) (defun mu4e--server-data (kind)
"Request data of some KIND. "Request data of some KIND.
KIND is a symbol or a list of symbols. Currently supported kinds: KIND is a symbol. Currently supported kinds: maildirs."
`maildirs', `doccount'." (mu4e--server-call-mu
(pcase kind `(data :kind ,kind)))
((pred (lambda (k) (memq k '(maildirs doccount))))
(mu4e--server-call-mu `(data :kind ,kind)))
((pred listp)
(when kind
(mu4e--server-data (car kind))
(mu4e--server-data (cdr kind))))
(_ (mu4e-error "Unexpected kind %s" kind))))
(defun mu4e--server-find (query threads sortfield sortdir maxnum skip-dups (defun mu4e--server-find (query threads sortfield sortdir maxnum skip-dups
include-related) include-related)

View File

@ -130,9 +130,9 @@ changed")
If non-nil, this is a plist of the form: If non-nil, this is a plist of the form:
\( \(
:checked <number of messages processed> (checked whether up-to-date) :checked <number of messages processed> (checked whether up-to-date)
:updated <number of messages updated/added> :updated <number of messages updated/added
:cleaned-up <number of stale messages removed from store> :cleaned-up <number of stale messages removed from store
:stamp <emacs (current-time) timestamp for the status>") :stamp <emacs (current-time) timestamp for the status)")
(defconst mu4e-last-update-buffer "*mu4e-last-update*" (defconst mu4e-last-update-buffer "*mu4e-last-update*"
"Name of buffer with cloned from the last update buffer. "Name of buffer with cloned from the last update buffer.

View File

@ -1,6 +1,6 @@
;;; mu4e.el --- Mu4e, the mu mail user agent -*- lexical-binding: t -*- ;;; mu4e.el --- Mu4e, the mu mail user agent -*- lexical-binding: t -*-
;; Copyright (C) 2011-2024 Dirk-Jan C. Binnema ;; Copyright (C) 2011-2023 Dirk-Jan C. Binnema
;; Author: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ;; Author: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
;; Maintainer: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ;; Maintainer: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
@ -203,31 +203,27 @@ Otherwise, check requirements, then start mu4e. When successful, invoke
(_ (mu4e-error "Error %d: %s" errcode errmsg)))) (_ (mu4e-error "Error %d: %s" errcode errmsg))))
(defun mu4e--update-status (info) (defun mu4e--update-status (info)
"Update `mu4e-index-update-status' with INFO. "Update the status message with INFO."
Return the former. As an additional side-effect, updates
doccount in server-properties."
(setq mu4e-index-update-status (setq mu4e-index-update-status
`(:tstamp ,(current-time) `(:tstamp ,(current-time)
:checked ,(or (plist-get info :checked) 0) :checked ,(plist-get info :checked)
:updated ,(or (plist-get info :updated) 0) :updated ,(plist-get info :updated)
:cleaned-up ,(or (plist-get info :cleaned-up) 0))) :cleaned-up ,(plist-get info :cleaned-up))))
mu4e-index-update-status)
(defun mu4e--info-handler (info) (defun mu4e--info-handler (info)
"Handler function for (:INFO ...) sexps received from server." "Handler function for (:INFO ...) sexps received from server."
(let* ((type (plist-get info :info))) (let* ((type (plist-get info :info))
(cond
((eq type 'add) t) ;; do nothing
((eq type 'index)
(let* ((info (mu4e--update-status info))
(checked (plist-get info :checked)) (checked (plist-get info :checked))
(updated (plist-get info :updated)) (updated (plist-get info :updated))
(cleaned-up (plist-get info :cleaned-up))) (cleaned-up (plist-get info :cleaned-up)))
(cond
((eq type 'add) t) ;; do nothing
((eq type 'index)
(if (eq (plist-get info :status) 'running) (if (eq (plist-get info :status) 'running)
(mu4e-index-message (mu4e-index-message
"Indexing... checked %d, updated %d" "Indexing... checked %d, updated %d" checked updated)
checked updated)
(progn ;; i.e. 'complete (progn ;; i.e. 'complete
(mu4e--update-status info)
(mu4e-index-message (mu4e-index-message
"%s completed; checked %d, updated %d, cleaned-up %d" "%s completed; checked %d, updated %d, cleaned-up %d"
(if mu4e-index-lazy-check "Lazy indexing" "Indexing") (if mu4e-index-lazy-check "Lazy indexing" "Indexing")
@ -241,9 +237,8 @@ doccount in server-properties."
(unless (and (not (string= mu4e--contacts-tstamp "0")) (unless (and (not (string= mu4e--contacts-tstamp "0"))
(zerop (plist-get info :updated))) (zerop (plist-get info :updated)))
(mu4e--request-contacts-maybe) (mu4e--request-contacts-maybe)
(mu4e--server-data '(maildirs doccount))) (mu4e--server-data 'maildirs)) ;; update maildir list
;; update maildir list & doccount (mu4e--main-redraw))))
(mu4e--main-redraw)))))
((plist-get info :message) ((plist-get info :message)
(mu4e-index-message "%s" (plist-get info :message)))))) (mu4e-index-message "%s" (plist-get info :message))))))