diff options
| -rw-r--r-- | CMakeLists.txt | 28 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 252 | ||||
| -rw-r--r-- | include/framework/QueryRequirements.h | 2 | ||||
| -rw-r--r-- | include/framework/interface/Query.h | 2 | ||||
| -rw-r--r-- | include/framework/interface/Scheduler.h | 4 | ||||
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 80 | ||||
| -rw-r--r-- | include/framework/structure/BufferView.h | 4 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 41 | ||||
| -rw-r--r-- | include/query/rangequery.h | 34 |
9 files changed, 158 insertions, 289 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 1164cee..3b2a1ad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,25 +44,29 @@ if (tests) target_link_libraries(mutable_buffer_tests PUBLIC gsl check subunit pthread) target_include_directories(mutable_buffer_tests PRIVATE include external/psudb-common/cpp/include) + add_executable(rangequery_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/rangequery_tests.cpp) + target_link_libraries(rangequery_tests PUBLIC gsl check subunit pthread) + target_include_directories(rangequery_tests PRIVATE include external/psudb-common/cpp/include) + #add_executable(vptree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/vptree_tests.cpp) #target_link_libraries(vptree_tests PUBLIC gsl check subunit pthread) #target_include_directories(vptree_tests PRIVATE include external/vptree external/psudb-common/cpp/include) - #add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp) - #target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread) - #target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external) + add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp) + target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread) + target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external) - #add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp) - #target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread) - #target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) + add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp) + target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread) + target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external) - #add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp) - #target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread) - #target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external) + add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp) + target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread) + target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external) - #add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp) - #target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread) - #target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) + add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp) + target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread) + target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external) add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp) target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c5c4a1a..c97b390 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -10,29 +10,23 @@ #pragma once #include <atomic> -#include <numeric> #include <cstdio> #include <vector> #include <set> #include <shared_mutex> #include <mutex> +#include "framework/interface/Scheduler.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "framework/scheduling/SerialScheduler.h" + #include "framework/structure/MutableBuffer.h" -#include "framework/structure/InternalLevel.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" #include "framework/interface/Record.h" -#include "framework/interface/Query.h" -#include "framework/interface/Scheduler.h" #include "framework/structure/ExtensionStructure.h" #include "framework/util/Configuration.h" -#include "framework/scheduling/FIFOScheduler.h" -#include "framework/scheduling/SerialScheduler.h" #include "framework/scheduling/Epoch.h" -#include "psu-util/timer.h" -#include "psu-ds/Alias.h" namespace de { @@ -43,22 +37,19 @@ class DynamicExtension { typedef MutableBuffer<R> Buffer; typedef ExtensionStructure<R, S, Q, L> Structure; typedef Epoch<R, S, Q, L> _Epoch; - typedef BufferView<R, Q> BufView; + typedef BufferView<R> BufView; public: - DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0, + DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0, size_t thread_cnt=16) : m_scale_factor(scale_factor) - , m_max_delete_prop(max_delete_prop) + , m_max_delete_prop(1) , m_sched(memory_budget, thread_cnt) - , m_buffer_capacity(buffer_cap) - , m_buffer_delete_capacity(max_delete_prop*buffer_cap) + , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) { - auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); - auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new _Epoch(0, vers, buf); + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); + auto epoch = new _Epoch(0, vers, m_buffer); - m_buffers.insert(buf); m_versions.insert(vers); m_epochs.insert({0, epoch}); } @@ -79,9 +70,7 @@ public: delete e.second; } - for (auto e : m_buffers) { - delete e; - } + delete m_buffer; for (auto e : m_versions) { delete e; @@ -95,10 +84,15 @@ public: int erase(const R &rec) { // FIXME: delete tagging will require a lot of extra work to get // operating "correctly" in a concurrent environment. + + /* + * Get a view on the buffer *first*. This will ensure a stronger + * ordering than simply accessing the buffer directly, but is + * not *strictly* necessary. + */ + auto view = m_buffer->get_buffer_view(); if constexpr (D == DeletePolicy::TAGGING) { static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation"); - BufView buffers = get_active_epoch()->get_buffer_view(); - if (get_active_epoch()->get_structure()->tagged_delete(rec)) { return 1; } @@ -108,7 +102,7 @@ public: * probably has the lowest probability of having the record, * so we'll check it last. */ - return buffers.delete_record(rec); + return view.delete_record(rec); } /* @@ -123,7 +117,7 @@ public: size_t get_record_count() { auto epoch = get_active_epoch_protected(); - auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count(); + auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count(); epoch->end_job(); return t; @@ -131,7 +125,7 @@ public: size_t get_tombstone_count() { auto epoch = get_active_epoch_protected(); - auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); + auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); epoch->end_job(); return t; @@ -147,7 +141,7 @@ public: size_t get_memory_usage() { auto epoch = get_active_epoch_protected(); - auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage(); + auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); epoch->end_job(); return t; @@ -155,14 +149,14 @@ public: size_t get_aux_memory_usage() { auto epoch = get_active_epoch_protected(); - auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); epoch->end_job(); return t; } size_t get_buffer_capacity() { - return m_buffer_capacity; + return m_buffer->get_capacity(); } Shard *create_static_structure(bool await_reconstruction_completion=false) { @@ -171,11 +165,20 @@ public: } auto epoch = get_active_epoch_protected(); - auto bv = epoch->get_buffer_view(); - auto vers = epoch->get_structure(); std::vector<Shard *> shards; + /* + * construct a shard from the buffer view. We'll hold the view + * for as short a time as possible: once the records are exfiltrated + * from the buffer, there's no reason to retain a hold on the view's + * head pointer any longer + */ + { + auto bv = epoch->get_buffer(); + shards.emplace_back(new S(std::move(bv))); + } + if (vers->get_levels().size() > 0) { for (int i=vers->get_levels().size() - 1; i>= 0; i--) { if (vers->get_levels()[i]) { @@ -184,12 +187,6 @@ public: } } - // FIXME: With an interface adjustment, this could be done in - // one call, rather than a loop. - for (ssize_t i=bv.size() - 1; i>=0; i--) { - shards.emplace_back(new S(bv.get_buffers()[i])); - } - Shard *shards_array[shards.size()]; size_t j = 0; @@ -237,10 +234,13 @@ public: private: SCHED m_sched; + Buffer *m_buffer; + std::mutex m_struct_lock; - std::set<Buffer*> m_buffers; std::set<Structure *> m_versions; + alignas(64) std::atomic<bool> m_reconstruction_scheduled; + std::atomic<size_t> m_current_epoch; std::atomic<size_t> m_newest_epoch; std::unordered_map<size_t, _Epoch *> m_epochs; @@ -253,8 +253,6 @@ private: size_t m_scale_factor; double m_max_delete_prop; - size_t m_buffer_capacity; - size_t m_buffer_delete_capacity; void enforce_delete_invariant(_Epoch *epoch) { auto structure = epoch->get_structure(); @@ -321,6 +319,7 @@ private: */ enforce_delete_invariant(new_epoch); + #if 0 /* * Update the new Epoch to contain the buffers from the old one * that it doesn't currently have if using a multi-threaded @@ -339,6 +338,7 @@ private: new_epoch->add_buffer(old_epoch->get_buffers()[i]); } } + #endif m_current_epoch.fetch_add(1); old_epoch->set_inactive(); @@ -373,57 +373,6 @@ private: return new_epoch; } - /* - * Add a new empty buffer. This is intended to be used - * when a reconstruction is triggered, to allow for inserts to be sustained in the new - * buffer while a new epoch is being created in the background. Returns a - * pointer to the newly created buffer. - */ - Buffer *add_empty_buffer() { - /* - * if there's a current Epoch transition ongoing, a buffer installed - * into an older Epoch, but not the new one, may be lost. So fail to - * insert a buffer. - */ - if (!m_epoch_transition_lk.try_lock()) { - return nullptr; - } - - /* - * verify that the currently active buffer is still full, if - * not, there is no reason to add a new one. This code is - * protected by the epoch transition lock, so need need to - * take a protected reference to the epoch. - */ - auto active_epoch = get_active_epoch(); - if (!active_epoch->get_active_buffer()->is_full()) { - m_epoch_transition_lk.unlock(); - return nullptr; - } - - /* - * create a new buffer and install it in the active epoch. - */ - auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); - - std::unique_lock<std::mutex> m_struct_lock; - auto new_buffer = active_epoch->add_buffer(temp_buffer); - - /* - * if epoch->add_buffer doesn't add the new buffer, this insert - * won't update the buffer set (duplicate insert) - */ - m_buffers.insert(new_buffer); - m_struct_lock.release(); - - if (new_buffer != temp_buffer) { - delete temp_buffer; - } - m_epoch_transition_lk.unlock(); - - return new_buffer; - } - void retire_epoch(_Epoch *epoch) { /* * Epochs with currently active jobs cannot @@ -452,21 +401,14 @@ private: delete epoch; m_epoch_retire_lk.unlock(); + /* NOTE: the BufferView mechanism handles freeing unused buffer space */ + /* * Following the epoch's destruction, any buffers * or structures with no remaining references can * be safely freed. */ std::unique_lock<std::mutex> lock(m_struct_lock); - for (auto itr = m_buffers.begin(); itr != m_buffers.end();) { - if ((*itr)->get_reference_count() == 0) { - auto tmp = *itr; - itr = m_buffers.erase(itr); - delete tmp; - } else { - itr++; - } - } for (auto itr = m_versions.begin(); itr != m_versions.end();) { if ((*itr)->get_reference_count() == 0) { @@ -484,21 +426,31 @@ private: Structure *vers = args->epoch->get_structure(); - // FIXME: with an improved shard interface, multiple full buffers + // FIXME: with an improved shard interface, multiple full buffer_viewers // could be flushed at once here. - Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; + auto buffer_view = args->epoch->get_buffer(); + size_t new_head = buffer_view.get_tail(); for (ssize_t i=0; i<args->merges.size(); i++) { vers->reconstruction(args->merges[i].second, args->merges[i].first); } /* - * if performing a compaction, don't push the buffer down, - * as there is no guarantee that any necessary reconstructions + * if performing a compaction, don't flush the buffer, as + * there is no guarantee that any necessary reconstructions * will free sufficient space in L0 to support a flush */ if (!args->compaction) { - vers->flush_buffer(buff); + vers->flush_buffer(std::move(buffer_view)); + + // FIXME: this may currently fail because there isn't any + // query preemption yet. At this point, we'd need to either + // 1) wait for all queries on the old_head to finish + // 2) kill all queries on the old_head + // 3) somehow migrate all queries on the old_head to the new + // version + auto res = args->epoch->advance_buffer_head(new_head); + assert(res); } args->epoch->end_job(); @@ -519,27 +471,33 @@ private: static void async_query(void *arguments) { QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments; - auto buffers = args->epoch->get_buffer_view(); + auto buffer = args->epoch->get_buffer(); auto vers = args->epoch->get_structure(); void *parms = args->query_parms; /* Get the buffer query states */ - std::vector<void *> buffer_states = buffers.get_query_states(parms); + void *buffer_state = Q::get_buffer_query_state(std::move(buffer), parms); /* Get the shard query states */ std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void *> states = vers->get_query_states(shards, parms); - Q::process_query_states(parms, states, buffer_states); + Q::process_query_states(parms, states, buffer_state); - std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + buffer_states.size()); + std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); for (size_t i=0; i<query_results.size(); i++) { - std::vector<Wrapped<R>> local_results = (i < buffer_states.size()) - ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms) - : Q::query(shards[i - buffer_states.size()].second, - states[i - buffer_states.size()], parms); - ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first; - query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers)); + std::vector<Wrapped<R>> local_results; + ShardID shid; + + if (i == 0) { /* process the buffer first */ + local_results = Q::buffer_query(buffer_state, parms); + shid = INVALID_SHID; + } else { + local_results = Q::query(shards[i - 1].second, states[i - 1], parms); + shid = shards[i - 1].first; + } + + query_results[i] = std::move(filter_deletes(local_results, shid, vers)); if constexpr (Q::EARLY_ABORT) { if (query_results[i].size() > 0) break; @@ -551,10 +509,7 @@ private: args->epoch->end_job(); - for (size_t i=0; i<buffer_states.size(); i++) { - Q::delete_buffer_query_state(buffer_states[i]); - } - + Q::delete_buffer_query_state(buffer_state); for (size_t i=0; i<states.size(); i++) { Q::delete_query_state(states[i]); } @@ -575,7 +530,7 @@ private: // ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count()); + args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffer().get_record_count()); args->extension = this; args->compaction = false; /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */ @@ -597,49 +552,16 @@ private: } int internal_append(const R &rec, bool ts) { - Buffer *buffer = nullptr; - int res = 0; - do { - auto epoch = get_active_epoch_protected(); - buffer = epoch->get_active_buffer(); - assert(buffer); - - /* - * If the buffer is full and there is no ongoing reconstruction, - * schedule a reconstruction and add a new empty buffer. If there - * is an ongoing reconstruction, then add a new empty buffer - * to the current epoch. - */ - if (buffer->is_full()) { - if constexpr (std::same_as<SCHED, SerialScheduler>) { - /* single threaded: run reconstruction and then empty buffer */ - epoch->end_job(); - schedule_reconstruction(); - buffer->truncate(); - continue; - } else if (epoch->prepare_reconstruction()) { - /* - * add an empty buffer to allow insert proceed and - * schedule a reconstruction on a background thread - */ - buffer = add_empty_buffer(); - schedule_reconstruction(); - } else { - /* background reconstruction is ongoing, so just add empty buffer */ - buffer = add_empty_buffer(); - } - } - - res = (buffer) ? buffer->append(rec, ts) : 0; - epoch->end_job(); - } while(!res); + if (!m_reconstruction_scheduled.load() && m_buffer->is_at_low_watermark()) { + m_reconstruction_scheduled.store(true); + schedule_reconstruction(); + } - /* internal append should always succeed, eventually */ - return 1; + /* this will fail if the HWM is reached and return 0 */ + return m_buffer->append(rec, ts); } - static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, - BufView &buffers, Structure *vers) { + static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -672,9 +594,11 @@ private: continue; } - if (buffers.check_tombstone(rec.rec)) { - continue; - } + // FIXME: need to figure out how best to re-enable the buffer tombstone + // check in the correct manner. + //if (buffview.check_tombstone(rec.rec)) { + //continue; + //} if (shid != INVALID_SHID) { for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { diff --git a/include/framework/QueryRequirements.h b/include/framework/QueryRequirements.h index 4d3e97b..dcba67e 100644 --- a/include/framework/QueryRequirements.h +++ b/include/framework/QueryRequirements.h @@ -11,7 +11,7 @@ */ #pragma once -#include "framework/structure/MutableBuffer.h" +#include "framework/structure/BufferView.h" #include "framework/interface/Record.h" #include "framework/interface/Shard.h" #include "framework/interface/Query.h" diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 8b92c45..ca742c3 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -23,7 +23,7 @@ concept QueryInterface = requires(Q q, void *p, std::vector<void*> &s) { {Q::get_query_state(p, p)} -> std::convertible_to<void*>; {Q::get_buffer_query_state(p, p)} -> std::convertible_to<void *>; */ - {Q::process_query_states(p, s, s)}; + {Q::process_query_states(p, s, p)}; /* {Q::query(s, p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; {Q::buffer_query(p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>; diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h index a8544a7..94afe6c 100644 --- a/include/framework/interface/Scheduler.h +++ b/include/framework/interface/Scheduler.h @@ -8,10 +8,6 @@ */ #pragma once -#include <vector> -#include <concepts> -#include "framework/interface/Record.h" -#include "util/types.h" #include "framework/scheduling/Task.h" template <typename S> diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 4e1b8a2..ca85fe2 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -8,6 +8,9 @@ */ #pragma once +#include <condition_variable> +#include <mutex> + #include "framework/structure/MutableBuffer.h" #include "framework/structure/ExtensionStructure.h" #include "framework/structure/BufferView.h" @@ -20,10 +23,10 @@ class Epoch { private: typedef MutableBuffer<R> Buffer; typedef ExtensionStructure<R, S, Q, L> Structure; - typedef BufferView<R, Q> BufView; + typedef BufferView<R> BufView; public: Epoch(size_t number=0) - : m_buffers() + : m_buffer(nullptr) , m_structure(nullptr) , m_active_merge(false) , m_active_jobs(0) @@ -31,8 +34,8 @@ public: , m_epoch_number(number) {} - Epoch(size_t number, Structure *structure, Buffer *buff) - : m_buffers() + Epoch(size_t number, Structure *structure, Buffer *buff) + : m_buffer(buff) , m_structure(structure) , m_active_jobs(0) , m_active_merge(false) @@ -40,8 +43,6 @@ public: , m_epoch_number(number) { structure->take_reference(); - buff->take_reference(); - m_buffers.push_back(buff); } ~Epoch() { @@ -54,35 +55,11 @@ public: */ //m_active_cv.notify_all(); - clear_buffers(); - if (m_structure) { m_structure->release_reference(); } } - Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) { - assert(buf); - - std::unique_lock<std::mutex> m_buffer_lock; - /* - * if a current buffer is specified, only add the - * new buffer if the active buffer is the current, - * otherwise just return the active buffer (poor man's - * CAS). - */ - if (cur_buf) { - auto active_buf = get_active_buffer(); - if (active_buf != cur_buf) { - return active_buf; - } - } - - buf->take_reference(); - m_buffers.push_back(buf); - return buf; - } - void start_job() { m_active_jobs.fetch_add(1); } @@ -109,36 +86,10 @@ public: return m_structure; } - std::vector<Buffer *> &get_buffers() { - return m_buffers; - } - - BufView get_buffer_view() { - std::unique_lock<std::mutex> m_buffer_lock; - return BufView(m_buffers); - } - - Buffer *get_active_buffer() { - if (m_buffers.size() == 0) return nullptr; - - return m_buffers[m_buffers.size() - 1]; + BufView get_buffer() { + return m_buffer->get_buffer_view(); } - /* - * Return the number of buffers in this epoch at - * time of call, and then clear the buffer vector, - * releasing all references in the process. - */ - size_t clear_buffers() { - std::unique_lock<std::mutex> m_buffer_lock; - size_t buf_cnt = m_buffers.size(); - for (auto buf : m_buffers) { - if (buf) buf->release_reference(); - } - - m_buffers.clear(); - return buf_cnt; - } /* * Returns a new Epoch object that is a copy of this one. The new object will also contain @@ -148,17 +99,14 @@ public: Epoch *clone(size_t number) { std::unique_lock<std::mutex> m_buffer_lock; auto epoch = new Epoch(number); - epoch->m_buffers = m_buffers; + epoch->m_buffer = m_buffer; + if (m_structure) { epoch->m_structure = m_structure->copy(); /* the copy routine returns a structure with 0 references */ epoch->m_structure->take_reference(); } - for (auto b : m_buffers) { - b->take_reference(); - } - return epoch; } @@ -213,9 +161,13 @@ public: return true; } + bool advance_buffer_head(size_t head) { + return m_buffer->advance_head(head); + } + private: Structure *m_structure; - std::vector<Buffer *> m_buffers; + Buffer *m_buffer; std::condition_variable m_active_cv; std::mutex m_cv_lock; diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 47c7b9b..ba5e693 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -103,6 +103,10 @@ public: memcpy(buffer, (std::byte*) (m_data + m_head), get_record_count() * sizeof(Wrapped<R>)); } + size_t get_tail() { + return m_tail; + } + private: Wrapped<R>* m_data; ReleaseFunction m_release; diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 60016a0..ae566cb 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -10,28 +10,22 @@ #pragma once #include <atomic> -#include <numeric> #include <cstdio> #include <vector> -#include "framework/structure/MutableBuffer.h" +#include "framework/structure/BufferView.h" #include "framework/structure/InternalLevel.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" #include "framework/util/Configuration.h" -#include "framework/scheduling/Task.h" #include "psu-util/timer.h" -#include "psu-ds/Alias.h" namespace de { template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING> class ExtensionStructure { typedef S Shard; - typedef MutableBuffer<R> Buffer; + typedef BufferView<R> BuffView; public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) @@ -96,14 +90,10 @@ public: * FIXME: arguably, this should be a method attached to the buffer that * takes a structure as input. */ - inline bool flush_buffer(Buffer *buffer) { - assert(can_reconstruct_with(0, buffer->get_record_count())); + inline bool flush_buffer(BuffView buffer) { + assert(can_reconstruct_with(0, buffer.get_record_count())); - // FIXME: this step makes an extra copy of the buffer, - // which could be avoided by adjusting the shard - // reconstruction process a bit, possibly. - buffer->start_flush(); - flush_buffer_into_l0(buffer); + flush_buffer_into_l0(std::move(buffer)); return true; } @@ -415,11 +405,11 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx, Buffer *buffer=nullptr) { + inline level_index find_reconstruction_target(level_index idx) { if (idx == 0 && m_levels.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx, buffer); + size_t incoming_rec_cnt = get_level_record_count(idx); for (level_index i=idx+1; i<m_levels.size(); i++) { if (can_reconstruct_with(i, incoming_rec_cnt)) { return i; @@ -431,13 +421,13 @@ private: return -1; } - inline void flush_buffer_into_l0(Buffer *buffer) { + inline void flush_buffer_into_l0(BuffView buffer) { assert(m_levels[0]); if constexpr (L == LayoutPolicy::LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0].get(); auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); - temp_level->append_buffer(buffer); + temp_level->append_buffer(std::move(buffer)); if (old_level->get_shard_count() > 0) { m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level); @@ -446,7 +436,7 @@ private: m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level); } } else { - m_levels[0]->append_buffer(buffer); + m_levels[0]->append_buffer(std::move(buffer)); } } @@ -469,16 +459,9 @@ private: } /* - * Returns the actual number of records present on a specified level. An - * index value of -1 indicates the memory table. Can optionally pass in - * a pointer to the memory table to use, if desired. Otherwise, there are - * no guarantees about which buffer will be accessed if level_index is -1. + * Returns the number of records present on a specified level. */ - inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { - if (buffer) { - return buffer->get_record_count(); - } - + inline size_t get_level_record_count(level_index idx) { return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; } diff --git a/include/query/rangequery.h b/include/query/rangequery.h index 16dcd86..ad5b767 100644 --- a/include/query/rangequery.h +++ b/include/query/rangequery.h @@ -12,7 +12,7 @@ #include "framework/interface/Record.h" #include "framework/interface/Shard.h" -#include "framework/structure/MutableBuffer.h" +#include "framework/structure/BufferView.h" #include "psu-ds/PriorityQueue.h" #include "util/Cursor.h" @@ -32,7 +32,10 @@ struct State { template <RecordInterface R> struct BufferState { - size_t cutoff; + BufferView<R> buffer; + + BufferState(BufferView<R> buffer) + : buffer(std::move(buffer)) {} }; template <ShardInterface S, RecordInterface R> @@ -51,14 +54,13 @@ public: return res; } - static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) { - auto res = new BufferState<R>(); - res->cutoff = buffer->get_record_count(); + static void* get_buffer_query_state(BufferView<R> buffer, void *parms) { + auto res = new BufferState<R>(std::move(buffer)); return res; } - static void process_query_states(void *query_parms, std::vector<void*> &shard_states, std::vector<void*> &buffer_states) { + static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_states) { return; } @@ -67,17 +69,21 @@ public: auto p = (Parms<R> *) parms; auto s = (State<R> *) q_state; - // if the returned index is one past the end of the - // records for the PGM, then there are not records - // in the index falling into the specified range. + /* + * if the returned index is one past the end of the + * records for the PGM, then there are not records + * in the index falling into the specified range. + */ if (s->start_idx == shard->get_record_count()) { return records; } auto ptr = shard->get_record_at(s->start_idx); - // roll the pointer forward to the first record that is - // greater than or equal to the lower bound. + /* + * roll the pointer forward to the first record that is + * greater than or equal to the lower bound. + */ while(ptr->rec.key < p->lower_bound) { ptr++; } @@ -90,13 +96,13 @@ public: return records; } - static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) { + static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) { auto p = (Parms<R> *) parms; auto s = (BufferState<R> *) state; std::vector<Wrapped<R>> records; - for (size_t i=0; i<s->cutoff; i++) { - auto rec = buffer->get_data() + i; + for (size_t i=0; i<s->buffer.get_record_count(); i++) { + auto rec = s->buffer.get(i); if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) { records.emplace_back(*rec); } |