store: add 'add_document' optimization, use it
*Usually* we need Xapian's replace_document() API, but when we know a document (message) is completely new, we can use the faster add_document(). That is the case with the initial (re)indexing, when start with an empty database. Also a few smaller cleanups.
This commit is contained in:
@ -79,14 +79,18 @@ private:
|
|||||||
struct Indexer::Private {
|
struct Indexer::Private {
|
||||||
Private(Mu::Store& store)
|
Private(Mu::Store& store)
|
||||||
: store_{store}, scanner_{store_.root_maildir(),
|
: store_{store}, scanner_{store_.root_maildir(),
|
||||||
[this](auto&& path,
|
[this](auto&& path,
|
||||||
auto&& statbuf, auto&& info) {
|
auto&& statbuf, auto&& info) {
|
||||||
return handler(path, statbuf, info);
|
return handler(path, statbuf, info);
|
||||||
}},
|
}},
|
||||||
max_message_size_{store_.config().get<Mu::Config::Id::MaxMessageSize>()} {
|
max_message_size_{store_.config().get<Mu::Config::Id::MaxMessageSize>()},
|
||||||
mu_message("created indexer for {} -> {} (batch-size: {})",
|
was_empty_{store.empty()} {
|
||||||
store.root_maildir(), store.path(),
|
|
||||||
store.config().get<Mu::Config::Id::BatchSize>());
|
mu_message("created indexer for {} -> "
|
||||||
|
"{} (batch-size: {}; was-empty: {})",
|
||||||
|
store.root_maildir(), store.path(),
|
||||||
|
store.config().get<Mu::Config::Id::BatchSize>(),
|
||||||
|
was_empty_);
|
||||||
}
|
}
|
||||||
|
|
||||||
~Private() {
|
~Private() {
|
||||||
@ -127,11 +131,11 @@ struct Indexer::Private {
|
|||||||
|
|
||||||
AsyncQueue<WorkItem> todos_;
|
AsyncQueue<WorkItem> todos_;
|
||||||
|
|
||||||
Progress progress_;
|
Progress progress_{};
|
||||||
IndexState state_;
|
IndexState state_{};
|
||||||
std::mutex lock_, w_lock_;
|
std::mutex lock_, w_lock_;
|
||||||
|
std::atomic<time_t> completed_{};
|
||||||
std::atomic<time_t> completed_;
|
bool was_empty_{};
|
||||||
};
|
};
|
||||||
|
|
||||||
bool
|
bool
|
||||||
@ -240,7 +244,12 @@ Indexer::Private::add_message(const std::string& path)
|
|||||||
mu_warning("failed to create message from {}: {}", path, msg.error().what());
|
mu_warning("failed to create message from {}: {}", path, msg.error().what());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
auto res = store_.add_message(msg.value(), true /*use-transaction*/);
|
// if the store was empty, we know that the message is completely new
|
||||||
|
// and can use the fast path (Xapians 'add_document' rather tahn
|
||||||
|
// 'replace_document)
|
||||||
|
auto res = store_.add_message(msg.value(),
|
||||||
|
true /*use-transaction*/,
|
||||||
|
was_empty_);
|
||||||
if (!res) {
|
if (!res) {
|
||||||
mu_warning("failed to add message @ {}: {}", path, res.error().what());
|
mu_warning("failed to add message @ {}: {}", path, res.error().what());
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@ -133,6 +133,7 @@ struct Store::Private {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Option<Message> find_message_unlocked(Store::Id docid) const;
|
Option<Message> find_message_unlocked(Store::Id docid) const;
|
||||||
|
Result<Store::Id> add_message_unlocked(Message& msg);
|
||||||
Result<Store::Id> update_message_unlocked(Message& msg, Store::Id docid);
|
Result<Store::Id> update_message_unlocked(Message& msg, Store::Id docid);
|
||||||
Result<Store::Id> update_message_unlocked(Message& msg, const std::string& old_path);
|
Result<Store::Id> update_message_unlocked(Message& msg, const std::string& old_path);
|
||||||
Result<Message> move_message_unlocked(Message&& msg,
|
Result<Message> move_message_unlocked(Message&& msg,
|
||||||
@ -150,10 +151,22 @@ struct Store::Private {
|
|||||||
std::mutex lock_;
|
std::mutex lock_;
|
||||||
};
|
};
|
||||||
|
|
||||||
Result<Store::Id>Store::Private::update_message_unlocked(Message& msg, Store::Id docid)
|
|
||||||
|
Result<Store::Id>
|
||||||
|
Store::Private::add_message_unlocked(Message& msg)
|
||||||
|
{
|
||||||
|
auto docid{xapian_db_.add_document(msg.document().xapian_document())};
|
||||||
|
mu_debug("added message @ {}; docid = {}", msg.path(), docid);
|
||||||
|
|
||||||
|
return Ok(std::move(docid));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Result<Store::Id>
|
||||||
|
Store::Private::update_message_unlocked(Message& msg, Store::Id docid)
|
||||||
{
|
{
|
||||||
xapian_db_.replace_document(docid, msg.document().xapian_document());
|
xapian_db_.replace_document(docid, msg.document().xapian_document());
|
||||||
g_debug("updated message @ %s; docid = %u", msg.path().c_str(), docid);
|
mu_debug("updated message @ {}; docid = {}", msg.path(), docid);
|
||||||
|
|
||||||
return Ok(std::move(docid));
|
return Ok(std::move(docid));
|
||||||
}
|
}
|
||||||
@ -288,26 +301,15 @@ Store::indexer()
|
|||||||
}
|
}
|
||||||
|
|
||||||
Result<Store::Id>
|
Result<Store::Id>
|
||||||
Store::add_message(const std::string& path, bool use_transaction)
|
Store::add_message(Message& msg, bool use_transaction, bool is_new)
|
||||||
{
|
{
|
||||||
if (auto msg{Message::make_from_path(path)}; !msg)
|
|
||||||
return Err(msg.error());
|
|
||||||
else
|
|
||||||
return add_message(msg.value(), use_transaction);
|
|
||||||
}
|
|
||||||
|
|
||||||
Result<Store::Id>
|
|
||||||
Store::add_message(Message& msg, bool use_transaction)
|
|
||||||
{
|
|
||||||
std::lock_guard guard{priv_->lock_};
|
|
||||||
|
|
||||||
const auto mdir{maildir_from_path(msg.path(),
|
const auto mdir{maildir_from_path(msg.path(),
|
||||||
root_maildir())};
|
root_maildir())};
|
||||||
if (!mdir)
|
if (!mdir)
|
||||||
return Err(mdir.error());
|
return Err(mdir.error());
|
||||||
|
|
||||||
if (auto&& res = msg.set_maildir(mdir.value()); !res)
|
if (auto&& res = msg.set_maildir(mdir.value()); !res)
|
||||||
return Err(res.error());
|
return Err(res.error());
|
||||||
|
|
||||||
/* add contacts from this message to cache; this cache
|
/* add contacts from this message to cache; this cache
|
||||||
* also determines whether those contacts are _personal_, i.e. match
|
* also determines whether those contacts are _personal_, i.e. match
|
||||||
* our personal addresses.
|
* our personal addresses.
|
||||||
@ -320,37 +322,34 @@ Store::add_message(Message& msg, bool use_transaction)
|
|||||||
if (is_personal)
|
if (is_personal)
|
||||||
msg.set_flags(msg.flags() | Flags::Personal);
|
msg.set_flags(msg.flags() | Flags::Personal);
|
||||||
|
|
||||||
|
std::lock_guard guard{priv_->lock_};
|
||||||
if (use_transaction)
|
if (use_transaction)
|
||||||
priv_->transaction_inc();
|
priv_->transaction_inc();
|
||||||
|
|
||||||
auto res = priv_->update_message_unlocked(msg, msg.path());
|
auto&& res = is_new ?
|
||||||
|
priv_->add_message_unlocked(msg) :
|
||||||
|
priv_->update_message_unlocked(msg, msg.path());
|
||||||
if (!res)
|
if (!res)
|
||||||
return Err(res.error());
|
return Err(res.error());
|
||||||
|
|
||||||
if (use_transaction) /* commit if batch is full */
|
if (use_transaction) /* commit if batch is full */
|
||||||
priv_->transaction_maybe_commit();
|
priv_->transaction_maybe_commit();
|
||||||
|
|
||||||
g_debug("added %smessage @ %s; docid = %u",
|
mu_debug("added {}message @ {}; docid = {}",
|
||||||
is_personal ? "personal " : "", msg.path().c_str(), *res);
|
is_personal ? "personal " : "", msg.path(), *res);
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
Result<Store::Id>
|
|
||||||
Store::update_message(Message& msg, Store::Id docid)
|
|
||||||
{
|
|
||||||
std::lock_guard guard{priv_->lock_};
|
|
||||||
|
|
||||||
return priv_->update_message_unlocked(msg, docid);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool
|
bool
|
||||||
Store::remove_message(const std::string& path)
|
Store::remove_message(const std::string& path)
|
||||||
{
|
{
|
||||||
std::lock_guard guard{priv_->lock_};
|
|
||||||
const auto term{field_from_id(Field::Id::Path).xapian_term(path)};
|
const auto term{field_from_id(Field::Id::Path).xapian_term(path)};
|
||||||
|
|
||||||
|
std::lock_guard guard{priv_->lock_};
|
||||||
|
|
||||||
xapian_db().delete_document(term);
|
xapian_db().delete_document(term);
|
||||||
g_debug("deleted message @ %s from store", path.c_str());
|
mu_debug("deleted message @ {} from store", path);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -433,14 +432,11 @@ messages_with_msgid(const Store& store, const std::string& msgid, size_t max=100
|
|||||||
} else if (msgid.empty())
|
} else if (msgid.empty())
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
const auto xprefix{field_from_id(Field::Id::MessageId).shortcut};
|
constexpr auto xprefix{field_from_id(Field::Id::MessageId).shortcut};
|
||||||
/*XXX this is a bit dodgy */
|
auto expr{mu_format("{}:{}", xprefix,
|
||||||
auto tmp{g_ascii_strdown(msgid.c_str(), -1)};
|
to_string_gchar(g_ascii_strdown(msgid.c_str(), -1)))};
|
||||||
auto expr{g_strdup_printf("%c:%s", xprefix, tmp)};
|
|
||||||
g_free(tmp);
|
|
||||||
|
|
||||||
const auto res{store.run_query(expr, {}, QueryFlags::None, max)};
|
const auto res{store.run_query(expr, {}, QueryFlags::None, max)};
|
||||||
g_free(expr);
|
|
||||||
if (!res) {
|
if (!res) {
|
||||||
mu_warning("failed to run message-id-query: {}", res.error().what());
|
mu_warning("failed to run message-id-query: {}", res.error().what());
|
||||||
return {};
|
return {};
|
||||||
|
|||||||
@ -186,30 +186,32 @@ public:
|
|||||||
std::string parse_query(const std::string& expr, bool xapian) const;
|
std::string parse_query(const std::string& expr, bool xapian) const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a message to the store. When planning to write many messages,
|
* Add or update a message to the store. When planning to write many
|
||||||
* it's much faster to do so in a transaction. If so, set
|
* messages, it's much faster to do so in a transaction. If so, set
|
||||||
* @in_transaction to true. When done with adding messages, call
|
* @in_transaction to true. When done with adding messages, call
|
||||||
* commit().
|
* commit().
|
||||||
*
|
*
|
||||||
* @param path the message path.
|
* Optimization: If you are sure the message (i.e., a message with the
|
||||||
* @param whether to bundle up to batch_size changes in a transaction
|
* given file-system path) does not yet exist in the database, ie., when
|
||||||
*
|
* doing the initial indexing, set @p is_new to true since we then don't
|
||||||
* @return the doc id of the added message or an error.
|
* have to check for the existing message.
|
||||||
*/
|
|
||||||
Result<Id> add_message(const std::string& path, bool use_transaction = false);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add a message to the store. When planning to write many messages,
|
|
||||||
* it's much faster to do so in a transaction. If so, set
|
|
||||||
* @in_transaction to true. When done with adding messages, call
|
|
||||||
* commit().
|
|
||||||
*
|
*
|
||||||
* @param msg a message
|
* @param msg a message
|
||||||
* @param whether to bundle up to batch_size changes in a transaction
|
* @param use_transaction whether to bundle up to batch_size
|
||||||
|
* changes in a transaction
|
||||||
|
* @param is_new whether this is a completely new message
|
||||||
*
|
*
|
||||||
* @return the doc id of the added message or an error.
|
* @return the doc id of the added message or an error.
|
||||||
*/
|
*/
|
||||||
Result<Id> add_message(Message& msg, bool use_transaction = false);
|
Result<Id> add_message(Message& msg, bool use_transaction = false,
|
||||||
|
bool is_new = false);
|
||||||
|
Result<Id> add_message(const std::string& path, bool use_transaction = false,
|
||||||
|
bool is_new = false) {
|
||||||
|
if (auto msg{Message::make_from_path(path)}; !msg)
|
||||||
|
return Err(msg.error());
|
||||||
|
else
|
||||||
|
return add_message(msg.value(), use_transaction, is_new);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update a message in the store.
|
* Update a message in the store.
|
||||||
@ -219,7 +221,6 @@ public:
|
|||||||
*
|
*
|
||||||
* @return Ok() or an error.
|
* @return Ok() or an error.
|
||||||
*/
|
*/
|
||||||
Result<Store::Id> update_message(Message& msg, Id id);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a message from the store. It will _not_ remove the message
|
* Remove a message from the store. It will _not_ remove the message
|
||||||
@ -414,7 +415,7 @@ public:
|
|||||||
*
|
*
|
||||||
* @return true or false
|
* @return true or false
|
||||||
*/
|
*/
|
||||||
size_t empty() const { return xapian_db().empty(); }
|
bool empty() const { return xapian_db().empty(); }
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* _almost_ private
|
* _almost_ private
|
||||||
|
|||||||
@ -281,6 +281,22 @@ public:
|
|||||||
DB_LOCKED; return db().term_exists(term);}, false);
|
DB_LOCKED; return db().term_exists(term);}, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a new document to the database
|
||||||
|
*
|
||||||
|
* @param doc a document (message)
|
||||||
|
*
|
||||||
|
* @return new docid or 0
|
||||||
|
*/
|
||||||
|
Xapian::docid add_document(const Xapian::Document& doc) {
|
||||||
|
return xapian_try([&]{
|
||||||
|
DB_LOCKED;
|
||||||
|
auto&& id= wdb().add_document(doc);
|
||||||
|
set_timestamp(MetadataIface::last_change_key);
|
||||||
|
return id;
|
||||||
|
}, 0);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Replace document in database
|
* Replace document in database
|
||||||
*
|
*
|
||||||
@ -288,7 +304,7 @@ public:
|
|||||||
* @param id docid
|
* @param id docid
|
||||||
* @param doc replacement document
|
* @param doc replacement document
|
||||||
*
|
*
|
||||||
* @return new docid or nothing.
|
* @return new docid or 0
|
||||||
*/
|
*/
|
||||||
Xapian::docid replace_document(const std::string& term, const Xapian::Document& doc) {
|
Xapian::docid replace_document(const std::string& term, const Xapian::Document& doc) {
|
||||||
return xapian_try([&]{
|
return xapian_try([&]{
|
||||||
|
|||||||
@ -202,7 +202,7 @@ goto * instructions[pOp->opcode];
|
|||||||
g_assert_cmpuint(store->size(),==, 1);
|
g_assert_cmpuint(store->size(),==, 1);
|
||||||
|
|
||||||
/* ensure 'update' dtrt, i.e., nothing. */
|
/* ensure 'update' dtrt, i.e., nothing. */
|
||||||
const auto docid2 = store->update_message(*message, *docid);
|
const auto docid2 = store->add_message(*message, *docid);
|
||||||
assert_valid_result(docid2);
|
assert_valid_result(docid2);
|
||||||
g_assert_cmpuint(store->size(),==, 1);
|
g_assert_cmpuint(store->size(),==, 1);
|
||||||
g_assert_cmpuint(*docid,==,*docid2);
|
g_assert_cmpuint(*docid,==,*docid2);
|
||||||
|
|||||||
Reference in New Issue
Block a user