query: Rework querying/threading machinery

Rewrite the query machinery in c++:
- use an MSet decorator instead of the mu-msg-iter stuff
- use mu-query-decider to mark duplicates/unreadable/related messages
- use mu-query-threader to replace the older container/thread code

Algorithm did not substantially change, but the implementation details
did.
This commit is contained in:
Dirk-Jan C. Binnema
2020-11-28 10:11:07 +02:00
parent 86e1515c71
commit 95dffb98a6
18 changed files with 2008 additions and 2464 deletions

View File

@ -23,19 +23,16 @@
#include <cctype>
#include <cstring>
#include <sstream>
#include <cmath>
#include <stdlib.h>
#include <xapian.h>
#include <glib/gstdio.h>
#include "mu-msg-fields.h"
#include "mu-msg-iter.h"
#include "utils/mu-str.h"
#include "utils/mu-date.h"
#include <utils/mu-utils.hh>
#include "mu-query-results.hh"
#include "mu-query-match-deciders.hh"
#include "mu-query-threads.hh"
#include <mu-xapian.hh>
using namespace Mu;
@ -43,186 +40,29 @@ using namespace Mu;
struct Query::Private {
Private(const Store& store): store_{store},
parser_{store_} {}
// New
//bool calculate_threads (Xapian::Enquire& enq, size maxnum);
Xapian::Query make_query (const std::string& expr, GError **err) const;
Xapian::Enquire make_enquire (const std::string& expr, MuMsgFieldId sortfieldid,
bool descending, GError **err) const;
GHashTable* find_thread_ids (MuMsgIter *iter, GHashTable **orig_set) const;
Xapian::Enquire make_enquire (const std::string& expr,
MuMsgFieldId sortfieldid, QueryFlags qflags) const;
Xapian::Enquire make_related_enquire (const Xapian::Query& first_q,
const StringSet& thread_ids,
MuMsgFieldId sortfieldid, QueryFlags qflags) const;
Xapian::Query make_related_query (MuMsgIter *iter, GHashTable **orig_set) const;
void find_related_messages (MuMsgIter **iter, int maxnum,
MuMsgFieldId sortfieldid, Query::Flags flags,
Xapian::Query orig_query) const;
Option<QueryResults> run_threaded (QueryResults &qres, Xapian::Enquire& enq,
MuMsgFieldId sortfieldid,
QueryFlags qflags, size_t maxnum) const;
Option<QueryResults> run_singular (const std::string& expr, MuMsgFieldId sortfieldid,
QueryFlags qflags, size_t maxnum) const;
Option<QueryResults> run_related (const std::string& expr, MuMsgFieldId sortfieldid,
QueryFlags qflags, size_t maxnum) const;
Option<QueryResults> run (const std::string& expr, MuMsgFieldId sortfieldid,
QueryFlags qflags, size_t maxnum) const;
const Store& store_;
const Parser parser_;
};
static constexpr MuMsgIterFlags
msg_iter_flags (Query::Flags flags)
{
MuMsgIterFlags iflags{MU_MSG_ITER_FLAG_NONE};
if (any_of(flags & Query::Flags::Descending))
iflags |= MU_MSG_ITER_FLAG_DESCENDING;
if (any_of(flags & Query::Flags::SkipUnreadable))
iflags |= MU_MSG_ITER_FLAG_SKIP_UNREADABLE;
if (any_of(flags & Query::Flags::SkipDups))
iflags |= MU_MSG_ITER_FLAG_SKIP_DUPS;
if (any_of(flags & Query::Flags::Threading))
iflags |= MU_MSG_ITER_FLAG_THREADS;
return iflags;
}
Xapian::Query
Query::Private::make_query (const std::string& expr, GError **err) const try {
Mu::WarningVec warns;
const auto tree{parser_.parse(expr, warns)};
for (auto&& w: warns)
g_warning ("query warning: %s", to_string(w).c_str());
return Mu::xapian_query (tree);
} catch (...) {
mu_util_g_set_error (err, MU_ERROR_XAPIAN_QUERY,
"parse error in query");
throw;
}
Xapian::Enquire
Query::Private::make_enquire (const std::string& expr, MuMsgFieldId sortfieldid,
bool descending, GError **err) const
{
Xapian::Enquire enq{store_.database()};
try {
if (!expr.empty() && expr != R"("")")
enq.set_query(make_query (expr, err));
else/* empty or "" means "matchall" */
enq.set_query(Xapian::Query::MatchAll);
} catch (...) {
mu_util_g_set_error (err, MU_ERROR_XAPIAN_QUERY, "parse error in query");
throw;
}
enq.set_cutoff(0,0);
return enq;
}
/*
* record all thread-ids for the messages; also 'orig_set' receives all
* original matches (a map msgid-->docid), so we can make sure the
* originals are not seen as 'duplicates' later (when skipping
* duplicates). We want to favor the originals over the related
* messages, when skipping duplicates.
*/
GHashTable*
Query::Private::find_thread_ids (MuMsgIter *iter, GHashTable **orig_set) const
{
GHashTable *ids;
ids = g_hash_table_new_full (g_str_hash, g_str_equal,
(GDestroyNotify)g_free, NULL);
*orig_set = g_hash_table_new_full (g_str_hash, g_str_equal,
(GDestroyNotify)g_free, NULL);
while (!mu_msg_iter_is_done (iter)) {
char *thread_id, *msgid;
unsigned docid;
/* record the thread id for the message */
if ((thread_id = mu_msg_iter_get_thread_id (iter)))
g_hash_table_insert (ids, thread_id,
GSIZE_TO_POINTER(TRUE));
/* record the original set */
docid = mu_msg_iter_get_docid(iter);
if (docid != 0 && (msgid = mu_msg_iter_get_msgid (iter)))
g_hash_table_insert (*orig_set, msgid,
GSIZE_TO_POINTER(docid));
if (!mu_msg_iter_next (iter))
break;
}
return ids;
}
Xapian::Query
Query::Private::make_related_query (MuMsgIter *iter, GHashTable **orig_set) const
{
GHashTable *hash;
GList *id_list, *cur;
std::vector<Xapian::Query> qvec;
static std::string pfx (1, mu_msg_field_xapian_prefix
(MU_MSG_FIELD_ID_THREAD_ID));
/* orig_set receives the hash msgid->docid of the set of
* original matches */
hash = find_thread_ids (iter, orig_set);
/* id_list now gets a list of all thread-ids seen in the query
* results; either in the Message-Id field or in
* References. */
id_list = g_hash_table_get_keys (hash);
// now, we create a vector with queries for each of the
// thread-ids, which we combine below. This is /much/ faster
// than creating the query as 'query = Query (OR, query)'...
for (cur = id_list; cur; cur = g_list_next(cur))
qvec.push_back (Xapian::Query((std::string
(pfx + (char*)cur->data))));
g_hash_table_destroy (hash);
g_list_free (id_list);
return Xapian::Query (Xapian::Query::OP_OR, qvec.begin(), qvec.end());
}
void
Query::Private::find_related_messages (MuMsgIter **iter, int maxnum,
MuMsgFieldId sortfieldid, Query::Flags flags,
Xapian::Query orig_query) const
{
GHashTable *orig_set;
Xapian::Enquire enq{store_.database()};
MuMsgIter *rel_iter;
const bool inc_related{any_of(flags & Query::Flags::IncludeRelated)};
orig_set = NULL;
Xapian::Query new_query{make_related_query (*iter, &orig_set)};
/* If related message are not desired, filter out messages which would not
have matched the original query.
*/
if (!inc_related)
new_query = Xapian::Query (Xapian::Query::OP_AND, orig_query, new_query);
enq.set_query(new_query);
enq.set_cutoff(0,0);
rel_iter= mu_msg_iter_new (
reinterpret_cast<XapianEnquire*>(&enq),
maxnum,
sortfieldid,
msg_iter_flags (flags),
NULL);
mu_msg_iter_destroy (*iter);
// set the preferred set for the iterator (ie., the set of
// messages not considered to be duplicates) to be the
// original matches -- the matches without considering
// 'related'
mu_msg_iter_set_preferred (rel_iter, orig_set);
g_hash_table_destroy (orig_set);
*iter = rel_iter;
}
Query::Query(const Store& store):
priv_{std::make_unique<Private>(store)}
{}
@ -232,66 +72,170 @@ Query::Query(Query&& other) = default;
Query::~Query() = default;
MuMsgIter*
Query::run (const std::string& expr, MuMsgFieldId sortfieldid, Query::Flags flags,
size_t maxnum, GError **err) const
static Xapian::Enquire&
maybe_sort (Xapian::Enquire& enq, MuMsgFieldId sortfieldid, QueryFlags qflags)
{
g_return_val_if_fail (mu_msg_field_id_is_valid (sortfieldid) ||
sortfieldid == MU_MSG_FIELD_ID_NONE,
NULL);
try {
MuMsgIter *iter;
const bool threads = any_of(flags & Flags::Threading);
const bool inc_related = any_of(flags & Flags::IncludeRelated);
const bool descending = any_of(flags & Flags::Descending);
Xapian::Enquire enq (priv_->make_enquire(expr, sortfieldid, descending, err));
if (sortfieldid != MU_MSG_FIELD_ID_NONE)
enq.set_sort_by_value(static_cast<Xapian::valueno>(sortfieldid),
any_of(qflags & QueryFlags::Descending));
return enq;
}
/* when we're doing a 'include-related query', wea're actually
* doing /two/ queries; one to get the initial matches, and
* based on that one to get all messages in threads in those
* matches.
*/
Xapian::Enquire
Query::Private::make_enquire (const std::string& expr,
MuMsgFieldId sortfieldid, QueryFlags qflags) const
{
Xapian::Enquire enq{store_.database()};
/* get the 'real' maxnum if it was specified as < 0 */
maxnum = maxnum == 0 ? priv_->store_.size(): maxnum;
/* Calculating threads involves two queries, so do the calculation only in
* the second query instead of in both.
*/
Query::Flags first_flags{};
if (threads)
first_flags = flags & ~Flags::Threading;
else
first_flags = flags;
/* Perform the initial query, returning up to max num results.
*/
iter = mu_msg_iter_new (
reinterpret_cast<XapianEnquire*>(&enq),
maxnum,
sortfieldid,
msg_iter_flags (first_flags),
err);
/* If we want threads or related messages, find related messages using a
* second query based on the message ids / refs of the first query's result.
* Do this even if we don't want to include related messages in the final
* result so we can apply the threading algorithm to the related message set
* of a maxnum-sized result instead of the unbounded result of the first
* query. If threads are desired but related message are not, we will remove
* the undesired related messages later.
*/
if(threads||inc_related)
priv_->find_related_messages (&iter, maxnum, sortfieldid, flags,
enq.get_query());
if (expr.empty() || expr == R"("")")
enq.set_query(Xapian::Query::MatchAll);
else {
WarningVec warns;
const auto tree{parser_.parse(expr, warns)};
for (auto&& w: warns)
g_warning ("query warning: %s", to_string(w).c_str());
enq.set_query(xapian_query(tree));
}
return iter;
return maybe_sort (enq, sortfieldid, qflags);
}
} MU_XAPIAN_CATCH_BLOCK_G_ERROR_RETURN (err, MU_ERROR_XAPIAN, 0);
Xapian::Enquire
Query::Private::make_related_enquire (const Xapian::Query& first_q,
const StringSet& thread_ids,
MuMsgFieldId sortfieldid, QueryFlags qflags) const
{
Xapian::Enquire enq{store_.database()};
static std::string pfx (1, mu_msg_field_xapian_prefix(MU_MSG_FIELD_ID_THREAD_ID));
std::vector<Xapian::Query> qvec{first_q};
for (auto&& t: thread_ids)
qvec.emplace_back(pfx + t);
Xapian::Query qr{Xapian::Query::OP_OR, qvec.begin(), qvec.end()};
enq.set_query(qr);
return maybe_sort (enq, sortfieldid, qflags);
}
struct ThreadKeyMaker: public Xapian::KeyMaker {
ThreadKeyMaker (const QueryMatches& matches):
match_info_(matches)
{}
std::string operator()(const Xapian::Document &doc) const override {
const auto it{match_info_.find(doc.get_docid())};
if (G_UNLIKELY(it == match_info_.end())) {
g_warning("can't find document %u", doc.get_docid());
return "";
}
return it->second.thread_path;
}
const QueryMatches& match_info_;
};
Option<QueryResults>
Query::Private::run_threaded (QueryResults &qres, Xapian::Enquire& enq,
MuMsgFieldId sortfieldid,
QueryFlags qflags, size_t maxnum) const
{
const auto descending{any_of(qflags & QueryFlags::Descending)};
calculate_threads(qres, sortfieldid, descending);
ThreadKeyMaker key_maker{qres.query_matches()};
enq.set_sort_by_key(&key_maker, descending);
DeciderInfo minfo;
minfo.matches = qres.query_matches();
auto mset{enq.get_mset(0, maxnum, {}, make_final_decider(qflags, minfo).get())};
return QueryResults{mset, std::move(qres.query_matches())};
}
Option<QueryResults>
Query::Private::run_singular (const std::string& expr, MuMsgFieldId sortfieldid,
QueryFlags qflags, size_t maxnum) const
{
const auto singular_qflags{qflags | QueryFlags::Leader};
const auto threading{any_of(qflags & QueryFlags::Threading)};
DeciderInfo minfo{};
auto enq{make_enquire(expr, threading ? MU_MSG_FIELD_ID_NONE : sortfieldid, qflags)};
auto mset{enq.get_mset(0, maxnum, {}, make_leader_decider(singular_qflags, minfo).get())};
auto qres{QueryResults{mset, std::move(minfo.matches)}};
if (none_of(qflags & QueryFlags::Threading))
return qres;
else
return run_threaded(qres, enq, sortfieldid, qflags, maxnum);
}
Option<QueryResults>
Query::Private::run_related (const std::string& expr, MuMsgFieldId sortfieldid,
QueryFlags qflags, size_t maxnum) const
{
const auto leader_qflags{qflags | QueryFlags::Leader | QueryFlags::GatherThreadIds};
const auto threading{any_of(qflags & QueryFlags::Threading)};
// Run our first, "leader" query;
DeciderInfo minfo{};
auto enq{make_enquire(expr, MU_MSG_FIELD_ID_NONE, qflags)};
const auto mset{enq.get_mset(0, maxnum, {},
make_leader_decider(leader_qflags, minfo).get())};
// Now, determine the "related query"
auto r_enq{make_related_enquire(enq.get_query(), minfo.thread_ids,
threading ? MU_MSG_FIELD_ID_NONE :sortfieldid, qflags)};
const auto r_mset{r_enq.get_mset(0, maxnum, {}, make_related_decider(qflags, minfo).get())};
auto qres{QueryResults{r_mset, std::move(minfo.matches)}};
if (none_of(qflags & QueryFlags::Threading))
return qres;
else
return run_threaded(qres, r_enq, sortfieldid, qflags, maxnum);
}
Option<QueryResults>
Query::Private::run (const std::string& expr, MuMsgFieldId sortfieldid,
QueryFlags qflags, size_t maxnum) const
{
const auto eff_maxnum{maxnum == 0 ? store_.size() : maxnum};
if (any_of(qflags & QueryFlags::IncludeRelated))
return run_related (expr, sortfieldid, qflags, eff_maxnum);
else
return run_singular(expr, sortfieldid, qflags, eff_maxnum);
}
Option<QueryResults>
Query::run (const std::string& expr, MuMsgFieldId sortfieldid,
QueryFlags qflags, size_t maxnum) const try
{
// some flags are for internal use only.
g_return_val_if_fail (none_of(qflags & QueryFlags::Leader), Nothing);
g_return_val_if_fail (none_of(qflags & QueryFlags::GatherThreadIds), Nothing);
StopWatch sw{format("query '%s'; related: %s; threads: %s; max-size: %zu",
expr.c_str(),
any_of(qflags & QueryFlags::IncludeRelated) ? "yes" : "no",
any_of(qflags & QueryFlags::Threading) ? "yes" : "no",
maxnum)};
return priv_->run(expr, sortfieldid, qflags, maxnum);
} catch (...) {
return Nothing;
}
size_t
Query::count (const std::string& expr) const try
{
const auto enq{priv_->make_enquire(expr, MU_MSG_FIELD_ID_NONE, false, nullptr)};
const auto enq{priv_->make_enquire(expr, MU_MSG_FIELD_ID_NONE, {})};
auto mset{enq.get_mset(0, priv_->store_.size())};
mset.fetch();
@ -302,24 +246,15 @@ Query::count (const std::string& expr) const try
std::string
Query::parse(const std::string& expr, bool xapian) const try
Query::parse (const std::string& expr, bool xapian) const
{
if (xapian) {
GError *err{};
const auto descr{priv_->make_query(expr, &err).get_description()};
if (err) {
g_warning ("query error: %s", err->message);
g_clear_error(&err);
}
return descr;
} else {
Mu::WarningVec warns;
const auto tree = priv_->parser_.parse (expr, warns);
for (auto&& w: warns)
g_warning ("query error: %s", to_string(w).c_str());
WarningVec warns;
const auto tree{priv_->parser_.parse(expr, warns)};
for (auto&& w: warns)
g_warning ("query warning: %s", to_string(w).c_str());
if (xapian)
return xapian_query(tree).get_description();
else
return to_string(tree);
}
} MU_XAPIAN_CATCH_BLOCK_RETURN("");
}