index: save/commit metadata after messages

Ensure the metadata (dirstamps) for messages are only written / committed _after_
the accompanying message have been written / committed.

This avoids missing updates when indexing gets terminated unexpectedly.
This commit is contained in:
Dirk-Jan C. Binnema
2022-02-03 22:51:02 +02:00
parent 12658a3dc6
commit 05393ba797
5 changed files with 174 additions and 113 deletions

View File

@ -22,6 +22,7 @@
#include <config.h>
#include <atomic>
#include <algorithm>
#include <mutex>
#include <vector>
#include <thread>
@ -39,29 +40,41 @@ using namespace std::chrono_literals;
using namespace Mu;
struct IndexState {
enum State { Idle, Scanning, Cleaning };
enum State { Idle,
Scanning,
Cleaning };
static const char* name(State s)
{
switch (s) {
case Idle: return "idle";
case Scanning: return "scanning";
case Cleaning: return "cleaning";
default: return "<error>";
case Idle:
return "idle";
case Scanning:
return "scanning";
case Cleaning:
return "cleaning";
default:
return "<error>";
}
}
bool operator==(State rhs) const { return state_ == rhs; }
bool operator==(State rhs) const
{
return state_ == rhs;
}
bool operator!=(State rhs) const
{
return state_ != rhs;
}
void change_to(State new_state)
{
g_debug("changing indexer state %s->%s",
name((State)state_),
g_debug("changing indexer state %s->%s", name((State)state_),
name((State)new_state));
state_ = new_state;
}
private:
State state_{Idle};
std::atomic<State> state_{Idle};
};
struct Indexer::Private {
@ -74,17 +87,20 @@ struct Indexer::Private {
{
g_message("created indexer for %s -> %s (batch-size: %zu)",
store.metadata().root_maildir.c_str(),
store.metadata().database_path.c_str(),
store.metadata().batch_size);
store.metadata().database_path.c_str(), store.metadata().batch_size);
}
~Private() { stop(); }
~Private()
{
stop();
}
bool dir_predicate(const std::string& path, const struct dirent* dirent) const;
bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype);
void maybe_start_worker();
void worker();
void item_worker();
void scan_worker();
bool cleanup();
@ -101,17 +117,24 @@ struct Indexer::Private {
std::vector<std::thread> workers_;
std::thread scanner_worker_;
AsyncQueue<std::string> fq_;
struct WorkItem {
std::string full_path;
enum Type {
Dir,
File
};
Type type;
};
AsyncQueue<WorkItem> todos_;
Progress progress_;
IndexState state_;
std::mutex lock_, wlock_;
std::mutex lock_, w_lock_;
};
bool
Indexer::Private::handler(const std::string& fullpath,
struct stat* statbuf,
Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
Scanner::HandleType htype)
{
switch (htype) {
@ -124,8 +147,7 @@ Indexer::Private::handler(const std::string& fullpath,
dirstamp_ = store_.dirstamp(fullpath);
if (conf_.lazy_check && dirstamp_ >= statbuf->st_mtime &&
htype == Scanner::HandleType::EnterNewCur) {
g_debug("skip %s (seems up-to-date: %s >= %s)",
fullpath.c_str(),
g_debug("skip %s (seems up-to-date: %s >= %s)", fullpath.c_str(),
time_to_string("%FT%T", dirstamp_).c_str(),
time_to_string("%FT%T", statbuf->st_mtime).c_str());
return false;
@ -152,7 +174,7 @@ Indexer::Private::handler(const std::string& fullpath,
return true;
}
case Scanner::HandleType::LeaveDir: {
store_.set_dirstamp(fullpath, ::time(NULL));
todos_.push({fullpath, WorkItem::Type::Dir});
return true;
}
@ -160,8 +182,7 @@ Indexer::Private::handler(const std::string& fullpath,
++progress_.checked;
if ((size_t)statbuf->st_size > max_message_size_) {
g_debug("skip %s (too big: %" G_GINT64_FORMAT " bytes)",
fullpath.c_str(),
g_debug("skip %s (too big: %" G_GINT64_FORMAT " bytes)", fullpath.c_str(),
(gint64)statbuf->st_size);
return false;
}
@ -175,44 +196,59 @@ Indexer::Private::handler(const std::string& fullpath,
// push the remaining messages to our "todo" queue for
// (re)parsing and adding/updating to the database.
fq_.push(std::string{fullpath});
todos_.push({fullpath, WorkItem::Type::File});
return true;
}
default: g_return_val_if_reached(false); return false;
default:
g_return_val_if_reached(false);
return false;
}
}
void
Indexer::Private::maybe_start_worker()
{
std::lock_guard<std::mutex> wlock{wlock_};
std::lock_guard lock{w_lock_};
if (fq_.size() > workers_.size() && workers_.size() < max_workers_)
workers_.emplace_back(std::thread([this] { worker(); }));
if (todos_.size() > workers_.size() && workers_.size() < max_workers_) {
workers_.emplace_back(std::thread([this] { item_worker(); }));
g_debug("added worker %zu", workers_.size());
}
}
void
Indexer::Private::worker()
Indexer::Private::item_worker()
{
std::string item;
WorkItem item;
g_debug("started worker");
while (state_ == IndexState::Scanning) {
if (!fq_.pop(item, 250ms))
if (!todos_.pop(item, 250ms))
continue;
try {
std::unique_lock lock{lock_};
store_.add_message(item, true /*use-transaction*/);
std::lock_guard lock{w_lock_};
switch (item.type) {
case WorkItem::Type::File:
store_.add_message(item.full_path,
true /*use-transaction*/);
++progress_.updated;
break;
case WorkItem::Type::Dir:
store_.set_dirstamp(item.full_path, ::time(NULL),
true /*use-transaction*/);
break;
default:
g_warn_if_reached();
break;
}
} catch (const Mu::Error& er) {
g_warning("error adding message @ %s: %s", item.c_str(), er.what());
g_warning("error adding message @ %s: %s",
item.full_path.c_str(), er.what());
}
maybe_start_worker();
std::this_thread::yield();
}
}
@ -227,8 +263,7 @@ Indexer::Private::cleanup()
++n;
if (::access(path.c_str(), R_OK) != 0) {
g_debug("cannot read %s (id=%u); queueing for removal from store",
path.c_str(),
id);
path.c_str(), id);
orphans.emplace_back(id);
}
@ -246,29 +281,9 @@ Indexer::Private::cleanup()
return true;
}
bool
Indexer::Private::start(const Indexer::Config& conf)
void
Indexer::Private::scan_worker()
{
stop();
conf_ = conf;
if (conf_.max_threads == 0)
max_workers_ = std::thread::hardware_concurrency();
else
max_workers_ = conf.max_threads;
g_debug("starting indexer with <= %zu worker thread(s)", max_workers_);
g_debug("indexing: %s; clean-up: %s",
conf_.scan ? "yes" : "no",
conf_.cleanup ? "yes" : "no");
state_.change_to(IndexState::Scanning);
{
/* kick off the first worker, which will spawn more if needed. */
std::lock_guard<std::mutex> wlock{wlock_};
workers_.emplace_back(std::thread([this] { worker(); }));
}
scanner_worker_ = std::thread([this] {
progress_ = {};
if (conf_.scan) {
@ -277,13 +292,16 @@ Indexer::Private::start(const Indexer::Config& conf)
g_warning("failed to start scanner");
goto leave;
}
g_debug("scanner finished with %zu file(s) in queue", fq_.size());
g_debug("scanner finished with %zu file(s) in queue", todos_.size());
}
// now there may still be messages in the work queue...
// finish those; this is a bit ugly; perhaps we should
// handle SIGTERM etc.
while (!fq_.empty())
if (!todos_.empty())
g_debug("process %zu remaining message(s) with %zu worker(s)", todos_.size(),
workers_.size());
while (!todos_.empty())
std::this_thread::sleep_for(100ms);
store_.commit();
@ -296,9 +314,34 @@ Indexer::Private::start(const Indexer::Config& conf)
}
leave:
state_.change_to(IndexState::Idle);
});
}
bool
Indexer::Private::start(const Indexer::Config& conf)
{
stop();
conf_ = conf;
if (conf_.max_threads == 0) {
/* we're blocked mostly by a) filesystem and b) database;
* so it's not very useful to start many threads */
max_workers_ = std::max(
4U, std::thread::hardware_concurrency());
} else
max_workers_ = conf.max_threads;
g_debug("starting indexer with <= %zu worker thread(s)", max_workers_);
g_debug("indexing: %s; clean-up: %s", conf_.scan ? "yes" : "no",
conf_.cleanup ? "yes" : "no");
state_.change_to(IndexState::Scanning);
/* kick off the first worker, which will spawn more if needed. */
workers_.emplace_back(std::thread([this] { item_worker(); }));
/* kick the disk-scanner thread */
scanner_worker_ = std::thread([this] { scan_worker(); });
g_debug("started indexer");
return true;
}
@ -306,26 +349,24 @@ bool
Indexer::Private::stop()
{
scanner_.stop();
state_.change_to(IndexState::Idle);
const auto w_n = workers_.size();
fq_.clear();
todos_.clear();
if (scanner_worker_.joinable())
scanner_worker_.join();
state_.change_to(IndexState::Idle);
for (auto&& w : workers_)
if (w.joinable())
w.join();
workers_.clear();
if (w_n > 0)
g_debug("stopped indexer (joined %zu worker(s))", w_n);
return true;
}
Indexer::Indexer(Store& store) : priv_{std::make_unique<Private>(store)} {}
Indexer::Indexer(Store& store)
: priv_{std::make_unique<Private>(store)}
{
}
Indexer::~Indexer() = default;
@ -338,7 +379,7 @@ Indexer::start(const Indexer::Config& conf)
return false;
}
std::lock_guard<std::mutex> l(priv_->lock_);
std::lock_guard lock(priv_->lock_);
if (is_running())
return true;
@ -348,7 +389,7 @@ Indexer::start(const Indexer::Config& conf)
bool
Indexer::stop()
{
std::lock_guard<std::mutex> l(priv_->lock_);
std::lock_guard lock{priv_->lock_};
if (!is_running())
return true;
@ -360,7 +401,7 @@ Indexer::stop()
bool
Indexer::is_running() const
{
return !(priv_->state_ == IndexState::Idle) || !priv_->fq_.empty();
return priv_->state_ != IndexState::Idle;
}
Indexer::Progress

View File

@ -58,7 +58,9 @@ public:
};
/**
* Start indexing. If already underway, do nothing.
* Start indexing. If already underway, do nothing. This returns
* immediately after starting, with the work being done in the
* background.
*
* @param conf a configuration object
*

View File

@ -44,7 +44,10 @@ struct Scanner::Private {
if (!handler_)
throw Mu::Error{Error::Code::Internal, "missing handler"};
}
~Private() { stop(); }
~Private()
{
stop();
}
bool start();
bool stop();
@ -65,7 +68,8 @@ is_special_dir(const char* d_name)
}
bool
Scanner::Private::process_dentry(const std::string& path, struct dirent* dentry, bool is_maildir)
Scanner::Private::process_dentry(const std::string& path, struct dirent* dentry,
bool is_maildir)
{
const auto d_name{dentry->d_name};
@ -83,8 +87,8 @@ Scanner::Private::process_dentry(const std::string& path, struct dirent* dentry,
if (S_ISDIR(statbuf.st_mode)) {
const auto new_cur =
std::strcmp(d_name, "cur") == 0 || std::strcmp(d_name, "new") == 0;
const auto htype = new_cur ? Scanner::HandleType::EnterNewCur
: Scanner::HandleType::EnterDir;
const auto htype =
new_cur ? Scanner::HandleType::EnterNewCur : Scanner::HandleType::EnterDir;
const auto res = handler_(fullpath, &statbuf, htype);
if (!res)
return true; // skip
@ -97,12 +101,16 @@ Scanner::Private::process_dentry(const std::string& path, struct dirent* dentry,
return handler_(fullpath, &statbuf, Scanner::HandleType::File);
g_debug("skip %s (neither maildir-file nor directory)", fullpath.c_str());
return true;
}
bool
Scanner::Private::process_dir(const std::string& path, bool is_maildir)
{
if (!running_)
return true; /* we're done */
const auto dir = opendir(path.c_str());
if (G_UNLIKELY(!dir)) {
g_warning("failed to scan dir %s: %s", path.c_str(), g_strerror(errno));
@ -171,8 +179,7 @@ Scanner::Private::start()
const auto start{std::chrono::steady_clock::now()};
process_dir(root_dir_, is_maildir);
const auto elapsed = std::chrono::steady_clock::now() - start;
g_debug("finished scan of %s in %" G_GINT64_FORMAT " ms",
root_dir_.c_str(),
g_debug("finished scan of %s in %" G_GINT64_FORMAT " ms", root_dir_.c_str(),
to_ms(elapsed));
running_ = false;
@ -201,15 +208,10 @@ Scanner::~Scanner() = default;
bool
Scanner::start()
{
{
std::lock_guard<std::mutex> l(priv_->lock_);
if (priv_->running_)
return true; // nothing to do
priv_->running_ = true;
}
const auto res = priv_->start();
const auto res = priv_->start(); /* blocks */
priv_->running_ = false;
return res;
@ -218,7 +220,7 @@ Scanner::start()
bool
Scanner::stop()
{
std::lock_guard<std::mutex> l(priv_->lock_);
std::lock_guard l(priv_->lock_);
return priv_->stop();
}

View File

@ -31,6 +31,7 @@
#include <iostream>
#include <cstring>
#include <vector>
#include <xapian.h>
#include "mu-store.hh"
@ -206,9 +207,12 @@ struct Store::Private {
contacts_.serialize());
});
}
g_debug("committing transaction (n=%zu)", transaction_size_);
g_debug("committing transaction (n=%zu,%zu)",
transaction_size_, metadatas_.size());
xapian_try([this] {
writable_db().commit_transaction();
for (auto&& mdata : metadatas_)
writable_db().set_metadata(mdata.first, mdata.second);
transaction_size_ = 0;
});
}
@ -278,6 +282,10 @@ struct Store::Private {
Xapian::docid add_or_update_msg(Xapian::docid docid, MuMsg* msg);
Xapian::Document new_doc_from_message(MuMsg* msg);
/* metadata to write as part of a transaction commit */
using StringPair = std::pair<std::string, std::string>;
std::vector<StringPair> metadatas_;
const bool read_only_{};
std::unique_ptr<Xapian::Database> db_;
@ -501,7 +509,7 @@ Store::dirstamp(const std::string& path) const
}
void
Store::set_dirstamp(const std::string& path, time_t tstamp)
Store::set_dirstamp(const std::string& path, time_t tstamp, bool use_transaction)
{
std::lock_guard guard{priv_->lock_};
@ -510,9 +518,15 @@ Store::set_dirstamp(const std::string& path, time_t tstamp)
const auto len = static_cast<size_t>(
g_snprintf(data.data(), data.size(), "%zx", tstamp));
xapian_try([&] {
priv_->writable_db().set_metadata(path, std::string{data.data(), len});
});
/* set_metadata is not otherwise part of a "transaction" but we want it
* to be so, so a dirstamp _only_ gets updated when the messages in the
* dir are commited */
Private::StringPair item{path, std::string{data.data(), len}};
if (use_transaction)
priv_->metadatas_.emplace_back(std::move(item));
else
xapian_try([&] { priv_->writable_db().set_metadata(item.first, item.second); });
}
MuMsg*

View File

@ -303,8 +303,10 @@ public:
*
* @param path a filesystem path
* @param tstamp the timestamp for that path
* @param whether to do this as part of a transaction
*/
void set_dirstamp(const std::string& path, time_t tstamp);
void set_dirstamp(const std::string& path, time_t tstamp,
bool use_transaction = false);
/**
* Get the number of documents in the document database