summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt28
-rw-r--r--include/framework/DynamicExtension.h252
-rw-r--r--include/framework/QueryRequirements.h2
-rw-r--r--include/framework/interface/Query.h2
-rw-r--r--include/framework/interface/Scheduler.h4
-rw-r--r--include/framework/scheduling/Epoch.h80
-rw-r--r--include/framework/structure/BufferView.h4
-rw-r--r--include/framework/structure/ExtensionStructure.h41
-rw-r--r--include/query/rangequery.h34
9 files changed, 158 insertions, 289 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1164cee..3b2a1ad 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -44,25 +44,29 @@ if (tests)
target_link_libraries(mutable_buffer_tests PUBLIC gsl check subunit pthread)
target_include_directories(mutable_buffer_tests PRIVATE include external/psudb-common/cpp/include)
+ add_executable(rangequery_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/rangequery_tests.cpp)
+ target_link_libraries(rangequery_tests PUBLIC gsl check subunit pthread)
+ target_include_directories(rangequery_tests PRIVATE include external/psudb-common/cpp/include)
+
#add_executable(vptree_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/vptree_tests.cpp)
#target_link_libraries(vptree_tests PUBLIC gsl check subunit pthread)
#target_include_directories(vptree_tests PRIVATE include external/vptree external/psudb-common/cpp/include)
- #add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp)
- #target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread)
- #target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external)
+ add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp)
+ target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread)
+ target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external)
- #add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp)
- #target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread)
- #target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
+ add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp)
+ target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread)
+ target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
- #add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp)
- #target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread)
- #target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external)
+ add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp)
+ target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread)
+ target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external)
- #add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp)
- #target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread)
- #target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
+ add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp)
+ target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread)
+ target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp)
target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread)
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index c5c4a1a..c97b390 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -10,29 +10,23 @@
#pragma once
#include <atomic>
-#include <numeric>
#include <cstdio>
#include <vector>
#include <set>
#include <shared_mutex>
#include <mutex>
+#include "framework/interface/Scheduler.h"
+#include "framework/scheduling/FIFOScheduler.h"
+#include "framework/scheduling/SerialScheduler.h"
+
#include "framework/structure/MutableBuffer.h"
-#include "framework/structure/InternalLevel.h"
-#include "framework/interface/Shard.h"
-#include "framework/interface/Query.h"
#include "framework/interface/Record.h"
-#include "framework/interface/Query.h"
-#include "framework/interface/Scheduler.h"
#include "framework/structure/ExtensionStructure.h"
#include "framework/util/Configuration.h"
-#include "framework/scheduling/FIFOScheduler.h"
-#include "framework/scheduling/SerialScheduler.h"
#include "framework/scheduling/Epoch.h"
-#include "psu-util/timer.h"
-#include "psu-ds/Alias.h"
namespace de {
@@ -43,22 +37,19 @@ class DynamicExtension {
typedef MutableBuffer<R> Buffer;
typedef ExtensionStructure<R, S, Q, L> Structure;
typedef Epoch<R, S, Q, L> _Epoch;
- typedef BufferView<R, Q> BufView;
+ typedef BufferView<R> BufView;
public:
- DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0,
+ DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0,
size_t thread_cnt=16)
: m_scale_factor(scale_factor)
- , m_max_delete_prop(max_delete_prop)
+ , m_max_delete_prop(1)
, m_sched(memory_budget, thread_cnt)
- , m_buffer_capacity(buffer_cap)
- , m_buffer_delete_capacity(max_delete_prop*buffer_cap)
+ , m_buffer(new Buffer(buffer_lwm, buffer_hwm))
{
- auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
- auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop);
- auto epoch = new _Epoch(0, vers, buf);
+ auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
+ auto epoch = new _Epoch(0, vers, m_buffer);
- m_buffers.insert(buf);
m_versions.insert(vers);
m_epochs.insert({0, epoch});
}
@@ -79,9 +70,7 @@ public:
delete e.second;
}
- for (auto e : m_buffers) {
- delete e;
- }
+ delete m_buffer;
for (auto e : m_versions) {
delete e;
@@ -95,10 +84,15 @@ public:
int erase(const R &rec) {
// FIXME: delete tagging will require a lot of extra work to get
// operating "correctly" in a concurrent environment.
+
+ /*
+ * Get a view on the buffer *first*. This will ensure a stronger
+ * ordering than simply accessing the buffer directly, but is
+ * not *strictly* necessary.
+ */
+ auto view = m_buffer->get_buffer_view();
if constexpr (D == DeletePolicy::TAGGING) {
static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation");
- BufView buffers = get_active_epoch()->get_buffer_view();
-
if (get_active_epoch()->get_structure()->tagged_delete(rec)) {
return 1;
}
@@ -108,7 +102,7 @@ public:
* probably has the lowest probability of having the record,
* so we'll check it last.
*/
- return buffers.delete_record(rec);
+ return view.delete_record(rec);
}
/*
@@ -123,7 +117,7 @@ public:
size_t get_record_count() {
auto epoch = get_active_epoch_protected();
- auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count();
+ auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count();
epoch->end_job();
return t;
@@ -131,7 +125,7 @@ public:
size_t get_tombstone_count() {
auto epoch = get_active_epoch_protected();
- auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count();
+ auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count();
epoch->end_job();
return t;
@@ -147,7 +141,7 @@ public:
size_t get_memory_usage() {
auto epoch = get_active_epoch_protected();
- auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage();
+ auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage();
epoch->end_job();
return t;
@@ -155,14 +149,14 @@ public:
size_t get_aux_memory_usage() {
auto epoch = get_active_epoch_protected();
- auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage();
+ auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage();
epoch->end_job();
return t;
}
size_t get_buffer_capacity() {
- return m_buffer_capacity;
+ return m_buffer->get_capacity();
}
Shard *create_static_structure(bool await_reconstruction_completion=false) {
@@ -171,11 +165,20 @@ public:
}
auto epoch = get_active_epoch_protected();
- auto bv = epoch->get_buffer_view();
-
auto vers = epoch->get_structure();
std::vector<Shard *> shards;
+ /*
+ * construct a shard from the buffer view. We'll hold the view
+ * for as short a time as possible: once the records are exfiltrated
+ * from the buffer, there's no reason to retain a hold on the view's
+ * head pointer any longer
+ */
+ {
+ auto bv = epoch->get_buffer();
+ shards.emplace_back(new S(std::move(bv)));
+ }
+
if (vers->get_levels().size() > 0) {
for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
if (vers->get_levels()[i]) {
@@ -184,12 +187,6 @@ public:
}
}
- // FIXME: With an interface adjustment, this could be done in
- // one call, rather than a loop.
- for (ssize_t i=bv.size() - 1; i>=0; i--) {
- shards.emplace_back(new S(bv.get_buffers()[i]));
- }
-
Shard *shards_array[shards.size()];
size_t j = 0;
@@ -237,10 +234,13 @@ public:
private:
SCHED m_sched;
+ Buffer *m_buffer;
+
std::mutex m_struct_lock;
- std::set<Buffer*> m_buffers;
std::set<Structure *> m_versions;
+ alignas(64) std::atomic<bool> m_reconstruction_scheduled;
+
std::atomic<size_t> m_current_epoch;
std::atomic<size_t> m_newest_epoch;
std::unordered_map<size_t, _Epoch *> m_epochs;
@@ -253,8 +253,6 @@ private:
size_t m_scale_factor;
double m_max_delete_prop;
- size_t m_buffer_capacity;
- size_t m_buffer_delete_capacity;
void enforce_delete_invariant(_Epoch *epoch) {
auto structure = epoch->get_structure();
@@ -321,6 +319,7 @@ private:
*/
enforce_delete_invariant(new_epoch);
+ #if 0
/*
* Update the new Epoch to contain the buffers from the old one
* that it doesn't currently have if using a multi-threaded
@@ -339,6 +338,7 @@ private:
new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
}
+ #endif
m_current_epoch.fetch_add(1);
old_epoch->set_inactive();
@@ -373,57 +373,6 @@ private:
return new_epoch;
}
- /*
- * Add a new empty buffer. This is intended to be used
- * when a reconstruction is triggered, to allow for inserts to be sustained in the new
- * buffer while a new epoch is being created in the background. Returns a
- * pointer to the newly created buffer.
- */
- Buffer *add_empty_buffer() {
- /*
- * if there's a current Epoch transition ongoing, a buffer installed
- * into an older Epoch, but not the new one, may be lost. So fail to
- * insert a buffer.
- */
- if (!m_epoch_transition_lk.try_lock()) {
- return nullptr;
- }
-
- /*
- * verify that the currently active buffer is still full, if
- * not, there is no reason to add a new one. This code is
- * protected by the epoch transition lock, so need need to
- * take a protected reference to the epoch.
- */
- auto active_epoch = get_active_epoch();
- if (!active_epoch->get_active_buffer()->is_full()) {
- m_epoch_transition_lk.unlock();
- return nullptr;
- }
-
- /*
- * create a new buffer and install it in the active epoch.
- */
- auto temp_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
-
- std::unique_lock<std::mutex> m_struct_lock;
- auto new_buffer = active_epoch->add_buffer(temp_buffer);
-
- /*
- * if epoch->add_buffer doesn't add the new buffer, this insert
- * won't update the buffer set (duplicate insert)
- */
- m_buffers.insert(new_buffer);
- m_struct_lock.release();
-
- if (new_buffer != temp_buffer) {
- delete temp_buffer;
- }
- m_epoch_transition_lk.unlock();
-
- return new_buffer;
- }
-
void retire_epoch(_Epoch *epoch) {
/*
* Epochs with currently active jobs cannot
@@ -452,21 +401,14 @@ private:
delete epoch;
m_epoch_retire_lk.unlock();
+ /* NOTE: the BufferView mechanism handles freeing unused buffer space */
+
/*
* Following the epoch's destruction, any buffers
* or structures with no remaining references can
* be safely freed.
*/
std::unique_lock<std::mutex> lock(m_struct_lock);
- for (auto itr = m_buffers.begin(); itr != m_buffers.end();) {
- if ((*itr)->get_reference_count() == 0) {
- auto tmp = *itr;
- itr = m_buffers.erase(itr);
- delete tmp;
- } else {
- itr++;
- }
- }
for (auto itr = m_versions.begin(); itr != m_versions.end();) {
if ((*itr)->get_reference_count() == 0) {
@@ -484,21 +426,31 @@ private:
Structure *vers = args->epoch->get_structure();
- // FIXME: with an improved shard interface, multiple full buffers
+ // FIXME: with an improved shard interface, multiple full buffer_viewers
// could be flushed at once here.
- Buffer *buff = (Buffer *) args->epoch->get_buffers()[0];
+ auto buffer_view = args->epoch->get_buffer();
+ size_t new_head = buffer_view.get_tail();
for (ssize_t i=0; i<args->merges.size(); i++) {
vers->reconstruction(args->merges[i].second, args->merges[i].first);
}
/*
- * if performing a compaction, don't push the buffer down,
- * as there is no guarantee that any necessary reconstructions
+ * if performing a compaction, don't flush the buffer, as
+ * there is no guarantee that any necessary reconstructions
* will free sufficient space in L0 to support a flush
*/
if (!args->compaction) {
- vers->flush_buffer(buff);
+ vers->flush_buffer(std::move(buffer_view));
+
+ // FIXME: this may currently fail because there isn't any
+ // query preemption yet. At this point, we'd need to either
+ // 1) wait for all queries on the old_head to finish
+ // 2) kill all queries on the old_head
+ // 3) somehow migrate all queries on the old_head to the new
+ // version
+ auto res = args->epoch->advance_buffer_head(new_head);
+ assert(res);
}
args->epoch->end_job();
@@ -519,27 +471,33 @@ private:
static void async_query(void *arguments) {
QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments;
- auto buffers = args->epoch->get_buffer_view();
+ auto buffer = args->epoch->get_buffer();
auto vers = args->epoch->get_structure();
void *parms = args->query_parms;
/* Get the buffer query states */
- std::vector<void *> buffer_states = buffers.get_query_states(parms);
+ void *buffer_state = Q::get_buffer_query_state(std::move(buffer), parms);
/* Get the shard query states */
std::vector<std::pair<ShardID, Shard*>> shards;
std::vector<void *> states = vers->get_query_states(shards, parms);
- Q::process_query_states(parms, states, buffer_states);
+ Q::process_query_states(parms, states, buffer_state);
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + buffer_states.size());
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
for (size_t i=0; i<query_results.size(); i++) {
- std::vector<Wrapped<R>> local_results = (i < buffer_states.size())
- ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms)
- : Q::query(shards[i - buffer_states.size()].second,
- states[i - buffer_states.size()], parms);
- ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first;
- query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers));
+ std::vector<Wrapped<R>> local_results;
+ ShardID shid;
+
+ if (i == 0) { /* process the buffer first */
+ local_results = Q::buffer_query(buffer_state, parms);
+ shid = INVALID_SHID;
+ } else {
+ local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
+ shid = shards[i - 1].first;
+ }
+
+ query_results[i] = std::move(filter_deletes(local_results, shid, vers));
if constexpr (Q::EARLY_ABORT) {
if (query_results[i].size() > 0) break;
@@ -551,10 +509,7 @@ private:
args->epoch->end_job();
- for (size_t i=0; i<buffer_states.size(); i++) {
- Q::delete_buffer_query_state(buffer_states[i]);
- }
-
+ Q::delete_buffer_query_state(buffer_state);
for (size_t i=0; i<states.size(); i++) {
Q::delete_query_state(states[i]);
}
@@ -575,7 +530,7 @@ private:
//
ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffers()[0]->get_record_count());
+ args->merges = epoch->get_structure()->get_reconstruction_tasks(epoch->get_buffer().get_record_count());
args->extension = this;
args->compaction = false;
/* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
@@ -597,49 +552,16 @@ private:
}
int internal_append(const R &rec, bool ts) {
- Buffer *buffer = nullptr;
- int res = 0;
- do {
- auto epoch = get_active_epoch_protected();
- buffer = epoch->get_active_buffer();
- assert(buffer);
-
- /*
- * If the buffer is full and there is no ongoing reconstruction,
- * schedule a reconstruction and add a new empty buffer. If there
- * is an ongoing reconstruction, then add a new empty buffer
- * to the current epoch.
- */
- if (buffer->is_full()) {
- if constexpr (std::same_as<SCHED, SerialScheduler>) {
- /* single threaded: run reconstruction and then empty buffer */
- epoch->end_job();
- schedule_reconstruction();
- buffer->truncate();
- continue;
- } else if (epoch->prepare_reconstruction()) {
- /*
- * add an empty buffer to allow insert proceed and
- * schedule a reconstruction on a background thread
- */
- buffer = add_empty_buffer();
- schedule_reconstruction();
- } else {
- /* background reconstruction is ongoing, so just add empty buffer */
- buffer = add_empty_buffer();
- }
- }
-
- res = (buffer) ? buffer->append(rec, ts) : 0;
- epoch->end_job();
- } while(!res);
+ if (!m_reconstruction_scheduled.load() && m_buffer->is_at_low_watermark()) {
+ m_reconstruction_scheduled.store(true);
+ schedule_reconstruction();
+ }
- /* internal append should always succeed, eventually */
- return 1;
+ /* this will fail if the HWM is reached and return 0 */
+ return m_buffer->append(rec, ts);
}
- static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid,
- BufView &buffers, Structure *vers) {
+ static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers) {
if constexpr (!Q::SKIP_DELETE_FILTER) {
return records;
}
@@ -672,9 +594,11 @@ private:
continue;
}
- if (buffers.check_tombstone(rec.rec)) {
- continue;
- }
+ // FIXME: need to figure out how best to re-enable the buffer tombstone
+ // check in the correct manner.
+ //if (buffview.check_tombstone(rec.rec)) {
+ //continue;
+ //}
if (shid != INVALID_SHID) {
for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {
diff --git a/include/framework/QueryRequirements.h b/include/framework/QueryRequirements.h
index 4d3e97b..dcba67e 100644
--- a/include/framework/QueryRequirements.h
+++ b/include/framework/QueryRequirements.h
@@ -11,7 +11,7 @@
*/
#pragma once
-#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/BufferView.h"
#include "framework/interface/Record.h"
#include "framework/interface/Shard.h"
#include "framework/interface/Query.h"
diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h
index 8b92c45..ca742c3 100644
--- a/include/framework/interface/Query.h
+++ b/include/framework/interface/Query.h
@@ -23,7 +23,7 @@ concept QueryInterface = requires(Q q, void *p, std::vector<void*> &s) {
{Q::get_query_state(p, p)} -> std::convertible_to<void*>;
{Q::get_buffer_query_state(p, p)} -> std::convertible_to<void *>;
*/
- {Q::process_query_states(p, s, s)};
+ {Q::process_query_states(p, s, p)};
/*
{Q::query(s, p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>;
{Q::buffer_query(p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>;
diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h
index a8544a7..94afe6c 100644
--- a/include/framework/interface/Scheduler.h
+++ b/include/framework/interface/Scheduler.h
@@ -8,10 +8,6 @@
*/
#pragma once
-#include <vector>
-#include <concepts>
-#include "framework/interface/Record.h"
-#include "util/types.h"
#include "framework/scheduling/Task.h"
template <typename S>
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 4e1b8a2..ca85fe2 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -8,6 +8,9 @@
*/
#pragma once
+#include <condition_variable>
+#include <mutex>
+
#include "framework/structure/MutableBuffer.h"
#include "framework/structure/ExtensionStructure.h"
#include "framework/structure/BufferView.h"
@@ -20,10 +23,10 @@ class Epoch {
private:
typedef MutableBuffer<R> Buffer;
typedef ExtensionStructure<R, S, Q, L> Structure;
- typedef BufferView<R, Q> BufView;
+ typedef BufferView<R> BufView;
public:
Epoch(size_t number=0)
- : m_buffers()
+ : m_buffer(nullptr)
, m_structure(nullptr)
, m_active_merge(false)
, m_active_jobs(0)
@@ -31,8 +34,8 @@ public:
, m_epoch_number(number)
{}
- Epoch(size_t number, Structure *structure, Buffer *buff)
- : m_buffers()
+ Epoch(size_t number, Structure *structure, Buffer *buff)
+ : m_buffer(buff)
, m_structure(structure)
, m_active_jobs(0)
, m_active_merge(false)
@@ -40,8 +43,6 @@ public:
, m_epoch_number(number)
{
structure->take_reference();
- buff->take_reference();
- m_buffers.push_back(buff);
}
~Epoch() {
@@ -54,35 +55,11 @@ public:
*/
//m_active_cv.notify_all();
- clear_buffers();
-
if (m_structure) {
m_structure->release_reference();
}
}
- Buffer *add_buffer(Buffer *buf, Buffer *cur_buf=nullptr) {
- assert(buf);
-
- std::unique_lock<std::mutex> m_buffer_lock;
- /*
- * if a current buffer is specified, only add the
- * new buffer if the active buffer is the current,
- * otherwise just return the active buffer (poor man's
- * CAS).
- */
- if (cur_buf) {
- auto active_buf = get_active_buffer();
- if (active_buf != cur_buf) {
- return active_buf;
- }
- }
-
- buf->take_reference();
- m_buffers.push_back(buf);
- return buf;
- }
-
void start_job() {
m_active_jobs.fetch_add(1);
}
@@ -109,36 +86,10 @@ public:
return m_structure;
}
- std::vector<Buffer *> &get_buffers() {
- return m_buffers;
- }
-
- BufView get_buffer_view() {
- std::unique_lock<std::mutex> m_buffer_lock;
- return BufView(m_buffers);
- }
-
- Buffer *get_active_buffer() {
- if (m_buffers.size() == 0) return nullptr;
-
- return m_buffers[m_buffers.size() - 1];
+ BufView get_buffer() {
+ return m_buffer->get_buffer_view();
}
- /*
- * Return the number of buffers in this epoch at
- * time of call, and then clear the buffer vector,
- * releasing all references in the process.
- */
- size_t clear_buffers() {
- std::unique_lock<std::mutex> m_buffer_lock;
- size_t buf_cnt = m_buffers.size();
- for (auto buf : m_buffers) {
- if (buf) buf->release_reference();
- }
-
- m_buffers.clear();
- return buf_cnt;
- }
/*
* Returns a new Epoch object that is a copy of this one. The new object will also contain
@@ -148,17 +99,14 @@ public:
Epoch *clone(size_t number) {
std::unique_lock<std::mutex> m_buffer_lock;
auto epoch = new Epoch(number);
- epoch->m_buffers = m_buffers;
+ epoch->m_buffer = m_buffer;
+
if (m_structure) {
epoch->m_structure = m_structure->copy();
/* the copy routine returns a structure with 0 references */
epoch->m_structure->take_reference();
}
- for (auto b : m_buffers) {
- b->take_reference();
- }
-
return epoch;
}
@@ -213,9 +161,13 @@ public:
return true;
}
+ bool advance_buffer_head(size_t head) {
+ return m_buffer->advance_head(head);
+ }
+
private:
Structure *m_structure;
- std::vector<Buffer *> m_buffers;
+ Buffer *m_buffer;
std::condition_variable m_active_cv;
std::mutex m_cv_lock;
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 47c7b9b..ba5e693 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -103,6 +103,10 @@ public:
memcpy(buffer, (std::byte*) (m_data + m_head), get_record_count() * sizeof(Wrapped<R>));
}
+ size_t get_tail() {
+ return m_tail;
+ }
+
private:
Wrapped<R>* m_data;
ReleaseFunction m_release;
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 60016a0..ae566cb 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -10,28 +10,22 @@
#pragma once
#include <atomic>
-#include <numeric>
#include <cstdio>
#include <vector>
-#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/BufferView.h"
#include "framework/structure/InternalLevel.h"
-#include "framework/interface/Shard.h"
-#include "framework/interface/Query.h"
-#include "framework/interface/Record.h"
#include "framework/util/Configuration.h"
-#include "framework/scheduling/Task.h"
#include "psu-util/timer.h"
-#include "psu-ds/Alias.h"
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
class ExtensionStructure {
typedef S Shard;
- typedef MutableBuffer<R> Buffer;
+ typedef BufferView<R> BuffView;
public:
ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
@@ -96,14 +90,10 @@ public:
* FIXME: arguably, this should be a method attached to the buffer that
* takes a structure as input.
*/
- inline bool flush_buffer(Buffer *buffer) {
- assert(can_reconstruct_with(0, buffer->get_record_count()));
+ inline bool flush_buffer(BuffView buffer) {
+ assert(can_reconstruct_with(0, buffer.get_record_count()));
- // FIXME: this step makes an extra copy of the buffer,
- // which could be avoided by adjusting the shard
- // reconstruction process a bit, possibly.
- buffer->start_flush();
- flush_buffer_into_l0(buffer);
+ flush_buffer_into_l0(std::move(buffer));
return true;
}
@@ -415,11 +405,11 @@ private:
* returns -1 if idx==0, and no such level exists, to simplify
* the logic of the first buffer flush.
*/
- inline level_index find_reconstruction_target(level_index idx, Buffer *buffer=nullptr) {
+ inline level_index find_reconstruction_target(level_index idx) {
if (idx == 0 && m_levels.size() == 0) return -1;
- size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
+ size_t incoming_rec_cnt = get_level_record_count(idx);
for (level_index i=idx+1; i<m_levels.size(); i++) {
if (can_reconstruct_with(i, incoming_rec_cnt)) {
return i;
@@ -431,13 +421,13 @@ private:
return -1;
}
- inline void flush_buffer_into_l0(Buffer *buffer) {
+ inline void flush_buffer_into_l0(BuffView buffer) {
assert(m_levels[0]);
if constexpr (L == LayoutPolicy::LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0].get();
auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
- temp_level->append_buffer(buffer);
+ temp_level->append_buffer(std::move(buffer));
if (old_level->get_shard_count() > 0) {
m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level);
@@ -446,7 +436,7 @@ private:
m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level);
}
} else {
- m_levels[0]->append_buffer(buffer);
+ m_levels[0]->append_buffer(std::move(buffer));
}
}
@@ -469,16 +459,9 @@ private:
}
/*
- * Returns the actual number of records present on a specified level. An
- * index value of -1 indicates the memory table. Can optionally pass in
- * a pointer to the memory table to use, if desired. Otherwise, there are
- * no guarantees about which buffer will be accessed if level_index is -1.
+ * Returns the number of records present on a specified level.
*/
- inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) {
- if (buffer) {
- return buffer->get_record_count();
- }
-
+ inline size_t get_level_record_count(level_index idx) {
return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
}
diff --git a/include/query/rangequery.h b/include/query/rangequery.h
index 16dcd86..ad5b767 100644
--- a/include/query/rangequery.h
+++ b/include/query/rangequery.h
@@ -12,7 +12,7 @@
#include "framework/interface/Record.h"
#include "framework/interface/Shard.h"
-#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/BufferView.h"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
@@ -32,7 +32,10 @@ struct State {
template <RecordInterface R>
struct BufferState {
- size_t cutoff;
+ BufferView<R> buffer;
+
+ BufferState(BufferView<R> buffer)
+ : buffer(std::move(buffer)) {}
};
template <ShardInterface S, RecordInterface R>
@@ -51,14 +54,13 @@ public:
return res;
}
- static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
- auto res = new BufferState<R>();
- res->cutoff = buffer->get_record_count();
+ static void* get_buffer_query_state(BufferView<R> buffer, void *parms) {
+ auto res = new BufferState<R>(std::move(buffer));
return res;
}
- static void process_query_states(void *query_parms, std::vector<void*> &shard_states, std::vector<void*> &buffer_states) {
+ static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_states) {
return;
}
@@ -67,17 +69,21 @@ public:
auto p = (Parms<R> *) parms;
auto s = (State<R> *) q_state;
- // if the returned index is one past the end of the
- // records for the PGM, then there are not records
- // in the index falling into the specified range.
+ /*
+ * if the returned index is one past the end of the
+ * records for the PGM, then there are not records
+ * in the index falling into the specified range.
+ */
if (s->start_idx == shard->get_record_count()) {
return records;
}
auto ptr = shard->get_record_at(s->start_idx);
- // roll the pointer forward to the first record that is
- // greater than or equal to the lower bound.
+ /*
+ * roll the pointer forward to the first record that is
+ * greater than or equal to the lower bound.
+ */
while(ptr->rec.key < p->lower_bound) {
ptr++;
}
@@ -90,13 +96,13 @@ public:
return records;
}
- static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
+ static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) {
auto p = (Parms<R> *) parms;
auto s = (BufferState<R> *) state;
std::vector<Wrapped<R>> records;
- for (size_t i=0; i<s->cutoff; i++) {
- auto rec = buffer->get_data() + i;
+ for (size_t i=0; i<s->buffer.get_record_count(); i++) {
+ auto rec = s->buffer.get(i);
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
records.emplace_back(*rec);
}