summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/framework/DynamicExtension.h73
-rw-r--r--include/framework/QueryRequirements.h6
-rw-r--r--include/framework/ShardRequirements.h6
-rw-r--r--include/framework/interface/Query.h176
-rw-r--r--include/framework/interface/Scheduler.h2
-rw-r--r--include/framework/interface/Shard.h5
-rw-r--r--include/framework/reconstruction/BSMPolicy.h12
-rw-r--r--include/framework/reconstruction/BackgroundTieringPolicy.h38
-rw-r--r--include/framework/reconstruction/CompactOnFull.h37
-rw-r--r--include/framework/reconstruction/FixedShardCountPolicy.h30
-rw-r--r--include/framework/reconstruction/FloodL0Policy.h3
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h10
-rw-r--r--include/framework/reconstruction/ReconstructionPolicy.h19
-rw-r--r--include/framework/reconstruction/TieringPolicy.h26
-rw-r--r--include/framework/scheduling/Epoch.h4
-rw-r--r--include/framework/scheduling/FIFOScheduler.h4
-rw-r--r--include/framework/scheduling/LockManager.h20
-rw-r--r--include/framework/scheduling/SerialScheduler.h3
-rw-r--r--include/framework/scheduling/Task.h11
-rw-r--r--include/framework/scheduling/Version.h20
-rw-r--r--include/framework/scheduling/statistics.h65
-rw-r--r--include/framework/structure/BufferView.h11
-rw-r--r--include/framework/structure/ExtensionStructure.h139
-rw-r--r--include/framework/structure/InternalLevel.h26
-rw-r--r--include/framework/structure/MutableBuffer.h31
-rw-r--r--include/framework/util/Configuration.h53
-rw-r--r--include/query/irs.h10
-rw-r--r--include/query/pointlookup.h14
-rw-r--r--include/query/rangecount.h12
-rw-r--r--include/query/wss.h14
-rw-r--r--include/shard/Alias.h259
-rw-r--r--include/shard/FSTrie.h282
-rw-r--r--include/shard/ISAMTree.h4
-rw-r--r--include/shard/LoudsPatricia.h278
-rw-r--r--include/shard/PGM.h451
-rw-r--r--include/shard/TrieSpline.h503
-rw-r--r--include/shard/VPTree.h605
-rw-r--r--include/util/bf_config.h4
-rw-r--r--include/util/types.h24
39 files changed, 1626 insertions, 1664 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6cfe72b..63264a0 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -70,7 +70,8 @@ public:
* for various configuration parameters in the system. See
* include/framework/util/Configuration.h for details.
*/
- DynamicExtension(ConfType &&config, double insertion_rate=1.0) : m_config(std::move(config)) {
+ DynamicExtension(ConfType &&config, double insertion_rate = 1.0)
+ : m_config(std::move(config)) {
m_buffer = std::make_unique<BufferType>(m_config.buffer_flush_trigger,
m_config.buffer_size);
@@ -426,13 +427,10 @@ private:
alignas(64) std::atomic<long> m_scheduled_records;
- enum reconstruction_phase {
- RECON_SCHEDULED = 1,
- RECON_ENDED = -1
- };
+ enum reconstruction_phase { RECON_SCHEDULED = 1, RECON_ENDED = -1 };
-
- void update_insertion_delay(long runtime, size_t reccnt, reconstruction_phase phase) {
+ void update_insertion_delay(long runtime, size_t reccnt,
+ reconstruction_phase phase) {
if (!m_config.dynamic_ratelimiting) {
return;
@@ -441,28 +439,19 @@ private:
m_scheduled_reconstruction_time.fetch_add(runtime * phase);
m_scheduled_records.fetch_add(reccnt * phase);
- long time_per_record = (m_scheduled_records.load())
- ? m_scheduled_reconstruction_time.load() /
- m_scheduled_records.load()
- : 0;
+ size_t total_reccnt = get_record_count();
+
+ long time_per_record =
+ (total_reccnt) ? m_scheduled_reconstruction_time.load() / total_reccnt
+ : 0;
long us = time_per_record / 1000;
- long excess_ns = time_per_record - us*1000;
+ long excess_ns = time_per_record - us * 1000;
m_stall_us.store(us);
size_t records_to_1us = (excess_ns) ? 1000 / excess_ns : 0;
- m_insertion_rate.store(1.0 - 1.0 / (double) records_to_1us );
-
- fprintf(stderr, "[%d] Updated delay: %ld and %ld\n", phase, us, excess_ns);
-
- if (runtime == 0) {
- fprintf(stderr, "\t[W] Predicted runtime change of 0\n");
- }
-
- if (reccnt == 0) {
- fprintf(stderr, "\t[W] Predicted reccnt change of 0\n");
- }
+ m_insertion_rate.store(1.0 - 1.0 / (double)records_to_1us);
}
bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args,
@@ -537,9 +526,9 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- #ifdef DE_PRINT_SHARD_COUNT
- fprintf(stdout, "S\t%ld\n", extension->get_shard_count());
- #endif
+#ifdef DE_PRINT_SHARD_COUNT
+ fprintf(stdout, "S\t%ld\n", extension->get_shard_count());
+#endif
// fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
// fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n",
@@ -694,9 +683,8 @@ private:
extension->m_lock_mngr.release_lock(level, args->version->get_id());
}
- extension->update_insertion_delay(args->predicted_runtime,
- args->tasks.get_total_reccnt(),
- RECON_ENDED);
+ extension->update_insertion_delay(
+ args->predicted_runtime, args->tasks.get_total_reccnt(), RECON_ENDED);
}
if (args->priority == ReconstructionPriority::FLUSH) {
@@ -708,7 +696,6 @@ private:
// args->version->get_id(), recon_id);
//
-
/* manually delete the argument object */
delete args;
}
@@ -937,23 +924,33 @@ private:
args->initial_version = active_version->get_id();
size_t level_idx = 0;
- for (size_t i=0; i< recon.size(); i++) {
- if (recon[i].target < (level_index) active_version->get_structure()->get_level_vector().size()) {
+ for (size_t i = 0; i < recon.size(); i++) {
+ if (recon[i].target < (level_index)active_version->get_structure()
+ ->get_level_vector()
+ .size()) {
level_idx = recon[i].target;
- args->predicted_runtime += active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt);
+ args->predicted_runtime +=
+ active_version->get_structure()
+ ->get_level_vector()[level_idx]
+ ->predict_reconstruction_time(recon[i].reccnt);
} else {
level_idx = recon[i].target - 1;
- args->predicted_runtime += m_config.rt_level_scale * active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt);
+ args->predicted_runtime +=
+ m_config.rt_level_scale *
+ active_version->get_structure()
+ ->get_level_vector()[level_idx]
+ ->predict_reconstruction_time(recon[i].reccnt);
}
}
if (args->predicted_runtime == 0) {
- fprintf(stderr, "Runtime Prediction of 0 for Level %ld with %ld records\n", level_idx, args->tasks.get_total_reccnt());
+ fprintf(stderr,
+ "Runtime Prediction of 0 for Level %ld with %ld records\n",
+ level_idx, args->tasks.get_total_reccnt());
}
update_insertion_delay(args->predicted_runtime,
- args->tasks.get_total_reccnt(),
- RECON_SCHEDULED);
+ args->tasks.get_total_reccnt(), RECON_SCHEDULED);
m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(),
args, RECONSTRUCTION);
}
@@ -1002,7 +999,7 @@ private:
return m_buffer->append(rec, ts);
}
-//#ifdef _GNU_SOURCE
+ //#ifdef _GNU_SOURCE
void set_thread_affinity() {
if constexpr (std::same_as<SchedType, SerialScheduler>) {
return;
diff --git a/include/framework/QueryRequirements.h b/include/framework/QueryRequirements.h
index dcba67e..9f56e4a 100644
--- a/include/framework/QueryRequirements.h
+++ b/include/framework/QueryRequirements.h
@@ -1,7 +1,7 @@
/*
* include/framework/QueryRequirements.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -11,7 +11,7 @@
*/
#pragma once
-#include "framework/structure/BufferView.h"
#include "framework/interface/Record.h"
-#include "framework/interface/Shard.h"
+#include "framework/structure/BufferView.h"
#include "framework/interface/Query.h"
+#include "framework/interface/Shard.h"
diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h
index d054030..335c916 100644
--- a/include/framework/ShardRequirements.h
+++ b/include/framework/ShardRequirements.h
@@ -1,7 +1,7 @@
/*
* include/framework/ShardRequirements.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -11,7 +11,7 @@
*/
#pragma once
-#include "framework/structure/BufferView.h"
+#include "framework/interface/Query.h"
#include "framework/interface/Record.h"
#include "framework/interface/Shard.h"
-#include "framework/interface/Query.h"
+#include "framework/structure/BufferView.h"
diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h
index 97a973d..4e9a3fb 100644
--- a/include/framework/interface/Query.h
+++ b/include/framework/interface/Query.h
@@ -12,7 +12,6 @@
namespace de {
-
/*
*
*
@@ -23,100 +22,99 @@ template <typename QUERY, typename SHARD,
typename PARAMETERS = typename QUERY::Parameters,
typename LOCAL = typename QUERY::LocalQuery,
typename LOCAL_BUFFER = typename QUERY::LocalQueryBuffer>
-concept QueryInterface =
- requires(PARAMETERS *parameters, LOCAL *local, LOCAL_BUFFER *buffer_query,
- SHARD *shard, std::vector<LOCAL *> &local_queries,
- std::vector<LOCAL_RESULT> &local_results, RESULT &result,
- BufferView<typename SHARD::RECORD> *bv) {
- /*
- * Given a set of query parameters and a shard, return a local query
- * object for that shard.
- */
- {
- QUERY::local_preproc(shard, parameters)
- } -> std::convertible_to<LOCAL *>;
+concept QueryInterface = requires(PARAMETERS *parameters, LOCAL *local,
+ LOCAL_BUFFER *buffer_query, SHARD *shard,
+ std::vector<LOCAL *> &local_queries,
+ std::vector<LOCAL_RESULT> &local_results,
+ RESULT &result,
+ BufferView<typename SHARD::RECORD> *bv) {
+ /*
+ * Given a set of query parameters and a shard, return a local query
+ * object for that shard.
+ */
+ { QUERY::local_preproc(shard, parameters) } -> std::convertible_to<LOCAL *>;
- /*
- * Given a set of query parameters and a buffer view, return a local
- * query object for the buffer.
- * NOTE: for interface reasons, the pointer to the buffer view MUST be
- * stored inside of the local query object. The future buffer
- * query routine will access the buffer by way of this pointer.
- */
- {
- QUERY::local_preproc_buffer(bv, parameters)
- } -> std::convertible_to<LOCAL_BUFFER *>;
+ /*
+ * Given a set of query parameters and a buffer view, return a local
+ * query object for the buffer.
+ * NOTE: for interface reasons, the pointer to the buffer view MUST be
+ * stored inside of the local query object. The future buffer
+ * query routine will access the buffer by way of this pointer.
+ */
+ {
+ QUERY::local_preproc_buffer(bv, parameters)
+ } -> std::convertible_to<LOCAL_BUFFER *>;
- /*
- * Given a full set of local queries, and the buffer query, make any
- * necessary adjustments to the local queries in-place, to account for
- * global information. If no additional processing is required, this
- * function can be left empty.
- */
- { QUERY::distribute_query(parameters, local_queries, buffer_query) };
+ /*
+ * Given a full set of local queries, and the buffer query, make any
+ * necessary adjustments to the local queries in-place, to account for
+ * global information. If no additional processing is required, this
+ * function can be left empty.
+ */
+ {QUERY::distribute_query(parameters, local_queries, buffer_query)};
- /*
- * Answer the local query, defined by `local` against `shard` and return
- * a vector of LOCAL_RESULT objects defining the query result.
- */
- { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>;
+ /*
+ * Answer the local query, defined by `local` against `shard` and return
+ * a vector of LOCAL_RESULT objects defining the query result.
+ */
+ { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>;
- /*
- * Answer the local query defined by `local` against the buffer (which
- * should be accessed by a pointer inside of `local`) and return a vector
- * of LOCAL_RESULT objects defining the query result.
- */
- {
- QUERY::local_query_buffer(buffer_query)
- } -> std::convertible_to<LOCAL_RESULT>;
+ /*
+ * Answer the local query defined by `local` against the buffer (which
+ * should be accessed by a pointer inside of `local`) and return a vector
+ * of LOCAL_RESULT objects defining the query result.
+ */
+ {
+ QUERY::local_query_buffer(buffer_query)
+ } -> std::convertible_to<LOCAL_RESULT>;
- /*
- * Process the local results from the buffer and all of the shards,
- * stored in `local_results`, and insert the associated ResultType
- * objects into the `result` vector, which represents the final result
- * of the query. Updates to this vector are done in-place.
- */
- { QUERY::combine(local_results, parameters, result) };
+ /*
+ * Process the local results from the buffer and all of the shards,
+ * stored in `local_results`, and insert the associated ResultType
+ * objects into the `result` vector, which represents the final result
+ * of the query. Updates to this vector are done in-place.
+ */
+ {QUERY::combine(local_results, parameters, result)};
- /*
- * Process the post-combine `result` vector of ResultType objects,
- * in the context of the global and local query parameters, to determine
- * if the query should be repeated. If so, make any necessary adjustments
- * to the local query objects and return True. Otherwise, return False.
- *
- * If no repetition is needed for a given problem type, simply return
- * False immediately and the query will end.
- */
- {
- QUERY::repeat(parameters, result, local_queries, buffer_query)
- } -> std::same_as<bool>;
+ /*
+ * Process the post-combine `result` vector of ResultType objects,
+ * in the context of the global and local query parameters, to determine
+ * if the query should be repeated. If so, make any necessary adjustments
+ * to the local query objects and return True. Otherwise, return False.
+ *
+ * If no repetition is needed for a given problem type, simply return
+ * False immediately and the query will end.
+ */
+ {
+ QUERY::repeat(parameters, result, local_queries, buffer_query)
+ } -> std::same_as<bool>;
- /*
- * If this flag is True, then the query will immediately stop and return
- * a result as soon as the first non-deleted LocalRecordType is found.
- * Otherwise, every Shard and the buffer will be queried and the results
- * merged, like normal.
- *
- * This is largely an optimization flag for use with point-lookup, or
- * other single-record result queries
- */
- { QUERY::EARLY_ABORT } -> std::convertible_to<bool>;
+ /*
+ * If this flag is True, then the query will immediately stop and return
+ * a result as soon as the first non-deleted LocalRecordType is found.
+ * Otherwise, every Shard and the buffer will be queried and the results
+ * merged, like normal.
+ *
+ * This is largely an optimization flag for use with point-lookup, or
+ * other single-record result queries
+ */
+ { QUERY::EARLY_ABORT } -> std::convertible_to<bool>;
- /*
- * If false, the built-in delete filtering that the framework can
- * apply to the local results, prior to calling combine, will be skipped.
- * This general filtering can be inefficient, particularly for tombstone
- * -based deletes, and so if a more efficient manual filtering can be
- * performed, it is worth setting this to True and doing that filtering
- * in the combine step.
- *
- * If deletes are not a consideration for your problem, it's also best
- * to turn this off, as it'll avoid the framework making an extra pass
- * over the local results prior to combining them.
- *
- * TODO: Temporarily disabling this, as we've dropped framework-level
- * delete filtering for the time being.
- */
- /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */
- };
+ /*
+ * If false, the built-in delete filtering that the framework can
+ * apply to the local results, prior to calling combine, will be skipped.
+ * This general filtering can be inefficient, particularly for tombstone
+ * -based deletes, and so if a more efficient manual filtering can be
+ * performed, it is worth setting this to True and doing that filtering
+ * in the combine step.
+ *
+ * If deletes are not a consideration for your problem, it's also best
+ * to turn this off, as it'll avoid the framework making an extra pass
+ * over the local results prior to combining them.
+ *
+ * TODO: Temporarily disabling this, as we've dropped framework-level
+ * delete filtering for the time being.
+ */
+ /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */
+};
} // namespace de
diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h
index d76a6c8..0724bce 100644
--- a/include/framework/interface/Scheduler.h
+++ b/include/framework/interface/Scheduler.h
@@ -14,7 +14,7 @@ template <typename SchedType>
concept SchedulerInterface = requires(SchedType s, size_t i, void *vp,
de::Job j) {
{SchedType(i, i)};
- {s.schedule_job(j, i, vp, i)} -> std::convertible_to<void>;
+ { s.schedule_job(j, i, vp, i) } -> std::convertible_to<void>;
{s.shutdown()};
{s.print_statistics()};
};
diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h
index fb5ce1a..e3a2de4 100644
--- a/include/framework/interface/Shard.h
+++ b/include/framework/interface/Shard.h
@@ -14,8 +14,8 @@ namespace de {
template <typename SHARD>
concept ShardInterface = RecordInterface<typename SHARD::RECORD> &&
- requires(SHARD shard, const std::vector<const SHARD *> &shard_vector, bool b,
- BufferView<typename SHARD::RECORD> bv,
+ requires(SHARD shard, const std::vector<const SHARD *> &shard_vector,
+ bool b, BufferView<typename SHARD::RECORD> bv,
typename SHARD::RECORD rec) {
/* construct a shard from a vector of shards of the same type */
{SHARD(shard_vector)};
@@ -52,7 +52,6 @@ concept ShardInterface = RecordInterface<typename SHARD::RECORD> &&
* use only at the moment
*/
{ shard.get_aux_memory_usage() } -> std::convertible_to<size_t>;
-
};
template <typename SHARD>
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index 61f379e..db4c8d4 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -36,7 +36,8 @@ public:
ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -49,7 +50,8 @@ public:
task.target = target_level;
size_t reccnt = 0;
- if (target_level < (ssize_t)levels.size() && levels[target_level]->get_record_count() > 0) {
+ if (target_level < (ssize_t)levels.size() &&
+ levels[target_level]->get_record_count() > 0) {
task.sources.push_back({target_level, all_shards_idx});
task.type = ReconstructionType::Merge;
} else {
@@ -71,7 +73,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -85,7 +88,8 @@ private:
}
inline size_t capacity(level_index level, size_t reccnt) const {
- double base = std::ceil(m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier));
+ double base = std::ceil(
+ m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier));
return m_buffer_size * (base - 1) * pow(base, level + 1);
}
diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h
index 36556a2..1c000b9 100644
--- a/include/framework/reconstruction/BackgroundTieringPolicy.h
+++ b/include/framework/reconstruction/BackgroundTieringPolicy.h
@@ -16,16 +16,20 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class BackgroundTieringPolicy : public ReconstructionPolicy<ShardType, QueryType> {
+class BackgroundTieringPolicy
+ : public ReconstructionPolicy<ShardType, QueryType> {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size, size_t size_modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {}
-
- std::vector<ReconstructionVector> get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size,
+ size_t size_modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(size_modifier) {}
+
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
std::vector<ReconstructionVector> reconstructions;
auto levels = version->get_structure()->get_level_vector();
@@ -34,7 +38,8 @@ public:
return {};
}
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -44,19 +49,21 @@ public:
}
for (level_index i = target_level; i > source_level; i--) {
- if (lock_mngr.take_lock(i-1, version->get_id())) {
+ if (lock_mngr.take_lock(i - 1, version->get_id())) {
ReconstructionVector recon;
size_t total_reccnt = levels[i - 1]->get_record_count();
std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) {
- shards.push_back({i-1, j});
+ for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count();
+ j++) {
+ shards.push_back({i - 1, j});
}
- recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
+ recon.add_reconstruction(shards, i, total_reccnt,
+ ReconstructionType::Compact);
reconstructions.push_back(recon);
}
}
-
+
return reconstructions;
}
@@ -68,7 +75,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -81,7 +89,9 @@ private:
return target_level;
}
- inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); }
+ inline size_t capacity(size_t reccnt) const {
+ return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier);
+ }
size_t m_scale_factor;
size_t m_buffer_size;
diff --git a/include/framework/reconstruction/CompactOnFull.h b/include/framework/reconstruction/CompactOnFull.h
index f5e0400..f0b549d 100644
--- a/include/framework/reconstruction/CompactOnFull.h
+++ b/include/framework/reconstruction/CompactOnFull.h
@@ -21,11 +21,14 @@ class CompactOnFull : public ReconstructionPolicy<ShardType, QueryType> {
LevelVector;
public:
- CompactOnFull(size_t scale_factor, size_t buffer_size, size_t size_modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {}
-
- std::vector<ReconstructionVector> get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ CompactOnFull(size_t scale_factor, size_t buffer_size,
+ size_t size_modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(size_modifier) {}
+
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
std::vector<ReconstructionVector> reconstructions;
auto levels = version->get_structure()->get_level_vector();
@@ -34,23 +37,24 @@ public:
return {};
}
- for (level_index i=0; i < (ssize_t) levels.size(); i++) {
- if (levels[i]->get_shard_count() >= m_scale_factor && lock_mngr.take_lock(i, version->get_id())) {
+ for (level_index i = 0; i < (ssize_t)levels.size(); i++) {
+ if (levels[i]->get_shard_count() >= m_scale_factor &&
+ lock_mngr.take_lock(i, version->get_id())) {
ReconstructionVector recon;
size_t total_reccnt = levels[i]->get_record_count();
std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t) levels[i]->get_shard_count(); j++) {
- shards.push_back({i,j});
+ for (ssize_t j = 0; j < (ssize_t)levels[i]->get_shard_count(); j++) {
+ shards.push_back({i, j});
}
- recon.add_reconstruction(shards, i+1, total_reccnt, ReconstructionType::Compact);
+ recon.add_reconstruction(shards, i + 1, total_reccnt,
+ ReconstructionType::Compact);
reconstructions.push_back(recon);
}
}
- return reconstructions;
- }
-
+ return reconstructions;
+ }
ReconstructionVector
get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
@@ -60,7 +64,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -73,7 +78,9 @@ private:
return target_level;
}
- inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); }
+ inline size_t capacity(size_t reccnt) const {
+ return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier);
+ }
size_t m_scale_factor;
size_t m_buffer_size;
diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h
index cc8dce4..52d9f39 100644
--- a/include/framework/reconstruction/FixedShardCountPolicy.h
+++ b/include/framework/reconstruction/FixedShardCountPolicy.h
@@ -16,16 +16,20 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class FixedShardCountPolicy : public ReconstructionPolicy<ShardType, QueryType> {
+class FixedShardCountPolicy
+ : public ReconstructionPolicy<ShardType, QueryType> {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count)
- : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {}
+ FixedShardCountPolicy(size_t buffer_size, size_t shard_count,
+ size_t max_record_count)
+ : m_buffer_size(buffer_size), m_shard_count(shard_count),
+ m_max_reccnt(max_record_count) {}
std::vector<ReconstructionVector>
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
@@ -37,26 +41,26 @@ public:
/* if this is the very first flush, there won't be an L1 yet */
if (levels.size() > 1 && levels[1]->get_shard_count() > 0) {
- ShardID last_shid = {1, (shard_index) (levels[1]->get_shard_count() - 1)};
- if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) {
- auto task = ReconstructionTask {
- {{0, 0}, last_shid}, 1, m_buffer_size,ReconstructionType::Merge
- };
+ ShardID last_shid = {1, (shard_index)(levels[1]->get_shard_count() - 1)};
+ if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() +
+ m_buffer_size <=
+ capacity()) {
+ auto task = ReconstructionTask{
+ {{0, 0}, last_shid}, 1, m_buffer_size, ReconstructionType::Merge};
v.add_reconstruction(task);
return v;
}
}
- auto task = ReconstructionTask {
- {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append
- };
+ auto task = ReconstructionTask{
+ {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append};
v.add_reconstruction(task);
return v;
}
private:
inline size_t capacity() const {
- double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count;
+ double bps = (double)m_max_reccnt / (double)m_buffer_size / m_shard_count;
return ceil(bps) * m_buffer_size;
}
diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h
index c0d29fe..7e38e02 100644
--- a/include/framework/reconstruction/FloodL0Policy.h
+++ b/include/framework/reconstruction/FloodL0Policy.h
@@ -24,7 +24,8 @@ public:
FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {}
std::vector<ReconstructionVector>
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index 955bc02..3e0afaa 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -58,8 +58,9 @@ public:
(target_level == 1)
? m_buffer_size + target_reccnt
: levels[target_level - 1]->get_record_count() + target_reccnt;
- auto type = (target_level >= (level_index) levels.size()) ? ReconstructionType::Append
- : ReconstructionType::Merge;
+ auto type = (target_level >= (level_index)levels.size())
+ ? ReconstructionType::Append
+ : ReconstructionType::Merge;
reconstructions.add_reconstruction(target_level - 1, target_level,
total_reccnt, type);
@@ -95,8 +96,9 @@ private:
inline size_t capacity(level_index level, size_t reccnt) const {
return m_buffer_size *
- pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt), m_size_modifier)), level);
-
+ pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt),
+ m_size_modifier)),
+ level);
}
size_t m_scale_factor;
diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h
index 41a2092..3c842b2 100644
--- a/include/framework/reconstruction/ReconstructionPolicy.h
+++ b/include/framework/reconstruction/ReconstructionPolicy.h
@@ -12,19 +12,22 @@
*/
#pragma once
-#include "util/types.h"
-#include "framework/structure/ExtensionStructure.h"
-#include "framework/scheduling/Version.h"
#include "framework/scheduling/LockManager.h"
+#include "framework/scheduling/Version.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "util/types.h"
namespace de {
-template<ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class ReconstructionPolicy {
typedef ExtensionStructure<ShardType, QueryType> StructureType;
public:
ReconstructionPolicy() {}
- virtual std::vector<ReconstructionVector> get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const = 0;
- virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0;
- };
-}
+ virtual std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const = 0;
+ virtual ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0;
+};
+} // namespace de
diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h
index b1fcb49..c16c427 100644
--- a/include/framework/reconstruction/TieringPolicy.h
+++ b/include/framework/reconstruction/TieringPolicy.h
@@ -21,11 +21,13 @@ class TieringPolicy : public ReconstructionPolicy<ShardType, QueryType> {
LevelVector;
public:
- TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(modifier) {}
+ TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(modifier) {}
- std::vector<ReconstructionVector> get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
@@ -34,7 +36,8 @@ public:
ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -47,12 +50,13 @@ public:
size_t total_reccnt = levels[i - 1]->get_record_count();
std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) {
- shards.push_back({i-1, j});
+ for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count(); j++) {
+ shards.push_back({i - 1, j});
}
if (total_reccnt > 0 || shards.size() > 0) {
- reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
+ reconstructions.add_reconstruction(shards, i, total_reccnt,
+ ReconstructionType::Compact);
}
}
@@ -60,7 +64,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -74,7 +79,8 @@ private:
}
inline size_t capacity(size_t reccnt) const {
- return std::ceil((double) m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier));
+ return std::ceil((double)m_scale_factor *
+ std::pow<double>(std::log10(reccnt), m_size_modifier));
}
size_t m_scale_factor;
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 7583727..a642b31 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -60,7 +60,9 @@ public:
Structure *get_mutable_structure() { return m_structure; }
- BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); }
+ BufView get_buffer() const {
+ return m_buffer->get_buffer_view(m_buffer_head);
+ }
/*
* Returns a new Epoch object that is a copy of this one. The new object
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 8fbe07c..ef6dde5 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -61,7 +61,8 @@ public:
std::unique_lock<std::mutex> lk(m_cv_lock);
m_stats.job_queued(ts, type, size);
- m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
+ m_task_queue.push(
+ Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
m_cv.notify_all();
}
@@ -81,7 +82,6 @@ private:
[[maybe_unused]] size_t m_memory_budget;
size_t m_thrd_cnt;
-
std::atomic<size_t> m_counter;
std::mutex m_cv_lock;
std::condition_variable m_cv;
diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h
index a40cf7a..820b975 100644
--- a/include/framework/scheduling/LockManager.h
+++ b/include/framework/scheduling/LockManager.h
@@ -3,15 +3,15 @@
*/
#pragma once
-#include <deque>
#include <atomic>
#include <cassert>
+#include <deque>
namespace de {
class LockManager {
public:
- LockManager(size_t levels=1) {
- for (size_t i=0; i < levels; i++) {
+ LockManager(size_t levels = 1) {
+ for (size_t i = 0; i < levels; i++) {
m_lks.emplace_back(false);
}
@@ -20,9 +20,7 @@ public:
~LockManager() = default;
- void add_lock() {
- m_lks.emplace_back(false);
- }
+ void add_lock() { m_lks.emplace_back(false); }
void release_lock(size_t idx, size_t version) {
if (idx < m_lks.size()) {
@@ -72,17 +70,13 @@ public:
return false;
}
- bool is_buffer_locked() {
- return m_buffer_lk.load();
- }
+ bool is_buffer_locked() { return m_buffer_lk.load(); }
- void release_buffer_lock() {
- m_buffer_lk.store(false);
- }
+ void release_buffer_lock() { m_buffer_lk.store(false); }
private:
std::deque<std::atomic<bool>> m_lks;
std::atomic<bool> m_buffer_lk;
std::atomic<size_t> m_last_unlocked_version;
};
-}
+} // namespace de
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index b6ebe53..6a8ed58 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -37,7 +37,8 @@ public:
t(0);
}
- void shutdown() { /* intentionally left blank */ }
+ void shutdown() { /* intentionally left blank */
+ }
void print_statistics() { m_stats.print_statistics(); }
void print_query_time_data() { m_stats.print_query_time_data(); }
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 4529b2e..00ddbbb 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -14,9 +14,9 @@
#pragma once
#include <chrono>
+#include <condition_variable>
#include <functional>
#include <future>
-#include <condition_variable>
#include "framework/scheduling/Version.h"
#include "framework/scheduling/statistics.h"
@@ -24,11 +24,7 @@
namespace de {
-enum class ReconstructionPriority {
- FLUSH = 0,
- CMPCT = 1,
- MAINT = 2
-};
+enum class ReconstructionPriority { FLUSH = 0, CMPCT = 1, MAINT = 2 };
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
struct ReconstructionArgs {
@@ -51,7 +47,8 @@ typedef std::function<void(void *)> Job;
struct Task {
Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
- SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr)
+ SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr,
+ std::condition_variable *cv = nullptr)
: m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
m_stats(stats), m_lk(lk), m_cv(cv) {}
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index be54c84..bbcbe25 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -25,18 +25,17 @@ private:
typedef BufferView<RecordType> BufferViewType;
public:
-
Version(size_t vid = 0)
: m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {}
- Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff,
- size_t head)
+ Version(size_t number, std::unique_ptr<StructureType> structure,
+ BufferType *buff, size_t head)
: m_buffer(buff), m_structure(std::move(structure)), m_id(number),
m_buffer_head(head) {
- if (m_buffer) {
- m_buffer->take_head_reference(m_buffer_head);
- }
- }
+ if (m_buffer) {
+ m_buffer->take_head_reference(m_buffer_head);
+ }
+ }
~Version() {
if (m_buffer) {
@@ -55,7 +54,7 @@ public:
size_t get_id() const { return m_id; }
- void set_id(size_t id) { m_id = id;}
+ void set_id(size_t id) { m_id = id; }
const StructureType *get_structure() const { return m_structure.get(); }
@@ -107,10 +106,7 @@ public:
m_structure->update_shard_version(version);
}
- size_t get_head() {
- return m_buffer_head;
- }
-
+ size_t get_head() { return m_buffer_head; }
void set_buffer(BufferType *buffer, size_t head) {
assert(m_buffer == nullptr);
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
index 6d9f9f0..8cb6dbd 100644
--- a/include/framework/scheduling/statistics.h
+++ b/include/framework/scheduling/statistics.h
@@ -16,12 +16,12 @@
#include <atomic>
#include <cassert>
#include <chrono>
+#include <cmath>
#include <cstdint>
#include <cstdlib>
#include <mutex>
#include <unordered_map>
#include <vector>
-#include <cmath>
namespace de {
@@ -68,7 +68,6 @@ private:
queue_time)
.count();
}
-
};
public:
@@ -117,10 +116,10 @@ public:
size_t worst_query = 0;
size_t first_query = UINT64_MAX;
-
for (auto &job : m_jobs) {
if (job.second.type != 1) {
- fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size, job.second.runtime(), job.second.runtime() / (job.second.size));
+ fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size,
+ job.second.runtime(), job.second.runtime() / (job.second.size));
}
if (job.first < first_query) {
@@ -149,7 +148,6 @@ public:
query_cnt++;
}
-
int64_t average_queue_time = (query_cnt) ? total_queue_time / query_cnt : 0;
int64_t average_runtime = (query_cnt) ? total_runtime / query_cnt : 0;
@@ -162,28 +160,37 @@ public:
continue;
}
- queue_deviation_sum += std::pow(job.second.time_in_queue() - average_queue_time, 2);
- runtime_deviation_sum += std::pow(job.second.runtime() - average_runtime, 2);
-
+ queue_deviation_sum +=
+ std::pow(job.second.time_in_queue() - average_queue_time, 2);
+ runtime_deviation_sum +=
+ std::pow(job.second.runtime() - average_runtime, 2);
}
- int64_t queue_stddev = (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0;
- int64_t runtime_stddev = (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0;
-
-
- fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n", query_cnt, worst_query, first_query);
- fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", average_queue_time, min_queue_time, max_queue_time, queue_stddev);
- fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\tStandard Deviation: %ld\n", average_runtime, min_runtime, max_runtime, runtime_stddev);
+ int64_t queue_stddev =
+ (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0;
+ int64_t runtime_stddev =
+ (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0;
+
+ fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n",
+ query_cnt, worst_query, first_query);
+ fprintf(stdout,
+ "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t "
+ "Max Scheduling Delay: %ld\tStandard Deviation: %ld\n",
+ average_queue_time, min_queue_time, max_queue_time, queue_stddev);
+ fprintf(stdout,
+ "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query "
+ "Latency: %ld\tStandard Deviation: %ld\n",
+ average_runtime, min_runtime, max_runtime, runtime_stddev);
}
-
void print_query_time_data() {
std::unique_lock<std::mutex> lk(m_mutex);
update_job_data_from_log();
for (auto &job : m_jobs) {
if (job.second.type == 1) {
- fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), job.second.runtime(), job.second.end_to_end_time());
+ fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(),
+ job.second.runtime(), job.second.end_to_end_time());
}
}
}
@@ -211,18 +218,18 @@ private:
auto &job = m_jobs.at(event.id);
switch (event.type) {
- case EventType::QUEUED:
- job.queue_time = event.time;
- break;
- case EventType::FINISHED:
- job.stop_time = event.time;
- break;
- case EventType::SCHEDULED:
- job.scheduled_time = event.time;
- break;
- case EventType::STARTED:
- job.start_time = event.time;
- break;
+ case EventType::QUEUED:
+ job.queue_time = event.time;
+ break;
+ case EventType::FINISHED:
+ job.stop_time = event.time;
+ break;
+ case EventType::SCHEDULED:
+ job.scheduled_time = event.time;
+ break;
+ case EventType::STARTED:
+ job.start_time = event.time;
+ break;
}
}
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index a9fb12d..afe5dbb 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -46,10 +46,9 @@ public:
BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail,
size_t tombstone_cnt, psudb::BloomFilter<R> *filter)
- : m_data(buffer), m_head(head), m_tail(tail),
- m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap),
- m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter),
- m_active(true) {}
+ : m_data(buffer), m_head(head), m_tail(tail), m_start(m_head % cap),
+ m_stop(m_tail % cap), m_cap(cap), m_approx_ts_cnt(tombstone_cnt),
+ m_tombstone_filter(filter), m_active(true) {}
~BufferView() = default;
@@ -104,9 +103,7 @@ public:
*/
size_t get_tombstone_count() { return m_approx_ts_cnt; }
- Wrapped<R> *get(size_t i) {
- return m_data + to_idx(i);
- }
+ Wrapped<R> *get(size_t i) { return m_data + to_idx(i); }
void copy_to_buffer(psudb::byte *buffer) {
/* check if the region to be copied circles back to start. If so, do it in
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index bb8a480..e03c7ad 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -27,12 +27,14 @@ class ExtensionStructure {
typedef BufferView<RecordType> BuffView;
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
+
public:
- ExtensionStructure(bool default_level=true) {
+ ExtensionStructure(bool default_level = true) {
if (default_level)
- m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ m_levels.emplace_back(
+ std::make_shared<InternalLevel<ShardType, QueryType>>(0));
}
-
+
~ExtensionStructure() = default;
/*
@@ -162,34 +164,37 @@ public:
return cnt;
}
-
/*
* Perform the reconstruction described by task. If the resulting
* reconstruction grows the structure (i.e., adds a level), returns
* true. Otherwise, returns false.
*/
- inline reconstruction_results<ShardType> perform_reconstruction(ReconstructionTask task) const {
+ inline reconstruction_results<ShardType>
+ perform_reconstruction(ReconstructionTask task) const {
reconstruction_results<ShardType> result;
result.target_level = task.target;
/* if there is only one source, then we don't need to actually rebuild */
if (task.sources.size() == 1) {
auto shid = task.sources[0];
- if (shid.shard_idx == all_shards_idx && m_levels[shid.level_idx]->get_shard_count() > 1) {
+ if (shid.shard_idx == all_shards_idx &&
+ m_levels[shid.level_idx]->get_shard_count() > 1) {
/* there's more than one shard, so we need to do the reconstruction */
} else {
- auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx);
+ auto raw_shard_ptr =
+ m_levels[shid.level_idx]->get_shard(shid.shard_idx);
assert(raw_shard_ptr);
result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr);
- result.new_shard = m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first;
+ result.new_shard =
+ m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first;
return result;
}
}
-
- std::vector<const ShardType*> shards;
+
+ std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx < (level_index) m_levels.size());
+ assert(shid.level_idx < (level_index)m_levels.size());
assert(shid.shard_idx >= -1);
auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx);
@@ -197,12 +202,13 @@ public:
result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr);
}
-
auto start = std::chrono::high_resolution_clock::now();
result.new_shard = std::make_shared<ShardType>(shards);
auto stop = std::chrono::high_resolution_clock::now();
- result.runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop- start).count();
+ result.runtime =
+ std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start)
+ .count();
result.reccnt = result.new_shard->get_record_count();
return result;
@@ -221,11 +227,10 @@ public:
return queries;
}
- size_t l0_size() const {
- return m_levels[0]->get_shard_count();
- }
+ size_t l0_size() const { return m_levels[0]->get_shard_count(); }
- bool apply_reconstruction(reconstruction_results<ShardType> &recon, size_t version) {
+ bool apply_reconstruction(reconstruction_results<ShardType> &recon,
+ size_t version) {
bool res = append_shard(recon.new_shard, version, recon.target_level);
m_levels[recon.target_level]->update_reconstruction_model(recon);
delete_shards(recon.source_shards);
@@ -233,13 +238,15 @@ public:
return res;
}
- bool append_shard(std::shared_ptr<ShardType> shard, size_t version, size_t level) {
+ bool append_shard(std::shared_ptr<ShardType> shard, size_t version,
+ size_t level) {
assert(level <= m_levels.size());
auto rc = false;
if (level == m_levels.size()) {
/* grow the structure */
- m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(level));
+ m_levels.push_back(
+ std::make_shared<InternalLevel<ShardType, QueryType>>(level));
rc = true;
}
@@ -248,12 +255,15 @@ public:
return rc;
}
- void delete_shards(std::vector<std::pair<level_index, const ShardType*>> shards) {
- for (size_t i=0; i<shards.size(); i++) {
- assert(shards[i].first < (level_index) m_levels.size());
+ void
+ delete_shards(std::vector<std::pair<level_index, const ShardType *>> shards) {
+ for (size_t i = 0; i < shards.size(); i++) {
+ assert(shards[i].first < (level_index)m_levels.size());
ssize_t shard_idx = -1;
- for (size_t j=0; j<m_levels[shards[i].first]->get_shard_count(); j++) {
- if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == shards[i].second) {
+ for (size_t j = 0; j < m_levels[shards[i].first]->get_shard_count();
+ j++) {
+ if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() ==
+ shards[i].second) {
shard_idx = j;
break;
}
@@ -262,7 +272,8 @@ public:
if (shard_idx != -1) {
m_levels[shards[i].first]->delete_shard(shard_idx);
} else {
- fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", shards[i].first, shards[i].second);
+ fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n",
+ shards[i].first, shards[i].second);
exit(EXIT_FAILURE);
}
}
@@ -270,51 +281,55 @@ public:
LevelVector const &get_level_vector() const { return m_levels; }
-
- /*
- * Validate that no level in the structure exceeds its maximum tombstone
- * capacity. This is used to trigger preemptive compactions at the end of
- * the reconstruction process.
- */
- bool validate_tombstone_proportion(double max_delete_prop) const {
- long double ts_prop;
- for (size_t i = 0; i < m_levels.size(); i++) {
- if (m_levels[i]) {
- ts_prop = (long double)m_levels[i]->get_tombstone_count() /
- (long double)m_levels[i]->get_record_count();
- if (ts_prop > (long double)max_delete_prop) {
- return false;
- }
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone
+ * capacity. This is used to trigger preemptive compactions at the end of
+ * the reconstruction process.
+ */
+ bool validate_tombstone_proportion(double max_delete_prop) const {
+ long double ts_prop;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double)m_levels[i]->get_tombstone_count() /
+ (long double)m_levels[i]->get_record_count();
+ if (ts_prop > (long double)max_delete_prop) {
+ return false;
}
}
-
- return true;
}
- bool validate_tombstone_proportion(level_index level, double max_delete_prop) const {
- long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count();
- return ts_prop <= (long double) max_delete_prop;
- }
+ return true;
+ }
- void print_structure(bool debug=false) const {
- for (size_t i=0; i<m_levels.size(); i++) {
- if (debug) {
- fprintf(stdout, "[D] [%ld]:\t", i);
- } else {
- fprintf(stdout, "[%ld]:\t", i);
- }
+ bool validate_tombstone_proportion(level_index level,
+ double max_delete_prop) const {
+ long double ts_prop = (long double)m_levels[level]->get_tombstone_count() /
+ (long double)m_levels[level]->get_record_count();
+ return ts_prop <= (long double)max_delete_prop;
+ }
- if (m_levels[i]) {
- for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
- fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, m_levels[i]->get_shard_ptr(j).second, m_levels[i]->get_shard_ptr(j).first.get(), m_levels[i]->get_shard(j)->get_record_count());
- }
- } else {
- fprintf(stdout, "[Empty]");
- }
+ void print_structure(bool debug = false) const {
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (debug) {
+ fprintf(stdout, "[D] [%ld]:\t", i);
+ } else {
+ fprintf(stdout, "[%ld]:\t", i);
+ }
- fprintf(stdout, "\n");
+ if (m_levels[i]) {
+ for (size_t j = 0; j < m_levels[i]->get_shard_count(); j++) {
+ fprintf(stdout, "(%ld, %ld, %p: %ld) ", j,
+ m_levels[i]->get_shard_ptr(j).second,
+ m_levels[i]->get_shard_ptr(j).first.get(),
+ m_levels[i]->get_shard(j)->get_record_count());
+ }
+ } else {
+ fprintf(stdout, "[Empty]");
}
- }
+
+ fprintf(stdout, "\n");
+ }
+ }
private:
LevelVector m_levels;
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 205dbf9..cf2b16a 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -15,11 +15,11 @@
*/
#pragma once
+#include <algorithm>
+#include <deque>
#include <future>
#include <memory>
#include <vector>
-#include <deque>
-#include <algorithm>
#include "framework/interface/Query.h"
#include "framework/interface/Shard.h"
@@ -34,7 +34,7 @@ class InternalLevel {
typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr;
public:
- InternalLevel(ssize_t level_no) : m_level_no(level_no) { }
+ InternalLevel(ssize_t level_no) : m_level_no(level_no) {}
~InternalLevel() = default;
@@ -109,7 +109,7 @@ public:
idx = 0;
}
- if (idx >= (ssize_t) m_shards.size()) {
+ if (idx >= (ssize_t)m_shards.size()) {
return nullptr;
}
@@ -183,7 +183,7 @@ public:
}
size_t get_nonempty_shard_count() const {
- size_t cnt = 0;
+ size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) {
cnt += 1;
@@ -199,7 +199,7 @@ public:
new_level->append(m_shards[i].first, m_shards[i].second);
}
- for (auto itr=m_rt_window.begin(); itr < m_rt_window.end(); itr++) {
+ for (auto itr = m_rt_window.begin(); itr < m_rt_window.end(); itr++) {
new_level->m_rt_window.push_front(*itr);
}
@@ -208,23 +208,21 @@ public:
void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); }
- void delete_shard(shard_index shard, bool log_delete=true) {
+ void delete_shard(shard_index shard, bool log_delete = true) {
size_t before = m_shards.size();
m_shards.erase(m_shards.begin() + shard);
size_t after = m_shards.size();
- assert( before > after);
+ assert(before > after);
}
- void append(std::shared_ptr<ShardType> shard, size_t version=0) {
+ void append(std::shared_ptr<ShardType> shard, size_t version = 0) {
m_shards.push_back({shard, version});
}
- void append(shard_ptr shard) {
- m_shards.push_back(shard);
- }
+ void append(shard_ptr shard) { m_shards.push_back(shard); }
const shard_ptr get_shard_ptr(ssize_t idx) const {
- if (idx >= 0 && idx < (ssize_t) m_shards.size()) {
+ if (idx >= 0 && idx < (ssize_t)m_shards.size()) {
return m_shards[idx];
} else if (idx == all_shards_idx && m_shards.size() == 1) {
return m_shards[0];
@@ -247,7 +245,7 @@ public:
size_t total = 0;
for (auto rt : m_rt_window) {
total += rt;
- }
+ }
return total / m_rt_window.size();
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 7357915..156b718 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -97,7 +97,9 @@ public:
return true;
}
- size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; }
+ size_t get_record_count() const {
+ return m_tail.load() - m_head.load().head_idx;
+ }
size_t get_capacity() const { return m_cap; }
@@ -139,14 +141,16 @@ public:
m_active_head_advance.store(true);
if (m_old_head.load().refcnt > 0) {
- //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts [2]\n");
+ // fprintf(stderr, "[W]: Refusing to advance head due to remaining
+ // reference counts [2]\n");
m_active_head_advance.store(false);
return false;
}
- // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", m_old_head.load().head_idx, m_head.load().head_idx, new_head);
- // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt);
-
+ // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n",
+ // m_old_head.load().head_idx, m_head.load().head_idx, new_head);
+ // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt,
+ // m_head.load().refcnt);
buffer_head new_hd = {new_head, 1};
buffer_head cur_hd;
@@ -163,7 +167,6 @@ public:
return true;
}
-
void set_low_watermark(size_t lwm) {
assert(lwm < m_hwm);
m_lwm = lwm;
@@ -197,13 +200,9 @@ public:
return m_cap - (m_tail.load() - m_old_head.load().head_idx);
}
- size_t debug_get_old_head() const {
- return m_old_head.load().head_idx;
- }
+ size_t debug_get_old_head() const { return m_old_head.load().head_idx; }
- size_t debug_get_head() const {
- return m_head.load().head_idx;
- }
+ size_t debug_get_head() const { return m_head.load().head_idx; }
bool take_head_reference(size_t target_head) {
buffer_head cur_hd, new_hd;
@@ -225,7 +224,7 @@ public:
return head_acquired;
}
-
+
bool release_head_reference(size_t head) {
buffer_head cur_hd, new_hd;
bool head_released = false;
@@ -261,8 +260,8 @@ private:
buffer_head cur_hd, new_hd;
bool head_acquired = false;
-
- //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx);
+ // fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head,
+ // m_old_head.load().head_idx, m_head.load().head_idx);
do {
if (m_old_head.load().head_idx == target_head) {
cur_hd = m_old_head.load();
@@ -279,7 +278,7 @@ private:
return new_hd.head_idx;
}
-
+
ssize_t try_advance_tail() {
size_t old_value = m_tail.load();
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 3ae3492..e2484d5 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -8,48 +8,49 @@
*/
#pragma once
+#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
#include "util/types.h"
-#include "framework/interface/Scheduler.h"
#include <cstdlib>
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
-DeletePolicy D, SchedulerInterface SchedType>
+ DeletePolicy D, SchedulerInterface SchedType>
class DEConfiguration {
- public:
- DEConfiguration(std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy)
- : recon_policy(std::move(recon_policy)) {}
+public:
+ DEConfiguration(
+ std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy)
+ : recon_policy(std::move(recon_policy)) {}
- std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy;
+ std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy;
- /* buffer parameters */
- size_t buffer_count = 1;
- size_t buffer_size = 8000;
- size_t buffer_flush_trigger = buffer_size / 2;
+ /* buffer parameters */
+ size_t buffer_count = 1;
+ size_t buffer_size = 8000;
+ size_t buffer_flush_trigger = buffer_size / 2;
- /* reconstruction triggers */
- bool recon_enable_seek_trigger = false;
- bool recon_enable_maint_on_flush = false;
- bool recon_enable_delete_cmpct = false;
- bool recon_maint_disabled = true;
+ /* reconstruction triggers */
+ bool recon_enable_seek_trigger = false;
+ bool recon_enable_maint_on_flush = false;
+ bool recon_enable_delete_cmpct = false;
+ bool recon_maint_disabled = true;
- size_t recon_l0_capacity = 0; /* 0 for unbounded */
- double maximum_delete_proportion = 1;
+ size_t recon_l0_capacity = 0; /* 0 for unbounded */
+ double maximum_delete_proportion = 1;
- /* resource management */
- size_t maximum_threads = 16;
- size_t minimum_recon_threads = 1;
- size_t minimum_query_threads = 4;
- size_t maximum_memory_usage = 0; /* o for unbounded */
+ /* resource management */
+ size_t maximum_threads = 16;
+ size_t minimum_recon_threads = 1;
+ size_t minimum_query_threads = 4;
+ size_t maximum_memory_usage = 0; /* o for unbounded */
- size_t physical_core_count = 6;
+ size_t physical_core_count = 6;
- size_t buffer_flush_query_preemption_trigger = UINT64_MAX;
+ size_t buffer_flush_query_preemption_trigger = UINT64_MAX;
- bool dynamic_ratelimiting = false;
- size_t rt_level_scale = 1;
+ bool dynamic_ratelimiting = false;
+ size_t rt_level_scale = 1;
};
} // namespace de
diff --git a/include/query/irs.h b/include/query/irs.h
index ec6fa29..eb584ae 100644
--- a/include/query/irs.h
+++ b/include/query/irs.h
@@ -64,7 +64,7 @@ public:
if (query->lower_idx == shard->get_record_count()) {
query->total_weight = 0;
} else {
- query->total_weight = query->upper_idx - query->lower_idx;
+ query->total_weight = query->upper_idx - query->lower_idx;
}
query->sample_size = 0;
@@ -192,8 +192,7 @@ public:
return result_set;
}
- static LocalResultType
- local_query_buffer(LocalQueryBuffer *query) {
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
LocalResultType result;
result.reserve(query->sample_size);
@@ -221,9 +220,8 @@ public:
return result;
}
- static void
- combine(std::vector<LocalResultType> const &local_results,
- Parameters *parms, ResultType &output) {
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
output.emplace_back(local_results[i][j]);
diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h
index b42ec85..ab3d93f 100644
--- a/include/query/pointlookup.h
+++ b/include/query/pointlookup.h
@@ -40,7 +40,7 @@ public:
typedef std::vector<Wrapped<R>> LocalResultType;
typedef std::vector<R> ResultType;
-
+
constexpr static bool EARLY_ABORT = false;
constexpr static bool SKIP_DELETE_FILTER = true;
@@ -58,7 +58,7 @@ public:
return query;
}
-
+
static void distribute_query(Parameters *parms,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
@@ -76,7 +76,7 @@ public:
return result;
}
-
+
static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
LocalResultType result;
@@ -91,11 +91,9 @@ public:
return result;
}
-
- static void
- combine(std::vector<LocalResultType> const &local_results,
- Parameters *parms, ResultType &output) {
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (auto r : local_results) {
if (r.size() > 0) {
if (r[0].is_deleted() || r[0].is_tombstone()) {
@@ -107,7 +105,7 @@ public:
}
}
}
-
+
static bool repeat(Parameters *parms, ResultType &output,
std::vector<LocalQuery *> const &local_queries,
LocalQueryBuffer *buffer_query) {
diff --git a/include/query/rangecount.h b/include/query/rangecount.h
index 0743ee6..89774a8 100644
--- a/include/query/rangecount.h
+++ b/include/query/rangecount.h
@@ -40,8 +40,8 @@ public:
size_t record_count;
size_t tombstone_count;
- bool is_deleted() {return false;}
- bool is_tombstone() {return false;}
+ bool is_deleted() { return false; }
+ bool is_tombstone() { return false; }
};
typedef size_t ResultType;
@@ -116,8 +116,7 @@ public:
return result;
}
- static LocalResultType
- local_query_buffer(LocalQueryBuffer *query) {
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
LocalResultType result = {0, 0};
for (size_t i = 0; i < query->buffer->get_record_count(); i++) {
@@ -137,9 +136,8 @@ public:
return result;
}
- static void
- combine(std::vector<LocalResultType> const &local_results,
- Parameters *parms, ResultType &output) {
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
size_t reccnt = 0;
size_t tscnt = 0;
diff --git a/include/query/wss.h b/include/query/wss.h
index d4e75f3..311de78 100644
--- a/include/query/wss.h
+++ b/include/query/wss.h
@@ -139,7 +139,8 @@ public:
for (size_t i = 0; i < query->sample_size; i++) {
size_t idx = shard->get_weighted_sample(query->global_parms.rng);
- if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) {
+ if (!shard->get_record_at(idx)->is_deleted() &&
+ !shard->get_record_at(idx)->is_tombstone()) {
result.emplace_back(shard->get_record_at(idx)->rec);
}
}
@@ -147,8 +148,7 @@ public:
return result;
}
- static LocalResultType
- local_query_buffer(LocalQueryBuffer *query) {
+ static LocalResultType local_query_buffer(LocalQueryBuffer *query) {
LocalResultType result;
for (size_t i = 0; i < query->sample_size; i++) {
@@ -156,7 +156,8 @@ public:
auto rec = query->buffer->get(idx);
auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight;
- if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) {
+ if (test <= rec->rec.weight && !rec->is_deleted() &&
+ !rec->is_tombstone()) {
result.emplace_back(rec->rec);
}
}
@@ -164,9 +165,8 @@ public:
return result;
}
- static void
- combine(std::vector<LocalResultType> const &local_results,
- Parameters *parms, ResultType &output) {
+ static void combine(std::vector<LocalResultType> const &local_results,
+ Parameters *parms, ResultType &output) {
for (size_t i = 0; i < local_results.size(); i++) {
for (size_t j = 0; j < local_results[i].size(); j++) {
output.emplace_back(local_results[i][j]);
diff --git a/include/shard/Alias.h b/include/shard/Alias.h
index 15b0884..c176fa2 100644
--- a/include/shard/Alias.h
+++ b/include/shard/Alias.h
@@ -20,186 +20,163 @@
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
-#include "util/bf_config.h"
#include "util/SortedMerge.h"
+#include "util/bf_config.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
using psudb::byte;
+using psudb::CACHELINE_SIZE;
namespace de {
-template <WeightedRecordInterface R>
-class Alias {
+template <WeightedRecordInterface R> class Alias {
public:
- typedef R RECORD;
-private:
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
- typedef decltype(R::weight) W;
+ typedef R RECORD;
+private:
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ typedef decltype(R::weight) W;
public:
- Alias(BufferView<R> buffer)
- : m_data(nullptr)
- , m_alias(nullptr)
- , m_total_weight(0)
- , m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0)
- , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) {
-
-
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- buffer.get_record_count() *
- sizeof(Wrapped<R>),
- (byte**) &m_data);
-
- auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
-
- if (m_reccnt > 0) {
- std::vector<W> weights;
- for (size_t i=0; i<m_reccnt; i++) {
- weights.emplace_back(m_data[i].rec.weight);
- m_total_weight += m_data[i].rec.weight;
- }
-
- build_alias_structure(weights);
- }
+ Alias(BufferView<R> buffer)
+ : m_data(nullptr), m_alias(nullptr), m_total_weight(0), m_reccnt(0),
+ m_tombstone_cnt(0), m_alloc_size(0),
+ m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(),
+ BF_HASH_FUNCS)) {
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>),
+ (byte **)&m_data);
+
+ auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
+
+ if (m_reccnt > 0) {
+ std::vector<W> weights;
+ for (size_t i = 0; i < m_reccnt; i++) {
+ weights.emplace_back(m_data[i].rec.weight);
+ m_total_weight += m_data[i].rec.weight;
+ }
+
+ build_alias_structure(weights);
}
+ }
- Alias(std::vector<const Alias*> const &shards)
- : m_data(nullptr)
- , m_alias(nullptr)
- , m_total_weight(0)
- , m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0)
- , m_bf(nullptr) {
-
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, Alias>(shards, &attemp_reccnt, &tombstone_count);
-
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- attemp_reccnt * sizeof(Wrapped<R>),
- (byte **) &m_data);
-
- auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
-
- if (m_reccnt > 0) {
- std::vector<W> weights;
- for (size_t i=0; i<m_reccnt; i++) {
- weights.emplace_back(m_data[i].rec.weight);
- m_total_weight += m_data[i].rec.weight;
- }
-
- build_alias_structure(weights);
- }
- }
-
- ~Alias() {
- free(m_data);
- delete m_alias;
- delete m_bf;
- }
+ Alias(std::vector<const Alias *> const &shards)
+ : m_data(nullptr), m_alias(nullptr), m_total_weight(0), m_reccnt(0),
+ m_tombstone_cnt(0), m_alloc_size(0), m_bf(nullptr) {
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- if (filter && !m_bf->lookup(rec)) {
- return nullptr;
- }
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors =
+ build_cursor_vec<R, Alias>(shards, &attemp_reccnt, &tombstone_count);
- size_t idx = get_lower_bound(rec.key);
- if (idx >= m_reccnt) {
- return nullptr;
- }
+ m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data);
- while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx;
+ auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
+ m_reccnt = res.record_count;
+ m_tombstone_cnt = res.tombstone_count;
- if (m_data[idx].rec == rec) {
- return m_data + idx;
- }
+ if (m_reccnt > 0) {
+ std::vector<W> weights;
+ for (size_t i = 0; i < m_reccnt; i++) {
+ weights.emplace_back(m_data[i].rec.weight);
+ m_total_weight += m_data[i].rec.weight;
+ }
- return nullptr;
+ build_alias_structure(weights);
}
+ }
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ ~Alias() {
+ free(m_data);
+ delete m_alias;
+ delete m_bf;
+ }
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) {
+ if (filter && !m_bf->lookup(rec)) {
+ return nullptr;
}
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
+ size_t idx = get_lower_bound(rec.key);
+ if (idx >= m_reccnt) {
+ return nullptr;
}
+ while (idx < (m_reccnt - 1) && m_data[idx].rec < rec)
+ ++idx;
- size_t get_memory_usage() const {
- return 0;
+ if (m_data[idx].rec == rec) {
+ return m_data + idx;
}
- size_t get_aux_memory_usage() const {
- return (m_bf) ? m_bf->memory_usage() : 0;
- }
+ return nullptr;
+ }
- W get_total_weight() const {
- return m_total_weight;
- }
+ Wrapped<R> *get_data() const { return m_data; }
- size_t get_weighted_sample(gsl_rng *rng) const {
- return m_alias->get(rng);
- }
+ size_t get_record_count() const { return m_reccnt; }
- size_t get_lower_bound(const K& key) const {
- size_t min = 0;
- size_t max = m_reccnt - 1;
+ size_t get_tombstone_count() const { return m_tombstone_cnt; }
- while (min < max) {
- size_t mid = (min + max) / 2;
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- if (key > m_data[mid].rec.key) {
- min = mid + 1;
- } else {
- max = mid;
- }
- }
+ size_t get_memory_usage() const { return 0; }
- return min;
- }
+ size_t get_aux_memory_usage() const {
+ return (m_bf) ? m_bf->memory_usage() : 0;
+ }
-private:
+ W get_total_weight() const { return m_total_weight; }
+
+ size_t get_weighted_sample(gsl_rng *rng) const { return m_alias->get(rng); }
- void build_alias_structure(std::vector<W> &weights) {
+ size_t get_lower_bound(const K &key) const {
+ size_t min = 0;
+ size_t max = m_reccnt - 1;
- // normalize the weights vector
- std::vector<double> norm_weights(weights.size());
+ while (min < max) {
+ size_t mid = (min + max) / 2;
+
+ if (key > m_data[mid].rec.key) {
+ min = mid + 1;
+ } else {
+ max = mid;
+ }
+ }
+
+ return min;
+ }
+
+private:
+ void build_alias_structure(std::vector<W> &weights) {
- for (size_t i=0; i<weights.size(); i++) {
- norm_weights[i] = (double) weights[i] / (double) m_total_weight;
- }
+ // normalize the weights vector
+ std::vector<double> norm_weights(weights.size());
- // build the alias structure
- m_alias = new psudb::Alias(norm_weights);
+ for (size_t i = 0; i < weights.size(); i++) {
+ norm_weights[i] = (double)weights[i] / (double)m_total_weight;
}
- Wrapped<R>* m_data;
- psudb::Alias *m_alias;
- W m_total_weight;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_alloc_size;
- BloomFilter<R> *m_bf;
+ // build the alias structure
+ m_alias = new psudb::Alias(norm_weights);
+ }
+
+ Wrapped<R> *m_data;
+ psudb::Alias *m_alias;
+ W m_total_weight;
+ size_t m_reccnt;
+ size_t m_tombstone_cnt;
+ size_t m_alloc_size;
+ BloomFilter<R> *m_bf;
};
-}
+} // namespace de
diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h
index 59ff116..31db40b 100644
--- a/include/shard/FSTrie.h
+++ b/include/shard/FSTrie.h
@@ -9,194 +9,182 @@
*/
#pragma once
-
#include <vector>
#include "framework/ShardRequirements.h"
#include "fst.hpp"
#include "util/SortedMerge.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
+using psudb::byte;
+using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::byte;
namespace de {
-template <KVPInterface R>
-class FSTrie {
+template <KVPInterface R> class FSTrie {
public:
- typedef R RECORD;
-private:
+ typedef R RECORD;
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
- static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys.");
+private:
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ static_assert(std::is_same_v<K, const char *>,
+ "FST requires const char* keys.");
public:
- FSTrie(BufferView<R> buffer)
- : m_data(nullptr)
- , m_reccnt(0)
- , m_alloc_size(0)
- {
- m_data = new Wrapped<R>[buffer.get_record_count()]();
- m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
-
- size_t cnt = 0;
- std::vector<std::string> keys;
- keys.reserve(buffer.get_record_count());
-
- /*
- * Copy the contents of the buffer view into a temporary buffer, and
- * sort them. We still need to iterate over these temporary records to
- * apply tombstone/deleted record filtering, as well as any possible
- * per-record processing that is required by the shard being built.
- */
- auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- temp_buffer[i] = *(buffer.get(i));
- }
-
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
+ FSTrie(BufferView<R> buffer) : m_data(nullptr), m_reccnt(0), m_alloc_size(0) {
+ m_data = new Wrapped<R>[buffer.get_record_count()]();
+ m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
+
+ size_t cnt = 0;
+ std::vector<std::string> keys;
+ keys.reserve(buffer.get_record_count());
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ temp_buffer[i] = *(buffer.get(i));
+ }
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key[0] != '\0') {
- continue;
- }
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
- m_data[cnt] = temp_buffer[i];
- m_data[cnt].clear_timestamp();
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() ||
+ temp_buffer[i].rec.key[0] != '\0') {
+ continue;
+ }
- keys.push_back(std::string(m_data[cnt].rec.key));
- cnt++;
- }
+ m_data[cnt] = temp_buffer[i];
+ m_data[cnt].clear_timestamp();
- m_reccnt = cnt;
- if (m_reccnt > 0) {
- m_fst = new fst::Trie(keys, true, 1);
- }
+ keys.push_back(std::string(m_data[cnt].rec.key));
+ cnt++;
+ }
- delete[] temp_buffer;
+ m_reccnt = cnt;
+ if (m_reccnt > 0) {
+ m_fst = new fst::Trie(keys, true, 1);
}
- FSTrie(std::vector<const FSTrie*> const &shards)
- : m_data(nullptr)
- , m_reccnt(0)
- , m_alloc_size(0)
- {
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count);
-
- m_data = new Wrapped<R>[attemp_reccnt]();
- m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
-
- std::vector<std::string> keys;
- keys.reserve(attemp_reccnt);
-
- // FIXME: For smaller cursor arrays, it may be more efficient to skip
- // the priority queue and just do a scan.
- PriorityQueue<Wrapped<R>> pq(cursors.size());
- for (size_t i=0; i<cursors.size(); i++) {
- pq.push(cursors[i].ptr, i);
- }
+ delete[] temp_buffer;
+ }
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- /*
- * if the current record is not a tombstone, and the next record is
- * a tombstone that matches the current one, then the current one
- * has been deleted, and both it and its tombstone can be skipped
- * over.
- */
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- /* skip over records that have been deleted via tagging */
- if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key[0] != '\0') {
- m_data[m_reccnt] = *cursor.ptr;
- keys.push_back(std::string(m_data[m_reccnt].rec.key));
-
- m_reccnt++;
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ FSTrie(std::vector<const FSTrie *> const &shards)
+ : m_data(nullptr), m_reccnt(0), m_alloc_size(0) {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors =
+ build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count);
- if (m_reccnt > 0) {
- m_fst = new fst::Trie(keys, true, 1);
- }
+ m_data = new Wrapped<R>[attemp_reccnt]();
+ m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
+
+ std::vector<std::string> keys;
+ keys.reserve(attemp_reccnt);
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i = 0; i < cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
}
- ~FSTrie() {
- delete[] m_data;
- delete m_fst;
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next =
+ pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[now.version];
+ auto &cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key[0] != '\0') {
+ m_data[m_reccnt] = *cursor.ptr;
+ keys.push_back(std::string(m_data[m_reccnt].rec.key));
+
+ m_reccnt++;
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor))
+ pq.push(cursor.ptr, now.version);
+ }
}
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+ if (m_reccnt > 0) {
+ m_fst = new fst::Trie(keys, true, 1);
+ }
+ }
- auto idx = m_fst->exactSearch(rec.key);
+ ~FSTrie() {
+ delete[] m_data;
+ delete m_fst;
+ }
- if (idx == fst::kNotFound) {
- return nullptr;
- }
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) {
- // FIXME: for convenience, I'm treating this Trie as a unique index
- // for now, so no need to scan forward and/or check values. This
- // also makes the point lookup query class a lot easier to make.
- // Ultimately, though, we can support non-unique indexes with some
- // extra work.
+ auto idx = m_fst->exactSearch(rec.key);
- return m_data + idx;
+ if (idx == fst::kNotFound) {
+ return nullptr;
}
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ // FIXME: for convenience, I'm treating this Trie as a unique index
+ // for now, so no need to scan forward and/or check values. This
+ // also makes the point lookup query class a lot easier to make.
+ // Ultimately, though, we can support non-unique indexes with some
+ // extra work.
- size_t get_tombstone_count() const {
- return 0;
- }
+ return m_data + idx;
+ }
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ Wrapped<R> *get_data() const { return m_data; }
+ size_t get_record_count() const { return m_reccnt; }
- size_t get_memory_usage() const {
- return m_fst->getMemoryUsage();
- }
+ size_t get_tombstone_count() const { return 0; }
- size_t get_aux_memory_usage() const {
- return m_alloc_size;
- }
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- size_t get_lower_bound(R &rec) {return 0;}
- size_t get_upper_bound(R &rec) {return 0;}
+ size_t get_memory_usage() const { return m_fst->getMemoryUsage(); }
-private:
+ size_t get_aux_memory_usage() const { return m_alloc_size; }
+
+ size_t get_lower_bound(R &rec) { return 0; }
+ size_t get_upper_bound(R &rec) { return 0; }
- Wrapped<R>* m_data;
- size_t m_reccnt;
- size_t m_alloc_size;
- fst::Trie *m_fst;
+private:
+ Wrapped<R> *m_data;
+ size_t m_reccnt;
+ size_t m_alloc_size;
+ fst::Trie *m_fst;
};
-}
+} // namespace de
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h
index f6b525f..6722aaf 100644
--- a/include/shard/ISAMTree.h
+++ b/include/shard/ISAMTree.h
@@ -121,7 +121,9 @@ public:
size_t get_memory_usage() const { return m_internal_node_cnt * NODE_SZ; }
- size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; }
+ size_t get_aux_memory_usage() const {
+ return (m_bf) ? m_bf->memory_usage() : 0;
+ }
/* SortedShardInterface methods */
size_t get_lower_bound(const K &key) const {
diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h
index fe0c30e..0b7c74c 100644
--- a/include/shard/LoudsPatricia.h
+++ b/include/shard/LoudsPatricia.h
@@ -9,191 +9,179 @@
*/
#pragma once
-
#include <vector>
#include "framework/ShardRequirements.h"
#include "louds-patricia.hpp"
#include "util/SortedMerge.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
+using psudb::byte;
+using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::byte;
namespace de {
-template <KVPInterface R>
-class LoudsPatricia {
+template <KVPInterface R> class LoudsPatricia {
private:
-
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
- static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys.");
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ static_assert(std::is_same_v<K, const char *>,
+ "FST requires const char* keys.");
public:
- LoudsPatricia(BufferView<R> buffer)
- : m_data(nullptr)
- , m_reccnt(0)
- , m_alloc_size(0)
- {
- m_data = new Wrapped<R>[buffer.get_record_count()]();
- m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
-
- m_louds = new louds::Patricia();
-
- size_t cnt = 0;
- std::vector<std::string> keys;
- keys.reserve(buffer.get_record_count());
-
- /*
- * Copy the contents of the buffer view into a temporary buffer, and
- * sort them. We still need to iterate over these temporary records to
- * apply tombstone/deleted record filtering, as well as any possible
- * per-record processing that is required by the shard being built.
- */
- auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- temp_buffer[i] = *(buffer.get(i));
- }
-
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
+ LoudsPatricia(BufferView<R> buffer)
+ : m_data(nullptr), m_reccnt(0), m_alloc_size(0) {
+ m_data = new Wrapped<R>[buffer.get_record_count()]();
+ m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
+
+ m_louds = new louds::Patricia();
+
+ size_t cnt = 0;
+ std::vector<std::string> keys;
+ keys.reserve(buffer.get_record_count());
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ temp_buffer[i] = *(buffer.get(i));
+ }
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") {
- continue;
- }
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
- m_data[cnt] = temp_buffer[i];
- m_data[cnt].clear_timestamp();
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() ||
+ temp_buffer[i].rec.key == "") {
+ continue;
+ }
- m_louds->add(std::string(m_data[cnt].rec.key));
- cnt++;
- }
+ m_data[cnt] = temp_buffer[i];
+ m_data[cnt].clear_timestamp();
- m_reccnt = cnt;
- if (m_reccnt > 0) {
- m_louds->build();
- }
+ m_louds->add(std::string(m_data[cnt].rec.key));
+ cnt++;
+ }
- delete[] temp_buffer;
+ m_reccnt = cnt;
+ if (m_reccnt > 0) {
+ m_louds->build();
}
- LoudsPatricia(std::vector<const LoudsPatricia*> &shards)
- : m_data(nullptr)
- , m_reccnt(0)
- , m_alloc_size(0)
- {
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt, &tombstone_count);
-
- m_data = new Wrapped<R>[attemp_reccnt]();
- m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
-
- m_louds = new louds::Patricia();
-
- // FIXME: For smaller cursor arrays, it may be more efficient to skip
- // the priority queue and just do a scan.
- PriorityQueue<Wrapped<R>> pq(cursors.size());
- for (size_t i=0; i<cursors.size(); i++) {
- pq.push(cursors[i].ptr, i);
- }
+ delete[] temp_buffer;
+ }
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- /*
- * if the current record is not a tombstone, and the next record is
- * a tombstone that matches the current one, then the current one
- * has been deleted, and both it and its tombstone can be skipped
- * over.
- */
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- /* skip over records that have been deleted via tagging */
- if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") {
- m_data[m_reccnt] = *cursor.ptr;
- m_louds->add(std::string(m_data[m_reccnt].rec.key));
- m_reccnt++;
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ LoudsPatricia(std::vector<const LoudsPatricia *> &shards)
+ : m_data(nullptr), m_reccnt(0), m_alloc_size(0) {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt,
+ &tombstone_count);
- if (m_reccnt > 0) {
- m_louds->build();
+ m_data = new Wrapped<R>[attemp_reccnt]();
+ m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
+
+ m_louds = new louds::Patricia();
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i = 0; i < cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
+
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next =
+ pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[now.version];
+ auto &cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") {
+ m_data[m_reccnt] = *cursor.ptr;
+ m_louds->add(std::string(m_data[m_reccnt].rec.key));
+ m_reccnt++;
}
+ pq.pop();
+
+ if (advance_cursor(cursor))
+ pq.push(cursor.ptr, now.version);
+ }
}
- ~LoudsPatricia() {
- delete[] m_data;
- delete m_louds;
+ if (m_reccnt > 0) {
+ m_louds->build();
}
+ }
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+ ~LoudsPatricia() {
+ delete[] m_data;
+ delete m_louds;
+ }
- auto idx = m_louds->lookup(std::string(rec.key));
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) {
- if (idx == -1) {
- return nullptr;
- }
+ auto idx = m_louds->lookup(std::string(rec.key));
- // FIXME: for convenience, I'm treating this Trie as a unique index
- // for now, so no need to scan forward and/or check values. This
- // also makes the point lookup query class a lot easier to make.
- // Ultimately, though, we can support non-unique indexes with some
- // extra work.
- return m_data + idx;
+ if (idx == -1) {
+ return nullptr;
}
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ // FIXME: for convenience, I'm treating this Trie as a unique index
+ // for now, so no need to scan forward and/or check values. This
+ // also makes the point lookup query class a lot easier to make.
+ // Ultimately, though, we can support non-unique indexes with some
+ // extra work.
+ return m_data + idx;
+ }
- size_t get_tombstone_count() const {
- return 0;
- }
+ Wrapped<R> *get_data() const { return m_data; }
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ size_t get_record_count() const { return m_reccnt; }
+ size_t get_tombstone_count() const { return 0; }
- size_t get_memory_usage() const {
- return m_louds->size();
- }
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- size_t get_aux_memory_usage() const {
- return m_alloc_size;
- }
+ size_t get_memory_usage() const { return m_louds->size(); }
- size_t get_lower_bound(R &rec) {return 0;}
- size_t get_upper_bound(R &rec) {return 0;}
+ size_t get_aux_memory_usage() const { return m_alloc_size; }
-private:
+ size_t get_lower_bound(R &rec) { return 0; }
+ size_t get_upper_bound(R &rec) { return 0; }
- Wrapped<R>* m_data;
- size_t m_reccnt;
- size_t m_alloc_size;
- louds::Patricia *m_louds;
+private:
+ Wrapped<R> *m_data;
+ size_t m_reccnt;
+ size_t m_alloc_size;
+ louds::Patricia *m_louds;
};
-}
+} // namespace de
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index 5b39ab4..40c9141 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -13,7 +13,6 @@
*/
#pragma once
-
#include <vector>
#include "framework/ShardRequirements.h"
@@ -23,278 +22,268 @@
#include "util/SortedMerge.h"
#include "util/bf_config.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
+using psudb::byte;
+using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::byte;
namespace de {
-template <RecordInterface R, size_t epsilon=128>
-class PGM {
+template <RecordInterface R, size_t epsilon = 128> class PGM {
public:
- typedef R RECORD;
+ typedef R RECORD;
+
private:
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
public:
- PGM(BufferView<R> buffer)
- : m_bf(nullptr)
- , m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0) {
-
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- buffer.get_record_count() *
- sizeof(Wrapped<R>),
- (byte**) &m_data);
-
- std::vector<K> keys;
- /*
- * Copy the contents of the buffer view into a temporary buffer, and
- * sort them. We still need to iterate over these temporary records to
- * apply tombstone/deleted record filtering, as well as any possible
- * per-record processing that is required by the shard being built.
- */
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
- buffer.get_record_count(),
- sizeof(Wrapped<R>));
- buffer.copy_to_buffer((byte *) temp_buffer);
-
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- merge_info info = {0, 0};
-
- /*
- * Iterate over the temporary buffer to process the records, copying
- * them into buffer as needed
- */
- while (base < stop) {
- if (!base->is_tombstone() && (base + 1 < stop)
- && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
-
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- keys.emplace_back(base->rec.key);
- m_data[info.record_count++] = *base;
-
- if (base->is_tombstone()) {
- info.tombstone_count++;
- if (m_bf){
- m_bf->insert(base->rec);
- }
- }
-
- base++;
+ PGM(BufferView<R> buffer)
+ : m_bf(nullptr), m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0) {
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>),
+ (byte **)&m_data);
+
+ std::vector<K> keys;
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc(
+ CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>));
+ buffer.copy_to_buffer((byte *)temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ merge_info info = {0, 0};
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop) &&
+ base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted()) {
+ base += 1;
+ continue;
+ }
+
+ // FIXME: this shouldn't be necessary, but the tagged record
+ // bypass doesn't seem to be working on this code-path, so this
+ // ensures that tagged records from the buffer are able to be
+ // dropped, eventually. It should only need to be &= 1
+ base->header &= 3;
+ keys.emplace_back(base->rec.key);
+ m_data[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(base->rec);
}
+ }
- free(temp_buffer);
-
- m_reccnt = info.record_count;
- m_tombstone_cnt = info.tombstone_count;
-
- if (m_reccnt > 0) {
- m_pgm = pgm::PGMIndex<K, epsilon>(keys);
- }
+ base++;
}
- PGM(std::vector<const PGM*> const &shards)
- : m_data(nullptr)
- , m_bf(nullptr)
- , m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0) {
-
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count);
-
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- attemp_reccnt * sizeof(Wrapped<R>),
- (byte **) &m_data);
- std::vector<K> keys;
-
- // FIXME: For smaller cursor arrays, it may be more efficient to skip
- // the priority queue and just do a scan.
- PriorityQueue<Wrapped<R>> pq(cursors.size());
- for (size_t i=0; i<cursors.size(); i++) {
- pq.push(cursors[i].ptr, i);
- }
+ free(temp_buffer);
- merge_info info = {0, 0};
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- /*
- * if the current record is not a tombstone, and the next record is
- * a tombstone that matches the current one, then the current one
- * has been deleted, and both it and its tombstone can be skipped
- * over.
- */
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- /* skip over records that have been deleted via tagging */
- if (!cursor.ptr->is_deleted()) {
- keys.emplace_back(cursor.ptr->rec.key);
- m_data[info.record_count++] = *cursor.ptr;
-
- /*
- * if the record is a tombstone, increment the ts count and
- * insert it into the bloom filter if one has been
- * provided.
- */
- if (cursor.ptr->is_tombstone()) {
- info.tombstone_count++;
- if (m_bf) {
- m_bf->insert(cursor.ptr->rec);
- }
- }
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
- m_reccnt = info.record_count;
- m_tombstone_cnt = info.tombstone_count;
+ if (m_reccnt > 0) {
+ m_pgm = pgm::PGMIndex<K, epsilon>(keys);
+ }
+ }
+
+ PGM(std::vector<const PGM *> const &shards)
+ : m_data(nullptr), m_bf(nullptr), m_reccnt(0), m_tombstone_cnt(0),
+ m_alloc_size(0) {
+
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors =
+ build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count);
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data);
+ std::vector<K> keys;
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i = 0; i < cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
- if (m_reccnt > 0) {
- m_pgm = pgm::PGMIndex<K, epsilon>(keys);
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next =
+ pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[now.version];
+ auto &cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted()) {
+ keys.emplace_back(cursor.ptr->rec.key);
+ m_data[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
+ */
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(cursor.ptr->rec);
+ }
+ }
}
- }
+ pq.pop();
- ~PGM() {
- free(m_data);
- delete m_bf;
+ if (advance_cursor(cursor))
+ pq.push(cursor.ptr, now.version);
+ }
}
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) const {
- size_t idx = get_lower_bound(rec.key);
- if (idx >= m_reccnt) {
- return nullptr;
- }
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
+ if (m_reccnt > 0) {
+ m_pgm = pgm::PGMIndex<K, epsilon>(keys);
+ }
+ }
- if (m_data[idx].rec == rec) {
- return m_data + idx;
- }
+ ~PGM() {
+ free(m_data);
+ delete m_bf;
+ }
- return nullptr;
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) const {
+ size_t idx = get_lower_bound(rec.key);
+ if (idx >= m_reccnt) {
+ return nullptr;
}
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ while (idx < m_reccnt && m_data[idx].rec < rec)
+ ++idx;
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
+ if (m_data[idx].rec == rec) {
+ return m_data + idx;
}
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ return nullptr;
+ }
+ Wrapped<R> *get_data() const { return m_data; }
- size_t get_memory_usage() const {
- return m_pgm.size_in_bytes();
- }
+ size_t get_record_count() const { return m_reccnt; }
- size_t get_aux_memory_usage() const {
- return (m_bf) ? m_bf->memory_usage() : 0;
- }
+ size_t get_tombstone_count() const { return m_tombstone_cnt; }
- size_t get_lower_bound(const K& key) const {
- auto bound = m_pgm.search(key);
- size_t idx = bound.lo;
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- if (idx >= m_reccnt) {
- return m_reccnt;
- }
+ size_t get_memory_usage() const { return m_pgm.size_in_bytes(); }
- /*
- * If the region to search is less than some pre-specified
- * amount, perform a linear scan to locate the record.
- */
- if (bound.hi - bound.lo < 256) {
- while (idx < bound.hi && m_data[idx].rec.key < key) {
- idx++;
- }
- } else {
- /* Otherwise, perform a binary search */
- idx = bound.lo;
- size_t max = bound.hi;
-
- while (idx < max) {
- size_t mid = (idx + max) / 2;
- if (key > m_data[mid].rec.key) {
- idx = mid + 1;
- } else {
- max = mid;
- }
- }
+ size_t get_aux_memory_usage() const {
+ return (m_bf) ? m_bf->memory_usage() : 0;
+ }
- }
+ size_t get_lower_bound(const K &key) const {
+ auto bound = m_pgm.search(key);
+ size_t idx = bound.lo;
- /*
- * the upper bound returned by PGM is one passed the end of the
- * array. If we are at that point, we should just return "not found"
- */
- if (idx == m_reccnt) {
- return idx;
- }
+ if (idx >= m_reccnt) {
+ return m_reccnt;
+ }
- /*
- * We may have walked one passed the actual lower bound, so check
- * the index before the current one to see if it is the actual bound
- */
- if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) {
- return idx-1;
+ /*
+ * If the region to search is less than some pre-specified
+ * amount, perform a linear scan to locate the record.
+ */
+ if (bound.hi - bound.lo < 256) {
+ while (idx < bound.hi && m_data[idx].rec.key < key) {
+ idx++;
+ }
+ } else {
+ /* Otherwise, perform a binary search */
+ idx = bound.lo;
+ size_t max = bound.hi;
+
+ while (idx < max) {
+ size_t mid = (idx + max) / 2;
+ if (key > m_data[mid].rec.key) {
+ idx = mid + 1;
+ } else {
+ max = mid;
}
+ }
+ }
+
+ /*
+ * the upper bound returned by PGM is one passed the end of the
+ * array. If we are at that point, we should just return "not found"
+ */
+ if (idx == m_reccnt) {
+ return idx;
+ }
- /*
- * Otherwise, check idx. If it is a valid bound, then return it,
- * otherwise return "not found".
- */
- return (m_data[idx].rec.key >= key) ? idx : m_reccnt;
+ /*
+ * We may have walked one passed the actual lower bound, so check
+ * the index before the current one to see if it is the actual bound
+ */
+ if (m_data[idx].rec.key > key && idx > 0 &&
+ m_data[idx - 1].rec.key <= key) {
+ return idx - 1;
}
+ /*
+ * Otherwise, check idx. If it is a valid bound, then return it,
+ * otherwise return "not found".
+ */
+ return (m_data[idx].rec.key >= key) ? idx : m_reccnt;
+ }
+
private:
- Wrapped<R>* m_data;
- BloomFilter<R> *m_bf;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_alloc_size;
- K m_max_key;
- K m_min_key;
- pgm::PGMIndex<K, epsilon> m_pgm;
+ Wrapped<R> *m_data;
+ BloomFilter<R> *m_bf;
+ size_t m_reccnt;
+ size_t m_tombstone_cnt;
+ size_t m_alloc_size;
+ K m_max_key;
+ K m_min_key;
+ pgm::PGMIndex<K, epsilon> m_pgm;
};
-}
+} // namespace de
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 1a04afc..2cd514d 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -11,319 +11,306 @@
*/
#pragma once
-
#include <vector>
#include "framework/ShardRequirements.h"
-#include "ts/builder.h"
#include "psu-ds/BloomFilter.h"
-#include "util/bf_config.h"
+#include "ts/builder.h"
#include "util/SortedMerge.h"
+#include "util/bf_config.h"
-using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
+using psudb::byte;
+using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
using psudb::queue_record;
-using psudb::byte;
namespace de {
-template <KVPInterface R, size_t E=1024>
-class TrieSpline {
+template <KVPInterface R, size_t E = 1024> class TrieSpline {
public:
- typedef R RECORD;
+ typedef R RECORD;
+
private:
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
public:
- TrieSpline(BufferView<R> buffer)
- : m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0)
- , m_max_key(0)
- , m_min_key(0)
- , m_bf(nullptr)
- {
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- buffer.get_record_count() *
- sizeof(Wrapped<R>),
- (byte**) &m_data);
+ TrieSpline(BufferView<R> buffer)
+ : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0),
+ m_min_key(0), m_bf(nullptr) {
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>),
+ (byte **)&m_data);
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *)psudb::sf_aligned_calloc(
+ CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped<R>));
+ buffer.copy_to_buffer((byte *)temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ auto tmp_min_key = temp_buffer[0].rec.key;
+ auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key;
+ auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+
+ merge_info info = {0, 0};
+
+ m_min_key = tmp_min_key;
+ m_max_key = tmp_max_key;
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop) &&
+ base->rec == (base + 1)->rec && (base + 1)->is_tombstone() &&
+ base->rec.key != m_max_key && base->rec.key != m_min_key) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted() && base->rec.key != m_max_key &&
+ base->rec.key != m_min_key) {
+ base += 1;
+ continue;
+ }
+
+ base->header &= 3;
+ bldr.AddKey(base->rec.key);
+ m_data[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(base->rec);
+ }
+ }
- /*
- * Copy the contents of the buffer view into a temporary buffer, and
- * sort them. We still need to iterate over these temporary records to
- * apply tombstone/deleted record filtering, as well as any possible
- * per-record processing that is required by the shard being built.
- */
- auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
- buffer.get_record_count(),
- sizeof(Wrapped<R>));
- buffer.copy_to_buffer((byte *) temp_buffer);
+ base++;
+ }
- auto base = temp_buffer;
- auto stop = base + buffer.get_record_count();
- std::sort(base, stop, std::less<Wrapped<R>>());
+ free(temp_buffer);
- auto tmp_min_key = temp_buffer[0].rec.key;
- auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key;
- auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
- merge_info info = {0, 0};
+ if (m_reccnt > 50) {
+ m_ts = bldr.Finalize();
+ }
+ }
+
+ TrieSpline(std::vector<const TrieSpline *> const &shards)
+ : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0),
+ m_min_key(0), m_bf(nullptr) {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt,
+ &tombstone_count);
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data);
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i = 0; i < cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
- m_min_key = tmp_min_key;
- m_max_key = tmp_max_key;
+ auto tmp_max_key = shards[0]->m_max_key;
+ auto tmp_min_key = shards[0]->m_min_key;
- /*
- * Iterate over the temporary buffer to process the records, copying
- * them into buffer as needed
- */
- while (base < stop) {
- if (!base->is_tombstone() && (base + 1 < stop)
- && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()
- && base->rec.key != m_max_key && base->rec.key != m_min_key) {
- base += 2;
- continue;
- } else if (base->is_deleted() && base->rec.key != m_max_key && base->rec.key != m_min_key) {
- base += 1;
- continue;
- }
+ for (size_t i = 0; i < shards.size(); i++) {
+ if (shards[i]->m_max_key > tmp_max_key) {
+ tmp_max_key = shards[i]->m_max_key;
+ }
- base->header &= 3;
- bldr.AddKey(base->rec.key);
- m_data[info.record_count++] = *base;
+ if (shards[i]->m_min_key < tmp_min_key) {
+ tmp_min_key = shards[i]->m_min_key;
+ }
+ }
- if (base->is_tombstone()) {
- info.tombstone_count++;
- if (m_bf){
- m_bf->insert(base->rec);
- }
+ auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+
+ m_max_key = tmp_max_key;
+ m_min_key = tmp_min_key;
+
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next =
+ pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over. Unless the tombstone would remove the maximum or
+ * minimum valued key, which cannot be removed at this point
+ * without breaking triespline
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone() &&
+ now.data->rec.key != tmp_max_key &&
+ now.data->rec.key != tmp_min_key) {
+
+ pq.pop();
+ pq.pop();
+ auto &cursor1 = cursors[now.version];
+ auto &cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1))
+ pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2))
+ pq.push(cursor2.ptr, next.version);
+ } else {
+ auto &cursor = cursors[now.version];
+ /*
+ * skip over records that have been deleted via tagging,
+ * unless they are the max or min keys, which cannot be
+ * removed without breaking triespline
+ */
+ if (!cursor.ptr->is_deleted() || cursor.ptr->rec.key == tmp_max_key ||
+ cursor.ptr->rec.key == tmp_min_key) {
+ bldr.AddKey(cursor.ptr->rec.key);
+ m_data[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
+ */
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(cursor.ptr->rec);
}
-
- base++;
+ }
}
+ pq.pop();
- free(temp_buffer);
-
- m_reccnt = info.record_count;
- m_tombstone_cnt = info.tombstone_count;
-
- if (m_reccnt > 50) {
- m_ts = bldr.Finalize();
- }
+ if (advance_cursor(cursor))
+ pq.push(cursor.ptr, now.version);
+ }
}
- TrieSpline(std::vector<const TrieSpline*> const &shards)
- : m_reccnt(0)
- , m_tombstone_cnt(0)
- , m_alloc_size(0)
- , m_max_key(0)
- , m_min_key(0)
- , m_bf(nullptr)
- {
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
- auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count);
-
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- attemp_reccnt * sizeof(Wrapped<R>),
- (byte **) &m_data);
-
- // FIXME: For smaller cursor arrays, it may be more efficient to skip
- // the priority queue and just do a scan.
- PriorityQueue<Wrapped<R>> pq(cursors.size());
- for (size_t i=0; i<cursors.size(); i++) {
- pq.push(cursors[i].ptr, i);
- }
-
- auto tmp_max_key = shards[0]->m_max_key;
- auto tmp_min_key = shards[0]->m_min_key;
-
- for (size_t i=0; i<shards.size(); i++) {
- if (shards[i]->m_max_key > tmp_max_key) {
- tmp_max_key = shards[i]->m_max_key;
- }
-
- if (shards[i]->m_min_key < tmp_min_key) {
- tmp_min_key = shards[i]->m_min_key;
- }
- }
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
- auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
-
- m_max_key = tmp_max_key;
- m_min_key = tmp_min_key;
-
- merge_info info = {0, 0};
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- /*
- * if the current record is not a tombstone, and the next record is
- * a tombstone that matches the current one, then the current one
- * has been deleted, and both it and its tombstone can be skipped
- * over. Unless the tombstone would remove the maximum or
- * minimum valued key, which cannot be removed at this point
- * without breaking triespline
- */
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()
- && now.data->rec.key != tmp_max_key && now.data->rec.key != tmp_min_key) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- /*
- * skip over records that have been deleted via tagging,
- * unless they are the max or min keys, which cannot be
- * removed without breaking triespline
- */
- if (!cursor.ptr->is_deleted() ||
- cursor.ptr->rec.key == tmp_max_key || cursor.ptr->rec.key == tmp_min_key) {
- bldr.AddKey(cursor.ptr->rec.key);
- m_data[info.record_count++] = *cursor.ptr;
-
- /*
- * if the record is a tombstone, increment the ts count and
- * insert it into the bloom filter if one has been
- * provided.
- */
- if (cursor.ptr->is_tombstone()) {
- info.tombstone_count++;
- if (m_bf) {
- m_bf->insert(cursor.ptr->rec);
- }
- }
- }
- pq.pop();
-
- if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
+ if (m_reccnt > 50) {
+ m_ts = bldr.Finalize();
+ }
+ }
- m_reccnt = info.record_count;
- m_tombstone_cnt = info.tombstone_count;
+ ~TrieSpline() {
+ free(m_data);
+ delete m_bf;
+ }
- if (m_reccnt > 50) {
- m_ts = bldr.Finalize();
- }
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) const {
+ if (filter && m_bf && !m_bf->lookup(rec)) {
+ return nullptr;
}
- ~TrieSpline() {
- free(m_data);
- delete m_bf;
+ size_t idx = get_lower_bound(rec.key);
+ if (idx >= m_reccnt) {
+ return nullptr;
}
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) const {
- if (filter && m_bf && !m_bf->lookup(rec)) {
- return nullptr;
- }
-
- size_t idx = get_lower_bound(rec.key);
- if (idx >= m_reccnt) {
- return nullptr;
- }
-
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
-
- if (m_data[idx].rec == rec) {
- return m_data + idx;
- }
+ while (idx < m_reccnt && m_data[idx].rec < rec)
+ ++idx;
- return nullptr;
+ if (m_data[idx].rec == rec) {
+ return m_data + idx;
}
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
+ return nullptr;
+ }
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
- }
+ Wrapped<R> *get_data() const { return m_data; }
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ size_t get_record_count() const { return m_reccnt; }
+ size_t get_tombstone_count() const { return m_tombstone_cnt; }
- size_t get_memory_usage() const {
- return m_ts.GetSize();
- }
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- size_t get_aux_memory_usage() const {
- return (m_bf) ? m_bf->memory_usage() : 0;
- }
+ size_t get_memory_usage() const { return m_ts.GetSize(); }
- size_t get_lower_bound(const K& key) const {
- if (m_reccnt < 50) {
- size_t bd = m_reccnt;
- for (size_t i=0; i<m_reccnt; i++) {
- if (m_data[i].rec.key >= key) {
- bd = i;
- break;
- }
- }
+ size_t get_aux_memory_usage() const {
+ return (m_bf) ? m_bf->memory_usage() : 0;
+ }
- return bd;
+ size_t get_lower_bound(const K &key) const {
+ if (m_reccnt < 50) {
+ size_t bd = m_reccnt;
+ for (size_t i = 0; i < m_reccnt; i++) {
+ if (m_data[i].rec.key >= key) {
+ bd = i;
+ break;
}
+ }
- auto bound = m_ts.GetSearchBound(key);
- size_t idx = bound.begin;
+ return bd;
+ }
- if (idx >= m_reccnt) {
- return m_reccnt;
- }
+ auto bound = m_ts.GetSearchBound(key);
+ size_t idx = bound.begin;
- // If the region to search is less than some pre-specified
- // amount, perform a linear scan to locate the record.
- if (bound.end - bound.begin < 256) {
- while (idx < bound.end && m_data[idx].rec.key < key) {
- idx++;
- }
- } else {
- // Otherwise, perform a binary search
- idx = bound.begin;
- size_t max = bound.end;
-
- while (idx < max) {
- size_t mid = (idx + max) / 2;
- if (key > m_data[mid].rec.key) {
- idx = mid + 1;
- } else {
- max = mid;
- }
- }
- }
+ if (idx >= m_reccnt) {
+ return m_reccnt;
+ }
- if (idx == m_reccnt) {
- return m_reccnt;
+ // If the region to search is less than some pre-specified
+ // amount, perform a linear scan to locate the record.
+ if (bound.end - bound.begin < 256) {
+ while (idx < bound.end && m_data[idx].rec.key < key) {
+ idx++;
+ }
+ } else {
+ // Otherwise, perform a binary search
+ idx = bound.begin;
+ size_t max = bound.end;
+
+ while (idx < max) {
+ size_t mid = (idx + max) / 2;
+ if (key > m_data[mid].rec.key) {
+ idx = mid + 1;
+ } else {
+ max = mid;
}
+ }
+ }
- if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) {
- return idx-1;
- }
+ if (idx == m_reccnt) {
+ return m_reccnt;
+ }
- return idx;
+ if (m_data[idx].rec.key > key && idx > 0 &&
+ m_data[idx - 1].rec.key <= key) {
+ return idx - 1;
}
-private:
+ return idx;
+ }
- Wrapped<R>* m_data;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_alloc_size;
- K m_max_key;
- K m_min_key;
- ts::TrieSpline<K> m_ts;
- BloomFilter<R> *m_bf;
+private:
+ Wrapped<R> *m_data;
+ size_t m_reccnt;
+ size_t m_tombstone_cnt;
+ size_t m_alloc_size;
+ K m_max_key;
+ K m_min_key;
+ ts::TrieSpline<K> m_ts;
+ BloomFilter<R> *m_bf;
};
-}
+} // namespace de
diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h
index 7130efe..33ce9b9 100644
--- a/include/shard/VPTree.h
+++ b/include/shard/VPTree.h
@@ -15,379 +15,376 @@
#include <vector>
-#include <unordered_map>
#include "framework/ShardRequirements.h"
#include "psu-ds/PriorityQueue.h"
+#include <unordered_map>
+using psudb::byte;
using psudb::CACHELINE_SIZE;
using psudb::PriorityQueue;
-using psudb::byte;
namespace de {
-template <NDRecordInterface R, size_t LEAFSZ=100, bool HMAP=false>
+template <NDRecordInterface R, size_t LEAFSZ = 100, bool HMAP = false>
class VPTree {
public:
- typedef R RECORD;
+ typedef R RECORD;
private:
- struct vpnode {
- size_t start;
- size_t stop;
- bool leaf;
+ struct vpnode {
+ size_t start;
+ size_t stop;
+ bool leaf;
+
+ double radius;
+ vpnode *inside;
+ vpnode *outside;
+
+ vpnode()
+ : start(0), stop(0), leaf(false), radius(0.0), inside(nullptr),
+ outside(nullptr) {}
+
+ ~vpnode() {
+ delete inside;
+ delete outside;
+ }
+ };
- double radius;
- vpnode *inside;
- vpnode *outside;
+public:
+ VPTree(BufferView<R> buffer)
+ : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) {
+
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped<R>),
+ (byte **)&m_data);
+
+ m_ptrs = new vp_ptr[buffer.get_record_count()];
+ m_reccnt = 0;
+
+ // FIXME: will eventually need to figure out tombstones
+ // this one will likely require the multi-pass
+ // approach, as otherwise we'll need to sort the
+ // records repeatedly on each reconstruction.
+ for (size_t i = 0; i < buffer.get_record_count(); i++) {
+ auto rec = buffer.get(i);
+
+ if (rec->is_deleted()) {
+ continue;
+ }
+
+ rec->header &= 3;
+ m_data[m_reccnt] = *rec;
+ m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
+ m_reccnt++;
+ }
- vpnode() : start(0), stop(0), leaf(false), radius(0.0), inside(nullptr), outside(nullptr) {}
+ if (m_reccnt > 0) {
+ m_root = build_vptree();
+ build_map();
+ }
+ }
- ~vpnode() {
- delete inside;
- delete outside;
- }
- };
+ VPTree(std::vector<const VPTree *> shards)
+ : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) {
+ size_t attemp_reccnt = 0;
+ for (size_t i = 0; i < shards.size(); i++) {
+ attemp_reccnt += shards[i]->get_record_count();
+ }
+ m_alloc_size = psudb::sf_aligned_alloc(
+ CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped<R>), (byte **)&m_data);
+ m_ptrs = new vp_ptr[attemp_reccnt];
+
+ // FIXME: will eventually need to figure out tombstones
+ // this one will likely require the multi-pass
+ // approach, as otherwise we'll need to sort the
+ // records repeatedly on each reconstruction.
+ for (size_t i = 0; i < shards.size(); i++) {
+ for (size_t j = 0; j < shards[i]->get_record_count(); j++) {
+ if (shards[i]->get_record_at(j)->is_deleted()) {
+ continue;
+ }
-public:
- VPTree(BufferView<R> buffer)
- : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) {
+ m_data[m_reccnt] = *shards[i]->get_record_at(j);
+ m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
+ m_reccnt++;
+ }
+ }
+ if (m_reccnt > 0) {
+ m_root = build_vptree();
+ build_map();
+ }
+ }
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- buffer.get_record_count() *
- sizeof(Wrapped<R>),
- (byte**) &m_data);
+ ~VPTree() {
+ free(m_data);
+ delete m_root;
+ delete[] m_ptrs;
+ }
- m_ptrs = new vp_ptr[buffer.get_record_count()];
- m_reccnt = 0;
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) {
+ if constexpr (HMAP) {
+ auto idx = m_lookup_map.find(rec);
- // FIXME: will eventually need to figure out tombstones
- // this one will likely require the multi-pass
- // approach, as otherwise we'll need to sort the
- // records repeatedly on each reconstruction.
- for (size_t i=0; i<buffer.get_record_count(); i++) {
- auto rec = buffer.get(i);
+ if (idx == m_lookup_map.end()) {
+ return nullptr;
+ }
- if (rec->is_deleted()) {
- continue;
- }
+ return m_data + idx->second;
+ } else {
+ vpnode *node = m_root;
- rec->header &= 3;
- m_data[m_reccnt] = *rec;
- m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
- m_reccnt++;
+ while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) {
+ if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) {
+ node = node->outside;
+ } else {
+ node = node->inside;
}
+ }
- if (m_reccnt > 0) {
- m_root = build_vptree();
- build_map();
+ for (size_t i = node->start; i <= node->stop; i++) {
+ if (m_ptrs[i].ptr->rec == rec) {
+ return m_ptrs[i].ptr;
}
+ }
+
+ return nullptr;
}
+ }
- VPTree(std::vector<const VPTree*> shards)
- : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) {
+ Wrapped<R> *get_data() const { return m_data; }
- size_t attemp_reccnt = 0;
- for (size_t i=0; i<shards.size(); i++) {
- attemp_reccnt += shards[i]->get_record_count();
- }
+ size_t get_record_count() const { return m_reccnt; }
- m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
- attemp_reccnt * sizeof(Wrapped<R>),
- (byte **) &m_data);
- m_ptrs = new vp_ptr[attemp_reccnt];
-
- // FIXME: will eventually need to figure out tombstones
- // this one will likely require the multi-pass
- // approach, as otherwise we'll need to sort the
- // records repeatedly on each reconstruction.
- for (size_t i=0; i<shards.size(); i++) {
- for (size_t j=0; j<shards[i]->get_record_count(); j++) {
- if (shards[i]->get_record_at(j)->is_deleted()) {
- continue;
- }
-
- m_data[m_reccnt] = *shards[i]->get_record_at(j);
- m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
- m_reccnt++;
- }
- }
+ size_t get_tombstone_count() const { return m_tombstone_cnt; }
- if (m_reccnt > 0) {
- m_root = build_vptree();
- build_map();
- }
- }
+ const Wrapped<R> *get_record_at(size_t idx) const {
+ if (idx >= m_reccnt)
+ return nullptr;
+ return m_data + idx;
+ }
- ~VPTree() {
- free(m_data);
- delete m_root;
- delete[] m_ptrs;
- }
+ size_t get_memory_usage() const {
+ return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R *);
+ }
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- if constexpr (HMAP) {
- auto idx = m_lookup_map.find(rec);
+ size_t get_aux_memory_usage() const {
+ // FIXME: need to return the size of the unordered_map
+ return 0;
+ }
- if (idx == m_lookup_map.end()) {
- return nullptr;
- }
+ void search(const R &point, size_t k,
+ PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> &pq) {
+ double farthest = std::numeric_limits<double>::max();
- return m_data + idx->second;
- } else {
- vpnode *node = m_root;
-
- while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) {
- if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) {
- node = node->outside;
- } else {
- node = node->inside;
- }
- }
-
- for (size_t i=node->start; i<=node->stop; i++) {
- if (m_ptrs[i].ptr->rec == rec) {
- return m_ptrs[i].ptr;
- }
- }
-
- return nullptr;
- }
- }
+ internal_search(m_root, point, k, pq, &farthest);
+ }
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
+private:
+ struct vp_ptr {
+ Wrapped<R> *ptr;
+ double dist;
+ };
+ Wrapped<R> *m_data;
+ vp_ptr *m_ptrs;
+ std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map;
+ size_t m_reccnt;
+ size_t m_tombstone_cnt;
+ size_t m_node_cnt;
+ size_t m_alloc_size;
+
+ vpnode *m_root;
+
+ vpnode *build_vptree() {
+ if (m_reccnt == 0) {
+ return nullptr;
}
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
- }
+ size_t lower = 0;
+ size_t upper = m_reccnt - 1;
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
+ auto rng = gsl_rng_alloc(gsl_rng_mt19937);
+ auto root = build_subtree(lower, upper, rng);
+ gsl_rng_free(rng);
+ return root;
+ }
- size_t get_memory_usage() const {
- return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*);
+ void build_map() {
+ // Skip constructing the hashmap if disabled in the
+ // template parameters.
+ if constexpr (!HMAP) {
+ return;
}
- size_t get_aux_memory_usage() const {
- // FIXME: need to return the size of the unordered_map
- return 0;
+ for (size_t i = 0; i < m_reccnt; i++) {
+ // FIXME: Will need to account for tombstones here too. Under
+ // tombstones, it is technically possible for two otherwise identical
+ // instances of the same record to exist within the same shard, so
+ // long as one of them is a tombstone. Because the table is currently
+ // using the unwrapped records for the key, it isn't possible for it
+ // to handle this case right now.
+ m_lookup_map.insert({m_data[i].rec, i});
}
-
- void search(const R &point, size_t k, PriorityQueue<Wrapped<R>,
- DistCmpMax<Wrapped<R>>> &pq) {
- double farthest = std::numeric_limits<double>::max();
-
- internal_search(m_root, point, k, pq, &farthest);
+ }
+
+ vpnode *build_subtree(size_t start, size_t stop, gsl_rng *rng) {
+ /*
+ * base-case: sometimes happens (probably because of the +1 and -1
+ * in the first recursive call)
+ */
+ if (start > stop) {
+ return nullptr;
}
-private:
- struct vp_ptr {
- Wrapped<R> *ptr;
- double dist;
- };
- Wrapped<R>* m_data;
- vp_ptr* m_ptrs;
- std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_node_cnt;
- size_t m_alloc_size;
-
- vpnode *m_root;
-
- vpnode *build_vptree() {
- if (m_reccnt == 0) {
- return nullptr;
- }
-
- size_t lower = 0;
- size_t upper = m_reccnt - 1;
+ /* base-case: create a leaf node */
+ if (stop - start <= LEAFSZ) {
+ vpnode *node = new vpnode();
+ node->start = start;
+ node->stop = stop;
+ node->leaf = true;
- auto rng = gsl_rng_alloc(gsl_rng_mt19937);
- auto root = build_subtree(lower, upper, rng);
- gsl_rng_free(rng);
- return root;
+ m_node_cnt++;
+ return node;
}
- void build_map() {
- // Skip constructing the hashmap if disabled in the
- // template parameters.
- if constexpr (!HMAP) {
- return;
- }
-
- for (size_t i=0; i<m_reccnt; i++) {
- // FIXME: Will need to account for tombstones here too. Under
- // tombstones, it is technically possible for two otherwise identical
- // instances of the same record to exist within the same shard, so
- // long as one of them is a tombstone. Because the table is currently
- // using the unwrapped records for the key, it isn't possible for it
- // to handle this case right now.
- m_lookup_map.insert({m_data[i].rec, i});
- }
+ /*
+ * select a random element to be the root of the
+ * subtree
+ */
+ auto i = start + gsl_rng_uniform_int(rng, stop - start + 1);
+ swap(start, i);
+
+ /* for efficiency, we'll pre-calculate the distances between each point and
+ * the root */
+ for (size_t i = start + 1; i <= stop; i++) {
+ m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec);
}
- vpnode *build_subtree(size_t start, size_t stop, gsl_rng *rng) {
- /*
- * base-case: sometimes happens (probably because of the +1 and -1
- * in the first recursive call)
- */
- if (start > stop) {
- return nullptr;
- }
-
- /* base-case: create a leaf node */
- if (stop - start <= LEAFSZ) {
- vpnode *node = new vpnode();
- node->start = start;
- node->stop = stop;
- node->leaf = true;
-
- m_node_cnt++;
- return node;
- }
-
- /*
- * select a random element to be the root of the
- * subtree
- */
- auto i = start + gsl_rng_uniform_int(rng, stop - start + 1);
- swap(start, i);
-
- /* for efficiency, we'll pre-calculate the distances between each point and the root */
- for (size_t i=start+1; i<=stop; i++) {
- m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec);
- }
-
- /*
- * partition elements based on their distance from the start,
- * with those elements with distance falling below the median
- * distance going into the left sub-array and those above
- * the median in the right. This is easily done using QuickSelect.
- */
- auto mid = (start + 1 + stop) / 2;
- quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng);
-
- /* Create a new node based on this partitioning */
- vpnode *node = new vpnode();
- node->start = start;
+ /*
+ * partition elements based on their distance from the start,
+ * with those elements with distance falling below the median
+ * distance going into the left sub-array and those above
+ * the median in the right. This is easily done using QuickSelect.
+ */
+ auto mid = (start + 1 + stop) / 2;
+ quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng);
- /* store the radius of the circle used for partitioning the node. */
- node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec);
- m_ptrs[start].dist = node->radius;
+ /* Create a new node based on this partitioning */
+ vpnode *node = new vpnode();
+ node->start = start;
- /* recursively construct the left and right subtrees */
- node->inside = build_subtree(start + 1, mid-1, rng);
- node->outside = build_subtree(mid, stop, rng);
+ /* store the radius of the circle used for partitioning the node. */
+ node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec);
+ m_ptrs[start].dist = node->radius;
- m_node_cnt++;
+ /* recursively construct the left and right subtrees */
+ node->inside = build_subtree(start + 1, mid - 1, rng);
+ node->outside = build_subtree(mid, stop, rng);
- return node;
- }
+ m_node_cnt++;
- void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p, gsl_rng *rng) {
- if (start == stop) return;
+ return node;
+ }
- auto pivot = partition(start, stop, p, rng);
+ void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p,
+ gsl_rng *rng) {
+ if (start == stop)
+ return;
- if (k < pivot) {
- quickselect(start, pivot - 1, k, p, rng);
- } else if (k > pivot) {
- quickselect(pivot + 1, stop, k, p, rng);
- }
- }
+ auto pivot = partition(start, stop, p, rng);
- // TODO: The quickselect code can probably be generalized and moved out
- // to psudb-common instead.
- size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) {
- auto pivot = start + gsl_rng_uniform_int(rng, stop - start);
- //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec);
-
- swap(pivot, stop);
-
- size_t j = start;
- for (size_t i=start; i<stop; i++) {
- if (m_ptrs[i].dist < m_ptrs[stop].dist) {
- //assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec));
- //if (distances[i - start] < distances[stop - start]) {
- //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) {
- swap(j, i);
- j++;
- }
- }
-
- swap(j, stop);
- return j;
+ if (k < pivot) {
+ quickselect(start, pivot - 1, k, p, rng);
+ } else if (k > pivot) {
+ quickselect(pivot + 1, stop, k, p, rng);
}
-
- void swap(size_t idx1, size_t idx2) {
- auto tmp = m_ptrs[idx1];
- m_ptrs[idx1] = m_ptrs[idx2];
- m_ptrs[idx2] = tmp;
+ }
+
+ // TODO: The quickselect code can probably be generalized and moved out
+ // to psudb-common instead.
+ size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) {
+ auto pivot = start + gsl_rng_uniform_int(rng, stop - start);
+ // double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec);
+
+ swap(pivot, stop);
+
+ size_t j = start;
+ for (size_t i = start; i < stop; i++) {
+ if (m_ptrs[i].dist < m_ptrs[stop].dist) {
+ // assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec));
+ // if (distances[i - start] < distances[stop - start]) {
+ // if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) {
+ swap(j, i);
+ j++;
+ }
}
- void internal_search(vpnode *node, const R &point, size_t k, PriorityQueue<Wrapped<R>,
- DistCmpMax<Wrapped<R>>> &pq, double *farthest) {
-
- if (node == nullptr) return;
+ swap(j, stop);
+ return j;
+ }
- if (node->leaf) {
- for (size_t i=node->start; i<=node->stop; i++) {
- double d = point.calc_distance(m_ptrs[i].ptr->rec);
- if (d < *farthest) {
- if (pq.size() == k) {
- pq.pop();
- }
+ void swap(size_t idx1, size_t idx2) {
+ auto tmp = m_ptrs[idx1];
+ m_ptrs[idx1] = m_ptrs[idx2];
+ m_ptrs[idx2] = tmp;
+ }
- pq.push(m_ptrs[i].ptr);
- if (pq.size() == k) {
- *farthest = point.calc_distance(pq.peek().data->rec);
- }
- }
- }
+ void internal_search(vpnode *node, const R &point, size_t k,
+ PriorityQueue<Wrapped<R>, DistCmpMax<Wrapped<R>>> &pq,
+ double *farthest) {
- return;
- }
-
- double d = point.calc_distance(m_ptrs[node->start].ptr->rec);
+ if (node == nullptr)
+ return;
+ if (node->leaf) {
+ for (size_t i = node->start; i <= node->stop; i++) {
+ double d = point.calc_distance(m_ptrs[i].ptr->rec);
if (d < *farthest) {
- if (pq.size() == k) {
- pq.pop();
- }
- pq.push(m_ptrs[node->start].ptr);
- if (pq.size() == k) {
- *farthest = point.calc_distance(pq.peek().data->rec);
- }
+ if (pq.size() == k) {
+ pq.pop();
+ }
+
+ pq.push(m_ptrs[i].ptr);
+ if (pq.size() == k) {
+ *farthest = point.calc_distance(pq.peek().data->rec);
+ }
}
+ }
- if (d < node->radius) {
- if (d - (*farthest) <= node->radius) {
- internal_search(node->inside, point, k, pq, farthest);
- }
+ return;
+ }
- if (d + (*farthest) >= node->radius) {
- internal_search(node->outside, point, k, pq, farthest);
- }
- } else {
- if (d + (*farthest) >= node->radius) {
- internal_search(node->outside, point, k, pq, farthest);
- }
+ double d = point.calc_distance(m_ptrs[node->start].ptr->rec);
- if (d - (*farthest) <= node->radius) {
- internal_search(node->inside, point, k, pq, farthest);
- }
- }
+ if (d < *farthest) {
+ if (pq.size() == k) {
+ pq.pop();
+ }
+ pq.push(m_ptrs[node->start].ptr);
+ if (pq.size() == k) {
+ *farthest = point.calc_distance(pq.peek().data->rec);
+ }
+ }
+
+ if (d < node->radius) {
+ if (d - (*farthest) <= node->radius) {
+ internal_search(node->inside, point, k, pq, farthest);
+ }
+
+ if (d + (*farthest) >= node->radius) {
+ internal_search(node->outside, point, k, pq, farthest);
+ }
+ } else {
+ if (d + (*farthest) >= node->radius) {
+ internal_search(node->outside, point, k, pq, farthest);
+ }
+
+ if (d - (*farthest) <= node->radius) {
+ internal_search(node->inside, point, k, pq, farthest);
+ }
}
- };
-}
+ }
+};
+} // namespace de
diff --git a/include/util/bf_config.h b/include/util/bf_config.h
index 836e452..7d823a7 100644
--- a/include/util/bf_config.h
+++ b/include/util/bf_config.h
@@ -35,6 +35,8 @@ static size_t BF_HASH_FUNCS = 7;
* Adjust the value of BF_HASH_FUNCS. The argument must be on the interval
* (0, INT64_MAX], or the behavior of bloom filters is undefined.
*/
-[[maybe_unused]] static void BF_SET_HASHFUNC(size_t func_cnt) { BF_HASH_FUNCS = func_cnt; }
+[[maybe_unused]] static void BF_SET_HASHFUNC(size_t func_cnt) {
+ BF_HASH_FUNCS = func_cnt;
+}
} // namespace de
diff --git a/include/util/types.h b/include/util/types.h
index 64dc773..be04d05 100644
--- a/include/util/types.h
+++ b/include/util/types.h
@@ -20,8 +20,8 @@
#include <cassert>
#include <cstdint>
#include <cstdlib>
-#include <vector>
#include <memory>
+#include <vector>
namespace de {
@@ -76,21 +76,18 @@ constexpr ShardID buffer_shid = {buffer_level_idx, all_shards_idx};
enum class ReconstructionType {
Invalid, /* placeholder type */
- Flush, /* a flush of the buffer into L0 */
- Merge, /* the merging of shards in two seperate levels */
- Append, /* adding a shard directly to a level */
- Compact /* the merging of shards on one level */
+ Flush, /* a flush of the buffer into L0 */
+ Merge, /* the merging of shards in two seperate levels */
+ Append, /* adding a shard directly to a level */
+ Compact /* the merging of shards on one level */
};
-
-template <typename ShardType>
-struct reconstruction_results {
+template <typename ShardType> struct reconstruction_results {
std::shared_ptr<ShardType> new_shard;
std::vector<std::pair<level_index, const ShardType *>> source_shards;
size_t target_level;
size_t reccnt;
long runtime;
-
};
typedef struct ReconstructionTask {
@@ -121,11 +118,14 @@ public:
total_reccnt += reccnt;
}
- void add_reconstruction(level_index source, level_index target,
- size_t reccnt, ReconstructionType type) {
+ void add_reconstruction(level_index source, level_index target, size_t reccnt,
+ ReconstructionType type) {
if (type == ReconstructionType::Merge) {
- m_tasks.push_back({{{source, all_shards_idx}, {target, all_shards_idx}}, target, reccnt, type});
+ m_tasks.push_back({{{source, all_shards_idx}, {target, all_shards_idx}},
+ target,
+ reccnt,
+ type});
} else {
m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt, type});
}