mu-server: try avoiding xapian multi-threaded access

Try to avoid multi-threaded operations with Xapian.

This remove the thread workers during indexing, and avoids the indexing
background thread. So, mu4e has to wait once again during indexing.

We can improve upon that, but first we need to know if it avoids the
problem of issue #2756.
This commit is contained in:
Dirk-Jan C. Binnema
2024-09-16 19:52:43 +03:00
parent 8176663002
commit d2343c6d62
4 changed files with 110 additions and 42 deletions

View File

@ -105,6 +105,7 @@ struct Indexer::Private {
bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype);
void maybe_start_worker();
void item_worker();
void scan_worker();
@ -135,6 +136,8 @@ struct Indexer::Private {
Type type;
};
void handle_item(WorkItem&& item);
AsyncQueue<WorkItem> todos_;
Progress progress_{};
@ -193,7 +196,11 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
return true;
}
case Scanner::HandleType::LeaveDir: {
#ifdef XAPIAN_SINGLE_THREADED
handle_item({fullpath, WorkItem::Type::Dir});
#else
todos_.push({fullpath, WorkItem::Type::Dir});
#endif /*XAPIAN_SINGLE_THREADED*/
return true;
}
@ -210,9 +217,13 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath))
return false;
#ifdef XAPIAN_SINGLE_THREADED
handle_item({fullpath, WorkItem::Type::File});
#else
// push the remaining messages to our "todo" queue for
// (re)parsing and adding/updating to the database.
todos_.push({fullpath, WorkItem::Type::File});
#endif
return true;
}
default:
@ -260,6 +271,30 @@ Indexer::Private::add_message(const std::string& path)
return true;
}
void
Indexer::Private::handle_item(WorkItem&& item)
{
try {
switch (item.type) {
case WorkItem::Type::File: {
if (G_LIKELY(add_message(item.full_path)))
++progress_.updated;
} break;
case WorkItem::Type::Dir:
store_.set_dirstamp(item.full_path, ::time(NULL));
break;
default:
g_warn_if_reached();
break;
}
} catch (const Mu::Error& er) {
mu_warning("error adding message @ {}: {}", item.full_path, er.what());
}
}
void
Indexer::Private::item_worker()
{
@ -270,22 +305,8 @@ Indexer::Private::item_worker()
while (state_ == IndexState::Scanning) {
if (!todos_.pop(item, 250ms))
continue;
try {
switch (item.type) {
case WorkItem::Type::File: {
if (G_LIKELY(add_message(item.full_path)))
++progress_.updated;
} break;
case WorkItem::Type::Dir:
store_.set_dirstamp(item.full_path, ::time(NULL));
break;
default:
g_warn_if_reached();
break;
}
} catch (const Mu::Error& er) {
mu_warning("error adding message @ {}: {}", item.full_path, er.what());
}
handle_item(std::move(item));
maybe_start_worker();
std::this_thread::yield();

View File

@ -149,6 +149,7 @@ struct Server::Private {
Store& store() { return store_; }
const Store& store() const { return store_; }
Indexer& indexer() { return store().indexer(); }
void do_index(const Indexer::Config& conf);
//CommandMap& command_map() const { return command_map_; }
//
@ -761,6 +762,20 @@ get_stats(const Indexer::Progress& stats, const std::string& state)
return sexp;
}
void
Server::Private::do_index(const Indexer::Config& conf)
{
StopWatch sw{"indexing"};
indexer().start(conf);
while (indexer().is_running()) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
output_sexp(get_stats(indexer().progress(), "running"),
Server::OutputFlags::Flush);
}
output_sexp(get_stats(indexer().progress(), "complete"),
Server::OutputFlags::Flush);
}
void
Server::Private::index_handler(const Command& cmd)
{
@ -770,22 +785,23 @@ Server::Private::index_handler(const Command& cmd)
// ignore .noupdate with an empty store.
conf.ignore_noupdate = store().empty();
#ifdef XAPIAN_SINGLE_THREADED
// nothing to do
if (indexer().is_running()) {
throw Error{Error::Code::Xapian, "indexer is already running"};
}
do_index(conf);
#else
indexer().stop();
if (index_thread_.joinable())
index_thread_.join();
// start a background track.
index_thread_ = std::thread([this, conf = std::move(conf)] {
StopWatch sw{"indexing"};
indexer().start(conf);
while (indexer().is_running()) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
output_sexp(get_stats(indexer().progress(), "running"),
Server::OutputFlags::Flush);
}
output_sexp(get_stats(indexer().progress(), "complete"),
Server::OutputFlags::Flush);
do_index(conf);
});
#endif /*XAPIAN_SINGLE_THREADED */
}
void
@ -959,6 +975,9 @@ Server::Private::ping_handler(const Command& cmd)
":personal-addresses", std::move(addrs),
":database-path", store().path(),
":root-maildir", store().root_maildir(),
#ifdef XAPIAN_SINGLE_THREADED
":xapian-single-threaded", Sexp::t_sym,
#endif /*XAPIAN_SINGLE_THREADED*/
":doccount", storecount)));
}

View File

@ -133,7 +133,13 @@ add_project_arguments(['-DHAVE_CONFIG_H'], language: 'cpp')
config_h_dep=declare_dependency(
include_directories: include_directories(['.']))
#
# single-threaded Xapian access?
#
if get_option('xapian-single-threaded')
config_h_data.set('XAPIAN_SINGLE_THREADED', true)
message('use Xapian only in a single thread')
endif
#
# d_type, d_ino are not available universally, so let's check
# (we use them for optimizations in mu-scanner
@ -322,6 +328,8 @@ if gmime_dep.version() == '3.2.13'
warning('See: https://github.com/jstedfast/gmime/issues/133')
endif
# Local Variables:
# indent-tabs-mode: nil
# End:

View File

@ -14,36 +14,56 @@
## along with this program; if not, write to the Free Software Foundation,
## Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
option('tests',
option('cld2',
type : 'feature',
value: 'auto',
description: 'build unit tests')
description: 'Add support for language-detection through cld2')
#
# emacs
#
option('emacs',
type: 'string',
value: 'emacs',
description: 'name/path of the emacs executable (for byte-compilation)')
option('lispdir',
type: 'string',
description: 'path under which to install emacs-lisp files')
#
# guile
#
option('guile',
type : 'feature',
value: 'auto',
description: 'build the guile scripting support (requires guile-3.x)')
option('cld2',
type : 'feature',
value: 'auto',
description: 'Compact Language Detector2')
# by default, this uses guile_dep.get_variable(pkgconfig: 'extensiondir')
option('guile-extension-dir',
type: 'string',
description: 'custom install path for the guile extension module')
#
# misc
#
option('tests',
type : 'feature',
value: 'auto',
description: 'build unit tests')
option('xapian-single-threaded',
type : 'boolean',
value: true,
description: 'only use Xapian from a single thread')
option('readline',
type: 'feature',
value: 'auto',
description: 'enable readline support for the mu4e repl')
option('emacs',
type: 'string',
value: 'emacs',
description: 'name/path of the emacs executable')
option('lispdir',
type: 'string',
description: 'path under which to install emacs-lisp files')