summaryrefslogtreecommitdiffstats
path: root/include/shard
diff options
context:
space:
mode:
Diffstat (limited to 'include/shard')
-rw-r--r--include/shard/MemISAM.h72
-rw-r--r--include/shard/WIRS.h108
2 files changed, 94 insertions, 86 deletions
diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h
index 6d97f95..dd2fd85 100644
--- a/include/shard/MemISAM.h
+++ b/include/shard/MemISAM.h
@@ -23,10 +23,11 @@ namespace de {
thread_local size_t mrun_cancelations = 0;
-template <typename K, typename V>
+template <RecordInterface R>
class MemISAM {
private:
-typedef Record<K, V> Rec;
+typedef decltype(R::key) K;
+typedef decltype(R::value) V;
constexpr static size_t inmem_isam_node_size = 256;
constexpr static size_t inmem_isam_fanout = inmem_isam_node_size / (sizeof(K) + sizeof(char*));
@@ -36,24 +37,23 @@ struct InMemISAMNode {
char* child[inmem_isam_fanout];
};
-constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(Rec);
+constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(R);
constexpr static size_t inmem_isam_node_keyskip = sizeof(K) * inmem_isam_fanout;
static_assert(sizeof(InMemISAMNode) == inmem_isam_node_size, "node size does not match");
-
public:
MemISAM(std::string data_fname, size_t record_cnt, size_t tombstone_cnt, BloomFilter *bf, bool tagging)
: m_reccnt(record_cnt), m_tombstone_cnt(tombstone_cnt), m_deleted_cnt(0), m_tagging(tagging) {
// read the stored data file the file
- size_t alloc_size = (record_cnt * sizeof(Rec)) + (CACHELINE_SIZE - (record_cnt * sizeof(Rec)) % CACHELINE_SIZE);
+ size_t alloc_size = (record_cnt * sizeof(R)) + (CACHELINE_SIZE - (record_cnt * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, alloc_size);
FILE *file = fopen(data_fname.c_str(), "rb");
assert(file);
- auto res = fread(m_data, sizeof(Rec), m_reccnt, file);
+ auto res = fread(m_data, sizeof(R), m_reccnt, file);
assert (res == m_reccnt);
fclose(file);
@@ -71,34 +71,34 @@ public:
}
}
- MemISAM(MutableBuffer<K,V>* buffer, BloomFilter* bf, bool tagging)
+ MemISAM(MutableBuffer<R>* buffer, BloomFilter* bf, bool tagging)
:m_reccnt(0), m_tombstone_cnt(0), m_isam_nodes(nullptr), m_deleted_cnt(0), m_tagging(tagging) {
- size_t alloc_size = (buffer->get_record_count() * sizeof(Rec)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Rec)) % CACHELINE_SIZE);
+ size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
TIMER_INIT();
size_t offset = 0;
m_reccnt = 0;
TIMER_START();
- Rec* base = buffer->sorted_output();
+ R* base = buffer->sorted_output();
TIMER_STOP();
auto sort_time = TIMER_RESULT();
- Rec* stop = base + buffer->get_record_count();
+ R* stop = base + buffer->get_record_count();
TIMER_START();
while (base < stop) {
if (!m_tagging) {
if (!base->is_tombstone() && (base + 1 < stop)
- && base->match(base + 1) && (base + 1)->is_tombstone()) {
+ && *base == *(base + 1) && (base + 1)->is_tombstone()) {
base += 2;
mrun_cancelations++;
continue;
}
- } else if (base->get_delete_status()) {
+ } else if (base->is_deleted()) {
base += 1;
continue;
}
@@ -123,15 +123,15 @@ public:
TIMER_STOP();
auto level_time = TIMER_RESULT();
- fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time);
+ //fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time);
}
MemISAM(MemISAM** runs, size_t len, BloomFilter* bf, bool tagging)
:m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr), m_tagging(tagging) {
- std::vector<Cursor<K,V>> cursors;
+ std::vector<Cursor<R>> cursors;
cursors.reserve(len);
- PriorityQueue<K,V> pq(len);
+ PriorityQueue<R> pq(len);
size_t attemp_reccnt = 0;
@@ -142,21 +142,21 @@ public:
attemp_reccnt += runs[i]->get_record_count();
pq.push(cursors[i].ptr, i);
} else {
- cursors.emplace_back(Cursor<K,V>{nullptr, nullptr, 0, 0});
+ cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0});
}
}
- size_t alloc_size = (attemp_reccnt * sizeof(Rec)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Rec)) % CACHELINE_SIZE);
+ size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
size_t offset = 0;
while (pq.size()) {
auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<K,V>{nullptr, 0};
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<R>{nullptr, 0};
if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr &&
- now.data->match(next.data) && next.data->is_tombstone()) {
+ *now.data == *next.data && next.data->is_tombstone()) {
pq.pop(); pq.pop();
auto& cursor1 = cursors[now.version];
@@ -165,7 +165,7 @@ public:
if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
} else {
auto& cursor = cursors[now.version];
- if (!m_tagging || !cursor.ptr->get_delete_status()) {
+ if (!m_tagging || !cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
if (cursor.ptr->is_tombstone()) {
++m_tombstone_cnt;
@@ -188,7 +188,7 @@ public:
if (m_isam_nodes) free(m_isam_nodes);
}
- Rec* sorted_output() const {
+ R* sorted_output() const {
return m_data;
}
@@ -208,7 +208,7 @@ public:
while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx;
- if (m_data[idx].match(key, val, false)) {
+ if (m_data[idx] == R {key, val}) {
m_data[idx].set_delete_status();
m_deleted_cnt++;
return true;
@@ -217,7 +217,7 @@ public:
return false;
}
- const Rec* get_record_at(size_t idx) const {
+ const R* get_record_at(size_t idx) const {
return (idx < m_reccnt) ? m_data + idx : nullptr;
}
@@ -235,7 +235,7 @@ public:
now = next ? next : reinterpret_cast<const InMemISAMNode*>(now->child[inmem_isam_fanout - 1]);
}
- const Rec* pos = reinterpret_cast<const Rec*>(now);
+ const R* pos = reinterpret_cast<const R*>(now);
while (pos < m_data + m_reccnt && pos->key < key) pos++;
return pos - m_data;
@@ -255,7 +255,7 @@ public:
now = next ? next : reinterpret_cast<const InMemISAMNode*>(now->child[inmem_isam_fanout - 1]);
}
- const Rec* pos = reinterpret_cast<const Rec*>(now);
+ const R* pos = reinterpret_cast<const R*>(now);
while (pos < m_data + m_reccnt && pos->key <= key) pos++;
return pos - m_data;
@@ -267,20 +267,20 @@ public:
return false;
}
- Rec* ptr = m_data + idx;
+ R* ptr = m_data + idx;
while (ptr < m_data + m_reccnt && ptr->lt(key, val)) ptr++;
- return ptr->match(key, val, true);
+ return *ptr == R {key, val} && ptr->is_tombstone();
}
size_t get_memory_utilization() {
- return m_reccnt * sizeof(Rec) + m_internal_node_cnt * inmem_isam_node_size;
+ return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size;
}
void persist_to_file(std::string data_fname) {
FILE *file = fopen(data_fname.c_str(), "wb");
assert(file);
- fwrite(m_data, sizeof(Rec), m_reccnt, file);
+ fwrite(m_data, sizeof(R), m_reccnt, file);
fclose(file);
}
@@ -303,14 +303,14 @@ private:
InMemISAMNode* current_node = m_isam_nodes;
- const Rec* leaf_base = m_data;
- const Rec* leaf_stop = m_data + m_reccnt;
+ const R* leaf_base = m_data;
+ const R* leaf_stop = m_data + m_reccnt;
while (leaf_base < leaf_stop) {
size_t fanout = 0;
for (size_t i = 0; i < inmem_isam_fanout; ++i) {
auto rec_ptr = leaf_base + inmem_isam_leaf_fanout * i;
if (rec_ptr >= leaf_stop) break;
- const Rec* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1);
+ const R* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1);
current_node->keys[i] = sep_key->key;
current_node->child[i] = (char*)rec_ptr;
++fanout;
@@ -350,7 +350,7 @@ private:
}
// Members: sorted data, internal ISAM levels, reccnt;
- Rec* m_data;
+ R* m_data;
InMemISAMNode* m_isam_nodes;
InMemISAMNode* m_root;
size_t m_reccnt;
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 39337bf..41766b9 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -12,6 +12,7 @@
#include <cassert>
#include <queue>
#include <memory>
+#include <concepts>
#include "ds/PriorityQueue.h"
#include "util/Cursor.h"
@@ -24,19 +25,26 @@ namespace de {
thread_local size_t wirs_cancelations = 0;
-template <typename K, typename V, typename W>
+template <WeightedRecordInterface R>
class WIRS {
private:
+
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ typedef decltype(R::weight) W;
+
+ template <WeightedRecordInterface R_ = R>
struct wirs_node {
- struct wirs_node *left, *right;
+ struct wirs_node<R_> *left, *right;
K low, high;
W weight;
Alias* alias;
};
+ template <WeightedRecordInterface R_ = R>
struct WIRSState {
W tot_weight;
- std::vector<wirs_node*> nodes;
+ std::vector<wirs_node<R_>*> nodes;
Alias* top_level_alias;
~WIRSState() {
@@ -45,13 +53,13 @@ private:
};
public:
- WIRS(MutableBuffer<K, V, W>* buffer, BloomFilter* bf, bool tagging)
+ WIRS(MutableBuffer<R>* buffer, BloomFilter* bf, bool tagging)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0),
m_ts_check_cnt(0), m_tagging(tagging), m_root(nullptr) {
- size_t alloc_size = (buffer->get_record_count() * sizeof(Record<K, V, W>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Record<K, V, W>)) % CACHELINE_SIZE);
+ size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Record<K, V, W>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
size_t offset = 0;
m_reccnt = 0;
@@ -61,13 +69,13 @@ public:
while (base < stop) {
if (!m_tagging) {
if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->match(base + 1) && (base + 1)->is_tombstone()) {
+ if (*base == *(base + 1) && (base + 1)->is_tombstone()) {
base += 2;
wirs_cancelations++;
continue;
}
}
- } else if (base->get_delete_status()) {
+ } else if (base->is_deleted()) {
base += 1;
continue;
}
@@ -92,10 +100,10 @@ public:
WIRS(WIRS** shards, size_t len, BloomFilter* bf, bool tagging)
: m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0),
m_tagging(tagging), m_root(nullptr) {
- std::vector<Cursor<K,V,W>> cursors;
+ std::vector<Cursor<R>> cursors;
cursors.reserve(len);
- PriorityQueue<K, V, W> pq(len);
+ PriorityQueue<R> pq(len);
size_t attemp_reccnt = 0;
@@ -106,28 +114,28 @@ public:
attemp_reccnt += shards[i]->get_record_count();
pq.push(cursors[i].ptr, i);
} else {
- cursors.emplace_back(Cursor<K,V,W>{nullptr, nullptr, 0, 0});
+ cursors.emplace_back(Cursor<R>{nullptr, nullptr, 0, 0});
}
}
- size_t alloc_size = (attemp_reccnt * sizeof(Record<K, V, W>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Record<K, V, W>)) % CACHELINE_SIZE);
+ size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE);
assert(alloc_size % CACHELINE_SIZE == 0);
- m_data = (Record<K, V, W>*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
+ m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size);
while (pq.size()) {
auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<K, V, W>{nullptr, 0};
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<R>{nullptr, 0};
if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr &&
- now.data->match(next.data) && next.data->is_tombstone()) {
+ *now.data == *next.data && next.data->is_tombstone()) {
pq.pop(); pq.pop();
auto& cursor1 = cursors[now.version];
auto& cursor2 = cursors[next.version];
- if (advance_cursor<K,V,W>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<K,V,W>(cursor2)) pq.push(cursor2.ptr, next.version);
+ if (advance_cursor<R>(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor<R>(cursor2)) pq.push(cursor2.ptr, next.version);
} else {
auto& cursor = cursors[now.version];
- if (!m_tagging || !cursor.ptr->get_delete_status()) {
+ if (!m_tagging || !cursor.ptr->is_deleted()) {
m_data[m_reccnt++] = *cursor.ptr;
m_total_weight += cursor.ptr->weight;
if (bf && cursor.ptr->is_tombstone()) {
@@ -137,7 +145,7 @@ public:
}
pq.pop();
- if (advance_cursor<K,V,W>(cursor)) pq.push(cursor.ptr, now.version);
+ if (advance_cursor<R>(cursor)) pq.push(cursor.ptr, now.version);
}
}
@@ -155,16 +163,16 @@ public:
free_tree(m_root);
}
- bool delete_record(const K& key, const V& val) {
- size_t idx = get_lower_bound(key);
+ bool delete_record(const R& rec) {
+ size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
return false;
}
- while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx;
+ while (idx < m_reccnt && m_data[idx] < rec) ++idx;
- if (m_data[idx].match(key, val, false)) {
- m_data[idx].set_delete_status();
+ if (m_data[idx] == R {rec.key, rec.val} && !m_data[idx].is_tombstone()) {
+ m_data[idx].set_delete();
m_deleted_cnt++;
return true;
}
@@ -172,7 +180,7 @@ public:
return false;
}
- void free_tree(struct wirs_node* node) {
+ void free_tree(struct wirs_node<R>* node) {
if (node) {
delete node->alias;
free_tree(node->left);
@@ -181,7 +189,7 @@ public:
}
}
- Record<K, V, W>* sorted_output() const {
+ R* sorted_output() const {
return m_data;
}
@@ -193,19 +201,19 @@ public:
return m_tombstone_cnt;
}
- const Record<K, V, W>* get_record_at(size_t idx) const {
+ const R* get_record_at(size_t idx) const {
if (idx >= m_reccnt) return nullptr;
return m_data + idx;
}
// low - high -> decompose to a set of nodes.
// Build Alias across the decomposed nodes.
- WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) {
- WIRSState* res = new WIRSState();
+ WIRSState<R>* get_sample_shard_state(const K& lower_key, const K& upper_key) {
+ auto res = new WIRSState();
// Simulate a stack to unfold recursion.
double tot_weight = 0.0;
- struct wirs_node* st[64] = {0};
+ struct wirs_node<R>* st[64] = {0};
st[0] = m_root;
size_t top = 1;
while(top > 0) {
@@ -231,15 +239,15 @@ public:
}
static void delete_state(void *state) {
- auto s = (WIRSState *) state;
+ WIRSState<R> *s = (WIRSState<R> *) state;
delete s;
}
// returns the number of records sampled
// NOTE: This operation returns records strictly between the lower and upper bounds, not
// including them.
- size_t get_samples(void* shard_state, std::vector<Record<K, V, W>> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
- WIRSState *state = (WIRSState *) shard_state;
+ size_t get_samples(void* shard_state, std::vector<R> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) {
+ WIRSState<R> *state = (WIRSState<R> *) shard_state;
if (sample_sz == 0) {
return 0;
}
@@ -295,30 +303,30 @@ public:
auto ptr = m_data + get_lower_bound(key);
- while (ptr < m_data + m_reccnt && ptr->lt(key, val)) {
+ while (ptr < m_data + m_reccnt && *ptr < R {key, val}) {
ptr ++;
}
- bool result = (m_tagging) ? ptr->get_delete_status()
- : ptr->match(key, val, true);
+ bool result = (m_tagging) ? ptr->is_deleted()
+ : *ptr == R {key, val} && ptr->is_tombstone();
m_rejection_cnt += result;
return result;
}
- bool check_tombstone(const K& key, const V& val) {
+ bool check_tombstone(const R& rec) {
m_ts_check_cnt++;
- size_t idx = get_lower_bound(key);
+ size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
return false;
}
- auto ptr = m_data + get_lower_bound(key);
+ auto ptr = m_data + get_lower_bound(rec.key);
- while (ptr < m_data + m_reccnt && ptr->lt(key, val)) {
+ while (ptr < m_data + m_reccnt && *ptr < rec) {
ptr ++;
}
- bool result = ptr->match(key, val, true);
+ bool result = *ptr == rec && ptr->is_tombstone();
m_rejection_cnt += result;
return result;
@@ -340,21 +348,21 @@ public:
private:
- bool covered_by(struct wirs_node* node, const K& lower_key, const K& upper_key) {
+ bool covered_by(struct wirs_node<R>* node, const K& lower_key, const K& upper_key) {
auto low_index = node->low * m_group_size;
auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1);
return lower_key < m_data[low_index].key && m_data[high_index].key < upper_key;
}
- bool intersects(struct wirs_node* node, const K& lower_key, const K& upper_key) {
+ bool intersects(struct wirs_node<R>* node, const K& lower_key, const K& upper_key) {
auto low_index = node->low * m_group_size;
auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1);
return lower_key < m_data[high_index].key && m_data[low_index].key < upper_key;
}
- struct wirs_node* construct_wirs_node(const std::vector<W>& weights, size_t low, size_t high) {
+ struct wirs_node<R>* construct_wirs_node(const std::vector<W>& weights, size_t low, size_t high) {
if (low == high) {
- return new wirs_node{nullptr, nullptr, low, high, weights[low], new Alias({1.0})};
+ return new wirs_node<R>{nullptr, nullptr, low, high, weights[low], new Alias({1.0})};
} else if (low > high) return nullptr;
std::vector<double> node_weights;
@@ -370,9 +378,9 @@ private:
size_t mid = (low + high) / 2;
- return new wirs_node{construct_wirs_node(weights, low, mid),
- construct_wirs_node(weights, mid + 1, high),
- low, high, sum, new Alias(node_weights)};
+ return new wirs_node<R>{construct_wirs_node(weights, low, mid),
+ construct_wirs_node(weights, mid + 1, high),
+ low, high, sum, new Alias(node_weights)};
}
@@ -410,9 +418,9 @@ private:
m_root = construct_wirs_node(weights, 0, n_groups-1);
}
- Record<K, V, W>* m_data;
+ R* m_data;
std::vector<Alias *> m_alias;
- wirs_node* m_root;
+ wirs_node<R>* m_root;
bool m_tagging;
W m_total_weight;
size_t m_reccnt;