diff --git a/lib/index/mu-indexer.cc b/lib/index/mu-indexer.cc index d373040c..72a12a0a 100644 --- a/lib/index/mu-indexer.cc +++ b/lib/index/mu-indexer.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -39,29 +40,41 @@ using namespace std::chrono_literals; using namespace Mu; struct IndexState { - enum State { Idle, Scanning, Cleaning }; + enum State { Idle, + Scanning, + Cleaning }; static const char* name(State s) { switch (s) { - case Idle: return "idle"; - case Scanning: return "scanning"; - case Cleaning: return "cleaning"; - default: return ""; + case Idle: + return "idle"; + case Scanning: + return "scanning"; + case Cleaning: + return "cleaning"; + default: + return ""; } } - bool operator==(State rhs) const { return state_ == rhs; } + bool operator==(State rhs) const + { + return state_ == rhs; + } + bool operator!=(State rhs) const + { + return state_ != rhs; + } void change_to(State new_state) { - g_debug("changing indexer state %s->%s", - name((State)state_), + g_debug("changing indexer state %s->%s", name((State)state_), name((State)new_state)); state_ = new_state; } private: - State state_{Idle}; + std::atomic state_{Idle}; }; struct Indexer::Private { @@ -74,17 +87,20 @@ struct Indexer::Private { { g_message("created indexer for %s -> %s (batch-size: %zu)", store.metadata().root_maildir.c_str(), - store.metadata().database_path.c_str(), - store.metadata().batch_size); + store.metadata().database_path.c_str(), store.metadata().batch_size); } - ~Private() { stop(); } + ~Private() + { + stop(); + } bool dir_predicate(const std::string& path, const struct dirent* dirent) const; bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype); void maybe_start_worker(); - void worker(); + void item_worker(); + void scan_worker(); bool cleanup(); @@ -101,17 +117,24 @@ struct Indexer::Private { std::vector workers_; std::thread scanner_worker_; - AsyncQueue fq_; + struct WorkItem { + std::string full_path; + enum Type { + Dir, + File + }; + Type type; + }; + + AsyncQueue todos_; Progress progress_; IndexState state_; - - std::mutex lock_, wlock_; + std::mutex lock_, w_lock_; }; bool -Indexer::Private::handler(const std::string& fullpath, - struct stat* statbuf, +Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype) { switch (htype) { @@ -124,8 +147,7 @@ Indexer::Private::handler(const std::string& fullpath, dirstamp_ = store_.dirstamp(fullpath); if (conf_.lazy_check && dirstamp_ >= statbuf->st_mtime && htype == Scanner::HandleType::EnterNewCur) { - g_debug("skip %s (seems up-to-date: %s >= %s)", - fullpath.c_str(), + g_debug("skip %s (seems up-to-date: %s >= %s)", fullpath.c_str(), time_to_string("%FT%T", dirstamp_).c_str(), time_to_string("%FT%T", statbuf->st_mtime).c_str()); return false; @@ -152,7 +174,7 @@ Indexer::Private::handler(const std::string& fullpath, return true; } case Scanner::HandleType::LeaveDir: { - store_.set_dirstamp(fullpath, ::time(NULL)); + todos_.push({fullpath, WorkItem::Type::Dir}); return true; } @@ -160,8 +182,7 @@ Indexer::Private::handler(const std::string& fullpath, ++progress_.checked; if ((size_t)statbuf->st_size > max_message_size_) { - g_debug("skip %s (too big: %" G_GINT64_FORMAT " bytes)", - fullpath.c_str(), + g_debug("skip %s (too big: %" G_GINT64_FORMAT " bytes)", fullpath.c_str(), (gint64)statbuf->st_size); return false; } @@ -175,44 +196,59 @@ Indexer::Private::handler(const std::string& fullpath, // push the remaining messages to our "todo" queue for // (re)parsing and adding/updating to the database. - fq_.push(std::string{fullpath}); + todos_.push({fullpath, WorkItem::Type::File}); return true; } - default: g_return_val_if_reached(false); return false; + default: + g_return_val_if_reached(false); + return false; } } void Indexer::Private::maybe_start_worker() { - std::lock_guard wlock{wlock_}; + std::lock_guard lock{w_lock_}; - if (fq_.size() > workers_.size() && workers_.size() < max_workers_) - workers_.emplace_back(std::thread([this] { worker(); })); + if (todos_.size() > workers_.size() && workers_.size() < max_workers_) { + workers_.emplace_back(std::thread([this] { item_worker(); })); + g_debug("added worker %zu", workers_.size()); + } } void -Indexer::Private::worker() +Indexer::Private::item_worker() { - std::string item; + WorkItem item; g_debug("started worker"); while (state_ == IndexState::Scanning) { - - if (!fq_.pop(item, 250ms)) + if (!todos_.pop(item, 250ms)) continue; try { - std::unique_lock lock{lock_}; - - store_.add_message(item, true /*use-transaction*/); - ++progress_.updated; - + std::lock_guard lock{w_lock_}; + switch (item.type) { + case WorkItem::Type::File: + store_.add_message(item.full_path, + true /*use-transaction*/); + ++progress_.updated; + break; + case WorkItem::Type::Dir: + store_.set_dirstamp(item.full_path, ::time(NULL), + true /*use-transaction*/); + break; + default: + g_warn_if_reached(); + break; + } } catch (const Mu::Error& er) { - g_warning("error adding message @ %s: %s", item.c_str(), er.what()); + g_warning("error adding message @ %s: %s", + item.full_path.c_str(), er.what()); } maybe_start_worker(); + std::this_thread::yield(); } } @@ -227,8 +263,7 @@ Indexer::Private::cleanup() ++n; if (::access(path.c_str(), R_OK) != 0) { g_debug("cannot read %s (id=%u); queueing for removal from store", - path.c_str(), - id); + path.c_str(), id); orphans.emplace_back(id); } @@ -246,59 +281,67 @@ Indexer::Private::cleanup() return true; } +void +Indexer::Private::scan_worker() +{ + progress_ = {}; + + if (conf_.scan) { + g_debug("starting scanner"); + if (!scanner_.start()) { // blocks. + g_warning("failed to start scanner"); + goto leave; + } + g_debug("scanner finished with %zu file(s) in queue", todos_.size()); + } + + // now there may still be messages in the work queue... + // finish those; this is a bit ugly; perhaps we should + // handle SIGTERM etc. + if (!todos_.empty()) + g_debug("process %zu remaining message(s) with %zu worker(s)", todos_.size(), + workers_.size()); + while (!todos_.empty()) + std::this_thread::sleep_for(100ms); + + store_.commit(); + + if (conf_.cleanup) { + g_debug("starting cleanup"); + state_.change_to(IndexState::Cleaning); + cleanup(); + g_debug("cleanup finished"); + } +leave: + state_.change_to(IndexState::Idle); +} + bool Indexer::Private::start(const Indexer::Config& conf) { stop(); conf_ = conf; - if (conf_.max_threads == 0) - max_workers_ = std::thread::hardware_concurrency(); - else + if (conf_.max_threads == 0) { + /* we're blocked mostly by a) filesystem and b) database; + * so it's not very useful to start many threads */ + max_workers_ = std::max( + 4U, std::thread::hardware_concurrency()); + } else max_workers_ = conf.max_threads; g_debug("starting indexer with <= %zu worker thread(s)", max_workers_); - g_debug("indexing: %s; clean-up: %s", - conf_.scan ? "yes" : "no", + g_debug("indexing: %s; clean-up: %s", conf_.scan ? "yes" : "no", conf_.cleanup ? "yes" : "no"); state_.change_to(IndexState::Scanning); - { - /* kick off the first worker, which will spawn more if needed. */ - std::lock_guard wlock{wlock_}; - workers_.emplace_back(std::thread([this] { worker(); })); - } - scanner_worker_ = std::thread([this] { - progress_ = {}; - - if (conf_.scan) { - g_debug("starting scanner"); - if (!scanner_.start()) { // blocks. - g_warning("failed to start scanner"); - goto leave; - } - g_debug("scanner finished with %zu file(s) in queue", fq_.size()); - } - - // now there may still be messages in the work queue... - // finish those; this is a bit ugly; perhaps we should - // handle SIGTERM etc. - while (!fq_.empty()) - std::this_thread::sleep_for(100ms); - - store_.commit(); - - if (conf_.cleanup) { - g_debug("starting cleanup"); - state_.change_to(IndexState::Cleaning); - cleanup(); - g_debug("cleanup finished"); - } - leave: - state_.change_to(IndexState::Idle); - }); + /* kick off the first worker, which will spawn more if needed. */ + workers_.emplace_back(std::thread([this] { item_worker(); })); + /* kick the disk-scanner thread */ + scanner_worker_ = std::thread([this] { scan_worker(); }); g_debug("started indexer"); + return true; } @@ -306,26 +349,24 @@ bool Indexer::Private::stop() { scanner_.stop(); - state_.change_to(IndexState::Idle); - const auto w_n = workers_.size(); - - fq_.clear(); + todos_.clear(); if (scanner_worker_.joinable()) scanner_worker_.join(); + state_.change_to(IndexState::Idle); for (auto&& w : workers_) if (w.joinable()) w.join(); workers_.clear(); - if (w_n > 0) - g_debug("stopped indexer (joined %zu worker(s))", w_n); - return true; } -Indexer::Indexer(Store& store) : priv_{std::make_unique(store)} {} +Indexer::Indexer(Store& store) + : priv_{std::make_unique(store)} +{ +} Indexer::~Indexer() = default; @@ -338,7 +379,7 @@ Indexer::start(const Indexer::Config& conf) return false; } - std::lock_guard l(priv_->lock_); + std::lock_guard lock(priv_->lock_); if (is_running()) return true; @@ -348,7 +389,7 @@ Indexer::start(const Indexer::Config& conf) bool Indexer::stop() { - std::lock_guard l(priv_->lock_); + std::lock_guard lock{priv_->lock_}; if (!is_running()) return true; @@ -360,7 +401,7 @@ Indexer::stop() bool Indexer::is_running() const { - return !(priv_->state_ == IndexState::Idle) || !priv_->fq_.empty(); + return priv_->state_ != IndexState::Idle; } Indexer::Progress diff --git a/lib/index/mu-indexer.hh b/lib/index/mu-indexer.hh index 78527445..2df7f1c6 100644 --- a/lib/index/mu-indexer.hh +++ b/lib/index/mu-indexer.hh @@ -58,7 +58,9 @@ public: }; /** - * Start indexing. If already underway, do nothing. + * Start indexing. If already underway, do nothing. This returns + * immediately after starting, with the work being done in the + * background. * * @param conf a configuration object * diff --git a/lib/index/mu-scanner.cc b/lib/index/mu-scanner.cc index fbc9317b..dfa6984d 100644 --- a/lib/index/mu-scanner.cc +++ b/lib/index/mu-scanner.cc @@ -44,7 +44,10 @@ struct Scanner::Private { if (!handler_) throw Mu::Error{Error::Code::Internal, "missing handler"}; } - ~Private() { stop(); } + ~Private() + { + stop(); + } bool start(); bool stop(); @@ -65,7 +68,8 @@ is_special_dir(const char* d_name) } bool -Scanner::Private::process_dentry(const std::string& path, struct dirent* dentry, bool is_maildir) +Scanner::Private::process_dentry(const std::string& path, struct dirent* dentry, + bool is_maildir) { const auto d_name{dentry->d_name}; @@ -83,9 +87,9 @@ Scanner::Private::process_dentry(const std::string& path, struct dirent* dentry, if (S_ISDIR(statbuf.st_mode)) { const auto new_cur = std::strcmp(d_name, "cur") == 0 || std::strcmp(d_name, "new") == 0; - const auto htype = new_cur ? Scanner::HandleType::EnterNewCur - : Scanner::HandleType::EnterDir; - const auto res = handler_(fullpath, &statbuf, htype); + const auto htype = + new_cur ? Scanner::HandleType::EnterNewCur : Scanner::HandleType::EnterDir; + const auto res = handler_(fullpath, &statbuf, htype); if (!res) return true; // skip @@ -97,12 +101,16 @@ Scanner::Private::process_dentry(const std::string& path, struct dirent* dentry, return handler_(fullpath, &statbuf, Scanner::HandleType::File); g_debug("skip %s (neither maildir-file nor directory)", fullpath.c_str()); + return true; } bool Scanner::Private::process_dir(const std::string& path, bool is_maildir) { + if (!running_) + return true; /* we're done */ + const auto dir = opendir(path.c_str()); if (G_UNLIKELY(!dir)) { g_warning("failed to scan dir %s: %s", path.c_str(), g_strerror(errno)); @@ -171,8 +179,7 @@ Scanner::Private::start() const auto start{std::chrono::steady_clock::now()}; process_dir(root_dir_, is_maildir); const auto elapsed = std::chrono::steady_clock::now() - start; - g_debug("finished scan of %s in %" G_GINT64_FORMAT " ms", - root_dir_.c_str(), + g_debug("finished scan of %s in %" G_GINT64_FORMAT " ms", root_dir_.c_str(), to_ms(elapsed)); running_ = false; @@ -201,15 +208,10 @@ Scanner::~Scanner() = default; bool Scanner::start() { - { - std::lock_guard l(priv_->lock_); - if (priv_->running_) - return true; // nothing to do + if (priv_->running_) + return true; // nothing to do - priv_->running_ = true; - } - - const auto res = priv_->start(); + const auto res = priv_->start(); /* blocks */ priv_->running_ = false; return res; @@ -218,7 +220,7 @@ Scanner::start() bool Scanner::stop() { - std::lock_guard l(priv_->lock_); + std::lock_guard l(priv_->lock_); return priv_->stop(); } diff --git a/lib/mu-store.cc b/lib/mu-store.cc index bce18875..75542db4 100644 --- a/lib/mu-store.cc +++ b/lib/mu-store.cc @@ -31,6 +31,7 @@ #include #include +#include #include #include "mu-store.hh" @@ -206,9 +207,12 @@ struct Store::Private { contacts_.serialize()); }); } - g_debug("committing transaction (n=%zu)", transaction_size_); + g_debug("committing transaction (n=%zu,%zu)", + transaction_size_, metadatas_.size()); xapian_try([this] { writable_db().commit_transaction(); + for (auto&& mdata : metadatas_) + writable_db().set_metadata(mdata.first, mdata.second); transaction_size_ = 0; }); } @@ -278,6 +282,10 @@ struct Store::Private { Xapian::docid add_or_update_msg(Xapian::docid docid, MuMsg* msg); Xapian::Document new_doc_from_message(MuMsg* msg); + /* metadata to write as part of a transaction commit */ + using StringPair = std::pair; + std::vector metadatas_; + const bool read_only_{}; std::unique_ptr db_; @@ -501,7 +509,7 @@ Store::dirstamp(const std::string& path) const } void -Store::set_dirstamp(const std::string& path, time_t tstamp) +Store::set_dirstamp(const std::string& path, time_t tstamp, bool use_transaction) { std::lock_guard guard{priv_->lock_}; @@ -510,9 +518,15 @@ Store::set_dirstamp(const std::string& path, time_t tstamp) const auto len = static_cast( g_snprintf(data.data(), data.size(), "%zx", tstamp)); - xapian_try([&] { - priv_->writable_db().set_metadata(path, std::string{data.data(), len}); - }); + /* set_metadata is not otherwise part of a "transaction" but we want it + * to be so, so a dirstamp _only_ gets updated when the messages in the + * dir are commited */ + + Private::StringPair item{path, std::string{data.data(), len}}; + if (use_transaction) + priv_->metadatas_.emplace_back(std::move(item)); + else + xapian_try([&] { priv_->writable_db().set_metadata(item.first, item.second); }); } MuMsg* diff --git a/lib/mu-store.hh b/lib/mu-store.hh index f79fe930..6d72c35d 100644 --- a/lib/mu-store.hh +++ b/lib/mu-store.hh @@ -303,8 +303,10 @@ public: * * @param path a filesystem path * @param tstamp the timestamp for that path + * @param whether to do this as part of a transaction */ - void set_dirstamp(const std::string& path, time_t tstamp); + void set_dirstamp(const std::string& path, time_t tstamp, + bool use_transaction = false); /** * Get the number of documents in the document database