mu-indexer: clean up state management

This commit is contained in:
Dirk-Jan C. Binnema
2020-11-15 15:32:09 +02:00
parent 558add3843
commit 3f4e0cff62
2 changed files with 57 additions and 55 deletions

View File

@ -40,6 +40,29 @@ using namespace std::chrono_literals;
using namespace Mu; using namespace Mu;
struct IndexState {
enum State { Idle, Scanning, Cleaning };
static const char* name(State s) {
switch(s) {
case Idle: return "idle";
case Scanning: return "scanning";
case Cleaning: return "cleaning";
default: return "<error>";
}
}
bool operator==(State rhs) const { return state_ == rhs; }
void change_to(State new_state) {
g_debug ("changing indexer state %s->%s",
name((State)state_), name((State)new_state));
state_ = new_state;
}
private:
State state_{Idle};
};
struct Indexer::Private { struct Indexer::Private {
Private (Mu::Store& store): Private (Mu::Store& store):
store_{store}, store_{store},
@ -74,23 +97,14 @@ struct Indexer::Private {
const size_t max_message_size_; const size_t max_message_size_;
time_t dirstamp_{}; time_t dirstamp_{};
std::atomic<bool> scan_done_{true}, clean_done_{true};
std::size_t max_workers_; std::size_t max_workers_;
std::vector<std::thread> workers_; std::vector<std::thread> workers_;
std::thread scanner_worker_; std::thread scanner_worker_;
AsyncQueue<std::string> fq_; AsyncQueue<std::string> fq_;
struct Progress {
void reset() {
processed = updated = removed = 0;
}
std::atomic<size_t> processed{}; /**< Number of messages processed */
std::atomic<size_t> updated{}; /**< Number of messages added/updated to store */
std::atomic<size_t> removed{}; /**< Number of message removed from store */
};
Progress progress_; Progress progress_;
IndexState state_;
std::mutex lock_, wlock_; std::mutex lock_, wlock_;
}; };
@ -178,7 +192,7 @@ Indexer::Private::worker()
g_debug ("started worker"); g_debug ("started worker");
while (!scan_done_ || !fq_.empty()) { while (state_ == IndexState::Scanning || !fq_.empty()) {
if (!fq_.pop (item, 250ms)) if (!fq_.pop (item, 250ms))
continue; continue;
@ -204,28 +218,23 @@ Indexer::Private::cleanup()
{ {
g_debug ("starting cleanup"); g_debug ("starting cleanup");
std::vector<Store::Id> orphans_; // store messages without files. size_t n{};
std::vector<Store::Id> orphans; // store messages without files.
store_.for_each_message_path([&](Store::Id id, const std::string &path) { store_.for_each_message_path([&](Store::Id id, const std::string &path) {
if (clean_done_) ++n;
return false; if (::access(path.c_str(), R_OK) != 0) {
g_debug ("cannot read %s (id=%u); queueing for removal from store",
if (::access(path.c_str(), F_OK) != 0) {
g_debug ("%s not found; queueing id=%u for removal",
path.c_str(), id); path.c_str(), id);
orphans_.emplace_back(id); orphans.emplace_back(id);
} }
return !clean_done_; return state_ == IndexState::Cleaning;
}); });
if (orphans_.empty()) { g_debug("remove %zu message(s) from store", orphans.size());
g_debug("nothing to clean up");
return true;
}
store_.remove_messages (orphans_); store_.remove_messages (orphans);
g_debug ("removed %zu orphan messages from store", orphans_.size());
return true; return true;
} }
@ -242,40 +251,36 @@ Indexer::Private::start(const Indexer::Config& conf)
else else
max_workers_ = conf.max_threads; max_workers_ = conf.max_threads;
g_debug ("starting indexer with up to %zu worker threads", max_workers_); g_debug ("starting indexer with <= %zu worker thread(s)", max_workers_);
g_debug ("indexing: %s; clean-up: %s",
conf_.scan ? "yes" : "no",
conf_.cleanup ? "yes" : "no");
scan_done_ = false;
workers_.emplace_back(std::thread([this]{worker();})); workers_.emplace_back(std::thread([this]{worker();}));
scan_done_ = clean_done_ = false; state_.change_to(IndexState::Scanning);
scanner_worker_ = std::thread([this]{ scanner_worker_ = std::thread([this]{
progress_ = {};
progress_.reset();
if (conf_.scan) { if (conf_.scan) {
g_debug("starting scanner"); g_debug("starting scanner");
if (!scanner_.start()) { // blocks.
const auto started{scanner_.start()};
clean_done_ = scan_done_ = true; // so listeners will stop.
if (!started) {
g_warning ("failed to start scanner"); g_warning ("failed to start scanner");
return; goto leave;
} else }
g_debug ("scanner finished"); g_debug ("scanner finished");
} }
if (conf_.cleanup) { if (conf_.cleanup) {
g_debug ("starting cleanup"); g_debug ("starting cleanup");
state_.change_to(IndexState::Cleaning);
cleanup(); cleanup();
clean_done_ = true;
g_debug ("cleanup finished"); g_debug ("cleanup finished");
} else {
clean_done_ = true;
g_debug ("cleanup skipped");
} }
store_.commit(); store_.commit();
leave:
state_.change_to(IndexState::Idle);
}); });
g_debug ("started indexer"); g_debug ("started indexer");
@ -287,7 +292,7 @@ bool
Indexer::Private::stop() Indexer::Private::stop()
{ {
scanner_.stop(); scanner_.stop();
scan_done_ = clean_done_ = true; state_.change_to(IndexState::Idle);
const auto w_n = workers_.size(); const auto w_n = workers_.size();
@ -343,17 +348,14 @@ Indexer::stop()
bool bool
Indexer::is_running() const Indexer::is_running() const
{ {
return !priv_->scan_done_ || !priv_->clean_done_ || return !(priv_->state_ == IndexState::Idle) || !priv_->fq_.empty();
!priv_->fq_.empty();
} }
Indexer::Progress Indexer::Progress
Indexer::progress() const Indexer::progress() const
{ {
return Progress{ priv_->progress_.running =
is_running(), priv_->state_ == IndexState::Idle ? false : true;
priv_->progress_.processed,
priv_->progress_.updated, return priv_->progress_;
priv_->progress_.removed
};
} }