diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-29 12:33:58 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-29 12:33:58 -0400 |
| commit | b00682429988f17152e7573ffeffa1cecfdd3d3a (patch) | |
| tree | b621adaa2bfe3a7a9846970f7426fd950e17aa99 /include/framework | |
| parent | de5fa133758e2f0aad855ac58dff5cfa13d06f74 (diff) | |
| download | dynamic-extension-b00682429988f17152e7573ffeffa1cecfdd3d3a.tar.gz | |
Tests and bugfixes for framework
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 30 | ||||
| -rw-r--r-- | include/framework/InternalLevel.h | 54 | ||||
| -rw-r--r-- | include/framework/MutableBuffer.h | 22 | ||||
| -rw-r--r-- | include/framework/QueryInterface.h | 5 | ||||
| -rw-r--r-- | include/framework/RecordInterface.h | 4 |
5 files changed, 62 insertions, 53 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index a70dda4..68f85e2 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -77,7 +77,7 @@ template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=voi class DynamicExtension { //typedef typename S<WrappedRecord<R>> Shard; typedef S Shard; - typedef MutableBuffer<R> Buffer; + typedef MutableBuffer<WrappedRecord<R>> Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) @@ -149,7 +149,7 @@ public: // Execute the query for each shard for (size_t i=0; i<shards.size(); i++) { - query_results[i] = post_process(Q::query(shards[i].second, states[i], parms)); + query_results[i] = filter_deletes(Q::query(shards[i].second, states[i], parms)); } // Merge the results together @@ -233,7 +233,7 @@ public: } } - shards.emplace_back(new S(get_buffer(), nullptr)); + shards.emplace_back(new S(get_buffer())); Shard *shards_array[shards.size()]; @@ -244,7 +244,7 @@ public: } } - Shard *flattened = new S(shards_array, j, nullptr); + Shard *flattened = new S(shards_array, j); for (auto shard : shards) { delete shard; @@ -259,13 +259,13 @@ private: size_t m_scale_factor; double m_max_delete_prop; - std::vector<InternalLevel<R, S, Q> *> m_levels; + std::vector<InternalLevel<WrappedRecord<R>, S, Q> *> m_levels; Buffer *get_buffer() { return m_buffer; } - int internal_append(R &rec, bool ts) { + int internal_append(const R &rec, bool ts) { Buffer *buffer; while (!(buffer = get_buffer())) ; @@ -274,10 +274,12 @@ private: merge_buffer(); } - return buffer->append(rec, ts); + WrappedRecord<R> wrec = static_cast<WrappedRecord<R>>(rec); + + return buffer->append(wrec, ts); } - std::vector<R> post_process(std::vector<R> records, ShardID shid, Buffer *buffer) { + std::vector<R> filter_deletes(std::vector<R> records, ShardID shid, Buffer *buffer) { std::vector<R> processed_records; processed_records.reserve(records.size()); @@ -335,7 +337,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel<WrappedRecord<R>, Shard, Q>(new_idx, new_shard_cnt)); return new_idx; } @@ -414,14 +416,14 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel<WrappedRecord<R>, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } @@ -430,9 +432,9 @@ private: if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); + auto temp_level = new InternalLevel<WrappedRecord<R>, Shard, Q>(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); + auto new_level = InternalLevel<WrappedRecord<R>, Shard, Q>::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -448,7 +450,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) { + inline void mark_as_unused(InternalLevel<WrappedRecord<R>, Shard, Q> *level) { delete level; } diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 6986a61..c489063 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -28,7 +28,7 @@ class InternalLevel { typedef MutableBuffer<R> Buffer; public: InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard>(shard_cap, nullptr)) + : m_level_no(level_no), m_shard_cnt(0), m_shards(new std::vector<Shard*>(shard_cap, nullptr)) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 @@ -36,7 +36,7 @@ public: InternalLevel(InternalLevel* level) : m_level_no(level->m_level_no + 1), m_shard_cnt(level->m_shard_cnt) , m_shards(level->m_shards) { - assert(m_shard_cnt == 1 && m_shards.size() == 1); + assert(m_shard_cnt == 1 && m_shards->size() == 1); } ~InternalLevel() {} @@ -48,22 +48,22 @@ public: auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; Shard* shards[2]; - shards[0] = base_level->m_shards[0]; - shards[1] = new_level->m_shards[0]; + shards[0] = (*base_level->m_shards)[0]; + shards[1] = (*new_level->m_shards)[0]; - res->m_shards[0] = new S(shards, 2); + (*res->m_shards)[0] = new S(shards, 2); return res; } void append_buffer(Buffer* buffer) { - assert(m_shard_cnt < m_shards.size()); - m_shards[m_shard_cnt] = new S(buffer); + assert(m_shard_cnt < m_shards->size()); + (*m_shards)[m_shard_cnt] = new S(buffer); ++m_shard_cnt; } void append_merged_shards(InternalLevel* level) { - assert(m_shard_cnt < m_shards.size()); - m_shards[m_shard_cnt] = new S(level->m_shards, level->m_shard_cnt); + assert(m_shard_cnt < m_shards->size()); + (*m_shards)[m_shard_cnt] = new S(level->m_shards->data(), level->m_shard_cnt); ++m_shard_cnt; } @@ -71,7 +71,7 @@ public: Shard *shards[m_shard_cnt]; for (size_t i=0; i<m_shard_cnt; i++) { - shards[i] = m_shards[i]; + shards[i] = (*m_shards)[i]; } return new S(shards, m_shard_cnt); @@ -80,9 +80,9 @@ public: // Append the sample range in-order..... void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) { for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - auto shard_state = Q::get_query_state(m_shards[i], query_parms); - shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]}); + if ((*m_shards)[i]) { + auto shard_state = Q::get_query_state((*m_shards)[i], query_parms); + shards.push_back({{m_level_no, (ssize_t) i}, (*m_shards)[i]}); shard_states.emplace_back(shard_state); } } @@ -92,8 +92,8 @@ public: if (m_shard_cnt == 0) return false; for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec, true); + if ((*m_shards)[i]) { + auto res = (*m_shards)[i]->point_lookup(rec, true); if (res && res->is_tombstone()) { return true; } @@ -105,9 +105,9 @@ public: bool delete_record(const R &rec) { if (m_shard_cnt == 0) return false; - for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec); + for (size_t i = 0; i < (*m_shards)->size(); ++i) { + if ((*m_shards)[i]) { + auto res = (*m_shards)[i]->point_lookup(rec); if (res) { res->set_delete(); } @@ -118,7 +118,7 @@ public: } Shard* get_shard(size_t idx) { - return m_shards[idx]; + return (*m_shards)[idx]; } size_t get_shard_count() { @@ -128,7 +128,7 @@ public: size_t get_record_cnt() { size_t cnt = 0; for (size_t i=0; i<m_shard_cnt; i++) { - cnt += m_shards[i]->get_record_count(); + cnt += (*m_shards)[i]->get_record_count(); } return cnt; @@ -137,7 +137,7 @@ public: size_t get_tombstone_count() { size_t res = 0; for (size_t i = 0; i < m_shard_cnt; ++i) { - res += m_shards[i]->get_tombstone_count(); + res += (*m_shards)[i]->get_tombstone_count(); } return res; } @@ -145,7 +145,7 @@ public: size_t get_aux_memory_usage() { size_t cnt = 0; for (size_t i=0; i<m_shard_cnt; i++) { - cnt += m_shards[i]->get_aux_memory_usage(); + cnt += (*m_shards)[i]->get_aux_memory_usage(); } return cnt; @@ -154,8 +154,8 @@ public: size_t get_memory_usage() { size_t cnt = 0; for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - cnt += m_shards[i]->get_memory_usage(); + if ((*m_shards)[i]) { + cnt += (*m_shards)[i]->get_memory_usage(); } } @@ -166,9 +166,9 @@ public: size_t tscnt = 0; size_t reccnt = 0; for (size_t i=0; i<m_shard_cnt; i++) { - if (m_shards[i]) { - tscnt += m_shards[i]->get_tombstone_count(); - reccnt += m_shards[i]->get_record_count(); + if ((*m_shards)[i]) { + tscnt += (*m_shards)[i]->get_tombstone_count(); + reccnt += (*m_shards[i])->get_record_count(); } } diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index 3643a89..c154001 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -28,14 +28,14 @@ namespace de { template <RecordInterface R> class MutableBuffer { - typedef WrappedRecord<R> WRec; + //typedef WrappedRecord<R> R; public: MutableBuffer(size_t capacity, bool rej_sampling, size_t max_tombstone_cap) : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0) , m_tombstonecnt(0), m_weight(0), m_max_weight(0) { - auto len = capacity * sizeof(WRec); + auto len = capacity * sizeof(R); size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE)); - m_data = (WRec*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); + m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize); m_tombstone_filter = nullptr; if (max_tombstone_cap > 0) { m_tombstone_filter = new BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS); @@ -54,12 +54,10 @@ public: int32_t pos = 0; if ((pos = try_advance_tail()) == -1) return 0; - WRec wrec; - wrec.rec = rec; + R new_rec = rec; + if (tombstone) new_rec.set_tombstone(); - if (tombstone) wrec.set_tombstone(); - - m_data[pos] = wrec; + m_data[pos] = new_rec; m_data[pos].header |= (pos << 2); if (tombstone) { @@ -68,7 +66,7 @@ public: } if constexpr (WeightedRecordInterface<R_>) { - m_weight.fetch_add(rec.weight); + m_weight.fetch_add(new_rec.weight); double old = m_max_weight.load(); while (old < rec.weight) { m_max_weight.compare_exchange_strong(old, rec.weight); @@ -125,7 +123,7 @@ public: auto offset = 0; while (offset < m_reccnt.load()) { - if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) { + if (m_data[offset] == rec && m_data[offset].is_tombstone()) { return true; } offset++;; @@ -149,6 +147,10 @@ public: return m_weight.load(); } + R *get_data() { + return m_data; + } + private: int32_t try_advance_tail() { size_t new_tail = m_reccnt.fetch_add(1); diff --git a/include/framework/QueryInterface.h b/include/framework/QueryInterface.h index eafeeb0..886bdc8 100644 --- a/include/framework/QueryInterface.h +++ b/include/framework/QueryInterface.h @@ -13,10 +13,15 @@ template <typename Q> concept QueryInterface = requires(Q q, void *p) { + +/* {q.get_query_state(p, p)} -> std::convertible_to<void*>; {q.get_buffer_query_state(p, p)}; {q.query(p, p)}; {q.buffer_query(p, p)}; {q.merge()}; {q.delete_query_state(p)}; +*/ + + {Q::delete_query_state(std::declval<void*>())} -> std::same_as<void>; }; diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h index 8afd90a..c8e622f 100644 --- a/include/framework/RecordInterface.h +++ b/include/framework/RecordInterface.h @@ -26,8 +26,8 @@ concept RecordInterface = requires(R r, R s) { }; template<RecordInterface R> -struct WrappedRecord { - R rec; +struct WrappedRecord : R { + //R rec; uint32_t header; inline void set_delete() { |