store/indexer: move transaction handling to store

Move the transaction handling code inside Store, simplifying the indexer.
This commit is contained in:
Dirk-Jan C. Binnema
2021-11-09 22:43:11 +02:00
parent 48d3f9cfab
commit 4c0d8572d8
3 changed files with 56 additions and 78 deletions

View File

@ -70,13 +70,12 @@ struct Indexer::Private {
[this](auto&& path, auto&& statbuf, auto&& info) { [this](auto&& path, auto&& statbuf, auto&& info) {
return handler(path, statbuf, info); return handler(path, statbuf, info);
}}, }},
max_message_size_{store_.metadata().max_message_size}, max_message_size_{store_.metadata().max_message_size}
batch_size_{store_.metadata().batch_size}
{ {
g_message("created indexer for %s -> %s (batch-size: %zu)", g_message("created indexer for %s -> %s (batch-size: %zu)",
store.metadata().root_maildir.c_str(), store.metadata().root_maildir.c_str(),
store.metadata().database_path.c_str(), store.metadata().database_path.c_str(),
batch_size_); store.metadata().batch_size);
} }
~Private() { stop(); } ~Private() { stop(); }
@ -106,8 +105,6 @@ struct Indexer::Private {
Progress progress_; Progress progress_;
IndexState state_; IndexState state_;
const size_t batch_size_; /**< Max number of messages added before
* committing */
std::mutex lock_, wlock_; std::mutex lock_, wlock_;
}; };
@ -194,11 +191,6 @@ Indexer::Private::worker()
g_debug("started worker"); g_debug("started worker");
/* note that transaction starting/committing is opportunistic,
* and are NOPs if we are already/not in a transaction */
store_.begin_transaction();
while (state_ == IndexState::Scanning || !fq_.empty()) { while (state_ == IndexState::Scanning || !fq_.empty()) {
if (!fq_.pop(item, 250ms)) if (!fq_.pop(item, 250ms))
continue; continue;
@ -209,22 +201,15 @@ Indexer::Private::worker()
try { try {
std::unique_lock lock{lock_}; std::unique_lock lock{lock_};
store_.add_message(item); store_.add_message(item, true /*use-transaction*/);
++progress_.updated; ++progress_.updated;
if (progress_.updated % batch_size_ == 0) {
store_.commit_transaction();
store_.begin_transaction();
}
} catch (const Mu::Error& er) { } catch (const Mu::Error& er) {
g_warning("error adding message @ %s: %s", item.c_str(), er.what()); g_warning("error adding message @ %s: %s", item.c_str(), er.what());
} }
maybe_start_worker(); maybe_start_worker();
} }
store_.commit_transaction();
} }
bool bool
@ -246,7 +231,6 @@ Indexer::Private::cleanup()
return state_ == IndexState::Cleaning; return state_ == IndexState::Cleaning;
}); });
// No need for transactions here, remove_messages does that for us.
if (orphans.empty()) if (orphans.empty())
g_debug("nothing to clean up"); g_debug("nothing to clean up");
else { else {
@ -295,6 +279,8 @@ Indexer::Private::start(const Indexer::Config& conf)
while (!fq_.empty()) while (!fq_.empty())
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
store_.commit();
if (conf_.cleanup) { if (conf_.cleanup) {
g_debug("starting cleanup"); g_debug("starting cleanup");
state_.change_to(IndexState::Cleaning); state_.change_to(IndexState::Cleaning);

View File

@ -139,8 +139,7 @@ struct Store::Private {
xapian_try([&] { xapian_try([&] {
writable_db().set_metadata(ContactsKey, contacts_.serialize()); writable_db().set_metadata(ContactsKey, contacts_.serialize());
}); });
if (in_transaction_) transaction_maybe_commit(true /*force*/);
commit_transaction();
} }
} }
@ -181,32 +180,34 @@ struct Store::Private {
return dynamic_cast<Xapian::WritableDatabase&>(*db_.get()); return dynamic_cast<Xapian::WritableDatabase&>(*db_.get());
} }
void begin_transaction() noexcept // If not started yet, start a transaction. Otherwise, just update the transaction size.
void transaction_inc() noexcept
{ {
if (mdata_.in_memory) if (mdata_.in_memory)
return; // not supported in the in-memory backend. return; // not supported
g_return_if_fail(!in_transaction_); if (transaction_size_ == 0) {
g_debug("starting transaction"); g_debug("starting transaction");
xapian_try([this] { xapian_try([this] { writable_db().begin_transaction(); });
writable_db().begin_transaction(); }
in_transaction_ = true; ++transaction_size_;
});
} }
void commit_transaction() noexcept // Opportunistically commit a transaction if the transaction size
// filled up a batch, or with force.
void transaction_maybe_commit(bool force = false) noexcept
{ {
if (mdata_.in_memory) if (mdata_.in_memory || transaction_size_ == 0)
return; // not supported in the in-memory backend. return; // not supported or not in transaction
g_return_if_fail(in_transaction_); if (force || transaction_size_ >= mdata_.batch_size) {
g_debug("committing modification(s)"); g_debug("committing transaction (n=%zu)", transaction_size_);
xapian_try([this] { xapian_try([this] {
if (in_transaction_)
writable_db().commit_transaction(); writable_db().commit_transaction();
in_transaction_ = false; transaction_size_ = 0;
}); });
} }
}
void add_synonyms() void add_synonyms()
{ {
@ -279,7 +280,7 @@ struct Store::Private {
Contacts contacts_; Contacts contacts_;
std::unique_ptr<Indexer> indexer_; std::unique_ptr<Indexer> indexer_;
std::atomic<bool> in_transaction_{}; size_t transaction_size_{};
std::mutex lock_; std::mutex lock_;
}; };
@ -406,7 +407,7 @@ maildir_from_path(const std::string& root, const std::string& path)
} }
unsigned unsigned
Store::add_message(const std::string& path) Store::add_message(const std::string& path, bool use_transaction)
{ {
LOCKED; LOCKED;
@ -418,7 +419,14 @@ Store::add_message(const std::string& path)
"failed to create message: %s", "failed to create message: %s",
gerr ? gerr->message : "something went wrong"}; gerr ? gerr->message : "something went wrong"};
if (use_transaction)
priv_->transaction_inc();
const auto docid{priv_->add_or_update_msg(0, msg)}; const auto docid{priv_->add_or_update_msg(0, msg)};
if (use_transaction) /* commit if batch is full */
priv_->transaction_maybe_commit();
mu_msg_unref(msg); mu_msg_unref(msg);
if (G_UNLIKELY(docid == InvalidId)) if (G_UNLIKELY(docid == InvalidId))
@ -461,16 +469,17 @@ Store::remove_message(const std::string& path)
void void
Store::remove_messages(const std::vector<Store::Id>& ids) Store::remove_messages(const std::vector<Store::Id>& ids)
{ {
begin_transaction(); LOCKED;
priv_->transaction_inc();
xapian_try([&] { xapian_try([&] {
LOCKED;
for (auto&& id : ids) { for (auto&& id : ids) {
priv_->writable_db().delete_document(id); priv_->writable_db().delete_document(id);
} }
}); });
commit_transaction(); priv_->transaction_maybe_commit(true /*force*/);
} }
time_t time_t
@ -547,6 +556,13 @@ Store::for_each_message_path(Store::ForEachMessageFunc msg_func) const
return n; return n;
} }
void
Store::commit()
{
LOCKED;
priv_->transaction_maybe_commit(true /*force*/);
}
static MuMsgFieldId static MuMsgFieldId
field_id(const std::string& field) field_id(const std::string& field)
{ {
@ -585,26 +601,6 @@ Store::for_each_term(const std::string& field, Store::ForEachTermFunc func) cons
return n; return n;
} }
void
Store::begin_transaction()
{
xapian_try([&] {
LOCKED;
if (!priv_->in_transaction_)
priv_->begin_transaction();
});
}
void
Store::commit_transaction()
{
xapian_try([&] {
LOCKED;
if (priv_->in_transaction_)
priv_->commit_transaction();
});
}
static void static void
add_terms_values_date(Xapian::Document& doc, MuMsg* msg, MuMsgFieldId mfid) add_terms_values_date(Xapian::Document& doc, MuMsg* msg, MuMsgFieldId mfid)
{ {

View File

@ -136,13 +136,16 @@ public:
Indexer& indexer(); Indexer& indexer();
/** /**
* Add a message to the store. * Add a message to the store. When planning to write many messages,
* it's much faster to do so in a transaction. If so, set
* @in_transaction to true. When done with adding messages, call commit().
* *
* @param path the message path. * @param path the message path.
* @param whether to bundle up to batch_size changes in a transaction
* *
* @return the doc id of the added message * @return the doc id of the added message
*/ */
Id add_message(const std::string& path); Id add_message(const std::string& path, bool use_transaction = false);
/** /**
* Update a message in the store. * Update a message in the store.
@ -279,17 +282,10 @@ public:
bool empty() const; bool empty() const;
/** /**
* Start a Xapian transaction, opportunistically. If a transaction * Commit the current batch of modifications to disk, opportunistically.
* is already underway, do nothing. * If no transaction is underway, do nothing.
*/ */
void begin_transaction(); void commit();
/**
* Commit the current group of modifications (i.e., transaction) to
* disk, opportunistically. If no transaction is underway, do
* nothing.
*/
void commit_transaction();
/** /**
* Get a reference to the private data. For internal use. * Get a reference to the private data. For internal use.