remove non-single-threaded option
Single-threaded is the build-default, and seems to work well enough for 1.12.7, so remove the option to turn it off. This is because build-options that influence such low-level/core behavior are a pain to maintain.
This commit is contained in:
@ -106,7 +106,6 @@ struct Indexer::Private {
|
||||
|
||||
void maybe_start_worker();
|
||||
|
||||
void item_worker();
|
||||
void scan_worker();
|
||||
|
||||
bool add_message(const std::string& path);
|
||||
@ -123,8 +122,6 @@ struct Indexer::Private {
|
||||
const size_t max_message_size_;
|
||||
|
||||
::time_t dirstamp_{};
|
||||
std::size_t max_workers_;
|
||||
std::vector<std::thread> workers_;
|
||||
std::thread scanner_worker_;
|
||||
|
||||
struct WorkItem {
|
||||
@ -138,8 +135,6 @@ struct Indexer::Private {
|
||||
|
||||
void handle_item(WorkItem&& item);
|
||||
|
||||
AsyncQueue<WorkItem> todos_;
|
||||
|
||||
Progress progress_{};
|
||||
IndexState state_{};
|
||||
std::mutex lock_, w_lock_;
|
||||
@ -198,11 +193,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
|
||||
return true;
|
||||
}
|
||||
case Scanner::HandleType::LeaveDir: {
|
||||
#ifdef XAPIAN_SINGLE_THREADED
|
||||
handle_item({fullpath, WorkItem::Type::Dir});
|
||||
#else
|
||||
todos_.push({fullpath, WorkItem::Type::Dir});
|
||||
#endif /*XAPIAN_SINGLE_THREADED*/
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -223,13 +214,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
|
||||
if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath))
|
||||
return false;
|
||||
|
||||
#ifdef XAPIAN_SINGLE_THREADED
|
||||
handle_item({fullpath, WorkItem::Type::File});
|
||||
#else
|
||||
// push the remaining messages to our "todo" queue for
|
||||
// (re)parsing and adding/updating to the database.
|
||||
todos_.push({fullpath, WorkItem::Type::File});
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
default:
|
||||
@ -238,17 +223,6 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Indexer::Private::maybe_start_worker()
|
||||
{
|
||||
std::lock_guard lock{w_lock_};
|
||||
|
||||
if (todos_.size() > workers_.size() && workers_.size() < max_workers_) {
|
||||
workers_.emplace_back(std::thread([this] { item_worker(); }));
|
||||
mu_debug("added worker {}", workers_.size());
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
Indexer::Private::add_message(const std::string& path)
|
||||
{
|
||||
@ -299,26 +273,6 @@ Indexer::Private::handle_item(WorkItem&& item)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
void
|
||||
Indexer::Private::item_worker()
|
||||
{
|
||||
WorkItem item;
|
||||
|
||||
mu_debug("started worker");
|
||||
|
||||
while (state_ == IndexState::Scanning) {
|
||||
if (!todos_.pop(item, 250ms))
|
||||
continue;
|
||||
|
||||
handle_item(std::move(item));
|
||||
|
||||
maybe_start_worker();
|
||||
std::this_thread::yield();
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
Indexer::Private::cleanup()
|
||||
{
|
||||
@ -359,28 +313,11 @@ Indexer::Private::scan_worker()
|
||||
state_.change_to(IndexState::Idle);
|
||||
return;
|
||||
}
|
||||
mu_debug("scanner finished with {} file(s) in queue", todos_.size());
|
||||
mu_debug("scanner finished");
|
||||
}
|
||||
|
||||
// now there may still be messages in the work queue...
|
||||
// finish those; this is a bit ugly; perhaps we should
|
||||
// handle SIGTERM etc.
|
||||
|
||||
if (!todos_.empty()) {
|
||||
const auto workers_size = std::invoke([this] {
|
||||
std::lock_guard lock{w_lock_};
|
||||
return workers_.size();
|
||||
});
|
||||
mu_debug("process {} remaining message(s) with {} worker(s)",
|
||||
todos_.size(), workers_size);
|
||||
while (!todos_.empty())
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}
|
||||
// and let the worker finish their work.
|
||||
state_.change_to(IndexState::Finishing);
|
||||
for (auto&& w : workers_)
|
||||
if (w.joinable())
|
||||
w.join();
|
||||
|
||||
if (conf_.cleanup) {
|
||||
mu_debug("starting cleanup");
|
||||
@ -402,21 +339,13 @@ Indexer::Private::start(const Indexer::Config& conf, bool block)
|
||||
stop();
|
||||
|
||||
conf_ = conf;
|
||||
if (conf_.max_threads == 0) {
|
||||
/* benchmarking suggests that ~4 threads is the fastest (the
|
||||
* real bottleneck is the database, so adding more threads just
|
||||
* slows things down)
|
||||
*/
|
||||
max_workers_ = std::min(4U, std::thread::hardware_concurrency());
|
||||
} else
|
||||
max_workers_ = conf.max_threads;
|
||||
|
||||
if (store_.empty() && conf_.lazy_check) {
|
||||
mu_debug("turn off lazy check since we have an empty store");
|
||||
conf_.lazy_check = false;
|
||||
}
|
||||
|
||||
mu_debug("starting indexer with <= {} worker thread(s)", max_workers_);
|
||||
mu_debug("starting indexer");
|
||||
mu_debug("indexing: {}; clean-up: {}", conf_.scan ? "yes" : "no",
|
||||
conf_.cleanup ? "yes" : "no");
|
||||
|
||||
@ -425,8 +354,6 @@ Indexer::Private::start(const Indexer::Config& conf, bool block)
|
||||
last_index_ = store_.config().get<Mu::Config::Id::LastIndex>();
|
||||
|
||||
state_.change_to(IndexState::Scanning);
|
||||
/* kick off the first worker, which will spawn more if needed. */
|
||||
workers_.emplace_back(std::thread([this] { item_worker(); }));
|
||||
/* kick the disk-scanner thread */
|
||||
scanner_worker_ = std::thread([this] { scan_worker(); });
|
||||
|
||||
@ -446,15 +373,10 @@ Indexer::Private::stop()
|
||||
{
|
||||
scanner_.stop();
|
||||
|
||||
todos_.clear();
|
||||
if (scanner_worker_.joinable())
|
||||
scanner_worker_.join();
|
||||
|
||||
state_.change_to(IndexState::Idle);
|
||||
for (auto&& w : workers_)
|
||||
if (w.joinable())
|
||||
w.join();
|
||||
workers_.clear();
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -562,7 +484,6 @@ test_index_basic()
|
||||
}
|
||||
|
||||
conf.lazy_check = true;
|
||||
conf.max_threads = 1;
|
||||
conf.ignore_noupdate = false;
|
||||
|
||||
{
|
||||
|
||||
@ -49,8 +49,6 @@ public:
|
||||
/**< scan for new messages */
|
||||
bool cleanup{true};
|
||||
/**< clean messages no longer in the file system */
|
||||
size_t max_threads{};
|
||||
/**< maximum # of threads to use */
|
||||
bool ignore_noupdate{};
|
||||
/**< ignore .noupdate files */
|
||||
bool lazy_check{};
|
||||
|
||||
@ -785,23 +785,11 @@ Server::Private::index_handler(const Command& cmd)
|
||||
// ignore .noupdate with an empty store.
|
||||
conf.ignore_noupdate = store().empty();
|
||||
|
||||
#ifdef XAPIAN_SINGLE_THREADED
|
||||
// nothing to do
|
||||
if (indexer().is_running()) {
|
||||
throw Error{Error::Code::Xapian, "indexer is already running"};
|
||||
}
|
||||
do_index(conf);
|
||||
#else
|
||||
indexer().stop();
|
||||
if (index_thread_.joinable())
|
||||
index_thread_.join();
|
||||
|
||||
// start a background track.
|
||||
index_thread_ = std::thread([this, conf = std::move(conf)] {
|
||||
do_index(conf);
|
||||
});
|
||||
#endif /*XAPIAN_SINGLE_THREADED */
|
||||
|
||||
}
|
||||
|
||||
void
|
||||
@ -975,9 +963,6 @@ Server::Private::ping_handler(const Command& cmd)
|
||||
":personal-addresses", std::move(addrs),
|
||||
":database-path", store().path(),
|
||||
":root-maildir", store().root_maildir(),
|
||||
#ifdef XAPIAN_SINGLE_THREADED
|
||||
":xapian-single-threaded", Sexp::t_sym,
|
||||
#endif /*XAPIAN_SINGLE_THREADED*/
|
||||
":doccount", storecount)));
|
||||
}
|
||||
|
||||
|
||||
@ -467,7 +467,6 @@ benchmark_indexer(gconstpointer testdata)
|
||||
auto store{Store::make_new(BENCH_STORE, BENCH_MAILDIRS)};
|
||||
g_assert_true(!!store);
|
||||
Indexer::Config conf{};
|
||||
conf.max_threads = tdata->num_threads;
|
||||
|
||||
auto res = store->indexer().start(conf);
|
||||
g_assert_true(res);
|
||||
|
||||
Reference in New Issue
Block a user