diff --git a/lib/mu-indexer.cc b/lib/mu-indexer.cc index 283a11b8..bfeac487 100644 --- a/lib/mu-indexer.cc +++ b/lib/mu-indexer.cc @@ -1,5 +1,5 @@ /* -** Copyright (C) 2020-2023 Dirk-Jan C. Binnema +** Copyright (C) 2020-2024 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the @@ -29,6 +29,7 @@ #include #include #include +#include #include using namespace std::chrono_literals; @@ -42,49 +43,28 @@ using namespace std::chrono_literals; using namespace Mu; -struct IndexState { - enum State { Idle, - Scanning, - Finishing, - Cleaning - }; - static const char* name(State s) { - switch (s) { - case Idle: - return "idle"; - case Scanning: - return "scanning"; - case Finishing: - return "finishing"; - case Cleaning: - return "cleaning"; - default: - return ""; - } - } +// states - bool operator==(State rhs) const { - return state_.load() == rhs; +enum struct State { Idle, Scanning, Draining }; +constexpr std::string_view +format_as(State s) +{ + switch (s) { + case State::Idle: return "idle"; + case State::Scanning: return "scanning"; + case State::Draining: return "draining"; + default: return ""; } - bool operator!=(State rhs) const { - return state_.load() != rhs; - } - void change_to(State new_state) { - mu_debug("changing indexer state {}->{}", name((State)state_), - name((State)new_state)); - state_.store(new_state); - } - -private: - std::atomic state_{Idle}; -}; +} struct Indexer::Private { Private(Mu::Store& store) - : store_{store}, scanner_{store_.root_maildir(), + : store_{store}, + store_worker_{store.store_worker()}, + scanner_{store_.root_maildir(), [this](auto&& path, auto&& statbuf, auto&& info) { - return handler(path, statbuf, info); + return scan_handler(path, statbuf, info); }}, max_message_size_{store_.config().get()}, was_empty_{store.empty()} { @@ -97,27 +77,55 @@ struct Indexer::Private { store.config().get()); } - ~Private() { - stop(); + ~Private() { force_cleanup(); } + + void force_cleanup() { + switch_state(State::Idle); + scanner_.stop(); + if (scanner_worker_.joinable()) + scanner_worker_.join(); + msg_paths_.clear(); + for (auto&& w : workers_) + if (w.joinable()) + w.join(); + store_worker_.clear(); } bool dir_predicate(const std::string& path, const struct dirent* dirent) const; - bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype); + bool scan_handler(const std::string& fullpath, struct stat* statbuf, + Scanner::HandleType htype); void maybe_start_worker(); void item_worker(); void scan_worker(); - bool add_message(const std::string& path); - - bool cleanup(); bool start(const Indexer::Config& conf, bool block); bool stop(); - bool is_running() const { return state_ != IndexState::Idle; } + bool is_running() const { return state_ != State::Idle; } + void switch_state(State new_state) { + mu_debug("changing indexer state {}->{}", state_, new_state); + state_ = new_state; + } + + // pace a bit so scan_items queue doesn't get too big. + void pace_scan_worker() { + while (msg_paths_.size() > 8 * max_workers_) { + std::this_thread::sleep_for(25ms); + continue; + } + } + // pace a bit so store-worker queue doesn't get too big. + void pace_store_worker() { + while (store_worker_.size() > 8) { + std::this_thread::sleep_for(25ms); + continue; + } + } Indexer::Config conf_; - Store& store_; + const Store& store_; + StoreWorker& store_worker_; Scanner scanner_; const size_t max_message_size_; @@ -126,27 +134,18 @@ struct Indexer::Private { std::vector workers_; std::thread scanner_worker_; - struct WorkItem { - std::string full_path; - enum Type { - Dir, - File - }; - Type type; - }; - - AsyncQueue todos_; + AsyncQueue msg_paths_; Progress progress_{}; - IndexState state_{}; + std::atomic state_{State::Idle}; std::mutex lock_, w_lock_; std::atomic completed_{}; bool was_empty_{}; }; bool -Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, - Scanner::HandleType htype) +Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf, + Scanner::HandleType htype) { switch (htype) { case Scanner::HandleType::EnterDir: @@ -160,7 +159,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, return false; } - // in lazy-mode, we ignore this dir if its dirstamp suggest it + // in lazy-mode, we ignore this dir if its dirstamp suggests it // is up-to-date (this is _not_ always true; hence we call it // lazy-mode); only for actual message dirs, since the dir // tstamps may not bubble up.U @@ -193,14 +192,16 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, return true; } case Scanner::HandleType::LeaveDir: { - todos_.push({fullpath, WorkItem::Type::Dir}); + // directly push to store worker, bypass scan-items queue + pace_store_worker(); + store_worker_.push(StoreWorker::SetDirStamp{fullpath, ::time({})}); return true; } case Scanner::HandleType::File: { ++progress_.checked; - if ((size_t)statbuf->st_size > max_message_size_) { + if (static_cast(statbuf->st_size) > max_message_size_) { mu_debug("skip {} (too big: {} bytes)", fullpath, statbuf->st_size); return false; } @@ -210,9 +211,10 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath)) return false; - // push the remaining messages to our "todo" queue for + // push the remaining messages to our "scan-items" queue for // (re)parsing and adding/updating to the database. - todos_.push({fullpath, WorkItem::Type::File}); + pace_scan_worker(); + msg_paths_.push(std::string{fullpath}); // move? return true; } default: @@ -226,158 +228,94 @@ Indexer::Private::maybe_start_worker() { std::lock_guard lock{w_lock_}; - if (todos_.size() > workers_.size() && workers_.size() < max_workers_) { + if (msg_paths_.size() > workers_.size() && workers_.size() < max_workers_) { workers_.emplace_back(std::thread([this] { item_worker(); })); mu_debug("added worker {}", workers_.size()); } } -bool -Indexer::Private::add_message(const std::string& path) -{ - /* - * Having the lock here makes things a _lot_ slower. - * - * The reason for having the lock is some helgrind warnings; - * but it believed those are _false alarms_ - * https://gitlab.gnome.org/GNOME/glib/-/issues/2662 - */ - //std::unique_lock lock{w_lock_}; - auto msg{Message::make_from_path(path, store_.message_options())}; - if (!msg) { - mu_warning("failed to create message from {}: {}", path, msg.error().what()); - return false; - } - // if the store was empty, we know that the message is completely new - // and can use the fast path (Xapians 'add_document' rather than - // 'replace_document) - auto res = store_.consume_message(std::move(msg.value()), was_empty_); - if (!res) { - mu_warning("failed to add message @ {}: {}", path, res.error().what()); - return false; - } - - return true; -} - void Indexer::Private::item_worker() { - WorkItem item; - mu_debug("started worker"); - while (state_ == IndexState::Scanning) { - if (!todos_.pop(item, 250ms)) + while (state_ == State::Scanning || + (state_ == State::Draining && !msg_paths_.empty())) { + + std::string msgpath; + if (!msg_paths_.pop(msgpath, 250ms)) + continue; + + auto msg{Message::make_from_path(msgpath, store_.message_options())}; + if (!msg) { + mu_warning("failed to create message from {}: {}", + msgpath, msg.error().what()); continue; - try { - switch (item.type) { - case WorkItem::Type::File: { - if (G_LIKELY(add_message(item.full_path))) - ++progress_.updated; - } break; - case WorkItem::Type::Dir: - store_.set_dirstamp(item.full_path, ::time(NULL)); - break; - default: - g_warn_if_reached(); - break; - } - } catch (const Mu::Error& er) { - mu_warning("error adding message @ {}: {}", item.full_path, er.what()); } + pace_store_worker(); /* slow down if store-worker q gets too big */ + + // if the store was empty, we know that the message is + // completely new and can use the fast path (Xapians + // 'add_document' rather than 'replace_document) + if (was_empty_) + store_worker_.push(StoreWorker::AddMessage{std::move(*msg)}); + else + store_worker_.push(StoreWorker::UpdateMessage{std::move(*msg)}); + ++progress_.updated; + maybe_start_worker(); - std::this_thread::yield(); } } -bool -Indexer::Private::cleanup() -{ - mu_debug("starting cleanup"); - - size_t n{}; - std::vector orphans; // store messages without files. - store_.for_each_message_path([&](Store::Id id, const std::string& path) { - ++n; - if (::access(path.c_str(), R_OK) != 0) { - mu_debug("cannot read {} (id={}); queuing for removal from store", - path, id); - orphans.emplace_back(id); - } - - return state_ == IndexState::Cleaning; - }); - - if (orphans.empty()) - mu_debug("nothing to clean up"); - else { - mu_debug("removing {} stale message(s) from store", orphans.size()); - store_.remove_messages(orphans); - progress_.removed += orphans.size(); - } - - return true; -} - void Indexer::Private::scan_worker() { - progress_.reset(); if (conf_.scan) { mu_debug("starting scanner"); if (!scanner_.start()) { // blocks. mu_warning("failed to start scanner"); - state_.change_to(IndexState::Idle); + switch_state(State::Idle); return; } - mu_debug("scanner finished with {} file(s) in queue", todos_.size()); + mu_debug("scanner finished with {} file(s) in queue", msg_paths_.size()); } - // now there may still be messages in the work queue... - // finish those; this is a bit ugly; perhaps we should - // handle SIGTERM etc. - - if (!todos_.empty()) { - const auto workers_size = std::invoke([this] { - std::lock_guard lock{w_lock_}; - return workers_.size(); - }); - mu_debug("process {} remaining message(s) with {} worker(s)", - todos_.size(), workers_size); - while (!todos_.empty()) - std::this_thread::sleep_for(100ms); - } - // and let the worker finish their work. - state_.change_to(IndexState::Finishing); - for (auto&& w : workers_) - if (w.joinable()) - w.join(); - if (conf_.cleanup) { - mu_debug("starting cleanup"); - state_.change_to(IndexState::Cleaning); - cleanup(); - mu_debug("cleanup finished"); + mu_debug("starting cleanup with work-item(s) left: {}", + store_worker_.size()); + + std::vector orphans; // store messages without files. + store_.for_each_message_path([&](Store::Id id, const std::string& path) { + if (::access(path.c_str(), R_OK) != 0) { + mu_debug("orphan: cannot read {} (id={})", path, id); + orphans.emplace_back(id); + } + return true; + }); + progress_.removed = orphans.size(); + if (!orphans.empty()) { + mu_info("removing {} orphan message(s)", orphans.size()); + store_worker_.push(StoreWorker::RemoveMessages{std::move(orphans)}); + } } completed_ = ::time({}); - store_.config().set(completed_); - state_.change_to(IndexState::Idle); + store_worker_.push(StoreWorker::SetLastIndex{completed_}); + + stop(); } bool Indexer::Private::start(const Indexer::Config& conf, bool block) { - stop(); + force_cleanup(); conf_ = conf; if (conf_.max_threads == 0) { /* benchmarking suggests that ~4 threads is the fastest (the * real bottleneck is the database, so adding more threads just - * slows things down) - */ + * slows things down) */ max_workers_ = std::min(4U, std::thread::hardware_concurrency()); } else max_workers_ = conf.max_threads; @@ -391,11 +329,14 @@ Indexer::Private::start(const Indexer::Config& conf, bool block) mu_debug("indexing: {}; clean-up: {}", conf_.scan ? "yes" : "no", conf_.cleanup ? "yes" : "no"); - state_.change_to(IndexState::Scanning); + progress_.reset(); + switch_state(State::Scanning); /* kick off the first worker, which will spawn more if needed. */ - workers_.emplace_back(std::thread([this] { item_worker(); })); - /* kick the disk-scanner thread */ - scanner_worker_ = std::thread([this] { scan_worker(); }); + workers_.emplace_back(std::thread([this]{item_worker();})); + /* kick the file-system-scanner thread */ + if (scanner_worker_.joinable()) + scanner_worker_.join(); // kill old one + scanner_worker_ = std::thread([this]{scan_worker();}); mu_debug("started indexer in {}-mode", block ? "blocking" : "non-blocking"); if (block) { @@ -411,18 +352,33 @@ Indexer::Private::start(const Indexer::Config& conf, bool block) bool Indexer::Private::stop() { + switch_state(State::Draining); + scanner_.stop(); + // cannot join scanner_worker_ here since it may be our + // current thread. - todos_.clear(); - if (scanner_worker_.joinable()) - scanner_worker_.join(); + // wait for completion. + while (!msg_paths_.empty()) { + mu_debug("scan-items left: {}", msg_paths_.size()); + std::this_thread::sleep_for(250ms); + } - state_.change_to(IndexState::Idle); for (auto&& w : workers_) if (w.joinable()) w.join(); workers_.clear(); + store_worker_.push(StoreWorker::EndTransaction()); + + // wait for completion. + while (!store_worker_.empty()) { + mu_debug("work-items left: {}", store_worker_.size()); + std::this_thread::sleep_for(250ms); + } + + switch_state(State::Idle); + return true; } @@ -469,7 +425,7 @@ Indexer::is_running() const const Indexer::Progress& Indexer::progress() const { - priv_->progress_.running = priv_->state_ == IndexState::Idle ? false : true; + priv_->progress_.running = priv_->state_ == State::Idle ? false : true; return priv_->progress_; }