summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-05-22 14:58:22 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-05-22 14:58:22 -0400
commit0cf160ee68d37be93665e665ef22ae6e211a157d (patch)
treebadaca4c5654e7abbe9291b18b08748aeeadc518 /include
parent08d6c84b9d69b500c964a8ff66e726e1f01f2095 (diff)
downloaddynamic-extension-0cf160ee68d37be93665e665ef22ae6e211a157d.tar.gz
More updates/restructuring
Diffstat (limited to 'include')
-rw-r--r--include/ds/PriorityQueue.h2
-rw-r--r--include/framework/DynamicExtension.h46
-rw-r--r--include/framework/InternalLevel.h39
-rw-r--r--include/framework/MutableBuffer.h2
-rw-r--r--include/framework/RecordInterface.h (renamed from include/util/Record.h)2
-rw-r--r--include/framework/ShardInterface.h12
-rw-r--r--include/shard/MemISAM.h22
-rw-r--r--include/shard/WIRS.h113
-rw-r--r--include/util/Cursor.h2
9 files changed, 104 insertions, 136 deletions
diff --git a/include/ds/PriorityQueue.h b/include/ds/PriorityQueue.h
index 22582da..0468fa4 100644
--- a/include/ds/PriorityQueue.h
+++ b/include/ds/PriorityQueue.h
@@ -12,7 +12,7 @@
#include <vector>
#include <cassert>
-#include "util/Record.h"
+#include "framework/RecordInterface.h"
namespace de {
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 2009344..a70dda4 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -18,6 +18,7 @@
#include "framework/InternalLevel.h"
#include "framework/ShardInterface.h"
#include "framework/QueryInterface.h"
+#include "framework/RecordInterface.h"
#include "shard/WIRS.h"
#include "ds/Alias.h"
@@ -74,12 +75,15 @@ typedef ssize_t level_index;
template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void>
class DynamicExtension {
+ //typedef typename S<WrappedRecord<R>> Shard;
+ typedef S Shard;
+ typedef MutableBuffer<R> Buffer;
public:
DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop)
: m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop),
- m_buffer(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop))
- {}
+ m_buffer(new Buffer(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop))
+ { }
~DynamicExtension() {
delete m_buffer;
@@ -94,7 +98,7 @@ public:
}
int erase(const R &rec) {
- MutableBuffer<R> *buffer;
+ Buffer *buffer;
if constexpr (DELETE_TAGGING) {
auto buffer = get_buffer();
@@ -131,7 +135,7 @@ public:
auto buffer_state = Q::get_buffer_query_state(buffer, parms);
// Get the shard query states
- std::vector<std::pair<ShardID, S*>> shards;
+ std::vector<std::pair<ShardID, Shard*>> shards;
std::vector<void*> states;
for (auto &level : m_levels) {
@@ -218,8 +222,8 @@ public:
return m_buffer->get_capacity();
}
- S *create_ssi() {
- std::vector<S *> shards;
+ Shard *create_ssi() {
+ std::vector<Shard *> shards;
if (m_levels.size() > 0) {
for (int i=m_levels.size() - 1; i>= 0; i--) {
@@ -231,7 +235,7 @@ public:
shards.emplace_back(new S(get_buffer(), nullptr));
- S *shards_array[shards.size()];
+ Shard *shards_array[shards.size()];
size_t j = 0;
for (size_t i=0; i<shards.size(); i++) {
@@ -240,7 +244,7 @@ public:
}
}
- S *flattened = new S(shards_array, j, nullptr);
+ Shard *flattened = new S(shards_array, j, nullptr);
for (auto shard : shards) {
delete shard;
@@ -250,19 +254,19 @@ public:
}
private:
- MutableBuffer<R> *m_buffer;
+ Buffer *m_buffer;
size_t m_scale_factor;
double m_max_delete_prop;
std::vector<InternalLevel<R, S, Q> *> m_levels;
- MutableBuffer<R> *get_buffer() {
+ Buffer *get_buffer() {
return m_buffer;
}
int internal_append(R &rec, bool ts) {
- MutableBuffer<R> *buffer;
+ Buffer *buffer;
while (!(buffer = get_buffer()))
;
@@ -273,7 +277,7 @@ private:
return buffer->append(rec, ts);
}
- std::vector<R> post_process(std::vector<R> records, ShardID shid, MutableBuffer<R> *buffer) {
+ std::vector<R> post_process(std::vector<R> records, ShardID shid, Buffer *buffer) {
std::vector<R> processed_records;
processed_records.reserve(records.size());
@@ -331,7 +335,7 @@ private:
if (new_idx > 0) {
assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
}
- m_levels.emplace_back(new InternalLevel<R, S, Q>(new_idx, new_shard_cnt));
+ m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt));
return new_idx;
}
@@ -380,7 +384,7 @@ private:
* returns -1 if idx==0, and no such level exists, to simplify
* the logic of the first merge.
*/
- inline level_index find_mergable_level(level_index idx, MutableBuffer<R> *buffer=nullptr) {
+ inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) {
if (idx == 0 && m_levels.size() == 0) return -1;
@@ -410,25 +414,25 @@ private:
// merging two memory levels
if (LSM_LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, S, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
mark_as_unused(tmp);
} else {
m_levels[base_level]->append_merged_shards(m_levels[incoming_level]);
}
mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<R, S, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
+ m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor);
}
- inline void merge_buffer_into_l0(MutableBuffer<R> *buffer) {
+ inline void merge_buffer_into_l0(Buffer *buffer) {
assert(m_levels[0]);
if (LSM_LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<R, S, Q>(0, 1);
+ auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<R, S, Q>::merge_levels(old_level, temp_level);
+ auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
m_levels[0] = new_level;
delete temp_level;
@@ -444,7 +448,7 @@ private:
* level may not be able to immediately be deleted, depending upon who
* else is using it.
*/
- inline void mark_as_unused(InternalLevel<R, S, Q> *level) {
+ inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) {
delete level;
}
@@ -477,7 +481,7 @@ private:
* a pointer to the memory table to use, if desired. Otherwise, there are
* no guarantees about which buffer will be accessed if level_index is -1.
*/
- inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) {
+ inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) {
assert(idx >= -1);
if (idx == -1) {
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index 70da821..6986a61 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -15,18 +15,20 @@
#include "util/types.h"
#include "util/bf_config.h"
#include "framework/ShardInterface.h"
-#include "framework/MutableBuffer.h"
#include "framework/QueryInterface.h"
+#include "framework/RecordInterface.h"
+#include "framework/MutableBuffer.h"
#include "ds/BloomFilter.h"
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q>
class InternalLevel {
-
+ typedef S Shard;
+ typedef MutableBuffer<R> Buffer;
public:
InternalLevel(ssize_t level_no, size_t shard_cap)
- : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<S>(shard_cap, nullptr))
+ : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard>(shard_cap, nullptr))
{}
// Create a new memory level sharing the shards and repurposing it as previous level_no + 1
@@ -45,7 +47,7 @@ public:
assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
- S* shards[2];
+ Shard* shards[2];
shards[0] = base_level->m_shards[0];
shards[1] = new_level->m_shards[0];
@@ -53,7 +55,7 @@ public:
return res;
}
- void append_buffer(MutableBuffer<R>* buffer) {
+ void append_buffer(Buffer* buffer) {
assert(m_shard_cnt < m_shards.size());
m_shards[m_shard_cnt] = new S(buffer);
++m_shard_cnt;
@@ -65,8 +67,8 @@ public:
++m_shard_cnt;
}
- S *get_merged_shard() {
- S *shards[m_shard_cnt];
+ Shard *get_merged_shard() {
+ Shard *shards[m_shard_cnt];
for (size_t i=0; i<m_shard_cnt; i++) {
shards[i] = m_shards[i];
@@ -76,7 +78,7 @@ public:
}
// Append the sample range in-order.....
- void get_query_states(std::vector<std::pair<ShardID, S *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
+ void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
for (size_t i=0; i<m_shard_cnt; i++) {
if (m_shards[i]) {
auto shard_state = Q::get_query_state(m_shards[i], query_parms);
@@ -90,23 +92,32 @@ public:
if (m_shard_cnt == 0) return false;
for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
- if (m_shards[i] && m_shards[i]->check_tombstone(rec))
- return true;
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec, true);
+ if (res && res->is_tombstone()) {
+ return true;
+ }
+ }
}
return false;
}
bool delete_record(const R &rec) {
+ if (m_shard_cnt == 0) return false;
+
for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i] && m_shards[i]->delete_record(rec)) {
- return true;
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec);
+ if (res) {
+ res->set_delete();
+ }
}
}
return false;
}
- S* get_shard(size_t idx) {
+ Shard* get_shard(size_t idx) {
return m_shards[idx];
}
@@ -170,7 +181,7 @@ private:
size_t m_shard_cnt;
size_t m_shard_size_cap;
- std::shared_ptr<std::vector<S*>> m_shards;
+ std::shared_ptr<std::vector<Shard*>> m_shards;
};
}
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index 4b45f20..3643a89 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -19,9 +19,9 @@
#include "util/base.h"
#include "util/bf_config.h"
#include "ds/BloomFilter.h"
-#include "util/Record.h"
#include "ds/Alias.h"
#include "util/timer.h"
+#include "framework/RecordInterface.h"
namespace de {
diff --git a/include/util/Record.h b/include/framework/RecordInterface.h
index fc543ed..8afd90a 100644
--- a/include/util/Record.h
+++ b/include/framework/RecordInterface.h
@@ -1,5 +1,5 @@
/*
- * include/util/record.h
+ * include/framework/RecordInterface.h
*
* Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
diff --git a/include/framework/ShardInterface.h b/include/framework/ShardInterface.h
index 1f48a45..3aa62df 100644
--- a/include/framework/ShardInterface.h
+++ b/include/framework/ShardInterface.h
@@ -11,12 +11,16 @@
#include <concepts>
#include "util/types.h"
+#include "framework/RecordInterface.h"
-template <typename S>
-concept ShardInterface = requires(S s, void *p) {
- s.point_lookup();
+namespace de {
+//template <template<typename> typename S, typename R>
+template <typename S>
+concept ShardInterface = requires(S s, void *p, bool b) {
+ //{s.point_lookup(r, b) } -> std::same_as<R*>;
{s.get_record_count()} -> std::convertible_to<size_t>;
- {s.get_tombstone_count()} -> std::convertible_to<size_t>;
{s.get_memory_usage()} -> std::convertible_to<size_t>;
};
+
+}
diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h
index 55699be..8ac17e4 100644
--- a/include/shard/MemISAM.h
+++ b/include/shard/MemISAM.h
@@ -27,6 +27,8 @@ thread_local size_t mrun_cancelations = 0;
template <RecordInterface R>
class MemISAM {
private:
+ friend class InternalLevel;
+
typedef decltype(R::key) K;
typedef decltype(R::value) V;
@@ -178,6 +180,26 @@ public:
return m_tombstone_cnt;
}
+ R *point_lookup(R &rec, bool filter) {
+
+ if (filter && !m_bf->lookup(rec.key)) {
+ return nullptr;
+ }
+
+ size_t idx = get_lower_bound(rec.key);
+ if (idx >= m_reccnt) {
+ return false;
+ }
+
+ while (idx < m_reccnt && m_data[idx] < rec) ++idx;
+
+ if (m_data[idx] == rec) {
+ return m_data + idx;
+ }
+
+ return nullptr;
+ }
+
bool delete_record(const K& key, const V& val) {
size_t idx = get_lower_bound(key);
if (idx >= m_reccnt) {
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 42dbcfd..7dee496 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -1,4 +1,5 @@
/*
+ {s.get_tombstone_count()} -> std::convertible_to<size_t>;
* include/shard/WIRS.h
*
* Copyright (C) 2023 Dong Xie <dongx@psu.edu>
@@ -8,6 +9,7 @@
*/
#pragma once
+
#include <vector>
#include <cassert>
#include <queue>
@@ -19,8 +21,8 @@
#include "ds/Alias.h"
#include "ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "util/Record.h"
#include "framework/MutableBuffer.h"
+#include "framework/RecordInterface.h"
namespace de {
@@ -32,8 +34,11 @@ struct wirs_query_parms {
decltype(R::key) upper_bound;
};
+class InternalLevel;
+
template <WeightedRecordInterface R>
class WIRS {
+ friend class InternalLevel;
private:
typedef decltype(R::key) K;
@@ -62,8 +67,7 @@ private:
public:
WIRS(MutableBuffer<R>* buffer)
- : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0),
- m_ts_check_cnt(0), m_root(nullptr) {
+ : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
@@ -106,8 +110,7 @@ public:
}
WIRS(WIRS** shards, size_t len)
- : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0),
- m_root(nullptr) {
+ : m_reccnt(0), m_tombstone_cnt(0), m_total_weight(0), m_root(nullptr) {
std::vector<Cursor<R>> cursors;
cursors.reserve(len);
@@ -177,24 +180,25 @@ public:
free_tree(m_root);
}
- bool delete_record(const R& rec) {
+ R *point_lookup(R &rec, bool filter=false) {
+ if (filter && !m_bf.lookup(rec.key)) {
+ return nullptr;
+ }
+
size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
- return false;
+ return nullptr;
}
while (idx < m_reccnt && m_data[idx] < rec) ++idx;
- if (m_data[idx] == R {rec.key, rec.val} && !m_data[idx].is_tombstone()) {
- m_data[idx].set_delete();
- m_deleted_cnt++;
- return true;
+ if (m_data[idx] == rec) {
+ return m_data + idx;
}
- return false;
+ return nullptr;
}
-
R* sorted_output() const {
return m_data;
}
@@ -212,46 +216,12 @@ public:
return m_data + idx;
}
- /*
- // low - high -> decompose to a set of nodes.
- // Build Alias across the decomposed nodes.
- WIRSState<R>* get_query_state(void *query_parameters) {
- auto res = new WIRSState();
- K lower_key = ((wirs_query_parms<R> *) query_parameters)->lower_bound;
- K upper_key = ((wirs_query_parms<R> *) query_parameters)->upper_bound;
-
- // Simulate a stack to unfold recursion.
- double tot_weight = 0.0;
- struct wirs_node<R>* st[64] = {0};
- st[0] = m_root;
- size_t top = 1;
- while(top > 0) {
- auto now = st[--top];
- if (covered_by(now, lower_key, upper_key) ||
- (now->left == nullptr && now->right == nullptr && intersects(now, lower_key, upper_key))) {
- res->nodes.emplace_back(now);
- tot_weight += now->weight;
- } else {
- if (now->left && intersects(now->left, lower_key, upper_key)) st[top++] = now->left;
- if (now->right && intersects(now->right, lower_key, upper_key)) st[top++] = now->right;
- }
- }
-
- std::vector<double> weights;
- for (const auto& node: res->nodes) {
- weights.emplace_back(node->weight / tot_weight);
- }
- res->tot_weight = tot_weight;
- res->top_level_alias = new Alias(weights);
- return res;
+ size_t get_memory_usage() {
+ return 0;
}
- static void delete_query_state(void *state) {
- WIRSState<R> *s = (WIRSState<R> *) state;
- delete s;
- }
- */
+private:
size_t get_lower_bound(const K& key) const {
size_t min = 0;
@@ -271,43 +241,6 @@ public:
return min;
}
- bool check_tombstone(const R& rec) {
- if(!m_bf.lookup(rec.key)) {
- return false;
- }
-
- m_ts_check_cnt++;
- size_t idx = get_lower_bound(rec.key);
- if (idx >= m_reccnt) {
- return false;
- }
-
- auto ptr = m_data + get_lower_bound(rec.key);
-
- while (ptr < m_data + m_reccnt && *ptr < rec) {
- ptr ++;
- }
-
- bool result = *ptr == rec && ptr->is_tombstone();
- m_rejection_cnt += result;
-
- return result;
- }
-
- size_t get_memory_usage() {
- return 0;
- }
-
- size_t get_rejection_count() {
- return m_rejection_cnt;
- }
-
- size_t get_ts_check_count() {
- return m_ts_check_cnt;
- }
-
-private:
-
bool covered_by(struct wirs_node<R>* node, const K& lower_key, const K& upper_key) {
auto low_index = node->low * m_group_size;
auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1);
@@ -394,13 +327,7 @@ private:
size_t m_reccnt;
size_t m_tombstone_cnt;
size_t m_group_size;
- size_t m_ts_check_cnt;
- size_t m_deleted_cnt;
BloomFilter<K> m_bf;
-
- // The number of rejections caused by tombstones
- // in this WIRS.
- size_t m_rejection_cnt;
};
}
diff --git a/include/util/Cursor.h b/include/util/Cursor.h
index 2609ae5..815458c 100644
--- a/include/util/Cursor.h
+++ b/include/util/Cursor.h
@@ -10,7 +10,7 @@
#pragma once
#include "util/base.h"
-#include "util/Record.h"
+#include "framework/RecordInterface.h"
#include "io/PagedFile.h"
namespace de {