Merge branch 'wip/djcb/xapian-single-thread'

This makes mu (/mu4e) use only single-threaded access to Xapian((*),
to avoid the problems with #2601 that some people are seeing.

In the mu4e UI, you'll see an '-st' suffix to the version, and
occasionally (hopefully not too often!) you get a warning from mu4e when
trying to talk to mu4e while indexing is underway,

  "Cannot handle command while indexing, please retry later."

which means just what is says.

(*) unless you pass `-Dxapian-single-threaded=false` to meson.
This commit is contained in:
Dirk-Jan C. Binnema
2024-10-08 22:14:02 +03:00
8 changed files with 161 additions and 59 deletions

View File

@ -51,6 +51,9 @@ future.
this support, so it becomes more widely useful. this support, so it becomes more widely useful.
https://github.com/djcb/mu/issues/1982 https://github.com/djcb/mu/issues/1982
- Display the messages from old-to-new (still get the newest though)
https://github.com/djcb/mu/issues/2759
* Done * Done
- Support mu4e-mark-handle-when also for when leaving emacs - Support mu4e-mark-handle-when also for when leaving emacs

View File

@ -105,6 +105,7 @@ struct Indexer::Private {
bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype); bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype);
void maybe_start_worker(); void maybe_start_worker();
void item_worker(); void item_worker();
void scan_worker(); void scan_worker();
@ -135,6 +136,8 @@ struct Indexer::Private {
Type type; Type type;
}; };
void handle_item(WorkItem&& item);
AsyncQueue<WorkItem> todos_; AsyncQueue<WorkItem> todos_;
Progress progress_{}; Progress progress_{};
@ -193,7 +196,11 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
return true; return true;
} }
case Scanner::HandleType::LeaveDir: { case Scanner::HandleType::LeaveDir: {
#ifdef XAPIAN_SINGLE_THREADED
handle_item({fullpath, WorkItem::Type::Dir});
#else
todos_.push({fullpath, WorkItem::Type::Dir}); todos_.push({fullpath, WorkItem::Type::Dir});
#endif /*XAPIAN_SINGLE_THREADED*/
return true; return true;
} }
@ -210,9 +217,13 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath)) if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath))
return false; return false;
#ifdef XAPIAN_SINGLE_THREADED
handle_item({fullpath, WorkItem::Type::File});
#else
// push the remaining messages to our "todo" queue for // push the remaining messages to our "todo" queue for
// (re)parsing and adding/updating to the database. // (re)parsing and adding/updating to the database.
todos_.push({fullpath, WorkItem::Type::File}); todos_.push({fullpath, WorkItem::Type::File});
#endif
return true; return true;
} }
default: default:
@ -260,6 +271,30 @@ Indexer::Private::add_message(const std::string& path)
return true; return true;
} }
void
Indexer::Private::handle_item(WorkItem&& item)
{
try {
switch (item.type) {
case WorkItem::Type::File: {
if (G_LIKELY(add_message(item.full_path)))
++progress_.updated;
} break;
case WorkItem::Type::Dir:
store_.set_dirstamp(item.full_path, ::time(NULL));
break;
default:
g_warn_if_reached();
break;
}
} catch (const Mu::Error& er) {
mu_warning("error adding message @ {}: {}", item.full_path, er.what());
}
}
void void
Indexer::Private::item_worker() Indexer::Private::item_worker()
{ {
@ -270,22 +305,8 @@ Indexer::Private::item_worker()
while (state_ == IndexState::Scanning) { while (state_ == IndexState::Scanning) {
if (!todos_.pop(item, 250ms)) if (!todos_.pop(item, 250ms))
continue; continue;
try {
switch (item.type) { handle_item(std::move(item));
case WorkItem::Type::File: {
if (G_LIKELY(add_message(item.full_path)))
++progress_.updated;
} break;
case WorkItem::Type::Dir:
store_.set_dirstamp(item.full_path, ::time(NULL));
break;
default:
g_warn_if_reached();
break;
}
} catch (const Mu::Error& er) {
mu_warning("error adding message @ {}: {}", item.full_path, er.what());
}
maybe_start_worker(); maybe_start_worker();
std::this_thread::yield(); std::this_thread::yield();

View File

@ -149,6 +149,7 @@ struct Server::Private {
Store& store() { return store_; } Store& store() { return store_; }
const Store& store() const { return store_; } const Store& store() const { return store_; }
Indexer& indexer() { return store().indexer(); } Indexer& indexer() { return store().indexer(); }
void do_index(const Indexer::Config& conf);
//CommandMap& command_map() const { return command_map_; } //CommandMap& command_map() const { return command_map_; }
// //
@ -761,6 +762,20 @@ get_stats(const Indexer::Progress& stats, const std::string& state)
return sexp; return sexp;
} }
void
Server::Private::do_index(const Indexer::Config& conf)
{
StopWatch sw{"indexing"};
indexer().start(conf);
while (indexer().is_running()) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
output_sexp(get_stats(indexer().progress(), "running"),
Server::OutputFlags::Flush);
}
output_sexp(get_stats(indexer().progress(), "complete"),
Server::OutputFlags::Flush);
}
void void
Server::Private::index_handler(const Command& cmd) Server::Private::index_handler(const Command& cmd)
{ {
@ -770,22 +785,23 @@ Server::Private::index_handler(const Command& cmd)
// ignore .noupdate with an empty store. // ignore .noupdate with an empty store.
conf.ignore_noupdate = store().empty(); conf.ignore_noupdate = store().empty();
#ifdef XAPIAN_SINGLE_THREADED
// nothing to do
if (indexer().is_running()) {
throw Error{Error::Code::Xapian, "indexer is already running"};
}
do_index(conf);
#else
indexer().stop(); indexer().stop();
if (index_thread_.joinable()) if (index_thread_.joinable())
index_thread_.join(); index_thread_.join();
// start a background track. // start a background track.
index_thread_ = std::thread([this, conf = std::move(conf)] { index_thread_ = std::thread([this, conf = std::move(conf)] {
StopWatch sw{"indexing"}; do_index(conf);
indexer().start(conf);
while (indexer().is_running()) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
output_sexp(get_stats(indexer().progress(), "running"),
Server::OutputFlags::Flush);
}
output_sexp(get_stats(indexer().progress(), "complete"),
Server::OutputFlags::Flush);
}); });
#endif /*XAPIAN_SINGLE_THREADED */
} }
void void
@ -959,6 +975,9 @@ Server::Private::ping_handler(const Command& cmd)
":personal-addresses", std::move(addrs), ":personal-addresses", std::move(addrs),
":database-path", store().path(), ":database-path", store().path(),
":root-maildir", store().root_maildir(), ":root-maildir", store().root_maildir(),
#ifdef XAPIAN_SINGLE_THREADED
":xapian-single-threaded", Sexp::t_sym,
#endif /*XAPIAN_SINGLE_THREADED*/
":doccount", storecount))); ":doccount", storecount)));
} }

View File

@ -133,7 +133,13 @@ add_project_arguments(['-DHAVE_CONFIG_H'], language: 'cpp')
config_h_dep=declare_dependency( config_h_dep=declare_dependency(
include_directories: include_directories(['.'])) include_directories: include_directories(['.']))
#
# single-threaded Xapian access?
#
if get_option('xapian-single-threaded')
config_h_data.set('XAPIAN_SINGLE_THREADED', true)
message('use Xapian only in a single thread')
endif
# #
# d_type, d_ino are not available universally, so let's check # d_type, d_ino are not available universally, so let's check
# (we use them for optimizations in mu-scanner # (we use them for optimizations in mu-scanner
@ -322,6 +328,8 @@ if gmime_dep.version() == '3.2.13'
warning('See: https://github.com/jstedfast/gmime/issues/133') warning('See: https://github.com/jstedfast/gmime/issues/133')
endif endif
# Local Variables: # Local Variables:
# indent-tabs-mode: nil # indent-tabs-mode: nil
# End: # End:

View File

@ -14,36 +14,56 @@
## along with this program; if not, write to the Free Software Foundation, ## along with this program; if not, write to the Free Software Foundation,
## Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ## Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
option('tests',
option('cld2',
type : 'feature', type : 'feature',
value: 'auto', value: 'auto',
description: 'build unit tests') description: 'Add support for language-detection through cld2')
#
# emacs
#
option('emacs',
type: 'string',
value: 'emacs',
description: 'name/path of the emacs executable (for byte-compilation)')
option('lispdir',
type: 'string',
description: 'path under which to install emacs-lisp files')
#
# guile
#
option('guile', option('guile',
type : 'feature', type : 'feature',
value: 'auto', value: 'auto',
description: 'build the guile scripting support (requires guile-3.x)') description: 'build the guile scripting support (requires guile-3.x)')
option('cld2',
type : 'feature',
value: 'auto',
description: 'Compact Language Detector2')
# by default, this uses guile_dep.get_variable(pkgconfig: 'extensiondir') # by default, this uses guile_dep.get_variable(pkgconfig: 'extensiondir')
option('guile-extension-dir', option('guile-extension-dir',
type: 'string', type: 'string',
description: 'custom install path for the guile extension module') description: 'custom install path for the guile extension module')
#
# misc
#
option('tests',
type : 'feature',
value: 'auto',
description: 'build unit tests')
option('xapian-single-threaded',
type : 'boolean',
value: true,
description: 'only use Xapian from a single thread')
option('readline', option('readline',
type: 'feature', type: 'feature',
value: 'auto', value: 'auto',
description: 'enable readline support for the mu4e repl') description: 'enable readline support for the mu4e repl')
option('emacs',
type: 'string',
value: 'emacs',
description: 'name/path of the emacs executable')
option('lispdir',
type: 'string',
description: 'path under which to install emacs-lisp files')

View File

@ -16,14 +16,20 @@
# generate some build data for use in mu4e # generate some build data for use in mu4e
version_extra=''
if get_option('xapian-single-threaded')
version_extra='-st'
endif
mu4e_meta = configure_file( mu4e_meta = configure_file(
input: 'mu4e-config.el.in', input: 'mu4e-config.el.in',
output: 'mu4e-config.el', output: 'mu4e-config.el',
install: true, install: true,
install_dir: mu4e_lispdir, install_dir: mu4e_lispdir,
configuration: { configuration: {
'VERSION' : meson.project_version(), 'VERSION' : meson.project_version(),
'MU_DOC_DIR' : join_paths(datadir, 'doc', 'mu'), 'MU_VERSION_EXTRA' : version_extra,
'MU_DOC_DIR' : join_paths(datadir, 'doc', 'mu'),
}) })
mu4e_pkg_desc = configure_file( mu4e_pkg_desc = configure_file(

View File

@ -1,6 +1,6 @@
;;; mu4e-main.el --- The Main interface for mu4e -*- lexical-binding: t -*- ;;; mu4e-main.el --- The Main interface for mu4e -*- lexical-binding: t -*-
;; Copyright (C) 2011-2023 Dirk-Jan C. Binnema ;; Copyright (C) 2011-2024 Dirk-Jan C. Binnema
;; Author: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ;; Author: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
;; Maintainer: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> ;; Maintainer: Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
@ -299,7 +299,9 @@ Otherwise, do nothing."
"* " "* "
(propertize "mu4e" 'face 'mu4e-header-key-face) (propertize "mu4e" 'face 'mu4e-header-key-face)
(propertize " - mu for emacs version " 'face 'mu4e-title-face) (propertize " - mu for emacs version " 'face 'mu4e-title-face)
(propertize mu4e-mu-version 'face 'mu4e-header-key-face) (propertize (concat mu4e-mu-version
(if (mu4e--server-xapian-single-threaded-p) "-st" ""))
'face 'mu4e-header-key-face)
"\n\n" "\n\n"
(propertize " Basics\n\n" 'face 'mu4e-title-face) (propertize " Basics\n\n" 'face 'mu4e-title-face)
(mu4e--main-action (mu4e--main-action

View File

@ -29,7 +29,7 @@
;;; Configuration ;;; Configuration
(defcustom mu4e-mu-home nil (defcustom mu4e-mu-home nil
"Location of an alternate mu home dir. "Location of an alternate mu home directory.
If not set, use the defaults, based on the XDG Base Directory If not set, use the defaults, based on the XDG Base Directory
Specification. Specification.
@ -188,6 +188,11 @@ for bookmarks and maildirs.")
"Get the latest server query items." "Get the latest server query items."
mu4e--server-query-items) mu4e--server-query-items)
;; temporary
(defun mu4e--server-xapian-single-threaded-p()
"Are we using Xapian in single-threaded mode?"
(plist-get mu4e--server-props :xapian-single-threaded))
;;; Handling raw server data ;;; Handling raw server data
@ -210,6 +215,9 @@ for bookmarks and maildirs.")
mu4e--server-cookie-post) mu4e--server-cookie-post)
"Regular expression matching the length cookie. "Regular expression matching the length cookie.
Match 1 will be the length (in hex).") Match 1 will be the length (in hex).")
(defvar mu4e--server-indexing nil "Currently indexing?")
(defun mu4e-running-p () (defun mu4e-running-p ()
"Whether mu4e is running. "Whether mu4e is running.
@ -249,15 +257,18 @@ removed."
(defun mu4e--server-plist-get (plist key) (defun mu4e--server-plist-get (plist key)
"Like `plist-get' but load data from file if it is a string. "Like `plist-get' but load data from file if it is a string.
I.e. (mu4e--server-plist-get (:foo bar) :foo) PLIST is a property-list, and KEY is the the key to search for.
E.g., (mu4e--server-plist-get (:foo bar) :foo)
=> bar => bar
but but
(mu4e--server-plist-get (:foo \"/tmp/data.eld\") :foo) (mu4e--server-plist-get (:foo \"/tmp/data.eld\") :foo)
=> evaluates the contents of /tmp/data.eld => evaluates the contents of /tmp/data.eld
(and deletes the file afterward). (and deletes the file afterward).
This for the few sexps we get from the mu server that support this This for the few sexps we get from the mu server that support
(headers, contacts, maildirs)." this -- headers, contacts, maildirs."
;; XXX: perhaps re-use the same buffer? ;; XXX: perhaps re-use the same buffer?
(let ((val (plist-get plist key))) (let ((val (plist-get plist key)))
(if (stringp val) (if (stringp val)
@ -383,6 +394,11 @@ The server output is as follows:
;; get some info ;; get some info
((plist-get sexp :info) ((plist-get sexp :info)
;; when indexing is finished, remove the block
(when (and (eq (plist-get sexp :info) 'index)
(eq (plist-get sexp :status) 'complete))
(setq mu4e--server-indexing nil))
(funcall mu4e-info-func sexp)) (funcall mu4e-info-func sexp))
;; get some data ;; get some data
@ -423,6 +439,7 @@ As per issue #2198."
,(when mu4e-mu-home (format "--muhome=%s" mu4e-mu-home))))) ,(when mu4e-mu-home (format "--muhome=%s" mu4e-mu-home)))))
(defun mu4e--version-check () (defun mu4e--version-check ()
"Verify that the versions for mu4e and mu are the same."
;; sanity-check 1 ;; sanity-check 1
(let ((default-directory temporary-file-directory)) ;;ensure it's local. (let ((default-directory temporary-file-directory)) ;;ensure it's local.
(unless (and mu4e-mu-binary (file-executable-p mu4e-mu-binary)) (unless (and mu4e-mu-binary (file-executable-p mu4e-mu-binary))
@ -486,6 +503,7 @@ You cannot run the repl when mu4e is running (or vice-versa)."
(proc (and (buffer-live-p buf) (get-buffer-process buf)))) (proc (and (buffer-live-p buf) (get-buffer-process buf))))
(when proc (when proc
(mu4e-message "shutting down") (mu4e-message "shutting down")
(setq mu4e--server-indexing nil)
(set-process-filter mu4e--server-process nil) (set-process-filter mu4e--server-process nil)
(set-process-sentinel mu4e--server-process nil) (set-process-sentinel mu4e--server-process nil)
(let ((delete-exited-processes t)) (let ((delete-exited-processes t))
@ -517,16 +535,20 @@ You cannot run the repl when mu4e is running (or vice-versa)."
((eq code 0) ((eq code 0)
(message nil)) ;; don't do anything (message nil)) ;; don't do anything
((eq code 11) ((eq code 11)
(error "schema mismatch; please re-init mu from command-line")) (error "Schema mismatch; please re-init mu from command-line"))
((eq code 19) ((eq code 19)
(error "mu database is locked by another process")) (error "Mu database is locked by another process"))
(t (error "mu server process ended with exit code %d" code)))) (t (error "Mu server process ended with exit code %d" code))))
(t (t
(error "something bad happened to the mu server process"))))) (error "Something bad happened to the mu server process")))))
(defun mu4e--server-call-mu (form) (defun mu4e--server-call-mu (form)
"Call the mu server with some command FORM." "Call the mu server with some command FORM."
(unless (mu4e-running-p) (mu4e--server-start)) (unless (mu4e-running-p)
(mu4e--server-start))
;; in single-threaded mode, mu can't accept our command right now.
(when (and (mu4e--server-xapian-single-threaded-p) mu4e--server-indexing)
(mu4e-warn "Cannot handle command while indexing, please retry later."))
(let* ((print-length nil) (print-level nil) (let* ((print-length nil) (print-level nil)
(cmd (format "%S" form))) (cmd (format "%S" form)))
(mu4e-log 'to-server "%s" cmd) (mu4e-log 'to-server "%s" cmd)
@ -591,7 +613,7 @@ or an error."
(defun mu4e--server-index (&optional cleanup lazy-check) (defun mu4e--server-index (&optional cleanup lazy-check)
"Index messages. "Index messages.
If CLEANUP is non-nil, remove messages which are in the database If CLEANUP is non-nil, remove messages which are in the database
but no longer in the filesystem. If LAZY-CHECK is non-nil, only but no longer in the file system. If LAZY-CHECK is non-nil, only
consider messages for which the time stamp (ctime) of the consider messages for which the time stamp (ctime) of the
directory they reside in has not changed since the previous directory they reside in has not changed since the previous
indexing run. This is much faster than the non-lazy check, but indexing run. This is much faster than the non-lazy check, but
@ -600,10 +622,11 @@ added or removed), since merely editing a message does not update
the directory time stamp." the directory time stamp."
(mu4e--server-call-mu (mu4e--server-call-mu
`(index :cleanup ,(and cleanup t) `(index :cleanup ,(and cleanup t)
:lazy-check ,(and lazy-check t)))) :lazy-check ,(and lazy-check t)))
(setq mu4e--server-indexing t)) ;; remember we're indexing.
(defun mu4e--server-mkdir (path &optional update) (defun mu4e--server-mkdir (path &optional update)
"Create a new maildir-directory at filesystem PATH. "Create a new maildir-directory at file system PATH.
When UPDATE is non-nil, send a update when completed. When UPDATE is non-nil, send a update when completed.
PATH must be below the root-maildir." PATH must be below the root-maildir."
;; handle maildir cache ;; handle maildir cache
@ -674,7 +697,7 @@ read/unread status are returned in the pong-response."
(mu4e--server-call-mu `(queries :queries ,queries))) (mu4e--server-call-mu `(queries :queries ,queries)))
(defun mu4e--server-remove (docid-or-path) (defun mu4e--server-remove (docid-or-path)
"Remove message with either DOCID or PATH. "Remove message with either DOCID-OR-PATH.
The results are reported through either (:update ... ) The results are reported through either (:update ... )
or (:error) sexps." or (:error) sexps."
(if (stringp docid-or-path) (if (stringp docid-or-path)