Merge branch 'wip/indexer'

This commit is contained in:
Dirk-Jan C. Binnema
2020-06-27 17:19:12 +03:00
29 changed files with 1744 additions and 2042 deletions

View File

@ -5,18 +5,24 @@
*** mu *** mu
- Where available (and with suitable glib), log to the systemd journal - Where available (and with suitable ~libglib~), log to the systemd journal
instead of a ~~/.cache/mu.log~ instead of a ~~/.cache/mu.log~. The ~--debug~ option to ~mu~ increases the
amount that is logged.
- Follow symlinks in maildirs, and support moving messsages between across - Follow symlinks in maildirs, and support moving messsages across
multiple filesystems (but note that that is quite a bit slower than the filesystems (but note that that is quite a bit slower than the
single-filesystem case) single-filesystem case)
- Optionally provide readline support for the mu server (when in tty-mode) - Optionally provide readline support for the ~mu~ server (when in tty-mode)
- Reworked the way mu generates s-expressions for mu4e; they are created - Reworked the way mu generates s-expressions for mu4e; they are created
programmatically now instead of through string building. programmatically now instead of through string building.
- The indexer (the part of mu that scans maildirs and updates the message
store) has been rewritten so it can work asynchronously and take advantage
of multiple cores. Note that for now, indexing in ~mu4e~ is still a blocking
operation.
*** mu4e *** mu4e
- Include maildir-shortcuts in the main-view with overall/unread counts, - Include maildir-shortcuts in the main-view with overall/unread counts,
@ -36,8 +42,6 @@
- Add a variable ~mu4e-mu-debug~ which, when set to non-~nil~ makes the ~mu~ - Add a variable ~mu4e-mu-debug~ which, when set to non-~nil~ makes the ~mu~
server log more verbosely (to ~mu.log~ or the journal) server log more verbosely (to ~mu.log~ or the journal)
* 1.4 (released, as of April 18 2020) * 1.4 (released, as of April 18 2020)
*** mu *** mu

View File

@ -15,7 +15,7 @@
## Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ## Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
AC_PREREQ([2.68]) AC_PREREQ([2.68])
AC_INIT([mu],[1.5.3],[https://github.com/djcb/mu/issues],[mu]) AC_INIT([mu],[1.5.4],[https://github.com/djcb/mu/issues],[mu])
AC_COPYRIGHT([Copyright (C) 2008-2020 Dirk-Jan C. Binnema]) AC_COPYRIGHT([Copyright (C) 2008-2020 Dirk-Jan C. Binnema])
AC_CONFIG_HEADERS([config.h]) AC_CONFIG_HEADERS([config.h])
AC_CONFIG_SRCDIR([mu/mu.cc]) AC_CONFIG_SRCDIR([mu/mu.cc])
@ -264,6 +264,7 @@ lib/Makefile
lib/doxyfile lib/doxyfile
lib/utils/Makefile lib/utils/Makefile
lib/query/Makefile lib/query/Makefile
lib/index/Makefile
mu4e/Makefile mu4e/Makefile
mu4e/mu4e-meta.el mu4e/mu4e-meta.el
guile/Makefile guile/Makefile

View File

@ -18,7 +18,7 @@
# before descending into tests/ # before descending into tests/
include $(top_srcdir)/gtest.mk include $(top_srcdir)/gtest.mk
SUBDIRS= utils query SUBDIRS= utils query index
if HAVE_JSON_GLIB if HAVE_JSON_GLIB
json_srcs= \ json_srcs= \
@ -27,6 +27,15 @@ json_srcs= \
json_flag="-DHAVE_JSON_GLIB" json_flag="-DHAVE_JSON_GLIB"
endif endif
TESTDEFS= \
-DMU_TESTMAILDIR=\"${abs_srcdir}/testdir\" \
-DMU_TESTMAILDIR2=\"${abs_srcdir}/testdir2\" \
-DMU_TESTMAILDIR3=\"${abs_srcdir}/testdir3\" \
-DMU_TESTMAILDIR4=\"${abs_srcdir}/testdir4\" \
-DABS_CURDIR=\"${abs_builddir}\" \
-DABS_SRCDIR=\"${abs_srcdir}\"
AM_CFLAGS= \ AM_CFLAGS= \
$(WARN_CFLAGS) \ $(WARN_CFLAGS) \
$(GMIME_CFLAGS) \ $(GMIME_CFLAGS) \
@ -36,12 +45,7 @@ AM_CFLAGS= \
$(ASAN_CFLAGS) \ $(ASAN_CFLAGS) \
$(json_flag) \ $(json_flag) \
$(CODE_COVERAGE_CFLAGS) \ $(CODE_COVERAGE_CFLAGS) \
-DMU_TESTMAILDIR=\"${abs_srcdir}/testdir\" \ $(TESTDEFS) \
-DMU_TESTMAILDIR2=\"${abs_srcdir}/testdir2\" \
-DMU_TESTMAILDIR3=\"${abs_srcdir}/testdir3\" \
-DMU_TESTMAILDIR4=\"${abs_srcdir}/testdir4\" \
-DABS_CURDIR=\"${abs_builddir}\" \
-DABS_SRCDIR=\"${abs_srcdir}\" \
-Wno-format-nonliteral \ -Wno-format-nonliteral \
-Wno-switch-enum \ -Wno-switch-enum \
-Wno-deprecated-declarations \ -Wno-deprecated-declarations \
@ -57,7 +61,7 @@ AM_CXXFLAGS= \
$(XAPIAN_CXXFLAGS) \ $(XAPIAN_CXXFLAGS) \
$(ASAN_CXXFLAGS) \ $(ASAN_CXXFLAGS) \
$(CODE_COVERAGE_CFLAGS) \ $(CODE_COVERAGE_CFLAGS) \
-DMU_TESTMAILDIR=\"${abs_srcdir}/testdir\" $(TESTDEFS)
AM_CPPFLAGS= \ AM_CPPFLAGS= \
$(CODE_COVERAGE_CPPFLAGS) $(CODE_COVERAGE_CPPFLAGS)
@ -81,8 +85,6 @@ libmu_la_SOURCES= \
mu-container.h \ mu-container.h \
mu-flags.h \ mu-flags.h \
mu-flags.c \ mu-flags.c \
mu-index.c \
mu-index.h \
mu-maildir.c \ mu-maildir.c \
mu-maildir.h \ mu-maildir.h \
mu-msg-crypto.c \ mu-msg-crypto.c \
@ -123,6 +125,7 @@ libmu_la_LIBADD= \
$(JSON_GLIB_LIBS) \ $(JSON_GLIB_LIBS) \
${builddir}/utils/libmu-utils.la \ ${builddir}/utils/libmu-utils.la \
${builddir}/query/libmu-query.la \ ${builddir}/query/libmu-query.la \
${builddir}/index/libmu-index.la \
$(CODE_COVERAGE_LIBS) $(CODE_COVERAGE_LIBS)
libmu_la_LDFLAGS= \ libmu_la_LDFLAGS= \
@ -150,7 +153,7 @@ test_mu_msg_SOURCES= test-mu-msg.c dummy.cc
test_mu_msg_LDADD= libtestmucommon.la test_mu_msg_LDADD= libtestmucommon.la
TEST_PROGS += test-mu-store TEST_PROGS += test-mu-store
test_mu_store_SOURCES= test-mu-store.c dummy.cc test_mu_store_SOURCES= test-mu-store.cc
test_mu_store_LDADD= libtestmucommon.la test_mu_store_LDADD= libtestmucommon.la
TEST_PROGS += test-mu-flags TEST_PROGS += test-mu-flags

45
lib/index/Makefile.am Normal file
View File

@ -0,0 +1,45 @@
## Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software Foundation,
## Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
include $(top_srcdir)/gtest.mk
AM_CPPFLAGS= \
$(CODE_COVERAGE_CPPFLAGS)
AM_CXXFLAGS= \
$(WARN_CXXFLAGS) \
$(GLIB_CFLAGS) \
$(ASAN_CXXFLAGS) \
$(CODE_COVERAGE_CFLAGS) \
-I${top_srcdir}/lib
AM_LDFLAGS= \
$(ASAN_LDFLAGS)
noinst_LTLIBRARIES= \
libmu-index.la
libmu_index_la_SOURCES= \
mu-indexer.cc \
mu-indexer.hh \
mu-scanner.cc \
mu-scanner.hh
libmu_index_la_LIBADD= \
$(GLIB_LIBS) \
$(CODE_COVERAGE_LIBS)
include $(top_srcdir)/aminclude_static.am

350
lib/index/mu-indexer.cc Normal file
View File

@ -0,0 +1,350 @@
/*
** Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
** Free Software Foundation; either version 3, or (at your option) any
** later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/
#include "mu-indexer.hh"
#include <config.h>
#include <atomic>
#include <mutex>
#include <vector>
#include <thread>
#include <condition_variable>
#include <iostream>
#include <atomic>
#include <chrono>
using namespace std::chrono_literals;
#include <xapian.h>
#include "mu-scanner.hh"
#include "utils/mu-async-queue.hh"
#include "utils/mu-error.hh"
#include "../mu-store.hh"
using namespace Mu;
struct Indexer::Private {
Private (Mu::Store& store):
store_{store},
scanner_{store_.metadata().root_maildir,
[this](auto&& path, auto&& statbuf, auto&& info){
return handler(path, statbuf, info);
}},
max_message_size_{store_.metadata().max_message_size} {
g_message ("created indexer for %s -> %s",
store.metadata().root_maildir.c_str(),
store.metadata().database_path.c_str());
}
~Private() { stop(); }
bool dir_predicate (const std::string& path, const struct dirent* dirent) const;
bool handler (const std::string& fullpath, struct stat *statbuf,
Scanner::HandleType htype);
void maybe_start_worker();
void worker();
bool cleanup();
bool start(const Indexer::Config& conf);
bool stop();
Indexer::Config conf_;
Store& store_;
Scanner scanner_;
const size_t max_message_size_;
time_t dirstamp_{};
std::atomic<bool> scan_done_{true}, clean_done_{true};
std::size_t max_workers_;
std::vector<std::thread> workers_;
std::thread scanner_worker_;
AsyncQueue<std::string> fq_;
struct Progress {
void reset() {
processed = updated = removed = 0;
}
std::atomic<size_t> processed{}; /**< Number of messages processed */
std::atomic<size_t> updated{}; /**< Number of messages added/updated to store */
std::atomic<size_t> removed{}; /**< Number of message removed from store */
};
Progress progress_;
std::mutex lock_, wlock_;
};
bool
Indexer::Private::handler (const std::string& fullpath, struct stat *statbuf,
Scanner::HandleType htype)
{
switch (htype) {
case Scanner::HandleType::EnterDir: {
// in lazy-mode, we ignore this dir if its dirstamp suggest it
// is up-to-date (this is _not_ always true; hence we call it
// lazy-mode)
dirstamp_ = store_.dirstamp(fullpath);
if (conf_.lazy_check && dirstamp_ == statbuf->st_mtime) {
g_debug("skip %s (seems up-to-date)", fullpath.c_str());
return false;
}
// don't index dirs with '.noindex'
auto noindex = ::access((fullpath + "/.noindex").c_str(), F_OK) == 0;
if (noindex) {
g_debug ("skip %s (has .noindex)", fullpath.c_str());
return false; // don't descend into this dir.
}
// don't index dirs with '.noupdate', unless we do a full
// (re)index.
if (!conf_.ignore_noupdate) {
auto noupdate = ::access((fullpath + "/.noupdate").c_str(), F_OK) == 0;
if (noupdate) {
g_debug ("skip %s (has .noupdate)", fullpath.c_str());
return false;
}
}
g_debug ("process %s", fullpath.c_str());
return true;
}
case Scanner::HandleType::LeaveDir: {
store_.set_dirstamp(fullpath, ::time({}));
return true;
}
case Scanner::HandleType::File: {
if ((size_t)statbuf->st_size > max_message_size_) {
g_debug ("skip %s (too big: %zu bytes)",
fullpath.c_str(), statbuf->st_size);
return false;
}
// if the message is not in the db yet, or not up-to-date, queue
// it for updating/inserting.
if (statbuf->st_mtime <= dirstamp_ &&
store_.contains_message (fullpath)) {
//g_debug ("skip %s: already up-to-date");
return false;
}
fq_.push(std::string{fullpath});
return true;
}
default:
g_return_val_if_reached (false);
return false;
}
}
void
Indexer::Private::maybe_start_worker()
{
std::lock_guard<std::mutex> wlock{wlock_};
if (fq_.size() > workers_.size() && workers_.size() < max_workers_)
workers_.emplace_back(std::thread([this]{worker();}));
}
void
Indexer::Private::worker()
{
std::string item;
g_debug ("started worker");
while (!scan_done_ || !fq_.empty()) {
if (!fq_.pop (item, 250ms))
continue;
//g_debug ("popped (n=%zu) path %s", fq_.size(), item.c_str());
++progress_.processed;
try {
store_.add_message(item);
++progress_.updated;
} catch (const Mu::Error& er) {
g_warning ("error adding message @ %s: %s",
item.c_str(), er.what());
}
maybe_start_worker();
}
}
bool
Indexer::Private::cleanup()
{
g_debug ("starting cleanup");
std::vector<Store::Id> orphans_; // store messages without files.
store_.for_each([&](Store::Id id, const std::string &path) {
if (clean_done_)
return false;
if (::access(path.c_str(), F_OK) != 0) {
g_debug ("%s not found; queing id=%u for removal",
path.c_str(), id);
orphans_.emplace_back(id);
}
return !clean_done_;
});
if (orphans_.empty()) {
g_debug("nothing to clean up");
return true;
}
store_.remove_messages (orphans_);
g_debug ("removed %zu orphan messages from store", orphans_.size());
return true;
}
bool
Indexer::Private::start(const Indexer::Config& conf)
{
stop();
conf_ = conf;
if (conf_.max_threads == 0)
max_workers_ = std::thread::hardware_concurrency();
else
max_workers_ = conf.max_threads;
g_debug ("starting indexer with up to %zu threads", max_workers_);
scan_done_ = false;
workers_.emplace_back(std::thread([this]{worker();}));
scan_done_ = clean_done_ = false;
scanner_worker_ = std::thread([this]{
progress_.reset();
if (conf_.scan) {
g_debug("starting scanner");
if (!scanner_.start()) {
g_warning ("failed to start scanner");
return;
}
scan_done_ = true;
g_debug ("scanner finished");
}
if (conf_.cleanup) {
g_debug ("starting cleanup");
cleanup();
clean_done_ = true;
g_debug ("cleanup finished");
}
store_.commit();
});
g_debug ("started indexer");
return true;
}
bool
Indexer::Private::stop()
{
scanner_.stop();
scan_done_ = clean_done_ = true;
const auto w_n = workers_.size();
fq_.clear();
if (scanner_worker_.joinable())
scanner_worker_.join();
for (auto&& w: workers_)
if (w.joinable())
w.join();
workers_.clear();
if (w_n > 0)
g_debug ("stopped indexer (joined %zu worker(s))", w_n);
return true;
}
Indexer::Indexer (Store& store):
priv_{std::make_unique<Private>(store)}
{}
Indexer::~Indexer() = default;
bool
Indexer::start(const Indexer::Config& conf)
{
std::lock_guard<std::mutex> l(priv_->lock_);
if (is_running())
return true;
return priv_->start(conf);
}
bool
Indexer::stop()
{
std::lock_guard<std::mutex> l(priv_->lock_);
if (!is_running())
return true;
g_debug ("stopping indexer");
return priv_->stop();
}
bool
Indexer::is_running() const
{
return !priv_->scan_done_ || !priv_->clean_done_ ||
!priv_->fq_.empty();
}
Indexer::Progress
Indexer::progress() const
{
return Progress{
is_running(),
priv_->progress_.processed,
priv_->progress_.updated,
priv_->progress_.removed
};
}

114
lib/index/mu-indexer.hh Normal file
View File

@ -0,0 +1,114 @@
/*
** Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
** Free Software Foundation; either version 3, or (at your option) any
** later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/
#ifndef MU_INDEXER_HH__
#define MU_INDEXER_HH__
#include <memory>
#include <chrono>
namespace Mu {
struct Store;
/// An object abstracting the index process.
class Indexer {
public:
/**
* Construct an indexer object
*
* @param store the message store to use
*/
Indexer (Store& store);
/**
* DTOR
*/
~Indexer();
/// A configuration object for the indexer
struct Config {
bool scan{true};
/**< scan for new messages */
bool cleanup{true};
/**< clean messages no longer in the file system */
size_t max_threads{};
/**< maximum # of threads to use */
bool ignore_noupdate{};
/**< ignore .noupdate files */
bool lazy_check{};
/**< whether to skip directories that don't have a changed
* mtime */
};
/**
* Start indexing. If already underway, do nothing.
*
* @param conf a configuration object
*
* @return true if starting worked or an indexing process was already
* underway; false otherwise.
*
*/
bool start(const Config& conf);
/**
* Stop indexing. If not indexing, do nothing.
*
*
* @return true if we stopped indexing, or indexing was not underway.
* False otherwise.
*/
bool stop();
/**
* Is an indexing process running?
*
* @return true or false.
*/
bool is_running() const;
// Object describing current progress
struct Progress {
bool running{}; /**< Is an index operation in progress? */
size_t processed{}; /**< Number of messages processed */
size_t updated{}; /**< Number of messages added/updated to store */
size_t removed{}; /**< Number of message removed from store */
};
/**
* Get an object describing the current progress. The progress object
* describes the most recent indexing job, and is reset up a fresh
* start().
*
* @return a progress object.
*/
Progress progress() const;
private:
struct Private;
std::unique_ptr<Private> priv_;
};
} // namepace Mu
#endif /* MU_INDEXER_HH__ */

242
lib/index/mu-scanner.cc Normal file
View File

@ -0,0 +1,242 @@
/*
** Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
** Free Software Foundation; either version 3, or (at your option) any
** later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/
#include "mu-scanner.hh"
#include "config.h"
#include <chrono>
#include <mutex>
#include <atomic>
#include <thread>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <glib.h>
#include "utils/mu-utils.hh"
#include "utils/mu-error.hh"
using namespace Mu;
struct Scanner::Private {
Private (const std::string& root_dir,
Scanner::Handler handler):
root_dir_{root_dir}, handler_{handler} {
if (!handler_)
throw Mu::Error{Error::Code::Internal, "missing handler"};
}
~Private() {
stop();
}
bool start();
bool stop();
bool process_dentry (const std::string& path, struct dirent *dentry, bool is_maildir);
bool process_dir (const std::string& path, bool is_maildir);
const std::string root_dir_;
const Scanner::Handler handler_;
std::atomic<bool> running_{};
std::mutex lock_;
};
static bool
is_special_dir (const struct dirent *dentry)
{
const auto d_name{dentry->d_name};
return d_name[0] == '\0' ||
(d_name[1] == '\0' && d_name[0] == '.') ||
(d_name[2] == '\0' && d_name[0] == '.' && d_name[1] == '.');
}
static bool
is_new_cur (const char *dirname)
{
if (dirname[0] == 'c' && dirname[1] == 'u' && dirname[2] == 'r' && dirname[3] == '\0')
return true;
if (dirname[0] == 'n' && dirname[1] == 'e' && dirname[2] == 'w' && dirname[3] == '\0')
return true;
return false;
}
bool
Scanner::Private::process_dentry (const std::string& path, struct dirent *dentry,
bool is_maildir)
{
if (is_special_dir (dentry))
return true; // ignore.
const auto fullpath{path + "/" + dentry->d_name};
struct stat statbuf;
if (::stat(fullpath.c_str(), &statbuf) != 0) {
g_warning ("failed to stat %s: %s", fullpath.c_str(), ::strerror(errno));
return false;
}
if (S_ISDIR(statbuf.st_mode)) {
const auto res = handler_(fullpath, &statbuf, Scanner::HandleType::EnterDir);
if (!res) {
//g_debug ("skipping dir %s", fullpath.c_str());
return true; // skip
}
process_dir (fullpath, is_new_cur(dentry->d_name));
return handler_(fullpath, &statbuf, Scanner::HandleType::LeaveDir);
} else if (S_ISREG(statbuf.st_mode) && is_maildir)
return handler_(fullpath, &statbuf, Scanner::HandleType::File);
g_debug ("skip %s (neither maildir-file nor directory)", fullpath.c_str());
return true;
}
bool
Scanner::Private::process_dir (const std::string& path, bool is_maildir)
{
const auto dir = opendir (path.c_str());
if (G_UNLIKELY(!dir)) {
g_warning("failed to scan dir %s: %s", path.c_str(), strerror(errno));
return false;
}
// TODO: sort dentries by inode order, which makes things faster for extfs.
// see mu-maildir.c
while (running_) {
errno = 0;
const auto dentry{readdir(dir)};
if (G_LIKELY(dentry)) {
process_dentry (path, dentry, is_maildir);
continue;
}
if (errno != 0) {
g_warning("failed to read %s: %s", path.c_str(), strerror(errno));
continue;
}
break;
}
closedir (dir);
return true;
}
bool
Scanner::Private::start()
{
const auto& path{root_dir_};
if (G_UNLIKELY(path.length() > PATH_MAX)) {
g_warning("path too long");
return false;
}
const auto mode{F_OK | R_OK};
if (G_UNLIKELY(access (path.c_str(), mode) != 0)) {
g_warning("'%s' is not readable: %s", path.c_str(), strerror (errno));
return false;
}
struct stat statbuf{};
if (G_UNLIKELY(stat (path.c_str(), &statbuf) != 0)) {
g_warning("'%s' is not stat'able: %s", path.c_str(), strerror (errno));
return false;
}
if (G_UNLIKELY(!S_ISDIR (statbuf.st_mode))) {
g_warning("'%s' is not a directory", path.c_str());
return false;
}
running_ = true;
g_debug ("starting scan @ %s", root_dir_.c_str());
auto basename{g_path_get_basename(root_dir_.c_str())};
const auto is_maildir = (g_strcmp0(basename, "cur") == 0 ||
g_strcmp0(basename,"new") == 0);
g_free(basename);
const auto start{std::chrono::steady_clock::now()};
process_dir(root_dir_, is_maildir);
const auto elapsed = std::chrono::steady_clock::now() - start;
g_debug ("finished scan of %s in %" G_GINT64_FORMAT " ms", root_dir_.c_str(),
to_ms(elapsed));
running_ = false;
return true;
}
bool
Scanner::Private::stop()
{
if (!running_)
return true; // nothing to do
g_debug ("stopping scan");
running_ = false;
return true;
}
Scanner::Scanner (const std::string& root_dir,
Scanner::Handler handler):
priv_{std::make_unique<Private>(root_dir, handler)}
{}
Scanner::~Scanner() = default;
bool
Scanner::start()
{
{
std::lock_guard<std::mutex> l(priv_->lock_);
if (priv_->running_)
return true; //nothing to do
priv_->running_ = true;
}
const auto res = priv_->start();
priv_->running_ = false;
return res;
}
bool
Scanner::stop()
{
std::lock_guard<std::mutex> l(priv_->lock_);
return priv_->stop();
}
bool
Scanner::is_running() const
{
return priv_->running_;
}

96
lib/index/mu-scanner.hh Normal file
View File

@ -0,0 +1,96 @@
/*
** Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
** Free Software Foundation; either version 3, or (at your option) any
** later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/
#ifndef MU_SCANNER_HH__
#define MU_SCANNER_HH__
#include <functional>
#include <memory>
#include <dirent.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
namespace Mu {
/// @brief Maildir scanner
///
/// Scans maildir (trees) recursively, and calls the Handler callback for
/// directories & files.
///
/// It filters out (i.e., does call the handler for):
/// - files starting with '.'
/// - files that do not live in a cur / new leaf maildir
/// - directories '.' and '..'
///
class Scanner {
public:
enum struct HandleType { File, EnterDir, LeaveDir };
/// Prototype for a handler function
using Handler = std::function<bool(const std::string& fullpath,
struct stat* statbuf,
HandleType htype)>;
/**
* Construct a scanner object for scanning a directory, recursively.
*
* If handler is a directroy
*
*
* @param root_dir root dir to start scanning
* @param handler handler function for some direntry
*/
Scanner (const std::string& root_dir, Handler handler);
/**
* DTOR
*/
~Scanner();
/**
* Start the scan; this is a blocking call than run until
* finished or (from another thread) stop() is called.
*
* @return true if starting worked; false otherwise
*/
bool start();
/**
* Stop the scan
*
* @return true if stopping worked; false otherwi%sse
*/
bool stop();
/**
* Is a scan currently running?
*
* @return true or false
*/
bool is_running() const;
private:
struct Private;
std::unique_ptr<Private> priv_;
};
} // namepace Mu
#endif /* MU_SCANNER_HH__ */

68
lib/index/test-scanner.cc Normal file
View File

@ -0,0 +1,68 @@
/*
** Copyright (C) 2017 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This library is free software; you can redistribute it and/or
** modify it under the terms of the GNU Lesser General Public License
** as published by the Free Software Foundation; either version 2.1
** of the License, or (at your option) any later version.
**
** This library is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
** Lesser General Public License for more details.
**
** You should have received a copy of the GNU Lesser General Public
** License along with this library; if not, write to the Free
** Software Foundation, 51 Franklin Street, Fifth Floor, Boston, MA
** 02110-1301, USA.
*/
#include <vector>
#include <glib.h>
#include <iostream>
#include <sstream>
#include "mu-scanner.hh"
#include "mu-utils.hh"
using namespace Mu;
static void
test_scan_maildir ()
{
allow_warnings();
Scanner scanner{"/home/djcb/Maildir",
[](const dirent* dentry)->bool {
g_print ("%02x %s\n", dentry->d_type, dentry->d_name);
return true;
},
[](const std::string& fullpath, const struct stat* statbuf,
auto&& info)->bool {
g_print ("%s %zu\n", fullpath.c_str(), statbuf->st_size);
return true;
}
};
g_assert_true (scanner.start());
while (scanner.is_running()) {
sleep(1);
}
}
int
main (int argc, char *argv[]) try
{
g_test_init (&argc, &argv, NULL);
g_test_add_func ("/utils/scanner/scan-maildir", test_scan_maildir);
return g_test_run ();
} catch (const std::runtime_error& re) {
std::cerr << re.what() << "\n";
return 1;
}

View File

@ -1,476 +0,0 @@
/*
** Copyright (C) 2008-2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify
1** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 3 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/
#include "config.h"
#include "mu-index.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <glib.h>
#include <glib/gstdio.h>
#include <errno.h>
#include "mu-maildir.h"
#define MU_LAST_USED_MAILDIR_KEY "last_used_maildir"
#define MU_INDEX_MAX_FILE_SIZE (500*1000*1000) /* 500 Mb */
/* apparently, people are getting really big mails, so let us index those (by
* default)*/
struct _MuIndex {
MuStore *_store;
gboolean _needs_reindex;
guint _max_filesize;
};
MuIndex*
mu_index_new (MuStore *store, GError **err)
{
MuIndex *index;
unsigned count;
g_return_val_if_fail (store, NULL);
g_return_val_if_fail (!mu_store_is_read_only(store), NULL);
index = g_new0 (MuIndex, 1);
index->_store = mu_store_ref (store);
/* set the default max file size */
index->_max_filesize = MU_INDEX_MAX_FILE_SIZE;
count = mu_store_count (store, err);
if (count == (unsigned)-1)
return NULL;
else if (count == 0)
index->_needs_reindex = TRUE;
return index;
}
void
mu_index_destroy (MuIndex *index)
{
if (!index)
return;
mu_store_unref (index->_store);
g_free (index);
}
struct _MuIndexCallbackData {
MuIndexMsgCallback _idx_msg_cb;
MuIndexDirCallback _idx_dir_cb;
MuStore* _store;
void* _user_data;
MuIndexStats* _stats;
gboolean _reindex;
gboolean _lazy_check;
time_t _dirstamp;
guint _max_filesize;
};
typedef struct _MuIndexCallbackData MuIndexCallbackData;
/* checks to determine if we need to (re)index this message note:
* simply checking timestamps is not good enough because message may
* be moved from other dirs (e.g. from 'new' to 'cur') and the time
* stamps won't change. */
static inline gboolean
needs_index (MuIndexCallbackData *data, const char *fullpath,
time_t filestamp)
{
/* unconditionally reindex */
if (data->_reindex)
return TRUE;
/* it's not in the database yet */
if (!mu_store_contains_message (data->_store, fullpath))
return TRUE;
/* it's there, but it's not up to date */
if ((unsigned)filestamp >= (unsigned)data->_dirstamp)
return TRUE;
return FALSE; /* index not needed */
}
static MuError
insert_or_update_maybe (const char *fullpath, const char *mdir,
time_t filestamp, MuIndexCallbackData *data,
gboolean *updated)
{
MuMsg *msg;
GError *err;
gboolean rv;
*updated = FALSE;
if (!needs_index (data, fullpath, filestamp))
return MU_OK; /* nothing to do for this one */
err = NULL;
msg = mu_msg_new_from_file (fullpath, mdir, &err);
if (!msg) {
if (!err)
g_warning ("error creating message object: %s",
fullpath);
else {
g_warning ("%s", err->message);
g_clear_error (&err);
}
/* warn, then simply continue */
return MU_OK;
}
/* we got a valid id; scan the message contents as well */
rv = mu_store_add_msg (data->_store, msg, &err);
mu_msg_unref (msg);
if (!rv) {
g_warning ("error storing message object: %s",
err ? err->message : "cause unknown");
g_clear_error (&err);
return MU_ERROR;
}
*updated = TRUE;
return MU_OK;
}
static MuError
run_msg_callback_maybe (MuIndexCallbackData *data)
{
MuError result;
if (!data || !data->_idx_msg_cb)
return MU_OK;
result = data->_idx_msg_cb (data->_stats, data->_user_data);
if (G_UNLIKELY(result != MU_OK && result != MU_STOP))
g_warning ("error in callback");
return result;
}
static MuError
on_run_maildir_msg (const char *fullpath, const char *mdir,
struct stat *statbuf, MuIndexCallbackData *data)
{
MuError result;
gboolean updated;
/* protect against too big messages */
if (G_UNLIKELY(statbuf->st_size > data->_max_filesize)) {
g_warning ("ignoring because bigger than %u bytes: %s",
data->_max_filesize, fullpath);
return MU_OK; /* not an error */
}
result = run_msg_callback_maybe (data);
if (result != MU_OK)
return result;
/* see if we need to update/insert anything...
* use the ctime, so any status change will be visible (perms,
* filename etc.)*/
result = insert_or_update_maybe (fullpath, mdir, statbuf->st_ctime,
data, &updated);
if (result == MU_OK && data && data->_stats) { /* update statistics */
++data->_stats->_processed;
updated ? ++data->_stats->_updated : ++data->_stats->_uptodate;
}
return result;
}
static time_t
get_dir_timestamp (const char *path)
{
struct stat statbuf;
if (stat (path, &statbuf) != 0) {
g_warning ("failed to stat %s: %s",
path, strerror(errno));
return 0;
}
return statbuf.st_ctime;
}
static MuError
on_run_maildir_dir (const char* fullpath, gboolean enter,
MuIndexCallbackData *data)
{
GError *err;
err = NULL;
/* xapian stores a per-dir timestamp; we use this timestamp to determine
* whether a message is up-to-date
*/
if (enter) {
data->_dirstamp =
mu_store_get_dirstamp (data->_store, fullpath, &err);
/* in 'lazy' mode, we only check the dir timestamp, and if it's
* up to date, we don't bother with this dir. This fails to
* account for messages below this dir that have merely
* _changed_ though */
if (data->_lazy_check && mu_maildir_is_leaf_dir(fullpath)) {
time_t dirstamp;
dirstamp = get_dir_timestamp (fullpath);
if (dirstamp <= data->_dirstamp) {
g_debug ("ignore %s (up-to-date)", fullpath);
return MU_IGNORE;
}
}
g_debug ("entering %s", fullpath);
} else {
mu_store_set_dirstamp (data->_store, fullpath,
time(NULL), &err);
g_debug ("leaving %s", fullpath);
}
if (data->_idx_dir_cb)
return data->_idx_dir_cb (fullpath, enter,
data->_user_data);
if (err) {
g_warning("%s: error handling %s: %s", __func__,
fullpath, err->message);
g_clear_error(&err);
}
return MU_OK;
}
static gboolean
check_path (const char *path)
{
g_return_val_if_fail (path, FALSE);
if (!g_path_is_absolute (path)) {
g_warning ("%s: not an absolute path: '%s'", __func__, path);
return FALSE;
}
if (access (path, R_OK) != 0) {
g_warning ("%s: cannot open '%s': %s",
__func__, path, strerror (errno));
return FALSE;
}
return TRUE;
}
static void
init_cb_data (MuIndexCallbackData *cb_data, MuStore *xapian,
gboolean reindex, gboolean lazycheck,
guint max_filesize, MuIndexStats *stats,
MuIndexMsgCallback msg_cb, MuIndexDirCallback dir_cb,
void *user_data)
{
cb_data->_idx_msg_cb = msg_cb;
cb_data->_idx_dir_cb = dir_cb;
cb_data->_user_data = user_data;
cb_data->_store = xapian;
cb_data->_reindex = reindex;
cb_data->_lazy_check = lazycheck;
cb_data->_dirstamp = 0;
cb_data->_max_filesize = max_filesize;
cb_data->_stats = stats;
if (cb_data->_stats)
memset (cb_data->_stats, 0, sizeof(MuIndexStats));
}
void
mu_index_set_max_msg_size (MuIndex *index, guint max_size)
{
g_return_if_fail (index);
if (max_size == 0)
index->_max_filesize = MU_INDEX_MAX_FILE_SIZE;
else
index->_max_filesize = max_size;
}
MuError
mu_index_run (MuIndex *index, gboolean reindex, gboolean lazycheck,
MuIndexStats *stats,
MuIndexMsgCallback msg_cb, MuIndexDirCallback dir_cb,
void *user_data)
{
MuIndexCallbackData cb_data;
MuError rv;
const char *path;
g_return_val_if_fail (index && index->_store, MU_ERROR);
g_return_val_if_fail (msg_cb, MU_ERROR);
path = mu_store_root_maildir (index->_store);
if (!check_path (path))
return MU_ERROR;
if (index->_needs_reindex)
reindex = TRUE;
init_cb_data (&cb_data, index->_store, reindex, lazycheck,
index->_max_filesize, stats,
msg_cb, dir_cb, user_data);
rv = mu_maildir_walk (path,
(MuMaildirWalkMsgCallback)on_run_maildir_msg,
(MuMaildirWalkDirCallback)on_run_maildir_dir,
reindex, /* re-index, ie. do a full update */
&cb_data);
mu_store_flush (index->_store);
return rv;
}
static MuError
on_stats_maildir_file (const char *fullpath, const char *mdir,
struct stat *statbuf,
MuIndexCallbackData *cb_data)
{
MuError result;
if (cb_data && cb_data->_idx_msg_cb)
result = cb_data->_idx_msg_cb (cb_data->_stats,
cb_data->_user_data);
else
result = MU_OK;
if (result == MU_OK) {
if (cb_data->_stats)
++cb_data->_stats->_processed;
return MU_OK;
}
return result; /* MU_STOP or MU_OK */
}
MuError
mu_index_stats (MuIndex *index,
MuIndexStats *stats, MuIndexMsgCallback cb_msg,
MuIndexDirCallback cb_dir, void *user_data)
{
const char *path;
MuIndexCallbackData cb_data;
g_return_val_if_fail (index, MU_ERROR);
g_return_val_if_fail (cb_msg, MU_ERROR);
path = mu_store_root_maildir (index->_store);
if (!check_path (path))
return MU_ERROR;
if (stats)
memset (stats, 0, sizeof(MuIndexStats));
cb_data._idx_msg_cb = cb_msg;
cb_data._idx_dir_cb = cb_dir;
cb_data._stats = stats;
cb_data._user_data = user_data;
cb_data._dirstamp = 0;
return mu_maildir_walk (path,
(MuMaildirWalkMsgCallback)on_stats_maildir_file,
NULL, FALSE, &cb_data);
}
struct _CleanupData {
MuStore *_store;
MuIndexStats *_stats;
MuIndexCleanupDeleteCallback _cb;
void *_user_data;
};
typedef struct _CleanupData CleanupData;
static MuError
foreach_doc_cb (const char* path, CleanupData *cudata)
{
if (access (path, R_OK) != 0) {
if (errno != EACCES)
g_debug ("cannot access %s: %s", path, strerror(errno));
if (!mu_store_remove_path (cudata->_store, path))
return MU_ERROR; /* something went wrong... bail out */
if (cudata->_stats)
++cudata->_stats->_cleaned_up;
}
if (cudata->_stats)
++cudata->_stats->_processed;
if (!cudata->_cb)
return MU_OK;
return cudata->_cb (cudata->_stats, cudata->_user_data);
}
MuError
mu_index_cleanup (MuIndex *index, MuIndexStats *stats,
MuIndexCleanupDeleteCallback cb,
void *user_data, GError **err)
{
MuError rv;
CleanupData cudata;
g_return_val_if_fail (index, MU_ERROR);
cudata._store = index->_store;
cudata._stats = stats;
cudata._cb = cb;
cudata._user_data = user_data;
rv = mu_store_foreach (index->_store,
(MuStoreForeachFunc)foreach_doc_cb,
&cudata, err);
mu_store_flush (index->_store);
return rv;
}
gboolean
mu_index_stats_clear (MuIndexStats *stats)
{
if (!stats)
return FALSE;
memset (stats, 0, sizeof(MuIndexStats));
return TRUE;
}

View File

@ -1,193 +0,0 @@
/*
** Copyright (C) 2008-2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 3 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/
#ifndef __MU_INDEX_H__
#define __MU_INDEX_H__
#include <stdlib.h>
#include <glib.h>
#include <utils/mu-util.h>
#include <mu-store.hh>
G_BEGIN_DECLS
/* opaque structure */
struct _MuIndex;
typedef struct _MuIndex MuIndex;
struct _MuIndexStats {
unsigned _processed; /* number of msgs processed or counted */
unsigned _updated; /* number of msgs new or updated */
unsigned _cleaned_up; /* number of msgs cleaned up */
unsigned _uptodate; /* number of msgs already up-to-date */
};
typedef struct _MuIndexStats MuIndexStats;
/**
* create a new MuIndex instance. NOTE: the database does not have
* to exist yet, but the directory must already exist; NOTE(2): before
* doing anything with the returned Index object, make sure you haved
* called mu_msg_init somewhere in your code.
*
* @param store a writable MuStore object
* @param err to receive error or NULL; there are only errors when this
* function returns NULL. Possible errors: see mu-error.h
*
* @return a new MuIndex instance, or NULL in case of error
*/
MuIndex* mu_index_new (MuStore *store, GError **err)
G_GNUC_MALLOC G_GNUC_WARN_UNUSED_RESULT;
/**
* destroy the index instance
*
* @param index a MuIndex instance, or NULL
*/
void mu_index_destroy (MuIndex *index);
/**
* change the maximum file size that mu-index considers from its
* default (MU_INDEX_MAX_FILE_SIZE). Note that the maximum size is a
* protection against mu (or the libraries it uses) allocating too
* much memory, which can lead to problems
*
* @param index a mu index object
* @param max_size the maximum msg size, or 0 to reset to the default
*/
void mu_index_set_max_msg_size (MuIndex *index, guint max_size);
/**
* callback function for mu_index_(run|stats|cleanup), for each message
*
* @param stats pointer to structure to receive statistics data
* @param user_data pointer to user data
*
* @return MU_OK to continue, MU_STOP to stop, or MU_ERROR in
* case of some error.
*/
typedef MuError (*MuIndexMsgCallback) (MuIndexStats* stats, void *user_data);
/**
* callback function for mu_index_(run|stats|cleanup), for each dir enter/leave
*
* @param path dirpath we just entered / left
* @param enter did we enter (TRUE) or leave(FALSE) the dir?
* @param user_data pointer to user data
*
* @return MU_OK to continue, MU_STOP to stopd or MU_ERROR in
* case of some error.
*/
typedef MuError (*MuIndexDirCallback) (const char* path, gboolean enter,
void *user_data);
/**
* start the indexing process
*
* @param index a valid MuIndex instance
* @param force if != 0, force re-indexing already index messages; this is
* obviously a lot slower than only indexing new/changed messages
* @param lazycheck whether ignore subdirectoryies that have up-to-date
* timestamps.
* @param stats a structure with some statistics about the results;
* note that this function does *not* reset the struct values to allow
* for cumulative stats from multiple calls. If needed, you can use
* @mu_index_stats_clear before calling this function
* @param cb_msg a callback function called for every msg indexed;
* @param cb_dir a callback function called for every dir entered/left or NULL
* @param user_data a user pointer that will be passed to the callback function
*
* @return MU_OK if the stats gathering was completed successfully,
* MU_STOP if the user stopped or MU_ERROR in
* case of some error.
*/
MuError mu_index_run (MuIndex *index, gboolean force,
gboolean lazycheck, MuIndexStats *stats,
MuIndexMsgCallback msg_cb,
MuIndexDirCallback dir_cb, void *user_data);
/**
* gather some statistics about the Maildir; this is usually much faster than
* mu_index_run, and can thus be used to provide some information to the user
* note though that the statistics may be different from the reality that
* mu_index_run sees, when there are updates in the Maildir
*
* @param index a valid MuIndex instance
* @param stats a structure with some statistics about the results;
* note that this function does *not* reset the struct values to allow
* for cumulative stats from multiple calls. If needed, you can use
* @mu_index_stats_clear before calling this function
* @param msg_cb a callback function which will be called for every msg;
* @param dir_cb a callback function which will be called for every dir or NULL
* @param user_data a user pointer that will be passed to the callback function
* xb
* @return MU_OK if the stats gathering was completed successfully,
* MU_STOP if the user stopped or MU_ERROR in
* case of some error.
*/
MuError mu_index_stats (MuIndex *index, MuIndexStats *stats,
MuIndexMsgCallback msg_cb, MuIndexDirCallback dir_cb,
void *user_data);
/**
* callback function called for each message
*
* @param MuIndexCleanupCallback
*
* @return a MuResult
*/
typedef MuError (*MuIndexCleanupDeleteCallback) (MuIndexStats *stats,
void *user_data);
/**
* cleanup the database; ie. remove entries for which no longer a corresponding
* file exists in the maildir
*
* @param index a valid MuIndex instance
* @param stats a structure with some statistics about the results;
* note that this function does *not* reset the struct values to allow
* for cumulative stats from multiple calls. If needed, you can use
* @mu_index_stats_clear before calling this function
* @param cb a callback function which will be called for every msg;
* @param user_data a user pointer that will be passed to the callback function
* @param err to receive error info or NULL. err->code is MuError value
*
* @return MU_OK if the stats gathering was completed successfully,
* MU_STOP if the user stopped or MU_ERROR in
* case of some error.
*/
MuError mu_index_cleanup (MuIndex *index, MuIndexStats *stats,
MuIndexCleanupDeleteCallback cb,
void *user_data, GError **err);
/**
* clear the stats structure
*
* @param stats a MuIndexStats object
*
* @return TRUE if stats != NULL, FALSE otherwise
*/
gboolean mu_index_stats_clear (MuIndexStats *stats);
G_END_DECLS
#endif /*__MU_INDEX_H__*/

View File

@ -105,12 +105,9 @@ mu_msg_new_from_doc (XapianDocument *doc, GError **err)
{ {
MuMsg *self; MuMsg *self;
MuMsgDoc *msgdoc; MuMsgDoc *msgdoc;
gint64 start;
g_return_val_if_fail (doc, NULL); g_return_val_if_fail (doc, NULL);
start = g_get_monotonic_time();
if (G_UNLIKELY(!_gmime_initialized)) { if (G_UNLIKELY(!_gmime_initialized)) {
gmime_init (); gmime_init ();
atexit (gmime_uninit); atexit (gmime_uninit);

View File

@ -19,14 +19,20 @@
#include "config.h" #include "config.h"
#include <chrono>
#include <mutex> #include <mutex>
#include <array> #include <array>
#include <cstdlib> #include <cstdlib>
#include <xapian.h> #include <stdexcept>
#include <unordered_map> #include <unordered_map>
#include <atomic> #include <atomic>
#include <type_traits>
#include <iostream> #include <iostream>
#include <cstring> #include <cstring>
#include <xapian.h>
#include "mu-store.hh" #include "mu-store.hh"
#include "utils/mu-str.h" #include "utils/mu-str.h"
#include "utils/mu-error.hh" #include "utils/mu-error.hh"
@ -36,17 +42,22 @@
using namespace Mu; using namespace Mu;
static_assert(std::is_same<Store::Id, Xapian::docid>::value,
"wrong type for Store::Id");
constexpr auto SchemaVersionKey = "schema-version"; constexpr auto SchemaVersionKey = "schema-version";
constexpr auto RootMaildirKey = "maildir"; // XXX: make this 'root-maildir' constexpr auto RootMaildirKey = "maildir"; // XXX: make this 'root-maildir'
constexpr auto ContactsKey = "contacts"; constexpr auto ContactsKey = "contacts";
constexpr auto PersonalAddressesKey = "personal-addresses"; constexpr auto PersonalAddressesKey = "personal-addresses";
constexpr auto CreatedKey = "created"; constexpr auto CreatedKey = "created";
constexpr auto BatchSize = 150'000; constexpr auto BatchSizeKey = "batch-size";
constexpr auto DefaultBatchSize = 250'000U;
constexpr auto MaxMessageSizeKey = "max-message-size";
constexpr auto DefaultMaxMessageSize = 100'000'000U;
constexpr auto ExpectedSchemaVersion = MU_STORE_SCHEMA_VERSION; constexpr auto ExpectedSchemaVersion = MU_STORE_SCHEMA_VERSION;
extern "C" { extern "C" {
static unsigned add_or_update_msg (MuStore *store, unsigned docid, MuMsg *msg, GError **err); static unsigned add_or_update_msg (MuStore *store, unsigned docid, MuMsg *msg, GError **err);
} }
@ -98,52 +109,58 @@ struct Store::Private {
#define LOCKED std::lock_guard<std::mutex> l(lock_); #define LOCKED std::lock_guard<std::mutex> l(lock_);
enum struct XapianOpts {ReadOnly, Open, CreateOverwrite };
Private (const std::string& path, bool readonly): Private (const std::string& path, bool readonly):
db_path_{path}, db_{make_xapian(path, readonly ? XapianOpts::ReadOnly : XapianOpts::Open)},
db_{readonly? mdata_{make_metadata(path)},
std::make_shared<Xapian::Database>(db_path_) :
std::make_shared<Xapian::WritableDatabase>(db_path_, Xapian::DB_OPEN)},
root_maildir_{db()->get_metadata(RootMaildirKey)},
created_{atoll(db()->get_metadata(CreatedKey).c_str())},
schema_version_{db()->get_metadata(SchemaVersionKey)},
personal_addresses_{Mu::split(db()->get_metadata(PersonalAddressesKey),",")},
contacts_{db()->get_metadata(ContactsKey)} { contacts_{db()->get_metadata(ContactsKey)} {
if (!readonly)
wdb()->begin_transaction();
} }
Private (const std::string& path, const std::string& root_maildir, Private (const std::string& path, const std::string& root_maildir,
const StringVec& personal_addresses): const StringVec& personal_addresses, const Store::Config& conf):
db_path_{path}, db_{make_xapian(path, XapianOpts::CreateOverwrite)},
db_{std::make_shared<Xapian::WritableDatabase>( mdata_{init_metadata(conf, path, root_maildir, personal_addresses)} {
db_path_, Xapian::DB_CREATE_OR_OVERWRITE)},
root_maildir_{root_maildir},
created_{time({})},
schema_version_{MU_STORE_SCHEMA_VERSION},
personal_addresses_{personal_addresses} {
writable_db()->set_metadata(SchemaVersionKey, schema_version_);
writable_db()->set_metadata(RootMaildirKey, root_maildir_);
writable_db()->set_metadata(CreatedKey, Mu::format("%" PRId64, (int64_t)created_));
std::string addrs;
for (const auto& addr : personal_addresses_) { // _very_ minimal check.
if (addr.find(",") != std::string::npos)
throw Mu::Error(Error::Code::InvalidArgument,
"e-mail address '%s' contains comma", addr.c_str());
addrs += (addrs.empty() ? "": ",") + addr;
}
writable_db()->set_metadata (PersonalAddressesKey, addrs);
wdb()->begin_transaction();
} }
~Private() try { ~Private() try {
LOCKED; LOCKED;
g_debug("closing store @ %s", mdata_.database_path.c_str());
if (wdb()) { if (wdb()) {
wdb()->set_metadata (ContactsKey, contacts_.serialize()); wdb()->set_metadata (ContactsKey, contacts_.serialize());
if (in_transaction_) // auto-commit. commit();
wdb()->commit_transaction();
} }
} MU_XAPIAN_CATCH_BLOCK; } MU_XAPIAN_CATCH_BLOCK;
std::shared_ptr<Xapian::Database> make_xapian (const std::string db_path,
XapianOpts opts) try {
switch (opts) {
case XapianOpts::ReadOnly:
return std::make_shared<Xapian::Database>(db_path);
case XapianOpts::Open:
return std::make_shared<Xapian::WritableDatabase>(
db_path, Xapian::DB_OPEN);
case XapianOpts::CreateOverwrite:
return std::make_shared<Xapian::WritableDatabase>(
db_path, Xapian::DB_CREATE_OR_OVERWRITE);
default:
throw std::logic_error ("invalid xapian options");
}
} catch (const Xapian::DatabaseError& xde) {
throw Mu::Error(Error::Code::Store, "failed to open store @ %s: %s",
db_path.c_str(), xde.get_msg().c_str());
} catch (...) {
throw Mu::Error(Error::Code::Internal,
"something went wrong when opening store @ %s",
db_path.c_str());
}
std::shared_ptr<Xapian::Database> db() const { std::shared_ptr<Xapian::Database> db() const {
if (!db_) if (!db_)
throw Mu::Error(Error::Code::NotFound, "no database found"); throw Mu::Error(Error::Code::NotFound, "no database found");
@ -162,16 +179,16 @@ struct Store::Private {
return w_db; return w_db;
} }
void begin_transaction () try { void dirty () try {
wdb()->begin_transaction(); if (++dirtiness_ > mdata_.batch_size)
in_transaction_ = true; commit();
dirtiness_ = 0;
} MU_XAPIAN_CATCH_BLOCK; } MU_XAPIAN_CATCH_BLOCK;
void commit_transaction () try { void commit () try {
in_transaction_ = false; g_debug("committing %zu modification(s)", dirtiness_);
dirtiness_ = 0; dirtiness_ = 0;
wdb()->commit_transaction(); wdb()->commit_transaction();
wdb()->begin_transaction();
} MU_XAPIAN_CATCH_BLOCK; } MU_XAPIAN_CATCH_BLOCK;
void add_synonyms () { void add_synonyms () {
@ -186,19 +203,58 @@ struct Store::Private {
return (time_t)atoll(db()->get_metadata(key).c_str()); return (time_t)atoll(db()->get_metadata(key).c_str());
} }
Store::Metadata make_metadata(const std::string& db_path) {
Store::Metadata mdata;
mdata.database_path = db_path;
mdata.schema_version = db()->get_metadata(SchemaVersionKey);
mdata.created = ::atoll(db()->get_metadata(CreatedKey).c_str());
mdata.read_only = !wdb();
mdata.batch_size = ::atoll(db()->get_metadata(BatchSizeKey).c_str());
mdata.max_message_size = ::atoll(db()->get_metadata(MaxMessageSizeKey).c_str());
mdata.root_maildir = db()->get_metadata(RootMaildirKey);
mdata.personal_addresses = Mu::split(db()->get_metadata(PersonalAddressesKey),",");
return mdata;
}
Store::Metadata init_metadata(const Store::Config& conf,
const std::string& path, const std::string& root_maildir,
const StringVec& personal_addresses) {
wdb()->set_metadata(SchemaVersionKey, ExpectedSchemaVersion);
wdb()->set_metadata(CreatedKey, Mu::format("%" PRId64, (int64_t)::time({})));
const size_t batch_size = conf.batch_size ? conf.batch_size : DefaultBatchSize;
wdb()->set_metadata(BatchSizeKey, Mu::format("%zu", batch_size));
const size_t max_msg_size = conf.max_message_size ?
conf.max_message_size : DefaultMaxMessageSize;
wdb()->set_metadata(MaxMessageSizeKey, Mu::format("%zu", max_msg_size));
wdb()->set_metadata(RootMaildirKey, root_maildir);
std::string addrs;
for (const auto& addr : personal_addresses) { // _very_ minimal check.
if (addr.find(",") != std::string::npos)
throw Mu::Error(Error::Code::InvalidArgument,
"e-mail address '%s' contains comma", addr.c_str());
addrs += (addrs.empty() ? "": ",") + addr;
}
wdb()->set_metadata (PersonalAddressesKey, addrs);
return make_metadata(path);
}
const std::string db_path_;
std::shared_ptr<Xapian::Database> db_; std::shared_ptr<Xapian::Database> db_;
const std::string root_maildir_; const Store::Metadata mdata_;
const time_t created_{};
const std::string schema_version_;
const StringVec personal_addresses_;
Contacts contacts_; Contacts contacts_;
std::unique_ptr<Indexer> indexer_;
std::atomic<bool> in_transaction_{}; std::atomic<bool> in_transaction_{};
std::mutex lock_; std::mutex lock_;
size_t dirtiness_{}; size_t dirtiness_{};
mutable std::atomic<std::size_t> ref_count_{1}; mutable std::atomic<std::size_t> ref_count_{1};
@ -222,48 +278,30 @@ get_uid_term (const char* path)
return std::string{uid_term, sizeof(uid_term)}; return std::string{uid_term, sizeof(uid_term)};
} }
#undef LOCKED #undef LOCKED
#define LOCKED std::lock_guard<std::mutex> l(priv_->lock_); #define LOCKED std::lock_guard<std::mutex> l__(priv_->lock_)
Store::Store (const std::string& path, bool readonly): Store::Store (const std::string& path, bool readonly):
priv_{std::make_unique<Private>(path, readonly)} priv_{std::make_unique<Private>(path, readonly)}
{ {
if (ExpectedSchemaVersion != schema_version()) if (metadata().schema_version != ExpectedSchemaVersion)
throw Mu::Error(Error::Code::SchemaMismatch, throw Mu::Error(Error::Code::SchemaMismatch,
"expected schema-version %s, but got %s", "expected schema-version %s, but got %s",
ExpectedSchemaVersion, schema_version().c_str()); ExpectedSchemaVersion,
metadata().schema_version.c_str());
} }
Store::Store (const std::string& path, const std::string& maildir, Store::Store (const std::string& path, const std::string& maildir,
const StringVec& personal_addresses): const StringVec& personal_addresses, const Store::Config& conf):
priv_{std::make_unique<Private>(path, maildir, personal_addresses)} priv_{std::make_unique<Private>(path, maildir, personal_addresses, conf)}
{} {}
Store::~Store() = default; Store::~Store() = default;
bool const Store::Metadata&
Store::read_only() const Store::metadata() const
{ {
return !priv_->wdb(); return priv_->mdata_;
}
const std::string&
Store::root_maildir () const
{
return priv_->root_maildir_;
}
const StringVec&
Store::personal_addresses(void) const
{
return priv_->personal_addresses_;
}
const std::string&
Store::database_path() const
{
return priv_->db_path_;
} }
const Contacts& const Contacts&
@ -273,9 +311,23 @@ Store::contacts() const
return priv_->contacts_; return priv_->contacts_;
} }
Indexer&
Store::indexer()
{
LOCKED;
if (metadata().read_only)
throw Error{Error::Code::Store, "no indexer for read-only store"};
else if (!priv_->indexer_)
priv_->indexer_ = std::make_unique<Indexer>(*this);
return *priv_->indexer_.get();
}
std::size_t std::size_t
Store::size() const Store::size() const
{ {
LOCKED;
return priv_->db()->get_doccount(); return priv_->db()->get_doccount();
} }
@ -285,19 +337,6 @@ Store::empty() const
return size() == 0; return size() == 0;
} }
const std::string&
Store::schema_version() const
{
return priv_->schema_version_;
}
time_t
Store::created() const
{
return priv_->created_;
}
static std::string static std::string
maildir_from_path (const std::string& root, const std::string& path) maildir_from_path (const std::string& root, const std::string& path)
{ {
@ -333,7 +372,7 @@ Store::add_message (const std::string& path)
LOCKED; LOCKED;
GError *gerr{}; GError *gerr{};
const auto maildir{maildir_from_path(root_maildir(), path)}; const auto maildir{maildir_from_path(metadata().root_maildir, path)};
auto msg{mu_msg_new_from_file (path.c_str(), maildir.c_str(), &gerr)}; auto msg{mu_msg_new_from_file (path.c_str(), maildir.c_str(), &gerr)};
if (G_UNLIKELY(!msg)) if (G_UNLIKELY(!msg))
throw Error{Error::Code::Message, "failed to create message: %s", throw Error{Error::Code::Message, "failed to create message: %s",
@ -347,6 +386,7 @@ Store::add_message (const std::string& path)
gerr ? gerr->message : "something went wrong"}; gerr ? gerr->message : "something went wrong"};
g_debug ("added message @ %s; docid = %u", path.c_str(), docid); g_debug ("added message @ %s; docid = %u", path.c_str(), docid);
priv_->dirty();
return docid; return docid;
} }
@ -366,6 +406,7 @@ Store::update_message (MuMsg *msg, unsigned docid)
g_debug ("updated message @ %s; docid = %u", g_debug ("updated message @ %s; docid = %u",
mu_msg_get_path(msg), docid); mu_msg_get_path(msg), docid);
priv_->dirty();
return true; return true;
} }
@ -384,13 +425,26 @@ Store::remove_message (const std::string& path)
} MU_XAPIAN_CATCH_BLOCK_RETURN (false); } MU_XAPIAN_CATCH_BLOCK_RETURN (false);
g_debug ("deleted message @ %s from database", path.c_str()); g_debug ("deleted message @ %s from store", path.c_str());
priv_->dirty();
return true; return true;
} }
void
Store::remove_messages (const std::vector<Store::Id>& ids)
{
LOCKED;
try {
for (auto&& id: ids) {
priv()->wdb()->delete_document(id);
priv_->dirty();
}
} MU_XAPIAN_CATCH_BLOCK;
}
time_t time_t
Store::dirstamp (const std::string& path) const Store::dirstamp (const std::string& path) const
@ -413,6 +467,7 @@ Store::set_dirstamp (const std::string& path, time_t tstamp)
const std::size_t len = g_snprintf (data.data(), data.size(), "%zx", tstamp); const std::size_t len = g_snprintf (data.data(), data.size(), "%zx", tstamp);
priv_->writable_db()->set_metadata(path, std::string{data.data(), len}); priv_->writable_db()->set_metadata(path, std::string{data.data(), len});
priv_->dirty();
} }
@ -449,29 +504,38 @@ Store::contains_message (const std::string& path) const
} MU_XAPIAN_CATCH_BLOCK_RETURN(false); } MU_XAPIAN_CATCH_BLOCK_RETURN(false);
} }
void
Store::begin_transaction () try std::size_t
Store::for_each (Store::ForEachFunc func)
{ {
LOCKED; LOCKED;
priv_->begin_transaction();
} MU_XAPIAN_CATCH_BLOCK; size_t n{};
void try {
Store::commit_transaction () try Xapian::Enquire enq (*priv_->db().get());
{
LOCKED;
priv_->commit_transaction();
} MU_XAPIAN_CATCH_BLOCK; enq.set_query (Xapian::Query::MatchAll);
enq.set_cutoff (0,0);
bool Xapian::MSet matches(enq.get_mset (0, priv_->db()->get_doccount()));
Store::in_transaction () const
{ for (auto&& it = matches.begin(); it != matches.end(); ++it, ++n)
return priv_->in_transaction_; if (!func (*it, it.get_document().get_value(MU_MSG_FIELD_ID_PATH)))
break;
} MU_XAPIAN_CATCH_BLOCK;
return n;
} }
void
Store::commit () try
{
LOCKED;
priv_->commit();
} MU_XAPIAN_CATCH_BLOCK;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// C compat // C compat
@ -501,7 +565,7 @@ mutable_self (MuStore *store)
} }
auto s = reinterpret_cast<Mu::Store*>(store); auto s = reinterpret_cast<Mu::Store*>(store);
if (s->read_only()) { if (s->metadata().read_only) {
g_error ("store is read-only"); // terminates g_error ("store is read-only"); // terminates
return {}; return {};
} }
@ -533,66 +597,6 @@ mu_store_new_readable (const char* xpath, GError **err)
return NULL; return NULL;
} }
MuStore*
mu_store_new_writable (const char* xpath, GError **err)
{
g_return_val_if_fail (xpath, NULL);
g_debug ("opening database at %s (writable)", xpath);
try {
return reinterpret_cast<MuStore*>(new Store (xpath, false/*!readonly*/));
} catch (const Mu::Error& me) {
if (me.code() == Mu::Error::Code::SchemaMismatch) {
g_set_error (err, MU_ERROR_DOMAIN, MU_ERROR_XAPIAN_SCHEMA_MISMATCH,
"%s", me.what());
return NULL;
}
} catch (const Xapian::DatabaseLockError& dle) {
g_set_error (err, MU_ERROR_DOMAIN, MU_ERROR_XAPIAN_CANNOT_GET_WRITELOCK,
"database @ %s is write-locked", xpath);
return NULL;
} catch (const Xapian::Error& dbe) {
g_warning ("failed to open database @ %s: %s", xpath,
dbe.get_error_string() ? dbe.get_error_string() : "something went wrong");
}
g_set_error (err, MU_ERROR_DOMAIN, MU_ERROR_XAPIAN_CANNOT_OPEN,
"failed to open database @ %s", xpath);
return NULL;
}
MuStore*
mu_store_new_create (const char* xpath, const char *root_maildir,
const char **personal_addresses, GError **err)
{
g_return_val_if_fail (xpath, NULL);
g_return_val_if_fail (root_maildir, NULL);
g_debug ("create database at %s (root-maildir=%s)", xpath, root_maildir);
try {
StringVec addrs;
for (auto i = 0; personal_addresses && personal_addresses[i]; ++i)
addrs.emplace_back(personal_addresses[i]);
return reinterpret_cast<MuStore*>(
new Store (xpath, std::string{root_maildir}, addrs));
} catch (const Xapian::DatabaseLockError& dle) {
g_set_error (err, MU_ERROR_DOMAIN, MU_ERROR_XAPIAN_CANNOT_GET_WRITELOCK,
"database @ %s is write-locked already", xpath);
} catch (...) {
g_set_error (err, MU_ERROR_DOMAIN, MU_ERROR_XAPIAN,
"error opening database @ %s", xpath);
}
return NULL;
}
MuStore* MuStore*
mu_store_ref (MuStore* store) mu_store_ref (MuStore* store)
@ -619,30 +623,6 @@ mu_store_unref (MuStore* store)
return NULL; return NULL;
} }
gboolean
mu_store_is_read_only (const MuStore *store)
{
g_return_val_if_fail (store, FALSE);
try {
return self(store)->read_only() ? TRUE : FALSE;
} MU_XAPIAN_CATCH_BLOCK_RETURN(FALSE);
}
const MuContacts*
mu_store_contacts (MuStore *store)
{
g_return_val_if_fail (store, FALSE);
try {
return self(store)->contacts().mu_contacts();
} MU_XAPIAN_CATCH_BLOCK_RETURN(FALSE);
}
unsigned unsigned
mu_store_count (const MuStore *store, GError **err) mu_store_count (const MuStore *store, GError **err)
{ {
@ -660,151 +640,16 @@ mu_store_schema_version (const MuStore *store)
{ {
g_return_val_if_fail (store, NULL); g_return_val_if_fail (store, NULL);
return self(store)->schema_version().c_str(); return self(store)->metadata().schema_version.c_str();
} }
XapianDatabase* XapianDatabase*
mu_store_get_read_only_database (MuStore *store) mu_store_get_read_only_database (MuStore *store)
{ {
g_return_val_if_fail (store, NULL); g_return_val_if_fail (store, NULL);
return (XapianWritableDatabase*)self(store)->priv()->db().get(); return (XapianDatabase*)self(store)->priv()->db().get();
} }
gboolean
mu_store_contains_message (const MuStore *store, const char* path)
{
g_return_val_if_fail (store, FALSE);
g_return_val_if_fail (path, FALSE);
try {
return self(store)->contains_message(path) ? TRUE : FALSE;
} MU_XAPIAN_CATCH_BLOCK_RETURN(FALSE);
}
unsigned
mu_store_get_docid_for_path (const MuStore *store, const char* path, GError **err)
{
g_return_val_if_fail (store, FALSE);
g_return_val_if_fail (path, FALSE);
try {
const std::string term (get_uid_term(path));
Xapian::Query query (term);
Xapian::Enquire enq (*self(store)->priv()->db().get());
enq.set_query (query);
Xapian::MSet mset (enq.get_mset (0,1));
if (mset.empty())
throw Mu::Error(Error::Code::NotFound,
"message @ %s not found in store", path);
return *mset.begin();
} MU_XAPIAN_CATCH_BLOCK_G_ERROR_RETURN(err, MU_ERROR_XAPIAN,
MU_STORE_INVALID_DOCID);
}
MuError
mu_store_foreach (MuStore *store,
MuStoreForeachFunc func, void *user_data, GError **err)
{
g_return_val_if_fail (store, MU_ERROR);
g_return_val_if_fail (func, MU_ERROR);
try {
Xapian::Enquire enq (*self(store)->priv()->db().get());
enq.set_query (Xapian::Query::MatchAll);
enq.set_cutoff (0,0);
Xapian::MSet matches(enq.get_mset (0, self(store)->size()));
if (matches.empty())
return MU_OK; /* database is empty */
for (Xapian::MSet::iterator iter = matches.begin();
iter != matches.end(); ++iter) {
Xapian::Document doc (iter.get_document());
const std::string path(doc.get_value(MU_MSG_FIELD_ID_PATH));
MuError res = func (path.c_str(), user_data);
if (res != MU_OK)
return res;
}
} MU_XAPIAN_CATCH_BLOCK_G_ERROR_RETURN(err, MU_ERROR_XAPIAN,
MU_ERROR_XAPIAN);
return MU_OK;
}
MuMsg*
mu_store_get_msg (const MuStore *store, unsigned docid, GError **err)
{
g_return_val_if_fail (store, NULL);
g_return_val_if_fail (docid != 0, NULL);
return self(store)->find_message(docid);
}
const char*
mu_store_database_path (const MuStore *store)
{
g_return_val_if_fail (store, NULL);
return self(store)->database_path().c_str();
}
const char*
mu_store_root_maildir (const MuStore *store)
{
g_return_val_if_fail (store, NULL);
return self(store)->root_maildir().c_str();
}
time_t
mu_store_created (const MuStore *store)
{
g_return_val_if_fail (store, (time_t)0);
return self(store)->created();
}
char**
mu_store_personal_addresses (const MuStore *store)
{
g_return_val_if_fail (store, NULL);
const auto size = self(store)->personal_addresses().size();
auto addrs = g_new0 (char*, 1 + size);
for (size_t i = 0; i != size; ++i)
addrs[i] = g_strdup(self(store)->personal_addresses()[i].c_str());
return addrs;
}
void
mu_store_flush (MuStore *store) try {
g_return_if_fail (store);
if (self(store)->priv()->in_transaction_)
mutable_self(store)->commit_transaction ();
mutable_self(store)->priv()->wdb()->set_metadata(
ContactsKey, self(store)->priv()->contacts_.serialize());
} MU_XAPIAN_CATCH_BLOCK;
static void static void
add_terms_values_date (Xapian::Document& doc, MuMsg *msg, MuMsgFieldId mfid) add_terms_values_date (Xapian::Document& doc, MuMsg *msg, MuMsgFieldId mfid)
{ {
@ -1237,7 +1082,7 @@ new_doc_from_message (MuStore *store, MuMsg *msg)
/* determine whether this is 'personal' email, ie. one of my /* determine whether this is 'personal' email, ie. one of my
* e-mail addresses is explicitly mentioned -- it's not a * e-mail addresses is explicitly mentioned -- it's not a
* mailing list message. Callback will update docinfo->_personal */ * mailing list message. Callback will update docinfo->_personal */
const auto& personal_addresses = self(store)->personal_addresses(); const auto& personal_addresses = self(store)->metadata().personal_addresses;
if (personal_addresses.size()) { if (personal_addresses.size()) {
docinfo._my_addresses = &personal_addresses; docinfo._my_addresses = &personal_addresses;
mu_msg_contact_foreach mu_msg_contact_foreach
@ -1293,9 +1138,6 @@ add_or_update_msg (MuStore *store, unsigned docid, MuMsg *msg, GError **err)
auto self = mutable_self(store); auto self = mutable_self(store);
auto wdb = self->priv()->wdb(); auto wdb = self->priv()->wdb();
if (!self->in_transaction())
self->priv()->begin_transaction();
add_term (doc, term); add_term (doc, term);
// update the threading info if this message has a message id // update the threading info if this message has a message id
@ -1309,9 +1151,6 @@ add_or_update_msg (MuStore *store, unsigned docid, MuMsg *msg, GError **err)
id = docid; id = docid;
} }
if (++self->priv()->dirtiness_ >= BatchSize)
self->priv()->commit_transaction();
return id; return id;
} MU_XAPIAN_CATCH_BLOCK_G_ERROR (err, MU_ERROR_XAPIAN_STORE_FAILED); } MU_XAPIAN_CATCH_BLOCK_G_ERROR (err, MU_ERROR_XAPIAN_STORE_FAILED);
@ -1319,101 +1158,4 @@ add_or_update_msg (MuStore *store, unsigned docid, MuMsg *msg, GError **err)
return MU_STORE_INVALID_DOCID; return MU_STORE_INVALID_DOCID;
} }
unsigned } // extern C
mu_store_add_msg (MuStore *store, MuMsg *msg, GError **err)
{
g_return_val_if_fail (store, MU_STORE_INVALID_DOCID);
g_return_val_if_fail (msg, MU_STORE_INVALID_DOCID);
return add_or_update_msg (store, 0, msg, err);
}
unsigned
mu_store_update_msg (MuStore *store, unsigned docid, MuMsg *msg, GError **err)
{
g_return_val_if_fail (store, MU_STORE_INVALID_DOCID);
g_return_val_if_fail (msg, MU_STORE_INVALID_DOCID);
g_return_val_if_fail (docid != 0, MU_STORE_INVALID_DOCID);
return add_or_update_msg (store, docid, msg, err);
}
unsigned
mu_store_add_path (MuStore *store, const char *path, GError **err) try {
MuMsg *msg;
unsigned docid;
g_return_val_if_fail (store, FALSE);
g_return_val_if_fail (path, FALSE);
const auto maildir{maildir_from_path(self(store)->root_maildir(), path)};
msg = mu_msg_new_from_file (path, maildir.c_str(), err);
if (!msg)
return MU_STORE_INVALID_DOCID;
docid = add_or_update_msg (store, 0, msg, err);
mu_msg_unref (msg);
return docid;
} catch (const Mu::Error& me) {
g_set_error (err, MU_ERROR_DOMAIN, MU_ERROR_XAPIAN,
"%s", me.what());
return MU_STORE_INVALID_DOCID;
} catch (...) {
g_set_error (err, MU_ERROR_DOMAIN, MU_ERROR_INTERNAL,
"caught exception");
return MU_STORE_INVALID_DOCID;
}
XapianWritableDatabase*
mu_store_get_writable_database (MuStore *store)
{
g_return_val_if_fail (store, NULL);
return (XapianWritableDatabase*)mutable_self(store)->priv()->wdb().get();
}
gboolean
mu_store_remove_path (MuStore *store, const char *msgpath)
{
g_return_val_if_fail (store, FALSE);
g_return_val_if_fail (msgpath, FALSE);
try {
const std::string term{(get_uid_term(msgpath))};
auto wdb = mutable_self(store)->priv()->wdb();
wdb->delete_document (term);
//store->inc_processed();
return TRUE;
} MU_XAPIAN_CATCH_BLOCK_RETURN (FALSE);
}
gboolean
mu_store_set_dirstamp (MuStore *store, const char* dirpath,
time_t stamp, GError **err)
{
g_return_val_if_fail (store, FALSE);
g_return_val_if_fail (dirpath, FALSE);
mutable_self(store)->set_dirstamp(dirpath, stamp);
return TRUE;
}
time_t
mu_store_get_dirstamp (const MuStore *store, const char *dirpath, GError **err)
{
g_return_val_if_fail (store, 0);
g_return_val_if_fail (dirpath, 0);
return self(store)->dirstamp(dirpath);
}
}

View File

@ -24,20 +24,26 @@
#ifdef __cplusplus #ifdef __cplusplus
#include "mu-contacts.hh"
#include <xapian.h>
#include <string> #include <string>
#include <vector> #include <vector>
#include <mutex>
#include <ctime> #include <ctime>
#include "mu-contacts.hh"
#include <xapian.h>
#include <utils/mu-utils.hh> #include <utils/mu-utils.hh>
#include <index/mu-indexer.hh>
namespace Mu { namespace Mu {
class Store { class Store {
public: public:
using Id = unsigned; /**< Id for a message in the store (internally,
* corresponds to a Xapian document-id) */
static constexpr Id InvalidId = 0; /**< Invalid store id */
/** /**
* Construct a store for an existing document database * Construct a store for an existing document database
* *
@ -46,65 +52,51 @@ public:
*/ */
Store (const std::string& path, bool readonly=true); Store (const std::string& path, bool readonly=true);
struct Config {
size_t max_message_size{};
/**< maximum size (in bytes) for a message, or 0 for default */
size_t batch_size{};
/**< size of batches before committing, or 0 for default */
};
/** /**
* Construct a store for a not-yet-existing document database * Construct a store for a not-yet-existing document database
* *
* @param path path to the database * @param path path to the database
* @param maildir maildir to use for this store * @param maildir maildir to use for this store
* @param personal_addressesaddresses that should be recognized as * @param personal_addresses addresses that should be recognized as
* 'personal' for identifying personal messages. * 'personal' for identifying personal messages.
*/ */
Store (const std::string& path, const std::string& maildir, Store (const std::string& path, const std::string& maildir,
const StringVec& personal_addresses); const StringVec& personal_addresses, const Config& conf);
/** /**
* DTOR * DTOR
*/ */
~Store(); ~Store();
/** struct Metadata {
* Is the store read-only? std::string database_path; /**< Full path to the Xapian database */
* std::string schema_version; /**< Database schema version */
* @return true or false std::time_t created; /**< database creation time */
*/
bool read_only() const; bool read_only; /**< Is the database opened read-only? */
size_t batch_size; /**< Maximum database tranasction batch size */
std::string root_maildir; /**< Absolute path to the top-level maildir */
StringVec personal_addresses; /**< Personal e-mail addresses */
size_t max_message_size; /**< Maximus allowed message size */
};
/** /**
* Path to the database; this is some subdirectory of the path * Get metadata about this store.
* passed to the constructor.
* *
* @return the database path * @return the metadata
*/ */
const std::string& database_path() const; const Metadata& metadata() const;
/**
* Path to the top-level Maildir
*
* @return the maildir
*/
const std::string& root_maildir() const;
/**
* Version of the database-schema
*
* @return the maildir
*/
const std::string& schema_version() const;
/**
* Time of creation of the store
*
* @return creation time
*/
std::time_t created() const;
/**
* Get a vec with the personal addresses
*
* @return personal addresses
*/
const StringVec& personal_addresses() const;
/** /**
* Get the Contacts object for this store * Get the Contacts object for this store
@ -113,6 +105,15 @@ public:
*/ */
const Contacts& contacts() const; const Contacts& contacts() const;
/**
* Get the Indexer associated with this store. It is an error
* to call this on a read-only store.
*
* @return the indexer.
*/
Indexer& indexer();
/** /**
* Add a message to the store. * Add a message to the store.
* *
@ -120,20 +121,21 @@ public:
* *
* @return the doc id of the added message * @return the doc id of the added message
*/ */
unsigned add_message (const std::string& path); Id add_message (const std::string& path);
/** /**
* Update a message in the store. * Update a message in the store.
* *
* @param msg a message * @param msg a message
* @param docid a docid * @param id the id for this message
* *
* @return false in case of failure; true ottherwise. * @return false in case of failure; true ottherwise.
*/ */
bool update_message (MuMsg *msg, unsigned docid); bool update_message (MuMsg *msg, Id id);
/** /**
* Add a message to the store. * Remove a message from the store. It will _not_ remove the message
* fromt he file system.
* *
* @param path the message path. * @param path the message path.
* *
@ -142,13 +144,29 @@ public:
bool remove_message (const std::string& path); bool remove_message (const std::string& path);
/** /**
* Fina message in the store. * Remove a number if messages from the store. It will _not_ remove the
* message fromt he file system.
* *
* @param docid doc id for the message to find * @param ids vector with store ids for the message
*/
void remove_messages (const std::vector<Id>& ids);
/**
* Remove a message from the store. It will _not_ remove the message
* fromt he file system.
*
* @param id the store id for the message
*/
void remove_message (Id id) { remove_messages({id}); }
/**
* Find message in the store.
*
* @param id doc id for the message to find
* *
* @return a message (owned by caller), or nullptr * @return a message (owned by caller), or nullptr
*/ */
MuMsg* find_message (unsigned docid) const; MuMsg* find_message (Id id) const;
/** /**
* does a certain message exist in the store already? * does a certain message exist in the store already?
@ -159,6 +177,36 @@ public:
*/ */
bool contains_message (const std::string& path) const; bool contains_message (const std::string& path) const;
/**
* Prototype for the ForEachFunc
*
* @param id :t store Id for the message
* @param path: the absolute path to the message
*
* @return true if for_each should continue; false to quit
*/
using ForEachFunc = std::function<bool(Id, const std::string&)>;
/**
* Call @param func for each document in the store. This takes a lock on
* the store, so the func should _not_ call any other Store:: methods.
*
* @param func a functio
*
* @return the number of times func was invoked
*/
size_t for_each (ForEachFunc func);
/**
* Get the timestamp for some message, or 0 if not found
*
* @param path the path
*
* @return the timestamp, or 0 if not found
*/
time_t message_tstamp (const std::string& path) const;
/** /**
* Get the timestamp for some directory * Get the timestamp for some directory
* *
@ -191,23 +239,11 @@ public:
bool empty() const; bool empty() const;
/** /**
* Begin a database transaction * Commit the current group of modifcations (i.e., transaction) to disk;
* This rarely needs to be called explicitly, as Store will take care of
* it.
*/ */
void begin_transaction(); void commit();
/**
* Commit a database transaction
*
*/
void commit_transaction();
/**
* Are we in a transaction?
*
* @return true or false
*/
bool in_transaction() const;
/** /**
* Get a reference to the private data. For internal use. * Get a reference to the private data. For internal use.
@ -240,7 +276,6 @@ typedef struct MuStore_ MuStore;
/* http://article.gmane.org/gmane.comp.search.xapian.general/3656 */ /* http://article.gmane.org/gmane.comp.search.xapian.general/3656 */
#define MU_STORE_MAX_TERM_LENGTH (240) #define MU_STORE_MAX_TERM_LENGTH (240)
/** /**
* create a new read-only Xapian store, for querying documents * create a new read-only Xapian store, for querying documents
* *
@ -252,34 +287,6 @@ typedef struct MuStore_ MuStore;
*/ */
MuStore* mu_store_new_readable (const char* xpath, GError **err) MuStore* mu_store_new_readable (const char* xpath, GError **err)
G_GNUC_MALLOC G_GNUC_WARN_UNUSED_RESULT; G_GNUC_MALLOC G_GNUC_WARN_UNUSED_RESULT;
/**
* create a new writable Xapian store, a place to store documents
*
* @param path the path to the database
* @param err to receive error info or NULL. err->code is MuError value
*
* @return a new MuStore object with ref count == 1, or NULL in case
* of error; free with mu_store_unref
*/
MuStore* mu_store_new_writable (const char *xpath, GError **err)
G_GNUC_MALLOC G_GNUC_WARN_UNUSED_RESULT;
/**
* create a new writable Xapian store, a place to store documents, and
* create/overwrite the existing database.
*
* @param path the path to the database
* @param path to the maildir
* @param personal_addressesaddresses that should be recognized as
* 'personal' for identifying personal messages.
* @param err to receive error info or NULL. err->code is MuError value
*
* @return a new MuStore object with ref count == 1, or NULL in case
* of error; free with mu_store_unref
*/
MuStore* mu_store_new_create (const char *xpath, const char *maildir,
const char **personal_addresses, GError **err)
G_GNUC_MALLOC G_GNUC_WARN_UNUSED_RESULT;
/** /**
* increase the reference count for this store with 1 * increase the reference count for this store with 1
@ -304,22 +311,8 @@ MuStore* mu_store_unref (MuStore *store);
/** /**
* we need this when using Xapian::(Writable)Database* from C * we need this when using Xapian::(Writable)Database* from C
*/ */
typedef gpointer XapianWritableDatabase;
typedef gpointer XapianDatabase; typedef gpointer XapianDatabase;
/**
* get the underlying writable database object for this store; not
* that this pointer becomes in valid after mu_store_destroy
*
* @param store a valid store
*
* @return a Xapian::WritableDatabase (you'll need to cast in C++), or
* NULL in case of error.
*/
XapianWritableDatabase* mu_store_get_writable_database (MuStore *store);
/** /**
* get the underlying read-only database object for this store; not that this * get the underlying read-only database object for this store; not that this
* pointer becomes in valid after mu_store_destroy * pointer becomes in valid after mu_store_destroy
@ -344,56 +337,6 @@ XapianDatabase* mu_store_get_read_only_database (MuStore *store);
const char* mu_store_schema_version (const MuStore* store); const char* mu_store_schema_version (const MuStore* store);
/**
* Get the database-path for this message store
*
* @param store the store to inspetc
*
* @return the database-path
*/
const char *mu_store_database_path (const MuStore *store);
/**
* Get the root-maildir for this message store.
*
* @param store the store
*
* @return the maildir.
*/
const char *mu_store_root_maildir(const MuStore *store);
/**
* Get the time this database was created
*
* @param store the store
*
* @return the maildir.
*/
time_t mu_store_created(const MuStore *store);
/**
* Get the list of personal addresses from the store
*
* @param store the message store
*
* @return the list of personal addresses, or NULL in case of error.
*
* Free with g_strfreev().
*/
char** mu_store_personal_addresses (const MuStore *store);
/**
* Get the a MuContacts* ptr for this store.
*
* @param store a store
*
* @return the contacts ptr
*/
const MuContacts* mu_store_contacts (MuStore *store);
/** /**
* get the numbers of documents in the database * get the numbers of documents in the database
* *
@ -405,174 +348,8 @@ const MuContacts* mu_store_contacts (MuStore *store);
*/ */
unsigned mu_store_count (const MuStore *store, GError **err); unsigned mu_store_count (const MuStore *store, GError **err);
/**
* try to flush/commit all outstanding work to the database and the contacts
* cache.
*
* @param store a valid xapian store
*/
void mu_store_flush (MuStore *store);
#define MU_STORE_INVALID_DOCID 0 #define MU_STORE_INVALID_DOCID 0
/**
* store an email message in the XapianStore
*
* @param store a valid store
* @param msg a valid message
* @param err receives error information, if any, or NULL
*
* @return the docid of the stored message, or 0
* (MU_STORE_INVALID_DOCID) in case of error
*/
unsigned mu_store_add_msg (MuStore *store, MuMsg *msg, GError **err);
/**
* update an email message in the XapianStore
*
* @param store a valid store
* @param the docid for the message
* @param msg a valid message
* @param err receives error information, if any, or NULL
*
* @return the docid of the stored message, or 0
* (MU_STORE_INVALID_DOCID) in case of error
*/
unsigned mu_store_update_msg (MuStore *store, unsigned docid, MuMsg *msg,
GError **err);
/**
* store an email message in the XapianStore; similar to
* mu_store_store, but instead takes a path as parameter instead of a
* MuMsg*
*
* @param store a valid store
* @param path full filesystem path to a valid message
* @param err receives error information, if any, or NULL
*
* @return the docid of the stored message, or 0
* (MU_STORE_INVALID_DOCID) in case of error
*/
unsigned mu_store_add_path (MuStore *store, const char *path, GError **err);
/**
* remove a message from the database based on its path
*
* @param store a valid store
* @param msgpath path of the message (note, this is only used to
* *identify* the message; a common use of this function is to remove
* a message from the database, for which there is no message anymore
* in the filesystem.
*
* @return TRUE if it succeeded, FALSE otherwise
*/
gboolean mu_store_remove_path (MuStore *store, const char* msgpath);
/**
* does a certain message exist in the database already?
*
* @param store a store
* @param path the message path
*
* @return TRUE if the message exists, FALSE otherwise
*/
gboolean mu_store_contains_message (const MuStore *store, const char* path);
/**
* get the docid for message at path
*
* @param store a store
* @param path the message path
* @param err to receive error info or NULL. err->code is MuError value
*
* @return the docid if the message was found, MU_STORE_INVALID_DOCID (0) otherwise
* */
unsigned mu_store_get_docid_for_path (const MuStore *store, const char* path,
GError **err);
/**
* store a timestamp for a directory
*
* @param store a valid store
* @param dirpath path to some directory
* @param stamp a timestamp
* @param err to receive error info or NULL. err->code is MuError value
*
* @return TRUE if setting the timestamp succeeded, FALSE otherwise
*/
gboolean mu_store_set_dirstamp (MuStore *store, const char* dirpath,
time_t stamp, GError **err);
/**
* get the timestamp for a directory
*
* @param store a valid store
* @param msgpath path to some directory
* @param err to receive error info or NULL. err->code is MuError value
*
* @return the timestamp, or 0 in case of error
*/
time_t mu_store_get_dirstamp (const MuStore *store, const char* dirpath,
GError **err);
/**
* check whether this store is read-only
*
* @param store a store
*
* @return TRUE if the store is read-only, FALSE otherwise (and in
* case of error)
*/
gboolean mu_store_is_read_only (const MuStore *store);
/**
* call a function for each document in the database
*
* @param self a valid store
* @param func a callback function to to call for each document
* @param user_data a user pointer passed to the callback function
* @param err to receive error info or NULL. err->code is MuError value
*
* @return MU_OK if all went well, MU_STOP if the foreach was interrupted,
* MU_ERROR in case of error
*/
typedef MuError (*MuStoreForeachFunc) (const char* path, gpointer user_data);
MuError mu_store_foreach (MuStore *self, MuStoreForeachFunc func,
void *user_data, GError **err);
/**
* check if the database is locked for writing
*
* @param xpath path to a xapian database
*
* @return TRUE if it is locked, FALSE otherwise (or in case of error)
*/
gboolean mu_store_database_is_locked (const gchar *xpath);
/**
* get a specific message, based on its Xapian docid
*
* @param self a valid MuQuery instance
* @param docid the Xapian docid for the wanted message
* @param err receives error information, or NULL
*
* @return a MuMsg instance (use mu_msg_unref when done with it), or
* NULL in case of error
*/
MuMsg* mu_store_get_msg (const MuStore *self, unsigned docid, GError **err)
G_GNUC_WARN_UNUSED_RESULT;
/**
* Print some information about the store
*
* @param store a store
* @param nocolor whether to _not_ show color
*/
void mu_store_print_info (const MuStore *store, gboolean nocolor);
G_END_DECLS G_END_DECLS
#endif /* __MU_STORE_HH__ */ #endif /* __MU_STORE_HH__ */

View File

@ -1,206 +0,0 @@
/* -*-mode: c; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*-*/
/*
** Copyright (C) 2008-2013 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
** Free Software Foundation; either version 3, or (at your option) any
** later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/
#if HAVE_CONFIG_H
#include "config.h"
#endif /*HAVE_CONFIG_H*/
#include <glib.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <locale.h>
#include "test-mu-common.h"
#include "mu-store.hh"
static void
test_mu_store_new_destroy (void)
{
MuStore *store;
gchar* tmpdir;
GError *err;
tmpdir = test_mu_common_get_random_tmpdir();
g_assert (tmpdir);
err = NULL;
store = mu_store_new_create (tmpdir, "/tmp", NULL, &err);
g_assert_no_error (err);
g_assert (store);
g_assert_cmpuint (0,==,mu_store_count (store, NULL));
mu_store_flush (store);
mu_store_unref (store);
g_free (tmpdir);
}
static void
test_mu_store_version (void)
{
MuStore *store;
gchar* tmpdir;
GError *err;
tmpdir = test_mu_common_get_random_tmpdir();
g_assert (tmpdir);
err = NULL;
store = mu_store_new_create (tmpdir, "/tmp", NULL, &err);
g_assert (store);
mu_store_unref (store);
store = mu_store_new_readable (tmpdir, &err);
g_assert (store);
g_assert (err == NULL);
g_assert_cmpuint (0,==,mu_store_count (store, NULL));
g_assert_cmpstr (MU_STORE_SCHEMA_VERSION,==,
mu_store_schema_version(store));
mu_store_unref (store);
g_free (tmpdir);
}
G_GNUC_UNUSED static void
test_mu_store_store_msg_and_count (void)
{
MuMsg *msg;
MuStore *store;
gchar* tmpdir;
tmpdir = test_mu_common_get_random_tmpdir();
g_assert (tmpdir);
store = mu_store_new_create (tmpdir, MU_TESTMAILDIR, NULL, NULL);
g_assert (store);
g_free (tmpdir);
g_assert_cmpuint (0,==,mu_store_count (store, NULL));
/* add one */
/* XXX this passes, but not make-dist; investigate */
msg = mu_msg_new_from_file (
MU_TESTMAILDIR "/cur/1283599333.1840_11.cthulhu!2,",
NULL, NULL);
g_assert (msg);
g_assert_cmpuint (mu_store_add_msg (store, msg, NULL),
!=, MU_STORE_INVALID_DOCID);
g_assert_cmpuint (1,==,mu_store_count (store, NULL));
g_assert_cmpuint (TRUE,==,mu_store_contains_message
(store,
MU_TESTMAILDIR "/cur/1283599333.1840_11.cthulhu!2,"));
mu_msg_unref (msg);
/* add another one */
msg = mu_msg_new_from_file (MU_TESTMAILDIR2
"/bar/cur/mail3", NULL, NULL);
g_assert (msg);
g_assert_cmpuint (mu_store_add_msg (store, msg, NULL),
!=, MU_STORE_INVALID_DOCID);
g_assert_cmpuint (2,==,mu_store_count (store, NULL));
g_assert_cmpuint (TRUE,==,
mu_store_contains_message (store, MU_TESTMAILDIR2
"/bar/cur/mail3"));
mu_msg_unref (msg);
/* try to add the first one again. count should be 2 still */
msg = mu_msg_new_from_file
(MU_TESTMAILDIR "/cur/1283599333.1840_11.cthulhu!2,",
NULL, NULL);
g_assert (msg);
g_assert_cmpuint (mu_store_add_msg (store, msg, NULL),
!=, MU_STORE_INVALID_DOCID);
g_assert_cmpuint (2,==,mu_store_count (store, NULL));
mu_msg_unref (msg);
mu_store_unref (store);
}
G_GNUC_UNUSED static void
test_mu_store_store_msg_remove_and_count (void)
{
MuMsg *msg;
MuStore *store;
gchar* tmpdir;
GError *err;
tmpdir = test_mu_common_get_random_tmpdir();
g_assert (tmpdir);
store = mu_store_new_create (tmpdir, MU_TESTMAILDIR, NULL, NULL);
g_assert (store);
g_assert_cmpuint (0,==,mu_store_count (store, NULL));
/* add one */
err = NULL;
msg = mu_msg_new_from_file (
MU_TESTMAILDIR "/cur/1283599333.1840_11.cthulhu!2,",
NULL, &err);
g_assert (msg);
g_assert_cmpuint (mu_store_add_msg (store, msg, NULL),
!=, MU_STORE_INVALID_DOCID);
g_assert_cmpuint (1,==,mu_store_count (store, NULL));
mu_msg_unref (msg);
/* remove one */
mu_store_remove_path (store,
MU_TESTMAILDIR "/cur/1283599333.1840_11.cthulhu!2,");
g_assert_cmpuint (0,==,mu_store_count (store, NULL));
g_assert_cmpuint (FALSE,==,mu_store_contains_message
(store,
MU_TESTMAILDIR "/cur/1283599333.1840_11.cthulhu!2,"));
g_free (tmpdir);
mu_store_unref (store);
}
int
main (int argc, char *argv[])
{
g_test_init (&argc, &argv, NULL);
/* mu_runtime_init/uninit */
g_test_add_func ("/mu-store/mu-store-new-destroy",
test_mu_store_new_destroy);
g_test_add_func ("/mu-store/mu-store-version",
test_mu_store_version);
#if 0
/* XXX this passes, but not make-dist; investigate */
g_test_add_func ("/mu-store/mu-store-store-and-count",
test_mu_store_store_msg_and_count);
g_test_add_func ("/mu-store/mu-store-store-remove-and-count",
test_mu_store_store_msg_remove_and_count);
#endif
if (!g_test_verbose())
g_log_set_handler (NULL,
G_LOG_LEVEL_MASK | G_LOG_FLAG_FATAL| G_LOG_FLAG_RECURSION,
(GLogFunc)black_hole, NULL);
return g_test_run ();
}

98
lib/test-mu-store.cc Normal file
View File

@ -0,0 +1,98 @@
/*
** Copyright (C) 2008-2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
** Free Software Foundation; either version 3, or (at your option) any
** later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/
#include "config.h"
#include <glib.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <locale.h>
#include "test-mu-common.h"
#include "mu-store.hh"
static std::string MuTestMaildir = Mu::canonicalize_filename(MU_TESTMAILDIR, "/");
static std::string MuTestMaildir2 = Mu::canonicalize_filename(MU_TESTMAILDIR2, "/");
static void
test_store_ctor_dtor ()
{
char *tmpdir = test_mu_common_get_random_tmpdir();
g_assert (tmpdir);
Mu::Store store{tmpdir, "/tmp", {}, {}};
g_free (tmpdir);
g_assert_true(store.empty());
g_assert_cmpuint (0,==,store.size());
g_assert_cmpstr (MU_STORE_SCHEMA_VERSION,==,
store.metadata().schema_version.c_str());
}
static void
test_store_add_count_remove ()
{
char *tmpdir = test_mu_common_get_random_tmpdir();
g_assert (tmpdir);
Mu::Store store{tmpdir, MuTestMaildir, {}, {}};
g_free (tmpdir);
const auto id1 = store.add_message(MuTestMaildir + "/cur/1283599333.1840_11.cthulhu!2,");
g_assert_cmpuint(id1, !=, Mu::Store::InvalidId);
g_assert_cmpuint(store.size(), ==, 1);
g_assert_true(store.contains_message(MuTestMaildir + "/cur/1283599333.1840_11.cthulhu!2,"));
g_assert_cmpuint(store.add_message(MuTestMaildir2 + "/bar/cur/mail3"),
!=, Mu::Store::InvalidId);
g_assert_cmpuint(store.size(), ==, 2);
g_assert_true(store.contains_message(MuTestMaildir2 + "/bar/cur/mail3"));
store.remove_message(id1);
g_assert_cmpuint(store.size(), ==, 1);
g_assert_false(store.contains_message(MuTestMaildir + "/cur/1283599333.1840_11.cthulhu!2,"));
store.remove_message (MuTestMaildir2 + "/bar/cur/mail3");
g_assert_true(store.empty());
g_assert_false(store.contains_message(MuTestMaildir2 + "/bar/cur/mail3"));
}
int
main (int argc, char *argv[])
{
g_test_init (&argc, &argv, NULL);
/* mu_runtime_init/uninit */
g_test_add_func ("/mu-store/ctor-dtor", test_store_ctor_dtor);
g_test_add_func ("/mu-store/add-count-remove", test_store_add_count_remove);
// if (!g_test_verbose())
// g_log_set_handler (NULL,
// G_LOG_LEVEL_MASK | G_LOG_FLAG_FATAL| G_LOG_FLAG_RECURSION,
// (GLogFunc)black_hole, NULL);
return g_test_run ();
}

191
lib/utils/mu-async-queue.hh Normal file
View File

@ -0,0 +1,191 @@
/*
** Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
** Free Software Foundation; either version 3, or (at your option) any
** later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/
#ifndef __MU_ASYNC_QUEUE_HH__
#define __MU_ASYNC_QUEUE_HH__
#include <deque>
#include <mutex>
#include <chrono>
#include <condition_variable>
namespace Mu {
constexpr std::size_t UnlimitedAsyncQueueSize{0};
template <typename ItemType, /**< the type of Item to queue */
std::size_t MaxSize = UnlimitedAsyncQueueSize, /**< maximum size for the queue */
typename Allocator = std::allocator<ItemType>> /**< allocator the items */
class AsyncQueue {
public:
using value_type = ItemType;
using allocator_type = Allocator;
using size_type = std::size_t;
using reference = value_type&;
using const_reference = const value_type&;
using pointer = typename std::allocator_traits<allocator_type>::pointer;
using const_pointer = typename std::allocator_traits<allocator_type>::const_pointer;
using Timeout = std::chrono::steady_clock::duration;
#define LOCKED std::unique_lock<std::mutex> lock(m_);
bool push (const value_type& item, Timeout timeout = {}) {
return push(std::move(value_type(item)));
}
/**
* Push an item to the end of the queue by moving it
*
* @param item the item to move to the end of the queue
* @param timeout and optional timeout
*
* @return true if the item was pushed; false otherwise.
*/
bool push (value_type&& item, Timeout timeout = {}) {
LOCKED;
if (!unlimited()) {
const auto rv = cv_full_.wait_for(lock, timeout,[&](){
return !full_unlocked();}) && !full_unlocked();
if (!rv)
return false;
}
q_.emplace_back(std::move(item));
lock.unlock();
cv_empty_.notify_one();
return true;
}
/**
* Pop an item from the queue
*
* @param receives the value if the funtion returns true
* @param timeout optional time to wait for an item to become available
*
* @return true if an item was popped (into val), false otherwise.
*/
bool pop (value_type& val, Timeout timeout = {}) {
LOCKED;
if (timeout != Timeout{}) {
const auto rv = cv_empty_.wait_for(lock, timeout,[&](){
return !q_.empty(); }) && !q_.empty();
if (!rv)
return false;
} else if (q_.empty())
return false;
val = std::move(q_.front());
q_.pop_front();
lock.unlock();
cv_full_.notify_one();
return true;
}
/**
* Clear the queue
*
*/
void clear() {
LOCKED;
q_.clear();
lock.unlock();
cv_full_.notify_one();
}
/**
* Size of the queue
*
*
* @return the size
*/
size_type size() const {
LOCKED;
return q_.size();
}
/**
* Maximum size of the queue if specified through the template
* parameter; otherwise the (theoretical) max_size of the inner
* container.
*
* @return the maximum size
*/
size_type max_size() const {
if (unlimited())
return q_.max_size();
else
return MaxSize;
}
/**
* Is the queue empty?
*
* @return true or false
*/
bool empty() const {
LOCKED;
return q_.empty();
}
/**
* Is the queue full? Returns false unless a maximum size was specified
* (as a template argument)
*
* @return true or false.
*/
bool full() const {
if (unlimited())
return false;
LOCKED;
return full_unlocked();
}
/**
* Is this queue (theoretically) unlimited in size?
*
* @return true or false
*/
constexpr static bool unlimited() {
return MaxSize == UnlimitedAsyncQueueSize;
}
private:
bool full_unlocked() const {
return q_.size() >= max_size();
}
std::deque<ItemType, Allocator> q_;
mutable std::mutex m_;
std::condition_variable cv_full_, cv_empty_;
};
} // namespace mu
#endif /* __MU_ASYNC_QUEUE_HH__ */

View File

@ -33,7 +33,7 @@
#include <glib/gprintf.h> #include <glib/gprintf.h>
#include "mu-utils.hh" #include "mu-utils.hh"
#include "mu-util.h"
using namespace Mu; using namespace Mu;
@ -447,6 +447,21 @@ Mu::size_to_string (const std::string& val, bool is_first)
return str; return str;
} }
std::string
Mu::canonicalize_filename(const std::string& path, const std::string& relative_to)
{
char *fname = mu_canonicalize_filename (
path.c_str(),
relative_to.empty() ? NULL : relative_to.c_str());
std::string rv{fname};
g_free (fname);
return rv;
}
void void
Mu::assert_equal(const std::string& s1, const std::string& s2) Mu::assert_equal(const std::string& s1, const std::string& s2)
{ {

View File

@ -1,5 +1,5 @@
/* /*
** Copyright (C) 2017 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ** Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
** **
** This library is free software; you can redistribute it and/or ** This library is free software; you can redistribute it and/or
** modify it under the terms of the GNU Lesser General Public License ** modify it under the terms of the GNU Lesser General Public License
@ -119,15 +119,28 @@ std::string date_to_time_t_string (const std::string& date, bool first);
*/ */
std::string date_to_time_t_string (int64_t t); std::string date_to_time_t_string (int64_t t);
using SteadyClock = std::chrono::steady_clock; using Clock = std::chrono::steady_clock;
using Duration = Clock::duration;
static inline int64_t to_ms (SteadyClock::duration dur) { template <typename Unit> constexpr int64_t to_unit (Duration d) {
return std::chrono::duration_cast<std::chrono::milliseconds>(dur).count(); using namespace std::chrono;
} return duration_cast<Unit>(d).count();
static inline int64_t to_us (SteadyClock::duration dur) {
return std::chrono::duration_cast<std::chrono::microseconds>(dur).count();
} }
constexpr int64_t to_s (Duration d) { return to_unit<std::chrono::seconds>(d); }
constexpr int64_t to_ms (Duration d) { return to_unit<std::chrono::milliseconds>(d); }
constexpr int64_t to_us (Duration d) { return to_unit<std::chrono::microseconds>(d); }
/**
* See g_canonicalize_filename
*
* @param filename
* @param relative_to
*
* @return
*/
std::string canonicalize_filename(const std::string& path, const std::string& relative_to);
/** /**
* Convert a size string to a size in bytes * Convert a size string to a size in bytes
* *
@ -202,8 +215,6 @@ private:
const bool color_; const bool color_;
}; };
/** /**
* *
* don't repeat these catch blocks everywhere... * don't repeat these catch blocks everywhere...

View File

@ -28,7 +28,6 @@
#include "mu-msg.h" #include "mu-msg.h"
#include "mu-maildir.h" #include "mu-maildir.h"
#include "mu-index.h"
#include "mu-query.h" #include "mu-query.h"
#include "mu-msg-iter.h" #include "mu-msg-iter.h"
#include "mu-bookmarks.h" #include "mu-bookmarks.h"

View File

@ -20,6 +20,10 @@
#include "config.h" #include "config.h"
#include "mu-cmd.hh" #include "mu-cmd.hh"
#include <chrono>
#include <thread>
#include <atomic>
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include <stdio.h> #include <stdio.h>
@ -27,23 +31,15 @@
#include <unistd.h> #include <unistd.h>
#include "mu-msg.h" #include "mu-msg.h"
#include "mu-index.h" #include "index/mu-indexer.hh"
#include "mu-store.hh" #include "mu-store.hh"
#include "mu-runtime.h" #include "mu-runtime.h"
#include "utils/mu-util.h" #include "utils/mu-util.h"
static gboolean MU_CAUGHT_SIGNAL; using namespace Mu;
static void static std::atomic<bool> CaughtSignal{};
sig_handler (int sig)
{
if (!MU_CAUGHT_SIGNAL && sig == SIGINT) /* Ctrl-C */
g_print ("\nshutting down gracefully, "
"press again to kill immediately");
MU_CAUGHT_SIGNAL = TRUE;
}
static void static void
install_sig_handler (void) install_sig_handler (void)
@ -51,11 +47,14 @@ install_sig_handler (void)
struct sigaction action; struct sigaction action;
int i, sigs[] = { SIGINT, SIGHUP, SIGTERM }; int i, sigs[] = { SIGINT, SIGHUP, SIGTERM };
MU_CAUGHT_SIGNAL = FALSE;
action.sa_handler = sig_handler;
sigemptyset(&action.sa_mask); sigemptyset(&action.sa_mask);
action.sa_flags = SA_RESETHAND; action.sa_flags = SA_RESETHAND;
action.sa_handler = [](int sig) {
if (!CaughtSignal && sig == SIGINT) /* Ctrl-C */
g_print ("\nshutting down gracefully, "
"press again to kill immediately");
CaughtSignal = true;
};
for (i = 0; i != G_N_ELEMENTS(sigs); ++i) for (i = 0; i != G_N_ELEMENTS(sigs); ++i)
if (sigaction (sigs[i], &action, NULL) != 0) if (sigaction (sigs[i], &action, NULL) != 0)
@ -64,235 +63,82 @@ install_sig_handler (void)
} }
static gboolean
check_params (const MuConfig *opts, GError **err)
{
/* param[0] == 'index' there should be no param[1] */
if (opts->params[1]) {
mu_util_g_set_error (err, MU_ERROR_IN_PARAMETERS,
"unexpected parameter");
return FALSE;
}
if (opts->max_msg_size < 0) {
mu_util_g_set_error (err, MU_ERROR_IN_PARAMETERS,
"the maximum message size must >= 0");
return FALSE;
}
return TRUE;
}
static MuError
index_msg_silent_cb (MuIndexStats* stats, void *user_data)
{
return MU_CAUGHT_SIGNAL ? MU_STOP: MU_OK;
}
static void static void
print_stats (MuIndexStats* stats, gboolean clear, gboolean color) print_stats (const Indexer::Progress& stats, bool color)
{ {
const char *kars="-\\|/"; const char *kars = "-\\|/";
char output[120]; static auto i = 0U;
static unsigned i = 0; MaybeAnsi col{color};
using Color = MaybeAnsi::Color;
if (clear) std::cout << col.fg(Color::Yellow) << kars[++i % 4] << col.reset()
fputs ("\r", stdout); << " indexing messages; "
<< "processed: " << col.fg(Color::Green) << stats.processed << col.reset()
if (color) << "; updated/new: " << col.fg(Color::Green) << stats.updated << col.reset()
g_snprintf << "; cleaned-up: " << col.fg(Color::Green) << stats.removed << col.reset();
(output, sizeof(output),
MU_COLOR_YELLOW "%c " MU_COLOR_DEFAULT
"processing mail; "
"processed: " MU_COLOR_GREEN "%u; " MU_COLOR_DEFAULT
"updated/new: " MU_COLOR_GREEN "%u" MU_COLOR_DEFAULT
", cleaned-up: " MU_COLOR_GREEN "%u" MU_COLOR_DEFAULT,
(unsigned)kars[++i % 4],
(unsigned)stats->_processed,
(unsigned)stats->_updated,
(unsigned)stats->_cleaned_up);
else
g_snprintf
(output, sizeof(output),
"%c processing mail; processed: %u; "
"updated/new: %u, cleaned-up: %u",
(unsigned)kars[++i % 4],
(unsigned)stats->_processed,
(unsigned)stats->_updated,
(unsigned)stats->_cleaned_up);
fputs (output, stdout);
fflush (stdout);
} }
struct _IndexData {
gboolean color;
};
typedef struct _IndexData IndexData;
static MuError
index_msg_cb (MuIndexStats* stats, IndexData *idata)
{
if (stats->_processed % 75)
return MU_OK;
print_stats (stats, TRUE, idata->color);
return MU_CAUGHT_SIGNAL ? MU_STOP: MU_OK;
}
static void
show_time (unsigned t, unsigned processed, gboolean color)
{
if (color) {
if (t)
g_print ("elapsed: "
MU_COLOR_GREEN "%u" MU_COLOR_DEFAULT
" second(s), ~ "
MU_COLOR_GREEN "%u" MU_COLOR_DEFAULT
" msg/s",
t, processed/t);
else
g_print ("elapsed: "
MU_COLOR_GREEN "%u" MU_COLOR_DEFAULT
" second(s)", t);
} else {
if (t)
g_print ("elapsed: %u second(s), ~ %u msg/s",
t, processed/t);
else
g_print ("elapsed: %u second(s)", t);
}
g_print ("\n");
}
static MuError
cleanup_missing (MuIndex *midx, const MuConfig *opts, MuIndexStats *stats,
GError **err)
{
MuError rv;
time_t t;
IndexData idata;
gboolean show_progress;
if (!opts->quiet)
g_print ("cleaning up messages [%s]\n",
mu_runtime_path (MU_RUNTIME_PATH_XAPIANDB));
show_progress = !opts->quiet && isatty(fileno(stdout));
mu_index_stats_clear (stats);
t = time (NULL);
idata.color = !opts->nocolor;
rv = mu_index_cleanup
(midx, stats,
show_progress ?
(MuIndexCleanupDeleteCallback)index_msg_cb :
(MuIndexCleanupDeleteCallback)index_msg_silent_cb,
&idata, err);
if (!opts->quiet) {
print_stats (stats, TRUE, !opts->nocolor);
g_print ("\n");
show_time ((unsigned)(time(NULL)-t),stats->_processed,
!opts->nocolor);
}
return (rv == MU_OK || rv == MU_STOP) ? MU_OK: MU_G_ERROR_CODE(err);
}
static MuError
cmd_index (MuIndex *midx, const MuConfig *opts, MuIndexStats *stats, GError **err)
{
IndexData idata;
MuError rv;
gboolean show_progress;
show_progress = !opts->quiet && isatty(fileno(stdout));
idata.color = !opts->nocolor;
rv = mu_index_run (midx,
opts->rebuild,
opts->lazycheck, stats,
show_progress ?
(MuIndexMsgCallback)index_msg_cb :
(MuIndexMsgCallback)index_msg_silent_cb,
NULL, &idata);
if (rv == MU_OK || rv == MU_STOP) {
g_message ("index: processed: %u; updated/new: %u",
stats->_processed, stats->_updated);
} else
mu_util_g_set_error (err, rv, "error while indexing");
return rv;
}
static MuIndex*
init_mu_index (MuStore *store, const MuConfig *opts, GError **err)
{
MuIndex *midx;
if (!check_params (opts, err))
return NULL;
midx = mu_index_new (store, err);
if (!midx)
return NULL;
mu_index_set_max_msg_size (midx, opts->max_msg_size);
return midx;
}
MuError MuError
mu_cmd_index (Mu::Store& store, const MuConfig *opts, GError **err) mu_cmd_index (Mu::Store& store, const MuConfig *opts, GError **err)
{ {
MuIndex *midx;
MuIndexStats stats;
gboolean rv;
time_t t;
g_return_val_if_fail (opts, MU_ERROR); g_return_val_if_fail (opts, MU_ERROR);
g_return_val_if_fail (opts->cmd == MU_CONFIG_CMD_INDEX, MU_ERROR); g_return_val_if_fail (opts->cmd == MU_CONFIG_CMD_INDEX, MU_ERROR);
/* create, and do error handling if needed */ /* param[0] == 'index' there should be no param[1] */
midx = init_mu_index (reinterpret_cast<MuStore*>(&store), // ugh. if (opts->params[1]) {
opts, err); mu_util_g_set_error (err, MU_ERROR_IN_PARAMETERS,
if (!midx) "unexpected parameter");
throw Mu::Error(Mu::Error::Code::Internal, err/*consumes*/, return MU_ERROR;
"error in index"); }
if (opts->max_msg_size < 0) {
mu_util_g_set_error (err, MU_ERROR_IN_PARAMETERS,
"the maximum message size must be >= 0");
return MU_ERROR;
}
MaybeAnsi col{!opts->nocolor};
using Color = MaybeAnsi::Color;
if (!opts->quiet) {
if (opts->lazycheck)
std::cout << "lazily ";
std::cout << "indexing maildir "
<< col.fg(Color::Green) << store.metadata().root_maildir
<< col.reset()
<< " -> store "
<< col.fg(Color::Green) << store.metadata().database_path
<< col.reset()
<< std::endl;
}
Mu::Indexer::Config conf{};
conf.cleanup = !opts->nocleanup;
conf.lazy_check = opts->lazycheck;
mu_index_stats_clear (&stats);
install_sig_handler (); install_sig_handler ();
t = time (NULL); store.indexer().start(conf);
rv = cmd_index (midx, opts, &stats, err); while (!CaughtSignal && store.indexer().is_running()) {
if (rv == MU_OK && !opts->nocleanup) {
if (!opts->quiet) if (!opts->quiet)
g_print ("\n"); print_stats (store.indexer().progress(), !opts->nocolor);
rv = cleanup_missing (midx, opts, &stats, err);
} std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (!opts->quiet) { if (!opts->quiet) {
print_stats (&stats, TRUE, !opts->nocolor); std::cout << "\r";
g_print ("\n"); std::cout.flush();
show_time ((unsigned)(time(NULL)-t), }
stats._processed, !opts->nocolor);
} }
mu_index_destroy (midx); store.indexer().stop();
if (rv != MU_OK) if (!opts->quiet) {
throw Mu::Error(Mu::Error::Code::Internal, err/*consumes*/, print_stats (store.indexer().progress(), !opts->nocolor);
"error in index"); std::cout << std::endl;
}
return rv ? MU_OK : MU_ERROR; return MU_OK;
} }

View File

@ -24,6 +24,9 @@
#include <string> #include <string>
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <thread>
#include <mutex>
#include <cstring> #include <cstring>
#include <glib.h> #include <glib.h>
#include <glib/gprintf.h> #include <glib/gprintf.h>
@ -33,7 +36,7 @@
#include "mu-cmd.hh" #include "mu-cmd.hh"
#include "mu-maildir.h" #include "mu-maildir.h"
#include "mu-query.h" #include "mu-query.h"
#include "mu-index.h" #include "index/mu-indexer.hh"
#include "mu-store.hh" #include "mu-store.hh"
#include "mu-msg-part.h" #include "mu-msg-part.h"
#include "mu-contacts.hh" #include "mu-contacts.hh"
@ -49,14 +52,9 @@ using namespace Sexp;
using DocId = unsigned; using DocId = unsigned;
static std::mutex OutputLock;
static std::atomic<bool> MuTerminate{false}; static std::atomic<bool> MuTerminate{false};
static void
sig_handler (int sig)
{
MuTerminate = true;
}
static void static void
install_sig_handler (void) install_sig_handler (void)
{ {
@ -65,7 +63,7 @@ install_sig_handler (void)
MuTerminate = false; MuTerminate = false;
action.sa_handler = sig_handler; action.sa_handler = [](int sig){ MuTerminate = true; };
sigemptyset(&action.sa_mask); sigemptyset(&action.sa_mask);
action.sa_flags = SA_RESETHAND; action.sa_flags = SA_RESETHAND;
@ -86,6 +84,8 @@ install_sig_handler (void)
static void G_GNUC_PRINTF(1, 2) static void G_GNUC_PRINTF(1, 2)
print_expr (const char* frm, ...) print_expr (const char* frm, ...)
{ {
std::lock_guard<std::mutex> l {OutputLock};
char *expr, *expr_orig; char *expr, *expr_orig;
va_list ap; va_list ap;
ssize_t rv; ssize_t rv;
@ -208,6 +208,7 @@ print_sexps (MuMsgIter *iter, unsigned maxnum)
} }
/// @brief object to manage the server-context for all commands.
struct Context { struct Context {
Context(){} Context(){}
Context (const MuConfig *opts): Context (const MuConfig *opts):
@ -219,8 +220,8 @@ struct Context {
throw Error(Error::Code::Store, &gerr/*consumes*/, "failed to create query"); throw Error(Error::Code::Store, &gerr/*consumes*/, "failed to create query");
g_message ("opened store @ %s; maildir @ %s; debug-mode %s", g_message ("opened store @ %s; maildir @ %s; debug-mode %s",
store_->database_path().c_str(), store_->metadata().database_path.c_str(),
store_->root_maildir().c_str(), store_->metadata().root_maildir.c_str(),
opts->debug ? "yes" : "no"); opts->debug ? "yes" : "no");
} }
@ -236,9 +237,10 @@ struct Context {
throw Mu::Error (Error::Code::Internal, "no store"); throw Mu::Error (Error::Code::Internal, "no store");
return *store_.get(); return *store_.get();
} }
Indexer& indexer() { return store().indexer(); }
std::unique_ptr<Mu::Store> store_; std::unique_ptr<Mu::Store> store_;
MuQuery *query{}; MuQuery *query{};
bool do_quit{}; bool do_quit{};
@ -343,7 +345,6 @@ compose_handler (Context& context, const Parameters& params)
Node::Seq compose_seq; Node::Seq compose_seq;
compose_seq.add_prop(":compose", Node::make_symbol(std::string(ctype))); compose_seq.add_prop(":compose", Node::make_symbol(std::string(ctype)));
// message optioss below checks extract-images / extract-encrypted
if (ctype == "reply" || ctype == "forward" || ctype == "edit" || ctype == "resend") { if (ctype == "reply" || ctype == "forward" || ctype == "edit" || ctype == "resend") {
GError *gerr{}; GError *gerr{};
@ -410,7 +411,6 @@ contacts_handler (Context& context, const Parameters& params)
seq.add_prop(":contacts", std::move(contacts)); seq.add_prop(":contacts", std::move(contacts));
seq.add_prop(":tstamp", Node::make_string(format("%" G_GINT64_FORMAT, seq.add_prop(":tstamp", Node::make_string(format("%" G_GINT64_FORMAT,
g_get_monotonic_time()))); g_get_monotonic_time())));
/* dump the contacts cache as a giant sexp */ /* dump the contacts cache as a giant sexp */
print_expr(std::move(seq)); print_expr(std::move(seq));
} }
@ -708,79 +708,35 @@ help_handler (Context& context, const Parameters& params)
} }
} }
static MuError static void
index_msg_cb (MuIndexStats *stats, void *user_data) print_stats (const Indexer::Progress& stats, const std::string& state)
{ {
if (MuTerminate)
return MU_STOP;
if (stats->_processed % 1000)
return MU_OK;
Node::Seq seq; Node::Seq seq;
seq.add_prop(":info", Node::make_symbol("index")); seq.add_prop(":info", Node::make_symbol("index"));
seq.add_prop(":status", Node::make_symbol("running")); seq.add_prop(":status", Node::make_symbol(std::string{state}));
seq.add_prop(":processed", stats->_processed); seq.add_prop(":processed", stats.processed);
seq.add_prop(":updated", stats->_updated); seq.add_prop(":updated", stats.updated);
seq.add_prop(":cleaned-up", stats.removed);
print_expr(std::move(seq)); print_expr(std::move(seq));
return MU_OK;
} }
static MuError
index_and_maybe_cleanup (MuIndex *index, bool cleanup, bool lazy_check)
{
MuIndexStats stats{}, stats2{};
mu_index_stats_clear (&stats);
auto rv = mu_index_run (index, FALSE, lazy_check, &stats,
index_msg_cb, NULL, NULL);
if (rv != MU_OK && rv != MU_STOP)
throw Error{Error::Code::Store, "indexing failed"};
mu_index_stats_clear (&stats2);
if (cleanup) {
GError *gerr{};
rv = mu_index_cleanup (index, &stats2, NULL, NULL, &gerr);
if (rv != MU_OK && rv != MU_STOP)
throw Error{Error::Code::Store, &gerr, "cleanup failed"};
}
Node::Seq seq;
seq.add_prop(":info", Node::make_symbol("index"));
seq.add_prop(":status", Node::make_symbol("complete"));
seq.add_prop(":processed", stats._processed);
seq.add_prop(":updated", stats._updated);
seq.add_prop(":cleaned-up", stats2._cleaned_up);
print_expr(std::move(seq));
return MU_OK;
}
static void static void
index_handler (Context& context, const Parameters& params) index_handler (Context& context, const Parameters& params)
{ {
GError *gerr{}; Mu::Indexer::Config conf{};
const auto cleanup{get_bool_or(params, ":cleanup")}; conf.cleanup = get_bool_or(params, ":cleanup");
const auto lazy_check{get_bool_or(params, ":lazy-check")}; conf.lazy_check = get_bool_or(params, ":lazy-check");
auto store_ptr = reinterpret_cast<MuStore*>(&context.store()); context.indexer().stop();
auto index{mu_index_new (store_ptr, &gerr)}; context.indexer().start(conf);
if (!index) while (context.indexer().is_running()) {
throw Error(Error::Code::Index, &gerr, "failed to create index object"); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
print_stats (context.indexer().progress(), "running");
try {
index_and_maybe_cleanup (index, cleanup, lazy_check);
} catch (...) {
mu_index_destroy(index);
throw;
} }
mu_index_destroy(index); print_stats (context.indexer().progress(), "complete");
mu_store_flush(store_ptr);
} }
static void static void
@ -953,6 +909,7 @@ ping_handler (Context& context, const Parameters& params)
const auto queries = get_string_vec (params, ":queries"); const auto queries = get_string_vec (params, ":queries");
Node::Seq qresults; Node::Seq qresults;
for (auto&& q: queries) { for (auto&& q: queries) {
const auto count{mu_query_count_run (context.query, q.c_str())}; const auto count{mu_query_count_run (context.query, q.c_str())};
const auto unreadq{format("flag:unread AND (%s)", q.c_str())}; const auto unreadq{format("flag:unread AND (%s)", q.c_str())};
const auto unread{mu_query_count_run (context.query, unreadq.c_str())}; const auto unread{mu_query_count_run (context.query, unreadq.c_str())};
@ -966,7 +923,7 @@ ping_handler (Context& context, const Parameters& params)
} }
Node::Seq addrs; Node::Seq addrs;
for (auto&& addr: context.store().personal_addresses()) for (auto&& addr: context.store().metadata().personal_addresses)
addrs.add(std::string(addr)); addrs.add(std::string(addr));
Node::Seq seq; Node::Seq seq;
@ -975,8 +932,8 @@ ping_handler (Context& context, const Parameters& params)
Node::Seq propseq; Node::Seq propseq;
propseq.add_prop(":version", VERSION); propseq.add_prop(":version", VERSION);
propseq.add_prop(":personal-addresses", std::move(addrs)); propseq.add_prop(":personal-addresses", std::move(addrs));
propseq.add_prop(":database-path", context.store().database_path()); propseq.add_prop(":database-path", context.store().metadata().database_path);
propseq.add_prop(":root-maildir", context.store().root_maildir()); propseq.add_prop(":root-maildir", context.store().metadata().root_maildir);
propseq.add_prop(":doccount", storecount); propseq.add_prop(":doccount", storecount);
propseq.add_prop(":queries", std::move(qresults)); propseq.add_prop(":queries", std::move(qresults));
seq.add_prop(":props", std::move(propseq)); seq.add_prop(":props", std::move(propseq));

View File

@ -20,6 +20,7 @@
#include "config.h" #include "config.h"
#include <iostream> #include <iostream>
#include <iomanip>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
@ -27,6 +28,7 @@
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#include "mu-msg.h" #include "mu-msg.h"
#include "mu-msg-part.h" #include "mu-msg-part.h"
#include "mu-cmd.hh" #include "mu-cmd.hh"
@ -547,20 +549,33 @@ cmd_verify (const MuConfig *opts, GError **err)
MU_OK : MU_ERROR; MU_OK : MU_ERROR;
} }
template <typename T>
static void key_val(const Mu::MaybeAnsi& col, const std::string& key, T val)
{
using Color = Mu::MaybeAnsi::Color;
std::cout << col.fg(Color::BrightBlue)
<< std::left << std::setw(18) << key
<< col.reset() << ": ";
std::cout << col.fg(Color::Green)
<< val << col.reset() << "\n";
}
static MuError static MuError
cmd_info (const Mu::Store& store, const MuConfig *opts, GError **err) cmd_info (const Mu::Store& store, const MuConfig *opts, GError **err)
{ {
const auto green{opts->nocolor ? "" : MU_COLOR_GREEN}; Mu::MaybeAnsi col{!opts->nocolor};
const auto def{opts->nocolor ? "" : MU_COLOR_DEFAULT};
std::cout << "database-path : " key_val(col, "maildir", store.metadata().root_maildir);
<< green << store.database_path() << def << "\n" key_val(col, "database-path", store.metadata().database_path);
<< "messages in store : " key_val(col, "schema-version", store.metadata().schema_version);
<< green << store.size() << def << "\n" key_val(col, "max-message-size", store.metadata().max_message_size);
<< "schema-version : " key_val(col, "batch-size", store.metadata().batch_size);
<< green << store.schema_version() << def << "\n"; key_val(col, "messages in store", store.size());
const auto created{store.created()}; const auto created{store.metadata().created};
const auto tstamp{::localtime (&created)}; const auto tstamp{::localtime (&created)};
#pragma GCC diagnostic push #pragma GCC diagnostic push
@ -569,23 +584,14 @@ cmd_info (const Mu::Store& store, const MuConfig *opts, GError **err)
strftime (tbuf, sizeof(tbuf), "%c", tstamp); strftime (tbuf, sizeof(tbuf), "%c", tstamp);
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
std::cout << "created : " << green << tbuf << def << "\n" key_val(col, "created", tbuf);
<< "maildir : "
<< green << store.root_maildir() << def << "\n";
std::cout << ("personal-addresses : "); const auto addrs{store.metadata().personal_addresses};
const auto addrs{store.personal_addresses()};
if (addrs.empty()) if (addrs.empty())
std::cout << green << "<none>" << def << "\n"; key_val(col, "personal-address", "<none>");
else { else
bool first{true}; for (auto&& c: addrs)
for (auto&& c: addrs) { key_val(col, "personal-address", c);
std::cout << (!first ? " " : "")
<< green << c << def << "\n";
first = false;
}
}
return MU_OK; return MU_OK;
} }
@ -601,6 +607,20 @@ cmd_init (const MuConfig *opts, GError **err)
return MU_ERROR_IN_PARAMETERS; return MU_ERROR_IN_PARAMETERS;
} }
if (opts->max_msg_size < 0) {
mu_util_g_set_error (err, MU_ERROR_IN_PARAMETERS,
"invalid value for max-message-size");
return MU_ERROR_IN_PARAMETERS;
} else if (opts->batch_size < 0) {
mu_util_g_set_error (err, MU_ERROR_IN_PARAMETERS,
"invalid value for batch-size");
return MU_ERROR_IN_PARAMETERS;
}
Mu::Store::Config conf{};
conf.max_message_size = opts->max_msg_size;
conf.batch_size = opts->batch_size;
Mu::StringVec my_addrs; Mu::StringVec my_addrs;
auto addrs = opts->my_addresses; auto addrs = opts->my_addresses;
while (addrs && *addrs) { while (addrs && *addrs) {
@ -608,29 +628,16 @@ cmd_init (const MuConfig *opts, GError **err)
++addrs; ++addrs;
} }
Mu::Store store(mu_runtime_path(MU_RUNTIME_PATH_XAPIANDB), opts->maildir, my_addrs); Mu::Store store(mu_runtime_path(MU_RUNTIME_PATH_XAPIANDB),
opts->maildir, my_addrs, conf);
if (!opts->quiet) { if (!opts->quiet) {
cmd_info (store, opts, NULL); cmd_info (store, opts, NULL);
g_print ("\nstore created.\n" std::cout << "\nstore created; use the 'index' command to fill/update it.\n";
"use 'mu index' to fill the database "
"with your messages.\n"
"see mu-index(1) for details\n");
} }
return MU_OK; return MU_OK;
} }
static MuError
cmd_index (Mu::Store& store, const MuConfig *opts, GError **err)
{
const auto res = mu_cmd_index(store, opts, err);
if (res == MU_OK && !opts->quiet)
cmd_info(store, opts, err);
return res;
}
static MuError static MuError
cmd_find (const MuConfig *opts, GError **err) cmd_find (const MuConfig *opts, GError **err)
{ {
@ -722,7 +729,7 @@ mu_cmd_execute (const MuConfig *opts, GError **err) try
case MU_CONFIG_CMD_TICKLE: case MU_CONFIG_CMD_TICKLE:
merr = with_writable_store (cmd_tickle, opts, err); break; merr = with_writable_store (cmd_tickle, opts, err); break;
case MU_CONFIG_CMD_INDEX: case MU_CONFIG_CMD_INDEX:
merr = with_writable_store (cmd_index, opts, err); break; merr = with_writable_store (mu_cmd_index, opts, err); break;
/* commands instantiate store themselves */ /* commands instantiate store themselves */
case MU_CONFIG_CMD_INIT: case MU_CONFIG_CMD_INIT:

View File

@ -144,10 +144,17 @@ config_options_group_init (void)
{"my-address", 0, 0, G_OPTION_ARG_STRING_ARRAY, {"my-address", 0, 0, G_OPTION_ARG_STRING_ARRAY,
&MU_CONFIG.my_addresses, "my e-mail address; can be used multiple times", &MU_CONFIG.my_addresses, "my e-mail address; can be used multiple times",
"<address>"}, "<address>"},
{"max-message-size", 0, 0, G_OPTION_ARG_INT,
&MU_CONFIG.max_msg_size, "Maximum allowed size for messages",
"<size-in-bytes>"},
{"batch-size", 0, 0, G_OPTION_ARG_INT,
&MU_CONFIG.max_msg_size,
"Number of changes in a database transaction batch",
"<number>"},
{NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL} {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}
}; };
og = g_option_group_new("init", "Options for the 'index' command", og = g_option_group_new("init", "Options for the 'init' command",
"", NULL, NULL); "", NULL, NULL);
g_option_group_add_entries(og, entries); g_option_group_add_entries(og, entries);

View File

@ -109,16 +109,18 @@ struct _MuConfig {
/* options for init */ /* options for init */
gchar *maildir; /* where the mails are */ gchar *maildir; /* where the mails are */
char** my_addresses; /* 'my e-mail address', for mu char** my_addresses; /* 'my e-mail address', for mu cfind;
* cfind; can be use multiple * can be use multiple times */
* times */ int max_msg_size; /* maximum size for message files */
int batch_size; /* database transaction batch size */
/* options for indexing */ /* options for indexing */
gboolean nocleanup; /* don't cleanup del'd mails from db */ gboolean nocleanup; /* don't cleanup del'd mails from db */
gboolean rebuild; /* empty the database before indexing */ gboolean rebuild; /* empty the database before indexing */
gboolean lazycheck; /* don't check dirs with up-to-date gboolean lazycheck; /* don't check dirs with up-to-date
* timestamps */ * timestamps */
int max_msg_size; /* maximum size for message files */
/* options for querying 'find' (and view-> 'summary') */ /* options for querying 'find' (and view-> 'summary') */
gchar *fields; /* fields to show in output */ gchar *fields; /* fields to show in output */

View File

@ -26,7 +26,9 @@
#include "mu-config.hh" #include "mu-config.hh"
#include "mu-cmd.hh" #include "mu-cmd.hh"
#include "mu-runtime.h" #include "mu-runtime.h"
#include "utils/mu-utils.hh"
using namespace Mu;
static void static void
show_version (void) show_version (void)
@ -50,39 +52,48 @@ handle_error (MuConfig *conf, MuError merr, GError **err)
if (!(err && *err)) if (!(err && *err))
return; return;
using Color = MaybeAnsi::Color;
MaybeAnsi col{conf ? !conf->nocolor : false};
if (*err) if (*err)
g_printerr ("error: %s (%u)\n", std::cerr << col.fg(Color::Red) << "error" << col.reset() << ": "
(*err)->message, << col.fg(Color::BrightYellow)
(*err)->code); << ((*err) ? (*err)->message : "something when wrong")
<< "\n";
std::cerr << col.fg(Color::Green);
switch ((*err)->code) { switch ((*err)->code) {
case MU_ERROR_XAPIAN_CANNOT_GET_WRITELOCK: case MU_ERROR_XAPIAN_CANNOT_GET_WRITELOCK:
g_printerr ("maybe mu is already running?\n"); std::cerr << "Maybe mu is already running?\n";
break; break;
case MU_ERROR_XAPIAN_NEEDS_REINDEX: case MU_ERROR_XAPIAN_NEEDS_REINDEX:
g_printerr ("database needs (re)indexing.\n" std::cerr << "Database needs (re)indexing.\n"
"try 'mu index' " << "try 'mu index' "
"(see mu-index(1) for details)\n"); << "(see mu-index(1) for details)\n";
return; return;
case MU_ERROR_IN_PARAMETERS: case MU_ERROR_IN_PARAMETERS:
if (conf && mu_config_cmd_is_valid(conf->cmd)) if (conf && mu_config_cmd_is_valid(conf->cmd))
mu_config_show_help (conf->cmd); mu_config_show_help (conf->cmd);
break; break;
case MU_ERROR_SCRIPT_NOT_FOUND: case MU_ERROR_SCRIPT_NOT_FOUND:
g_printerr ("see the mu manpage for commands, or " std::cerr << "See the mu manpage for commands, or "
"'mu script' for the scripts\n"); << "'mu script' for the scripts\n";
break; break;
case MU_ERROR_XAPIAN_CANNOT_OPEN: case MU_ERROR_XAPIAN_CANNOT_OPEN:
g_printerr("Please (re)initialize mu with 'mu init' " std::cerr << "Please (re)initialize mu with 'mu init' "
"see mu-init(1) for details\n"); << "see mu-init(1) for details\n";
return; return;
case MU_ERROR_XAPIAN_SCHEMA_MISMATCH: case MU_ERROR_XAPIAN_SCHEMA_MISMATCH:
g_printerr("Please (re)initialize mu with 'mu init' " std::cerr << "Please (re)initialize mu with 'mu init' "
"see mu-init(1) for details\n"); << "see mu-init(1) for details\n";
return; return;
default: default:
break; /* nothing to do */ break; /* nothing to do */
} }
std::cerr << col.reset();
} }

View File

@ -314,9 +314,6 @@ test_mu_cfind_csv (void)
output = erroutput = NULL; output = erroutput = NULL;
g_assert (g_spawn_command_line_sync (cmdline, &output, &erroutput, g_assert (g_spawn_command_line_sync (cmdline, &output, &erroutput,
NULL, NULL)); NULL, NULL));
g_print ("\n\n%s\n\n", output);
g_assert (output); g_assert (output);
if (output[1] == 'H') if (output[1] == 'H')
g_assert_cmpstr (output, g_assert_cmpstr (output,

View File

@ -1,5 +1,5 @@
/* /*
** Copyright (C) 2010-2017 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ** Copyright (C) 2010-2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
** **
** This program is free software; you can redistribute it and/or modify it ** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the ** under the terms of the GNU General Public License as published by the
@ -17,9 +17,7 @@
** **
*/ */
#if HAVE_CONFIG_H
#include "config.h" #include "config.h"
#endif /*HAVE_CONFIG*/
#include <gtk/gtk.h> #include <gtk/gtk.h>
#include <gdk/gdkkeysyms.h> #include <gdk/gdkkeysyms.h>
@ -28,7 +26,6 @@
#include <utils/mu-util.h> #include <utils/mu-util.h>
#include <mu-store.hh> #include <mu-store.hh>
#include <mu-runtime.h> #include <mu-runtime.h>
#include <mu-index.h>
#include "mug-msg-list-view.h" #include "mug-msg-list-view.h"
#include "mug-query-bar.h" #include "mug-query-bar.h"