Try to avoid multi-threaded operations with Xapian. This remove the thread workers during indexing, and avoids the indexing background thread. So, mu4e has to wait once again during indexing. We can improve upon that, but first we need to know if it avoids the problem of issue #2756.
683 lines
16 KiB
C++
683 lines
16 KiB
C++
/*
|
|
** Copyright (C) 2020-2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
|
|
**
|
|
** This program is free software; you can redistribute it and/or modify it
|
|
** under the terms of the GNU General Public License as published by the
|
|
** Free Software Foundation; either version 3, or (at your option) any
|
|
** later version.
|
|
**
|
|
** This program is distributed in the hope that it will be useful,
|
|
** but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
** GNU General Public License for more details.
|
|
**
|
|
** You should have received a copy of the GNU General Public License
|
|
** along with this program; if not, write to the Free Software Foundation,
|
|
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
**
|
|
*/
|
|
|
|
#include "mu-indexer.hh"
|
|
|
|
#include <config.h>
|
|
|
|
#include <atomic>
|
|
#include <algorithm>
|
|
#include <mutex>
|
|
#include <vector>
|
|
#include <thread>
|
|
#include <condition_variable>
|
|
#include <iostream>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
using namespace std::chrono_literals;
|
|
|
|
#include "mu-store.hh"
|
|
|
|
#include "mu-scanner.hh"
|
|
#include "utils/mu-async-queue.hh"
|
|
#include "utils/mu-error.hh"
|
|
|
|
#include "utils/mu-utils-file.hh"
|
|
|
|
using namespace Mu;
|
|
|
|
struct IndexState {
|
|
enum State { Idle,
|
|
Scanning,
|
|
Finishing,
|
|
Cleaning
|
|
};
|
|
static const char* name(State s) {
|
|
switch (s) {
|
|
case Idle:
|
|
return "idle";
|
|
case Scanning:
|
|
return "scanning";
|
|
case Finishing:
|
|
return "finishing";
|
|
case Cleaning:
|
|
return "cleaning";
|
|
default:
|
|
return "<error>";
|
|
}
|
|
}
|
|
|
|
bool operator==(State rhs) const {
|
|
return state_.load() == rhs;
|
|
}
|
|
bool operator!=(State rhs) const {
|
|
return state_.load() != rhs;
|
|
}
|
|
void change_to(State new_state) {
|
|
mu_debug("changing indexer state {}->{}", name((State)state_),
|
|
name((State)new_state));
|
|
state_.store(new_state);
|
|
}
|
|
|
|
private:
|
|
std::atomic<State> state_{Idle};
|
|
};
|
|
|
|
struct Indexer::Private {
|
|
Private(Mu::Store& store)
|
|
: store_{store}, scanner_{store_.root_maildir(),
|
|
[this](auto&& path,
|
|
auto&& statbuf, auto&& info) {
|
|
return handler(path, statbuf, info);
|
|
}},
|
|
max_message_size_{store_.config().get<Mu::Config::Id::MaxMessageSize>()},
|
|
was_empty_{store.empty()} {
|
|
|
|
mu_message("created indexer for {} -> "
|
|
"{} (batch-size: {}; was-empty: {}; ngrams: {})",
|
|
store.root_maildir(), store.path(),
|
|
store.config().get<Mu::Config::Id::BatchSize>(),
|
|
was_empty_,
|
|
store.config().get<Mu::Config::Id::SupportNgrams>());
|
|
}
|
|
|
|
~Private() {
|
|
stop();
|
|
}
|
|
|
|
bool dir_predicate(const std::string& path, const struct dirent* dirent) const;
|
|
bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype);
|
|
|
|
void maybe_start_worker();
|
|
|
|
void item_worker();
|
|
void scan_worker();
|
|
|
|
bool add_message(const std::string& path);
|
|
|
|
bool cleanup();
|
|
bool start(const Indexer::Config& conf, bool block);
|
|
bool stop();
|
|
|
|
bool is_running() const { return state_ != IndexState::Idle; }
|
|
|
|
Indexer::Config conf_;
|
|
Store& store_;
|
|
Scanner scanner_;
|
|
const size_t max_message_size_;
|
|
|
|
::time_t dirstamp_{};
|
|
std::size_t max_workers_;
|
|
std::vector<std::thread> workers_;
|
|
std::thread scanner_worker_;
|
|
|
|
struct WorkItem {
|
|
std::string full_path;
|
|
enum Type {
|
|
Dir,
|
|
File
|
|
};
|
|
Type type;
|
|
};
|
|
|
|
void handle_item(WorkItem&& item);
|
|
|
|
AsyncQueue<WorkItem> todos_;
|
|
|
|
Progress progress_{};
|
|
IndexState state_{};
|
|
std::mutex lock_, w_lock_;
|
|
std::atomic<time_t> completed_{};
|
|
bool was_empty_{};
|
|
};
|
|
|
|
bool
|
|
Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
|
|
Scanner::HandleType htype)
|
|
{
|
|
switch (htype) {
|
|
case Scanner::HandleType::EnterDir:
|
|
case Scanner::HandleType::EnterNewCur: {
|
|
if (fullpath.length() > MaxTermLength) {
|
|
// currently the db uses the path as a key, and
|
|
// therefore it cannot be too long. We'd get an error
|
|
// later anyway but for now it's useful for surviving
|
|
// circular symlinks
|
|
mu_warning("'{}' is too long; ignore", fullpath);
|
|
return false;
|
|
}
|
|
|
|
// in lazy-mode, we ignore this dir if its dirstamp suggest it
|
|
// is up-to-date (this is _not_ always true; hence we call it
|
|
// lazy-mode); only for actual message dirs, since the dir
|
|
// tstamps may not bubble up.
|
|
dirstamp_ = store_.dirstamp(fullpath);
|
|
if (conf_.lazy_check && dirstamp_ >= statbuf->st_ctime &&
|
|
htype == Scanner::HandleType::EnterNewCur) {
|
|
mu_debug("skip {} (seems up-to-date: {:%FT%T} >= {:%FT%T})",
|
|
fullpath, mu_time(dirstamp_), mu_time(statbuf->st_ctime));
|
|
return false;
|
|
}
|
|
|
|
// don't index dirs with '.noindex'
|
|
auto noindex = ::access((fullpath + "/.noindex").c_str(), F_OK) == 0;
|
|
if (noindex) {
|
|
mu_debug("skip {} (has .noindex)", fullpath);
|
|
return false; // don't descend into this dir.
|
|
}
|
|
|
|
// don't index dirs with '.noupdate', unless we do a full
|
|
// (re)index.
|
|
if (!conf_.ignore_noupdate) {
|
|
auto noupdate = ::access((fullpath + "/.noupdate").c_str(), F_OK) == 0;
|
|
if (noupdate) {
|
|
mu_debug("skip {} (has .noupdate)", fullpath);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
mu_debug("checked {}", fullpath);
|
|
return true;
|
|
}
|
|
case Scanner::HandleType::LeaveDir: {
|
|
#ifdef XAPIAN_SINGLE_THREADED
|
|
handle_item({fullpath, WorkItem::Type::Dir});
|
|
#else
|
|
todos_.push({fullpath, WorkItem::Type::Dir});
|
|
#endif /*XAPIAN_SINGLE_THREADED*/
|
|
return true;
|
|
}
|
|
|
|
case Scanner::HandleType::File: {
|
|
++progress_.checked;
|
|
|
|
if ((size_t)statbuf->st_size > max_message_size_) {
|
|
mu_debug("skip {} (too big: {} bytes)", fullpath, statbuf->st_size);
|
|
return false;
|
|
}
|
|
|
|
// if the message is not in the db yet, or not up-to-date, queue
|
|
// it for updating/inserting.
|
|
if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath))
|
|
return false;
|
|
|
|
#ifdef XAPIAN_SINGLE_THREADED
|
|
handle_item({fullpath, WorkItem::Type::File});
|
|
#else
|
|
// push the remaining messages to our "todo" queue for
|
|
// (re)parsing and adding/updating to the database.
|
|
todos_.push({fullpath, WorkItem::Type::File});
|
|
#endif
|
|
return true;
|
|
}
|
|
default:
|
|
g_return_val_if_reached(false);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void
|
|
Indexer::Private::maybe_start_worker()
|
|
{
|
|
std::lock_guard lock{w_lock_};
|
|
|
|
if (todos_.size() > workers_.size() && workers_.size() < max_workers_) {
|
|
workers_.emplace_back(std::thread([this] { item_worker(); }));
|
|
mu_debug("added worker {}", workers_.size());
|
|
}
|
|
}
|
|
|
|
bool
|
|
Indexer::Private::add_message(const std::string& path)
|
|
{
|
|
/*
|
|
* Having the lock here makes things a _lot_ slower.
|
|
*
|
|
* The reason for having the lock is some helgrind warnings;
|
|
* but it believed those are _false alarms_
|
|
* https://gitlab.gnome.org/GNOME/glib/-/issues/2662
|
|
*/
|
|
//std::unique_lock lock{w_lock_};
|
|
auto msg{Message::make_from_path(path, store_.message_options())};
|
|
if (!msg) {
|
|
mu_warning("failed to create message from {}: {}", path, msg.error().what());
|
|
return false;
|
|
}
|
|
// if the store was empty, we know that the message is completely new
|
|
// and can use the fast path (Xapians 'add_document' rather than
|
|
// 'replace_document)
|
|
auto res = store_.consume_message(std::move(msg.value()), was_empty_);
|
|
if (!res) {
|
|
mu_warning("failed to add message @ {}: {}", path, res.error().what());
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
void
|
|
Indexer::Private::handle_item(WorkItem&& item)
|
|
{
|
|
try {
|
|
switch (item.type) {
|
|
case WorkItem::Type::File: {
|
|
if (G_LIKELY(add_message(item.full_path)))
|
|
++progress_.updated;
|
|
} break;
|
|
case WorkItem::Type::Dir:
|
|
store_.set_dirstamp(item.full_path, ::time(NULL));
|
|
break;
|
|
default:
|
|
g_warn_if_reached();
|
|
break;
|
|
}
|
|
} catch (const Mu::Error& er) {
|
|
mu_warning("error adding message @ {}: {}", item.full_path, er.what());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void
|
|
Indexer::Private::item_worker()
|
|
{
|
|
WorkItem item;
|
|
|
|
mu_debug("started worker");
|
|
|
|
while (state_ == IndexState::Scanning) {
|
|
if (!todos_.pop(item, 250ms))
|
|
continue;
|
|
|
|
handle_item(std::move(item));
|
|
|
|
maybe_start_worker();
|
|
std::this_thread::yield();
|
|
}
|
|
}
|
|
|
|
bool
|
|
Indexer::Private::cleanup()
|
|
{
|
|
mu_debug("starting cleanup");
|
|
|
|
size_t n{};
|
|
std::vector<Store::Id> orphans; // store messages without files.
|
|
store_.for_each_message_path([&](Store::Id id, const std::string& path) {
|
|
++n;
|
|
if (::access(path.c_str(), R_OK) != 0) {
|
|
mu_debug("cannot read {} (id={}); queuing for removal from store",
|
|
path, id);
|
|
orphans.emplace_back(id);
|
|
}
|
|
|
|
return state_ == IndexState::Cleaning;
|
|
});
|
|
|
|
if (orphans.empty())
|
|
mu_debug("nothing to clean up");
|
|
else {
|
|
mu_debug("removing {} stale message(s) from store", orphans.size());
|
|
store_.remove_messages(orphans);
|
|
progress_.removed += orphans.size();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void
|
|
Indexer::Private::scan_worker()
|
|
{
|
|
progress_.reset();
|
|
if (conf_.scan) {
|
|
mu_debug("starting scanner");
|
|
if (!scanner_.start()) { // blocks.
|
|
mu_warning("failed to start scanner");
|
|
state_.change_to(IndexState::Idle);
|
|
return;
|
|
}
|
|
mu_debug("scanner finished with {} file(s) in queue", todos_.size());
|
|
}
|
|
|
|
// now there may still be messages in the work queue...
|
|
// finish those; this is a bit ugly; perhaps we should
|
|
// handle SIGTERM etc.
|
|
|
|
if (!todos_.empty()) {
|
|
const auto workers_size = std::invoke([this] {
|
|
std::lock_guard lock{w_lock_};
|
|
return workers_.size();
|
|
});
|
|
mu_debug("process {} remaining message(s) with {} worker(s)",
|
|
todos_.size(), workers_size);
|
|
while (!todos_.empty())
|
|
std::this_thread::sleep_for(100ms);
|
|
}
|
|
// and let the worker finish their work.
|
|
state_.change_to(IndexState::Finishing);
|
|
for (auto&& w : workers_)
|
|
if (w.joinable())
|
|
w.join();
|
|
|
|
if (conf_.cleanup) {
|
|
mu_debug("starting cleanup");
|
|
state_.change_to(IndexState::Cleaning);
|
|
cleanup();
|
|
mu_debug("cleanup finished");
|
|
}
|
|
|
|
completed_ = ::time({});
|
|
// attempt to commit to disk.
|
|
store_.xapian_db().request_commit(true);
|
|
store_.config().set<Mu::Config::Id::LastIndex>(completed_);
|
|
state_.change_to(IndexState::Idle);
|
|
}
|
|
|
|
bool
|
|
Indexer::Private::start(const Indexer::Config& conf, bool block)
|
|
{
|
|
stop();
|
|
|
|
conf_ = conf;
|
|
if (conf_.max_threads == 0) {
|
|
/* benchmarking suggests that ~4 threads is the fastest (the
|
|
* real bottleneck is the database, so adding more threads just
|
|
* slows things down)
|
|
*/
|
|
max_workers_ = std::min(4U, std::thread::hardware_concurrency());
|
|
} else
|
|
max_workers_ = conf.max_threads;
|
|
|
|
if (store_.empty() && conf_.lazy_check) {
|
|
mu_debug("turn off lazy check since we have an empty store");
|
|
conf_.lazy_check = false;
|
|
}
|
|
|
|
mu_debug("starting indexer with <= {} worker thread(s)", max_workers_);
|
|
mu_debug("indexing: {}; clean-up: {}", conf_.scan ? "yes" : "no",
|
|
conf_.cleanup ? "yes" : "no");
|
|
|
|
state_.change_to(IndexState::Scanning);
|
|
/* kick off the first worker, which will spawn more if needed. */
|
|
workers_.emplace_back(std::thread([this] { item_worker(); }));
|
|
/* kick the disk-scanner thread */
|
|
scanner_worker_ = std::thread([this] { scan_worker(); });
|
|
|
|
mu_debug("started indexer in {}-mode", block ? "blocking" : "non-blocking");
|
|
if (block) {
|
|
while(is_running()) {
|
|
using namespace std::chrono_literals;
|
|
std::this_thread::sleep_for(100ms);
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool
|
|
Indexer::Private::stop()
|
|
{
|
|
scanner_.stop();
|
|
|
|
todos_.clear();
|
|
if (scanner_worker_.joinable())
|
|
scanner_worker_.join();
|
|
|
|
state_.change_to(IndexState::Idle);
|
|
for (auto&& w : workers_)
|
|
if (w.joinable())
|
|
w.join();
|
|
workers_.clear();
|
|
|
|
return true;
|
|
}
|
|
|
|
Indexer::Indexer(Store& store)
|
|
: priv_{std::make_unique<Private>(store)}
|
|
{}
|
|
|
|
Indexer::~Indexer() = default;
|
|
|
|
bool
|
|
Indexer::start(const Indexer::Config& conf, bool block)
|
|
{
|
|
const auto mdir{priv_->store_.root_maildir()};
|
|
if (G_UNLIKELY(access(mdir.c_str(), R_OK) != 0)) {
|
|
mu_critical("'{}' is not readable: {}", mdir, g_strerror(errno));
|
|
return false;
|
|
}
|
|
|
|
std::lock_guard lock(priv_->lock_);
|
|
if (is_running())
|
|
return true;
|
|
|
|
return priv_->start(conf, block);
|
|
}
|
|
|
|
bool
|
|
Indexer::stop()
|
|
{
|
|
std::lock_guard lock{priv_->lock_};
|
|
|
|
if (!is_running())
|
|
return true;
|
|
|
|
mu_debug("stopping indexer");
|
|
return priv_->stop();
|
|
}
|
|
|
|
bool
|
|
Indexer::is_running() const
|
|
{
|
|
return priv_->is_running();
|
|
}
|
|
|
|
const Indexer::Progress&
|
|
Indexer::progress() const
|
|
{
|
|
priv_->progress_.running = priv_->state_ == IndexState::Idle ? false : true;
|
|
|
|
return priv_->progress_;
|
|
}
|
|
|
|
::time_t
|
|
Indexer::completed() const
|
|
{
|
|
return priv_->completed_;
|
|
}
|
|
|
|
|
|
#if BUILD_TESTS
|
|
#include "mu-test-utils.hh"
|
|
|
|
static void
|
|
test_index_basic()
|
|
{
|
|
allow_warnings();
|
|
|
|
TempDir tdir;
|
|
auto store = Store::make_new(tdir.path(), MU_TESTMAILDIR2);
|
|
assert_valid_result(store);
|
|
g_assert_true(store->empty());
|
|
|
|
Indexer& idx{store->indexer()};
|
|
|
|
g_assert_false(idx.is_running());
|
|
g_assert_true(idx.stop());
|
|
g_assert_cmpuint(idx.completed(),==, 0);
|
|
|
|
const auto& prog{idx.progress()};
|
|
g_assert_false(prog.running);
|
|
g_assert_cmpuint(prog.checked,==, 0);
|
|
g_assert_cmpuint(prog.updated,==, 0);
|
|
g_assert_cmpuint(prog.removed,==, 0);
|
|
|
|
Indexer::Config conf{};
|
|
conf.ignore_noupdate = true;
|
|
|
|
{
|
|
const auto start{time({})};
|
|
g_assert_true(idx.start(conf));
|
|
while (idx.is_running())
|
|
g_usleep(10000);
|
|
|
|
g_assert_false(idx.is_running());
|
|
g_assert_true(idx.stop());
|
|
|
|
g_assert_cmpuint(idx.completed() - start, <, 5);
|
|
|
|
g_assert_false(prog.running);
|
|
g_assert_cmpuint(prog.checked,==, 14);
|
|
g_assert_cmpuint(prog.updated,==, 14);
|
|
g_assert_cmpuint(prog.removed,==, 0);
|
|
|
|
g_assert_cmpuint(store->size(),==,14);
|
|
}
|
|
|
|
conf.lazy_check = true;
|
|
conf.max_threads = 1;
|
|
conf.ignore_noupdate = false;
|
|
|
|
{
|
|
const auto start{time({})};
|
|
g_assert_true(idx.start(conf));
|
|
while (idx.is_running())
|
|
g_usleep(10000);
|
|
|
|
g_assert_false(idx.is_running());
|
|
g_assert_true(idx.stop());
|
|
|
|
g_assert_cmpuint(idx.completed() - start, <, 3);
|
|
|
|
g_assert_false(prog.running);
|
|
g_assert_cmpuint(prog.checked,==, 0);
|
|
g_assert_cmpuint(prog.updated,==, 0);
|
|
g_assert_cmpuint(prog.removed,==, 0);
|
|
|
|
g_assert_cmpuint(store->size(),==, 14);
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
test_index_lazy()
|
|
{
|
|
allow_warnings();
|
|
|
|
TempDir tdir;
|
|
auto store = Store::make_new(tdir.path(), MU_TESTMAILDIR2);
|
|
assert_valid_result(store);
|
|
g_assert_true(store->empty());
|
|
Indexer& idx{store->indexer()};
|
|
|
|
Indexer::Config conf{};
|
|
conf.lazy_check = true;
|
|
conf.ignore_noupdate = false;
|
|
|
|
const auto start{time({})};
|
|
g_assert_true(idx.start(conf));
|
|
while (idx.is_running())
|
|
g_usleep(10000);
|
|
|
|
g_assert_false(idx.is_running());
|
|
g_assert_true(idx.stop());
|
|
|
|
g_assert_cmpuint(idx.completed() - start, <, 3);
|
|
|
|
const auto& prog{idx.progress()};
|
|
g_assert_false(prog.running);
|
|
g_assert_cmpuint(prog.checked,==, 6);
|
|
g_assert_cmpuint(prog.updated,==, 6);
|
|
g_assert_cmpuint(prog.removed,==, 0);
|
|
|
|
g_assert_cmpuint(store->size(),==, 6);
|
|
}
|
|
|
|
static void
|
|
test_index_cleanup()
|
|
{
|
|
allow_warnings();
|
|
|
|
TempDir tdir;
|
|
auto mdir = join_paths(tdir.path(), "Test");
|
|
{
|
|
auto res = run_command({"cp", "-r", MU_TESTMAILDIR2, mdir});
|
|
assert_valid_result(res);
|
|
g_assert_cmpuint(res->exit_code,==, 0);
|
|
}
|
|
|
|
auto store = Store::make_new(tdir.path(), mdir);
|
|
assert_valid_result(store);
|
|
g_assert_true(store->empty());
|
|
Indexer& idx{store->indexer()};
|
|
|
|
Indexer::Config conf{};
|
|
conf.ignore_noupdate = true;
|
|
|
|
g_assert_true(idx.start(conf));
|
|
while (idx.is_running())
|
|
g_usleep(10000);
|
|
|
|
g_assert_false(idx.is_running());
|
|
g_assert_true(idx.stop());
|
|
g_assert_cmpuint(store->size(),==, 14);
|
|
|
|
// remove a message
|
|
{
|
|
auto mpath = join_paths(mdir, "bar", "cur", "mail6");
|
|
auto res = run_command({"rm", mpath});
|
|
assert_valid_result(res);
|
|
g_assert_cmpuint(res->exit_code,==, 0);
|
|
}
|
|
|
|
// no cleanup, # stays the same
|
|
conf.cleanup = false;
|
|
g_assert_true(idx.start(conf));
|
|
while (idx.is_running())
|
|
g_usleep(10000);
|
|
g_assert_false(idx.is_running());
|
|
g_assert_true(idx.stop());
|
|
g_assert_cmpuint(store->size(),==, 14);
|
|
|
|
// cleanup, message is gone from store.
|
|
conf.cleanup = true;
|
|
g_assert_true(idx.start(conf));
|
|
while (idx.is_running())
|
|
g_usleep(10000);
|
|
g_assert_false(idx.is_running());
|
|
g_assert_true(idx.stop());
|
|
g_assert_cmpuint(store->size(),==, 13);
|
|
}
|
|
|
|
|
|
int
|
|
main(int argc, char* argv[])
|
|
{
|
|
mu_test_init(&argc, &argv);
|
|
|
|
g_test_add_func("/index/basic", test_index_basic);
|
|
g_test_add_func("/index/lazy", test_index_lazy);
|
|
g_test_add_func("/index/cleanup", test_index_cleanup);
|
|
|
|
return g_test_run();
|
|
|
|
}
|
|
#endif /*BUILD_TESTS*/
|