lib: move transaction handling to mu-xapian
Instead of handling transactions in the store, handle it in xapian-db. Make the code a bit more natural / cleaner-out Handle transaction automatically (with a batch-size) and add some RAII Transaction object, which makes all database interaction transactable for the duration. So, no more need for explicit parameters to add_message while indexing.
This commit is contained in:
@ -185,8 +185,27 @@ public:
|
||||
CreateOverwrite, /**< Create new or overwrite existing */
|
||||
};
|
||||
|
||||
/**
|
||||
* XapianDb CTOR
|
||||
*
|
||||
* @param db_path path to the database
|
||||
* @param flavor kind of database
|
||||
*/
|
||||
XapianDb(const std::string& db_path, Flavor flavor);
|
||||
|
||||
/**
|
||||
* DTOR
|
||||
*/
|
||||
~XapianDb() {
|
||||
if (tx_level_ > 0)
|
||||
mu_warning("inconsistent transaction level ({})", tx_level_);
|
||||
if (tx_level_ > 0) {
|
||||
mu_debug("closing db after committing {} change(s)", changes_);
|
||||
xapian_try([this]{ DB_LOCKED; wdb().commit_transaction(); });
|
||||
} else
|
||||
mu_debug("closing db");
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the database read-only?
|
||||
*
|
||||
@ -195,7 +214,7 @@ public:
|
||||
bool read_only() const override;
|
||||
|
||||
/**
|
||||
* Path to the database; empty for in-memory database
|
||||
* Path to the database; empty for in-memory databases
|
||||
*
|
||||
* @return path to database
|
||||
*/
|
||||
@ -210,7 +229,6 @@ public:
|
||||
return db().get_description();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the number of documents (messages) in the database
|
||||
*
|
||||
@ -268,7 +286,8 @@ public:
|
||||
* @param val new value for key
|
||||
*/
|
||||
void set_metadata(const std::string& key, const std::string& val) override {
|
||||
xapian_try([&] { DB_LOCKED; wdb().set_metadata(key, val); });
|
||||
xapian_try([&] { DB_LOCKED; wdb().set_metadata(key, val);
|
||||
maybe_commit(); });
|
||||
}
|
||||
|
||||
/**
|
||||
@ -287,7 +306,6 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Does the given term exist in the database?
|
||||
*
|
||||
@ -312,6 +330,7 @@ public:
|
||||
DB_LOCKED;
|
||||
auto&& id{wdb().add_document(doc)};
|
||||
set_timestamp(MetadataIface::last_change_key);
|
||||
maybe_commit();
|
||||
return Ok(std::move(id));
|
||||
});
|
||||
}
|
||||
@ -331,6 +350,7 @@ public:
|
||||
DB_LOCKED;
|
||||
auto&& id{wdb().replace_document(term, doc)};
|
||||
set_timestamp(MetadataIface::last_change_key);
|
||||
maybe_commit();
|
||||
return Ok(std::move(id));
|
||||
});
|
||||
}
|
||||
@ -340,6 +360,7 @@ public:
|
||||
DB_LOCKED;
|
||||
wdb().replace_document(id, doc);
|
||||
set_timestamp(MetadataIface::last_change_key);
|
||||
maybe_commit();
|
||||
return Ok(std::move(id));
|
||||
});
|
||||
}
|
||||
@ -356,6 +377,7 @@ public:
|
||||
DB_LOCKED;
|
||||
wdb().delete_document(term);
|
||||
set_timestamp(MetadataIface::last_change_key);
|
||||
maybe_commit();
|
||||
return Ok();
|
||||
});
|
||||
}
|
||||
@ -364,6 +386,7 @@ public:
|
||||
DB_LOCKED;
|
||||
wdb().delete_document(id);
|
||||
set_timestamp(MetadataIface::last_change_key);
|
||||
maybe_commit();
|
||||
return Ok();
|
||||
});
|
||||
}
|
||||
@ -381,38 +404,106 @@ public:
|
||||
}
|
||||
|
||||
/*
|
||||
* transactions
|
||||
* If the "transaction ref count" > 0 (with inc_transactions());, we run
|
||||
* in "transaction mode". That means that the subsequent Xapian mutation
|
||||
* are part of a transactions, which is flushed when the number of
|
||||
* changes reaches the batch size, _or_ the transaction ref count is
|
||||
* decreased to 0 (dec_transactions()). *
|
||||
*/
|
||||
|
||||
/**
|
||||
* Start a transaction
|
||||
*
|
||||
* @param flushed
|
||||
*
|
||||
* @return Ok or Error
|
||||
* Increase the transaction level; needs to be balance by dec_transactions()
|
||||
*/
|
||||
Result<void> begin_transaction(bool flushed=true) {
|
||||
return xapian_try_result([&]{
|
||||
void inc_transaction_level() {
|
||||
xapian_try([this]{
|
||||
DB_LOCKED;
|
||||
wdb().begin_transaction(flushed);
|
||||
return Ok();
|
||||
if (tx_level_ == 0) {// need to start the Xapian transaction?
|
||||
mu_debug("begin transaction");
|
||||
wdb().begin_transaction();
|
||||
}
|
||||
++tx_level_;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit a transaction
|
||||
* Decrease the transaction level (to balance inc_transactions())
|
||||
*
|
||||
* @return Ok or Error*/
|
||||
Result<void> commit_transaction() {
|
||||
return xapian_try_result([&]{
|
||||
* If the level reach 0, perform a Xapian commit.
|
||||
*/
|
||||
void dec_transaction_level() {
|
||||
xapian_try([this]{
|
||||
DB_LOCKED;
|
||||
wdb().commit_transaction();
|
||||
return Ok();
|
||||
if (tx_level_ == 0) {
|
||||
mu_critical("cannot dec transaction-level)");
|
||||
throw std::runtime_error("cannot dec transactions");
|
||||
}
|
||||
if (tx_level_ == 1) {// need to commit the Xapian transaction?
|
||||
mu_debug("committing {} changes", changes_);
|
||||
wdb().commit_transaction();
|
||||
changes_ = 0;
|
||||
}
|
||||
--tx_level_;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Are we inside a transaction?
|
||||
*
|
||||
* @return true or false
|
||||
*/
|
||||
bool in_transaction() const { DB_LOCKED; return tx_level_ > 0; }
|
||||
|
||||
|
||||
/**
|
||||
* RAII Transaction object
|
||||
*
|
||||
*/
|
||||
struct Transaction {
|
||||
Transaction(XapianDb& db): db_{db} {
|
||||
db_.inc_transaction_level();
|
||||
}
|
||||
~Transaction() {
|
||||
db_.dec_transaction_level();
|
||||
}
|
||||
private:
|
||||
XapianDb& db_;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Manually request the Xapian DB to be committed to disk. This won't
|
||||
* do anything while in a transaction.
|
||||
*/
|
||||
void commit() {
|
||||
xapian_try([this]{
|
||||
DB_LOCKED;
|
||||
if (tx_level_ == 0) {
|
||||
mu_info("committing xapian-db @ {}", path_);
|
||||
wdb().commit();
|
||||
} else
|
||||
mu_debug("not committing while in transaction");
|
||||
});
|
||||
}
|
||||
|
||||
using DbType = std::variant<Xapian::Database, Xapian::WritableDatabase>;
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
* To be called after all changes, with DB_LOCKED held.
|
||||
*/
|
||||
void maybe_commit() {
|
||||
// in transaction-mode and enough changes, commit them
|
||||
// and start a new transaction
|
||||
if (tx_level_ > 0 && ++changes_ >= batch_size_) {
|
||||
mu_debug("batch full ({}/{}); committing change", changes_, batch_size_);
|
||||
wdb().commit_transaction();
|
||||
wdb().commit();
|
||||
--tx_level_;
|
||||
changes_ = 0;
|
||||
wdb().begin_transaction();
|
||||
}
|
||||
}
|
||||
|
||||
void set_timestamp(const std::string_view key);
|
||||
|
||||
/**
|
||||
@ -432,7 +523,12 @@ private:
|
||||
mutable std::mutex lock_;
|
||||
std::string path_;
|
||||
|
||||
DbType db_;
|
||||
DbType db_;
|
||||
bool in_tx_;
|
||||
|
||||
size_t tx_level_{};
|
||||
const size_t batch_size_;
|
||||
size_t changes_{};
|
||||
};
|
||||
|
||||
constexpr std::string_view
|
||||
|
||||
Reference in New Issue
Block a user