summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-03-03 13:41:19 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-03-03 13:41:19 -0500
commit2ded45f5a20f38fdfd9f348c446c38dc713a5591 (patch)
tree746fb09b49ee4c00fc3e4760d899d60d8d8dcce0 /include
parentd116b94389538aa8e0e7354fae77693b980de4f0 (diff)
downloaddynamic-extension-2ded45f5a20f38fdfd9f348c446c38dc713a5591.tar.gz
Fixed a few concurrency bugs
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h204
-rw-r--r--include/framework/reconstruction/BackgroundTieringPolicy.h2
-rw-r--r--include/framework/scheduling/FIFOScheduler.h21
-rw-r--r--include/framework/scheduling/LockManager.h35
-rw-r--r--include/framework/scheduling/Task.h12
-rw-r--r--include/framework/scheduling/Version.h44
-rw-r--r--include/framework/structure/BufferView.h17
-rw-r--r--include/framework/structure/MutableBuffer.h160
-rw-r--r--include/framework/util/Configuration.h4
9 files changed, 303 insertions, 196 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 8cef4a1..762029e 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -1,7 +1,7 @@
/*
* include/framework/DynamicExtension.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
@@ -161,7 +161,6 @@ public:
static_assert(std::same_as<SchedType, SerialScheduler>,
"Tagging is only supported in single-threaded operation");
- auto view = m_buffer->get_buffer_view();
auto version = get_active_version();
if (version->get_mutable_structure()->tagged_delete(rec)) {
return 1;
@@ -172,7 +171,7 @@ public:
* probably has the lowest probability of having the record,
* so we'll check it last.
*/
- return view.delete_record(rec);
+ return version->get_buffer().delete_record(rec);
}
/*
@@ -256,9 +255,8 @@ public:
* ShardType::get_memory_usage) and the buffer by the index.
*/
size_t get_memory_usage() {
- auto version = get_active_version();
auto t = m_buffer->get_memory_usage() +
- version->get_structure()->get_memory_usage();
+ get_active_version()->get_structure()->get_memory_usage();
return t;
}
@@ -288,8 +286,7 @@ public:
* the index. Ownership of this object is transfered to the
* caller.
*/
- // FIXME: switch this over to std::unique_ptr
- ShardType *
+ std::unique_ptr<ShardType>
create_static_structure(bool await_reconstruction_completion = false) {
if (await_reconstruction_completion) {
await_version();
@@ -322,7 +319,7 @@ public:
}
}
- ShardType *flattened = new ShardType(shards);
+ auto flattened = std::make_unique<ShardType>(shards);
for (auto shard : shards) {
delete shard;
@@ -348,7 +345,7 @@ public:
* of the counter
*/
if (vid == INVALID_VERSION) {
- vid = m_version_counter.load() - 1;
+ vid = m_version_counter.load();
}
/* versions signal on m_version_advance_cv when they activate */
@@ -384,7 +381,11 @@ public:
* stdout. Each level is on its own line, and each shard is represented.
*/
void print_structure() {
- get_active_version()->get_structure()->print_structure();
+ auto ver = get_active_version();
+ auto bv = ver->get_buffer();
+
+ fprintf(stdout, "[B]:\t(%ld)\n", bv.get_record_count());
+ ver->get_structure()->print_structure();
}
private:
@@ -397,15 +398,15 @@ private:
std::atomic<int> m_next_core;
/* versioning + concurrency variables */
- std::atomic<size_t> m_version_counter;
- std::atomic<std::shared_ptr<VersionType>> m_active_version;
+ alignas(64) std::atomic<size_t> m_version_counter;
+ alignas(64) std::atomic<std::shared_ptr<VersionType>> m_active_version;
- std::condition_variable m_version_advance_cv;
- std::mutex m_version_advance_mtx;
+ alignas(64) std::condition_variable m_version_advance_cv;
+ alignas(64) std::mutex m_version_advance_mtx;
- LockManager m_lock_mngr;
+ alignas(64) LockManager m_lock_mngr;
- std::atomic<size_t> m_preempt_version;
+ alignas(64) std::atomic<size_t> m_preempt_version;
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
@@ -460,7 +461,9 @@ private:
auto extension = (DynamicExtension *)args->extension;
extension->SetThreadAffinity();
-
+ // static std::atomic<size_t> cnt = 0;
+ // size_t recon_id = cnt.fetch_add(1);
+
size_t new_head = 0;
std::vector<reconstruction_results<ShardType>> reconstructions;
@@ -476,21 +479,24 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
+ // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
+ // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n",
+ // args->version->get_id(), recon_id);
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
assert(buffview.get_tail() != buffview.get_head());
new_head = buffview.get_tail();
+ // fprintf(stderr, "\t[I] Current Buffer Head:\t%ld (%ld)\n",
+ // buffview.get_head(), recon_id);
+
reconstruction_results<ShardType> flush_recon;
flush_recon.target_level = 0;
flush_recon.new_shard = std::make_shared<ShardType>(std::move(buffview));
reconstructions.push_back(flush_recon);
- // fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head);
-
/* advance the buffer head for a flush */
bool success = false;
size_t failure_cnt = 0;
@@ -499,24 +505,39 @@ private:
if (!success) {
failure_cnt++;
usleep(1);
- // fprintf(stderr, "[I] Buffer head advance blocked on %ld\n", args->version->get_id());
+ // fprintf(stderr, "\t[I] Buffer head advance blocked on %ld (%ld)\n",
+ // args->version->get_id(), recon_id);
- if (failure_cnt >= extension->m_config.buffer_flush_query_preemption_trigger) {
+ if (failure_cnt >=
+ extension->m_config.buffer_flush_query_preemption_trigger) {
extension->preempt_queries();
+
+ if (failure_cnt > 500000) {
+ // fprintf(stderr,
+ // "[C] Critical failure. Hung on version: %ld (%ld)\n",
+ // extension->m_buffer->debug_get_old_head(), recon_id);
+ }
}
}
}
+ // fprintf(stderr, "\t[I] Buffer head advanced to:\t%ld (%ld)\n",
+ // new_head, recon_id);
} else {
- // fprintf(stderr, "[I] Running background reconstruction\n");
+ // fprintf(stderr, "[I] Running background reconstruction (%ld)\n",
+ // recon_id);
}
/* perform all of the reconstructions */
auto structure = args->version->get_structure();
assert(structure);
-
+
+ // fprintf(stderr, "\t[I] Pre-reconstruction L0 Size\t%ld (%ld)\n",
+ // structure->get_level_vector()[0]->get_shard_count(), recon_id);
+
for (size_t i = 0; i < args->tasks.size(); i++) {
- reconstructions.emplace_back(structure->perform_reconstruction(args->tasks[i]));
+ reconstructions.emplace_back(
+ structure->perform_reconstruction(args->tasks[i]));
}
/*
@@ -524,51 +545,90 @@ private:
* reconstruction isn't a flush), generate one.
*/
if (args->version->get_id() == INVALID_VERSION) {
- args->version->set_id(extension->m_version_counter.fetch_add(1));
- // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
+ assert(args->priority == ReconstructionPriority::MAINT);
+ args->version->set_id(extension->m_version_counter.fetch_add(1) + 1);
+ // fprintf(stderr, "\t[I] Reconstruction version assigned %ld (%ld)\n",
+ // args->version->get_id(), recon_id);
+ } else {
+ assert(args->priority == ReconstructionPriority::FLUSH);
}
-
+
/* wait for our opportunity to install the updates */
extension->await_version(args->version->get_id() - 1);
- /* get a fresh copy of the structure and apply our updates */
- args->version->set_structure(std::move(std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy())));
+ // size_t old_reccnt = args->version->get_structure()->get_record_count();
+
+ /*
+ * this version *should* have an ID one less than the version we are
+ * currently constructing, and should be fully stable (i.e., only the
+ * buffer tail can change)
+ */
+ auto active_version = extension->get_active_version();
+ assert(active_version->get_id() == args->version->get_id() - 1);
+
+ /* get a fresh copy of the structure from the current version */
+ args->version->set_structure(std::move(std::unique_ptr<StructureType>(
+ active_version->get_structure()->copy())));
+
+ // size_t cur_reccnt = args->version->get_structure()->get_record_count();
+ /* apply our updates to the copied structure (adding/removing shards) */
for (auto recon : reconstructions) {
- auto grow = args->version->get_mutable_structure()->append_shard(recon.new_shard, args->version->get_id(), recon.target_level);
+ auto grow = args->version->get_mutable_structure()->append_shard(
+ recon.new_shard, args->version->get_id(), recon.target_level);
args->version->get_mutable_structure()->delete_shards(recon.source_shards);
if (grow) {
extension->m_lock_mngr.add_lock();
}
}
- /* grab the latest buffer head */
+ // size_t new_reccnt = args->version->get_structure()->get_record_count();
+
+ // fprintf(stderr, "\t[I] Post-reconstruction L0 Size\t%ld (%ld)\n",
+ // args->version->get_structure()->get_level_vector()[0]->get_shard_count(),
+ // recon_id);
+
+ /* for maintenance reconstructions, advance the buffer head to match the
+ * currently active version */
if (args->priority == ReconstructionPriority::MAINT) {
- args->version->set_head(extension->get_active_version()->get_head());
+ args->version->set_buffer(extension->m_buffer.get(),
+ active_version->get_head());
+ // fprintf(stderr, "\t[I] Buffer head set to %ld (%ld)\n",
+ // active_version->get_head(), recon_id);
+ // if (new_reccnt != cur_reccnt) {
+ // fprintf(stderr, "ERROR: invalid reccnt (%ld)\n", recon_id);
+ // }
}
+ // fprintf(stderr, "\t[I] Record Counts: %ld %ld %ld (%ld)\n", old_reccnt,
+ // cur_reccnt, new_reccnt, recon_id);
+
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
/* maint reconstructions can now safely release their locks */
if (args->priority == ReconstructionPriority::MAINT) {
std::set<size_t> locked_levels;
- for (size_t i=0; i<args->tasks.size(); i++) {
+ for (size_t i = 0; i < args->tasks.size(); i++) {
for (auto source : args->tasks[i].sources) {
locked_levels.insert(source.level_idx);
}
}
for (auto level : locked_levels) {
- // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id());
- extension->m_lock_mngr.release_lock(level);
+ // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, recon_id);
+ extension->m_lock_mngr.release_lock(level, args->version->get_id());
}
}
-
+
if (args->priority == ReconstructionPriority::FLUSH) {
+ // fprintf(stderr, "\t[I] releasing lock on buffer (%ld)\n", recon_id);
extension->m_lock_mngr.release_buffer_lock();
}
+ // fprintf(stderr, "[I] Reconstruction to Version %ld Finished (%ld)\n",
+ // args->version->get_id(), recon_id);
+
/* manually delete the argument object */
delete args;
}
@@ -596,6 +656,19 @@ private:
/* execute the local/buffer queries and combine the results into output */
QueryResult output;
do {
+ /*
+ * for query preemption--check if the query should be restarted
+ * to prevent blocking buffer flushes for too long
+ */
+ if (args->extension->restart_query(args, version->get_id())) {
+ /* clean up memory allocated for temporary query objects */
+ delete buffer_query;
+ for (size_t i = 0; i < local_queries.size(); i++) {
+ delete local_queries[i];
+ }
+ return;
+ }
+
std::vector<LocalResult> query_results(shards.size() + 1);
for (size_t i = 0; i < query_results.size(); i++) {
if (i == 0) { /* execute buffer query */
@@ -610,19 +683,6 @@ private:
if (query_results[i].size() > 0)
break;
}
-
- /*
- * for query preemption--check if the query should be restarted
- * to prevent blocking buffer flushes for too long
- */
- if (args->extension->restart_query(args, version->get_id())) {
- /* clean up memory allocated for temporary query objects */
- delete buffer_query;
- for (size_t i = 0; i < local_queries.size(); i++) {
- delete local_queries[i];
- }
- return;
- }
}
/*
@@ -656,7 +716,7 @@ private:
* early to minimize activation blocking.
*/
version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
- size_t version_id = m_version_counter.fetch_add(1);
+ size_t version_id = m_version_counter.fetch_add(1) + 1;
// fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
@@ -674,17 +734,16 @@ private:
*/
version_ptr create_version_maint(std::unique_ptr<StructureType> structure) {
auto active_version = get_active_version();
- version_ptr new_version =
- std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
+ version_ptr new_version = std::make_shared<VersionType>(
+ INVALID_VERSION, std::move(structure), nullptr, 0);
return new_version;
}
void install_new_version(version_ptr new_version, size_t old_active_version_id) {
- assert(new_version->get_structure());
- assert(new_version->get_id() != INVALID_VERSION);
+ assert(new_version->valid());
- // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ // fprintf(stderr, "\t[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
@@ -698,11 +757,7 @@ private:
m_active_version.store(new_version);
m_version_advance_cv.notify_all();
- // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
- }
-
- StructureType *create_scratch_structure() {
- return get_active_version()->get_structure()->copy();
+ // fprintf(stderr, "\t[I] Installed version %ld\n", new_version->get_id());
}
void begin_reconstruction_scheduling() {
@@ -727,6 +782,20 @@ private:
return;
}
+ /*
+ * Double check that we actually need to flush. I was seeing some
+ * flushes running on empty buffers--somehow it seems like a new
+ * flush was getting scheduled immediately after another one finished,
+ * when it wasn't necessary. This should prevent that from happening.
+ *
+ * A bit of a kludge, but it *should* work
+ */
+ if (!m_buffer->is_at_low_watermark()) {
+ m_lock_mngr.release_buffer_lock();
+ end_reconstruction_scheduling();
+ return;
+ }
+
auto active_version = m_active_version.load();
auto *args = new ReconstructionArgs<ShardType, QueryType>();
@@ -768,10 +837,6 @@ private:
auto active_version = m_active_version.load();
auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr);
- if (reconstructions.size() == 0) {
- // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
- }
-
for (auto &recon : reconstructions) {
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
@@ -783,7 +848,7 @@ private:
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
args->initial_version = active_version->get_id();
- m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
+ m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args,
RECONSTRUCTION);
}
@@ -806,11 +871,8 @@ private:
}
int internal_append(const RecordType &rec, bool ts) {
- if (m_buffer->is_at_low_watermark()) {
- auto old = false;
- if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) {
- schedule_flush();
- }
+ if (m_buffer->is_at_low_watermark() && !m_lock_mngr.is_buffer_locked()) {
+ schedule_flush();
}
/* this will fail if the HWM is reached and return 0 */
diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h
index 9a9ebaa..ab19e24 100644
--- a/include/framework/reconstruction/BackgroundTieringPolicy.h
+++ b/include/framework/reconstruction/BackgroundTieringPolicy.h
@@ -44,7 +44,7 @@ public:
}
for (level_index i = target_level; i > source_level; i--) {
- if (lock_mngr.take_lock(i-1)) {
+ if (lock_mngr.take_lock(i-1, version->get_id())) {
ReconstructionVector recon;
size_t total_reccnt = levels[i - 1]->get_record_count();
std::vector<ShardID> shards;
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 8a4cd8d..16fe111 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -1,7 +1,7 @@
/*
* include/framework/scheduling/FIFOScheduler.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -9,11 +9,6 @@
* determine which jobs to run next. If more jobs are scheduled than there
* are available threads, the excess will stall until a thread becomes
* available and then run in the order they were received by the scheduler.
- *
- * TODO: We need to set up a custom threadpool based on jthreads to support
- * thread preemption for a later phase of this project. That will allow us
- * to avoid blocking epoch transitions on long-running queries, or to pause
- * reconstructions on demand.
*/
#pragma once
@@ -40,7 +35,6 @@ public:
m_thrd_cnt((thread_cnt) ? thread_cnt : DEFAULT_MAX_THREADS),
m_used_memory(0), m_used_thrds(0), m_shutdown(false) {
m_sched_thrd = std::thread(&FIFOScheduler::run, this);
- m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this);
m_thrd_pool.resize(m_thrd_cnt);
}
@@ -50,7 +44,6 @@ public:
}
m_sched_thrd.join();
- m_sched_wakeup_thrd.join();
m_flush_thread.join();
}
@@ -60,13 +53,13 @@ public:
size_t ts = m_counter.fetch_add(1);
if (type == 3) {
- do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock));
+ do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock, &m_cv));
return;
}
std::unique_lock<std::mutex> lk(m_cv_lock);
m_stats.job_queued(ts, type, size);
- m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
+ m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
m_cv.notify_all();
}
@@ -95,7 +88,6 @@ private:
std::thread m_flush_thread;
std::thread m_sched_thrd;
- std::thread m_sched_wakeup_thrd;
ctpl::thread_pool m_thrd_pool;
std::atomic<size_t> m_used_memory;
@@ -105,13 +97,6 @@ private:
SchedulerStatistics m_stats;
- void periodic_wakeup() {
- do {
- std::this_thread::sleep_for(10us);
- m_cv.notify_all();
- } while (!m_shutdown.load());
- }
-
void schedule_next() {
auto lk = std::unique_lock<std::mutex>(m_queue_lock);
assert(m_task_queue.size() > 0);
diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h
index fcc79d1..275c5ae 100644
--- a/include/framework/scheduling/LockManager.h
+++ b/include/framework/scheduling/LockManager.h
@@ -5,6 +5,7 @@
#pragma once
#include <deque>
#include <atomic>
+#include <cassert>
namespace de {
class LockManager {
@@ -13,6 +14,8 @@ public:
for (size_t i=0; i < levels; i++) {
m_lks.emplace_back(false);
}
+
+ m_last_unlocked_version = 0;
}
~LockManager() = default;
@@ -21,25 +24,38 @@ public:
m_lks.emplace_back(false);
}
- void release_lock(size_t idx) {
+ void release_lock(size_t idx, size_t version) {
if (idx < m_lks.size()) {
- m_lks[idx].store(false);
+ assert(m_lks.at(idx).load() == true);
+ m_lks.at(idx).store(false);
+
+ while (m_last_unlocked_version.load() < version) {
+ auto tmp = m_last_unlocked_version.load();
+ m_last_unlocked_version.compare_exchange_strong(tmp, version);
+ }
}
}
- bool is_locked(size_t idx) {
+ bool is_locked(size_t idx, size_t version) {
if (idx < m_lks.size()) {
- return m_lks[idx].load();
+ return m_lks.at(idx).load() && m_last_unlocked_version <= version;
}
return false;
}
- bool take_lock(size_t idx) {
+ bool take_lock(size_t idx, size_t version) {
if (idx < m_lks.size()) {
- bool old = m_lks[idx].load();
+ bool old = m_lks.at(idx).load();
if (!old) {
- return m_lks[idx].compare_exchange_strong(old, true);
+ auto result = m_lks.at(idx).compare_exchange_strong(old, true);
+
+ if (m_last_unlocked_version.load() > version) {
+ m_lks.at(idx).store(false);
+ return false;
+ }
+
+ return result;
}
}
@@ -55,6 +71,10 @@ public:
return false;
}
+ bool is_buffer_locked() {
+ return m_buffer_lk.load();
+ }
+
void release_buffer_lock() {
m_buffer_lk.store(false);
}
@@ -62,5 +82,6 @@ public:
private:
std::deque<std::atomic<bool>> m_lks;
std::atomic<bool> m_buffer_lk;
+ std::atomic<size_t> m_last_unlocked_version;
};
}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 2d68f56..3dbc9f4 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -1,7 +1,7 @@
/*
* include/framework/scheduling/Task.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -16,6 +16,7 @@
#include <chrono>
#include <functional>
#include <future>
+#include <condition_variable>
#include "framework/scheduling/Version.h"
#include "framework/scheduling/statistics.h"
@@ -49,9 +50,9 @@ typedef std::function<void(void *)> Job;
struct Task {
Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
- SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr)
+ SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr)
: m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
- m_stats(stats), m_lk(lk) {}
+ m_stats(stats), m_lk(lk), m_cv(cv) {}
Job m_job;
size_t m_size;
@@ -60,6 +61,7 @@ struct Task {
size_t m_type;
SchedulerStatistics *m_stats;
std::mutex *m_lk;
+ std::condition_variable *m_cv;
friend bool operator<(const Task &self, const Task &other) {
return self.m_timestamp < other.m_timestamp;
@@ -92,6 +94,10 @@ struct Task {
if (m_lk) {
m_lk->unlock();
}
+
+ if (m_cv) {
+ m_cv->notify_all();
+ }
}
};
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index 4cd73ba..be54c84 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -25,16 +25,24 @@ private:
typedef BufferView<RecordType> BufferViewType;
public:
+
Version(size_t vid = 0)
- : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0),
- m_pending_buffer_head(-1) {}
+ : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {}
Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff,
size_t head)
: m_buffer(buff), m_structure(std::move(structure)), m_id(number),
- m_buffer_head(head), m_pending_buffer_head(-1) {}
-
- ~Version() = default;
+ m_buffer_head(head) {
+ if (m_buffer) {
+ m_buffer->take_head_reference(m_buffer_head);
+ }
+ }
+
+ ~Version() {
+ if (m_buffer) {
+ m_buffer->release_head_reference(m_buffer_head);
+ }
+ }
/*
* Versions are *not* copyable or movable. Only one can exist, and all users
@@ -72,6 +80,9 @@ public:
auto version = new Version(number);
version->m_buffer = m_buffer;
version->m_buffer_head = m_buffer_head;
+ if (version->m_buffer) {
+ version->m_buffer->take_head_reference(m_buffer_head);
+ }
if (m_structure) {
version->m_structure = std::unique_ptr(m_structure->copy());
@@ -81,8 +92,15 @@ public:
}
bool advance_buffer_head(size_t new_head) {
- m_buffer_head = new_head;
- return m_buffer->advance_head(new_head);
+ m_buffer->release_head_reference(m_buffer_head);
+ if (m_buffer->advance_head(new_head)) {
+ m_buffer_head = new_head;
+ return true;
+ }
+
+ /* if we failed to advance, reclaim our reference */
+ m_buffer->take_head_reference(m_buffer_head);
+ return false;
}
void update_shard_version(size_t version) {
@@ -94,9 +112,16 @@ public:
}
- void set_head(size_t head) {
- // fprintf(stderr, "[I] Updating buffer head of %ld to %ld\n", get_id(), head);
+ void set_buffer(BufferType *buffer, size_t head) {
+ assert(m_buffer == nullptr);
+
+ m_buffer = buffer;
m_buffer_head = head;
+ m_buffer->take_head_reference(head);
+ }
+
+ bool valid() {
+ return (m_buffer) && (m_buffer_head) && (m_structure) && (m_id);
}
private:
@@ -105,6 +130,5 @@ private:
size_t m_id;
size_t m_buffer_head;
- ssize_t m_pending_buffer_head;
};
} // namespace de
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index acf1201..a9fb12d 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -1,7 +1,7 @@
/*
* include/framework/structure/BufferView.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -20,8 +20,6 @@
namespace de {
-typedef std::function<void(void)> ReleaseFunction;
-
template <RecordInterface R> class BufferView {
public:
BufferView() = default;
@@ -35,7 +33,6 @@ public:
BufferView(BufferView &&other)
: m_data(std::exchange(other.m_data, nullptr)),
- m_release(std::move(other.m_release)),
m_head(std::exchange(other.m_head, 0)),
m_tail(std::exchange(other.m_tail, 0)),
m_start(std::exchange(other.m_start, 0)),
@@ -48,18 +45,13 @@ public:
BufferView &operator=(BufferView &&other) = delete;
BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail,
- size_t tombstone_cnt, psudb::BloomFilter<R> *filter,
- ReleaseFunction release)
- : m_data(buffer), m_release(release), m_head(head), m_tail(tail),
+ size_t tombstone_cnt, psudb::BloomFilter<R> *filter)
+ : m_data(buffer), m_head(head), m_tail(tail),
m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap),
m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter),
m_active(true) {}
- ~BufferView() {
- if (m_active) {
- m_release();
- }
- }
+ ~BufferView() = default;
bool check_tombstone(const R &rec) {
if (m_tombstone_filter && !m_tombstone_filter->lookup(rec))
@@ -138,7 +130,6 @@ public:
private:
Wrapped<R> *m_data;
- ReleaseFunction m_release;
size_t m_head;
size_t m_tail;
size_t m_start;
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index e62a495..7357915 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -1,7 +1,7 @@
/*
* include/framework/structure/MutableBuffer.h
*
- * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023-2025 Douglas B. Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
@@ -109,11 +109,11 @@ public:
size_t get_tombstone_count() const { return m_tscnt.load(); }
bool delete_record(const R &rec) {
- return get_buffer_view().delete_record(rec);
+ return get_buffer_view(m_head.load().head_idx).delete_record(rec);
}
bool check_tombstone(const R &rec) {
- return get_buffer_view().check_tombstone(rec);
+ return get_buffer_view(m_head.load().head_idx).check_tombstone(rec);
}
size_t get_memory_usage() const { return m_cap * sizeof(Wrapped<R>); }
@@ -122,20 +122,9 @@ public:
return m_tombstone_filter->get_memory_usage();
}
- BufferView<R> get_buffer_view(size_t target_head) {
- size_t head = get_head(target_head);
- auto f = std::bind(release_head_reference, (void *)this, head);
-
+ BufferView<R> get_buffer_view(size_t head) {
return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
- m_tombstone_filter, f);
- }
-
- BufferView<R> get_buffer_view() {
- size_t head = get_head(m_head.load().head_idx);
- auto f = std::bind(release_head_reference, (void *)this, head);
-
- return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(),
- m_tombstone_filter, f);
+ m_tombstone_filter);
}
/*
@@ -159,7 +148,7 @@ public:
// fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt);
- buffer_head new_hd = {new_head, 0};
+ buffer_head new_hd = {new_head, 1};
buffer_head cur_hd;
/* replace current head with new head */
@@ -174,32 +163,6 @@ public:
return true;
}
- /*
- * FIXME: If target_head does not match *either* the old_head or the
- * current_head, this routine will loop infinitely.
- */
- size_t get_head(size_t target_head) {
- buffer_head cur_hd, new_hd;
- bool head_acquired = false;
-
-
- //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx);
- do {
- if (m_old_head.load().head_idx == target_head) {
- cur_hd = m_old_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
- } else if (m_head.load().head_idx == target_head) {
- cur_hd = m_head.load();
- cur_hd.head_idx = target_head;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
- head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
- }
- } while (!head_acquired);
-
- return new_hd.head_idx;
- }
void set_low_watermark(size_t lwm) {
assert(lwm < m_hwm);
@@ -234,8 +197,90 @@ public:
return m_cap - (m_tail.load() - m_old_head.load().head_idx);
}
+ size_t debug_get_old_head() const {
+ return m_old_head.load().head_idx;
+ }
+
+ size_t debug_get_head() const {
+ return m_head.load().head_idx;
+ }
+
+ bool take_head_reference(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head) {
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while (!head_acquired);
+
+ return head_acquired;
+ }
+
+ bool release_head_reference(size_t head) {
+ buffer_head cur_hd, new_hd;
+ bool head_released = false;
+ do {
+ if (m_old_head.load().head_idx == head) {
+ cur_hd = m_old_head;
+
+ assert(cur_hd.refcnt > 0);
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+ head_released = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else {
+ cur_hd = m_head;
+
+ /* it's possible the head got pushed from current to old */
+ if (cur_hd.head_idx == head) {
+ assert(cur_hd.refcnt > 0);
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
+ head_released = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ }
+ _mm_pause();
+ } while (!head_released);
+
+ return head_released;
+ }
+
private:
- int64_t try_advance_tail() {
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+
+ //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx);
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head) {
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while (!head_acquired);
+
+ return new_hd.head_idx;
+ }
+
+ ssize_t try_advance_tail() {
size_t old_value = m_tail.load();
/* if full, fail to advance the tail */
@@ -257,33 +302,6 @@ private:
size_t to_idx(size_t i, size_t head) { return (head + i) % m_cap; }
- static void release_head_reference(void *buff, size_t head) {
- MutableBuffer<R> *buffer = (MutableBuffer<R> *)buff;
-
- buffer_head cur_hd, new_hd;
- do {
- if (buffer->m_old_head.load().head_idx == head) {
- cur_hd = buffer->m_old_head;
- if (cur_hd.refcnt == 0)
- continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
- if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- } else {
- cur_hd = buffer->m_head;
- if (cur_hd.refcnt == 0)
- continue;
- new_hd = {cur_hd.head_idx, cur_hd.refcnt - 1};
-
- if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
- break;
- }
- }
- _mm_pause();
- } while (true);
- }
-
size_t m_lwm;
size_t m_hwm;
size_t m_cap;
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index fecb2bf..2b8a7fc 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -27,7 +27,7 @@ class DEConfiguration {
/* buffer parameters */
size_t buffer_count = 1;
size_t buffer_size = 8000;
- size_t buffer_flush_trigger = buffer_size / 4;
+ size_t buffer_flush_trigger = buffer_size / 2;
/* reconstruction triggers */
bool recon_enable_seek_trigger = false;
@@ -44,7 +44,7 @@ class DEConfiguration {
size_t minimum_query_threads = 4;
size_t maximum_memory_usage = 0; /* o for unbounded */
- size_t buffer_flush_query_preemption_trigger = 10;
+ size_t buffer_flush_query_preemption_trigger = UINT64_MAX;
};