xapian-db: remove locks, transaction levels

Simplify xapian-db: locks should go elsewhere; transaction level add
too much complication.
This commit is contained in:
Dirk-Jan C. Binnema
2024-05-26 11:42:50 +03:00
parent e978a58400
commit c05b28e761
4 changed files with 66 additions and 117 deletions

View File

@ -324,8 +324,6 @@ Indexer::Private::cleanup()
void void
Indexer::Private::scan_worker() Indexer::Private::scan_worker()
{ {
XapianDb::Transaction tx{store_.xapian_db()}; // RAII
progress_.reset(); progress_.reset();
if (conf_.scan) { if (conf_.scan) {
mu_debug("starting scanner"); mu_debug("starting scanner");

View File

@ -382,10 +382,12 @@ Store::remove_messages(const std::vector<Store::Id>& ids)
{ {
std::lock_guard guard{priv_->lock_}; std::lock_guard guard{priv_->lock_};
XapianDb::Transaction tx (xapian_db()); // RAII xapian_db().request_transaction();
for (auto&& id : ids) for (auto&& id : ids)
xapian_db().delete_document(id); xapian_db().delete_document(id);
xapian_db().request_commit(true/*force*/);
} }

View File

@ -41,6 +41,7 @@ XapianDb::wdb()
{ {
if (read_only()) if (read_only())
throw std::runtime_error("database is read-only"); throw std::runtime_error("database is read-only");
return std::get<Xapian::WritableDatabase>(db_); return std::get<Xapian::WritableDatabase>(db_);
} }
@ -106,7 +107,7 @@ XapianDb::XapianDb(const std::string& db_path, Flavor flavor):
if (flavor == Flavor::CreateOverwrite) if (flavor == Flavor::CreateOverwrite)
set_timestamp(MetadataIface::created_key); set_timestamp(MetadataIface::created_key);
mu_debug("created {} / {} (batch-size: {})", flavor, *this, batch_size_); mu_debug("created {}", *this);
} }
void void

View File

@ -24,6 +24,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <mutex> #include <mutex>
#include <thread>
#include <functional> #include <functional>
#include <unordered_map> #include <unordered_map>
@ -186,10 +187,8 @@ private:
/** /**
* Fairly thin wrapper around Xapian::Database and Xapian::WritableDatabase * Fairly thin wrapper around Xapian::Database and Xapian::WritableDatabase
* with just the things we need + locking + exception handling
*/ */
class XapianDb: public MetadataIface { class XapianDb: public MetadataIface {
#define DB_LOCKED std::unique_lock lock__{lock_};
public: public:
/** /**
* Type of database to create. * Type of database to create.
@ -202,7 +201,7 @@ public:
}; };
/** /**
* XapianDb CTOR. This may throw some Xapian exception. * XapianDb CTOR. This may throw.
* *
* @param db_path path to the database * @param db_path path to the database
* @param flavor kind of database * @param flavor kind of database
@ -213,12 +212,8 @@ public:
* DTOR * DTOR
*/ */
~XapianDb() { ~XapianDb() {
if (tx_level_ > 0) if (!read_only())
mu_warning("inconsistent transaction level ({})", tx_level_); request_commit(true/*force*/);
if (tx_level_ > 0) {
mu_debug("closing db after committing {} change(s)", changes_);
xapian_try([this]{ DB_LOCKED; wdb().commit_transaction(); });
} else
mu_debug("closing db"); mu_debug("closing db");
} }
@ -260,7 +255,7 @@ public:
*/ */
size_t size() const noexcept { size_t size() const noexcept {
return xapian_try([this]{ return xapian_try([this]{
DB_LOCKED; return db().get_doccount(); }, 0); return db().get_doccount(); }, 0);
} }
/** /**
@ -276,7 +271,7 @@ public:
* @return an enquire object * @return an enquire object
*/ */
Xapian::Enquire enquire() const { Xapian::Enquire enquire() const {
DB_LOCKED; return Xapian::Enquire(db()); return Xapian::Enquire(db());
} }
/** /**
@ -288,7 +283,7 @@ public:
*/ */
Result<Xapian::Document> document(Xapian::docid id) const { Result<Xapian::Document> document(Xapian::docid id) const {
return xapian_try_result([&]{ return xapian_try_result([&]{
DB_LOCKED; return Ok(db().get_document(id)); }); return Ok(db().get_document(id)); });
} }
/** /**
@ -300,7 +295,7 @@ public:
*/ */
std::string metadata(const std::string& key) const override { std::string metadata(const std::string& key) const override {
return xapian_try([&]{ return xapian_try([&]{
DB_LOCKED; return db().get_metadata(key);}, ""); return db().get_metadata(key);}, "");
} }
/** /**
@ -310,8 +305,8 @@ public:
* @param val new value for key * @param val new value for key
*/ */
void set_metadata(const std::string& key, const std::string& val) override { void set_metadata(const std::string& key, const std::string& val) override {
xapian_try([&] { DB_LOCKED; wdb().set_metadata(key, val); xapian_try([&] { wdb().set_metadata(key, val);
maybe_commit(); }); maybe_commit();});
} }
/** /**
@ -323,7 +318,6 @@ public:
//using each_func = MetadataIface::each_func; //using each_func = MetadataIface::each_func;
void for_each(MetadataIface::each_func&& func) const override { void for_each(MetadataIface::each_func&& func) const override {
xapian_try([&]{ xapian_try([&]{
DB_LOCKED;
for (auto&& it = db().metadata_keys_begin(); for (auto&& it = db().metadata_keys_begin();
it != db().metadata_keys_end(); ++it) it != db().metadata_keys_end(); ++it)
func(*it, db().get_metadata(*it)); func(*it, db().get_metadata(*it));
@ -339,7 +333,7 @@ public:
*/ */
bool term_exists(const std::string& term) const { bool term_exists(const std::string& term) const {
return xapian_try([&]{ return xapian_try([&]{
DB_LOCKED; return db().term_exists(term);}, false); return db().term_exists(term);}, false);
} }
/** /**
@ -351,7 +345,6 @@ public:
*/ */
Result<Xapian::docid> add_document(const Xapian::Document& doc) { Result<Xapian::docid> add_document(const Xapian::Document& doc) {
return xapian_try_result([&]{ return xapian_try_result([&]{
DB_LOCKED;
auto&& id{wdb().add_document(doc)}; auto&& id{wdb().add_document(doc)};
set_timestamp(MetadataIface::last_change_key); set_timestamp(MetadataIface::last_change_key);
maybe_commit(); maybe_commit();
@ -369,9 +362,9 @@ public:
* @return new docid or an error * @return new docid or an error
*/ */
Result<Xapian::docid> Result<Xapian::docid>
replace_document(const std::string& term, const Xapian::Document& doc) { replace_document(const std::string& term,
const Xapian::Document& doc) {
return xapian_try_result([&]{ return xapian_try_result([&]{
DB_LOCKED;
auto&& id{wdb().replace_document(term, doc)}; auto&& id{wdb().replace_document(term, doc)};
set_timestamp(MetadataIface::last_change_key); set_timestamp(MetadataIface::last_change_key);
maybe_commit(); maybe_commit();
@ -379,9 +372,9 @@ public:
}); });
} }
Result<Xapian::docid> Result<Xapian::docid>
replace_document(Xapian::docid id, const Xapian::Document& doc) { replace_document(Xapian::docid id,
const Xapian::Document& doc) {
return xapian_try_result([&]{ return xapian_try_result([&]{
DB_LOCKED;
wdb().replace_document(id, doc); wdb().replace_document(id, doc);
set_timestamp(MetadataIface::last_change_key); set_timestamp(MetadataIface::last_change_key);
maybe_commit(); maybe_commit();
@ -398,7 +391,6 @@ public:
*/ */
Result<void> delete_document(const std::string& term) { Result<void> delete_document(const std::string& term) {
return xapian_try_result([&]{ return xapian_try_result([&]{
DB_LOCKED;
wdb().delete_document(term); wdb().delete_document(term);
set_timestamp(MetadataIface::last_change_key); set_timestamp(MetadataIface::last_change_key);
maybe_commit(); maybe_commit();
@ -407,7 +399,6 @@ public:
} }
Result<void> delete_document(Xapian::docid id) { Result<void> delete_document(Xapian::docid id) {
return xapian_try_result([&]{ return xapian_try_result([&]{
DB_LOCKED;
wdb().delete_document(id); wdb().delete_document(id);
set_timestamp(MetadataIface::last_change_key); set_timestamp(MetadataIface::last_change_key);
maybe_commit(); maybe_commit();
@ -417,7 +408,6 @@ public:
template<typename Func> template<typename Func>
size_t all_terms(const std::string& prefix, Func&& func) const { size_t all_terms(const std::string& prefix, Func&& func) const {
DB_LOCKED;
size_t n{}; size_t n{};
for (auto it = db().allterms_begin(prefix); it != db().allterms_end(prefix); ++it) { for (auto it = db().allterms_begin(prefix); it != db().allterms_end(prefix); ++it) {
if (!func(*it)) if (!func(*it))
@ -427,110 +417,69 @@ public:
return n; return n;
} }
/*
* If the "transaction ref count" > 0 (with inc_transactions());, we run
* in "transaction mode". That means that the subsequent Xapian mutation
* are part of a transactions, which is flushed when the number of
* changes reaches the batch size, _or_ the transaction ref count is
* decreased to 0 (dec_transactions()). *
*/
/** /**
* Increase the transaction level; needs to be balance by dec_transactions() * Requests a transaction to be started; this is only
*/ * a request, which may not be granted.
void inc_transaction_level() {
xapian_try([this]{
DB_LOCKED;
if (tx_level_ == 0) {// need to start the Xapian transaction?
mu_debug("begin transaction");
wdb().begin_transaction();
}
++tx_level_;
mu_debug("ind'd tx level to {}", tx_level_);
});
}
/**
* Decrease the transaction level (to balance inc_transactions())
* *
* If the level reach 0, perform a Xapian commit. * If you're already in a transaction but that transaction
* was started in another thread, that transaction will be
* committed before starting a new one.
*
* Otherwise, start a transaction if you're not already in one.
*
* @return A result; either true if a transaction was started; false
* otherwise, or an error.
*/ */
void dec_transaction_level() { Result<bool> request_transaction() {
xapian_try([this]{ return xapian_try_result([this]() {
DB_LOCKED; auto& db = wdb();
if (tx_level_ == 0) { if (in_transaction())
mu_critical("cannot dec transaction-level)"); return Ok(false); // nothing to
throw std::runtime_error("cannot dec transactions");
}
--tx_level_; db.begin_transaction();
if (tx_level_ == 0) {// need to commit the Xapian transaction? mu_debug("begin transaction");
mu_debug("committing {} changes", changes_); in_transaction_ = true;
changes_ = 0; return Ok(true);
wdb().commit_transaction();
}
mu_debug("dec'd tx level to {}", tx_level_);
}); });
} }
/**
* Explicitly request the Xapian DB to be committed to disk. This won't
* do anything when not in a transaction.
*
* @param force whether to force-commit
*/
void request_commit(bool force = false) { request_commit(wdb(), force); }
void maybe_commit() { request_commit(false); }
/** /**
* Are we inside a transaction? * Are we inside a transaction?
* *
* @return true or false * @return true or false
*/ */
bool in_transaction() const { DB_LOCKED; return tx_level_ > 0; } bool in_transaction() const { return in_transaction_; }
/**
* RAII Transaction object
*
*/
struct Transaction {
Transaction(XapianDb& db): db_{db} {
db_.inc_transaction_level();
}
~Transaction() {
db_.dec_transaction_level();
}
private:
XapianDb& db_;
};
/**
* Manually request the Xapian DB to be committed to disk. This won't
* do anything while in a transaction.
*/
void commit() {
xapian_try([this]{
DB_LOCKED;
if (tx_level_ == 0) {
mu_info("committing xapian-db @ {}", path_);
wdb().commit();
} else
mu_debug("not committing while in transaction");
});
}
using DbType = std::variant<Xapian::Database, Xapian::WritableDatabase>; using DbType = std::variant<Xapian::Database, Xapian::WritableDatabase>;
private:
private:
/** /**
* To be called after all changes, with DB_LOCKED held. * To be called after all changes, with DB_LOCKED held.
*/ */
void maybe_commit() { void request_commit(Xapian::WritableDatabase& db, bool force) {
// in transaction-mode and enough changes, commit them // in transaction-mode and enough changes, commit them
// and start a new transaction if (!in_transaction())
if (tx_level_ > 0 && ++changes_ >= batch_size_) { return;
mu_debug("batch full ({}/{}); committing change", changes_, batch_size_); if ((++changes_ < batch_size_) && !force)
wdb().commit_transaction(); return;
wdb().commit(); xapian_try([&]{
--tx_level_; mu_debug("committing transaction with {} changes; "
"forced={}", changes_, force ? "yes" : "no");
db.commit_transaction();
db.commit();
changes_ = 0; changes_ = 0;
wdb().begin_transaction(); in_transaction_ = {};
++tx_level_; });
}
} }
void set_timestamp(const std::string_view key); void set_timestamp(const std::string_view key);
@ -549,12 +498,11 @@ private:
*/ */
Xapian::WritableDatabase& wdb(); Xapian::WritableDatabase& wdb();
mutable std::mutex lock_;
std::string path_; std::string path_;
DbType db_; DbType db_;
size_t tx_level_{};
size_t batch_size_;
size_t changes_{}; size_t changes_{};
bool in_transaction_{};
size_t batch_size_;
}; };
constexpr std::string_view constexpr std::string_view