summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <dbr4@psu.edu>2024-05-14 16:31:05 -0400
committerGitHub <noreply@github.com>2024-05-14 16:31:05 -0400
commit47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc (patch)
treeee5613ce182b2c9caa228d3abeb65dc27fef2db3 /include/framework/DynamicExtension.h
parent4a834497d5f82c817d634925250158d85ca825c2 (diff)
parent8643fe194dec05b4e3f3ea31e162ac0b2b00e162 (diff)
downloaddynamic-extension-47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc.tar.gz
Merge pull request #4 from dbrumbaugh/master
Updates for VLDB revision
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h71
1 files changed, 48 insertions, 23 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 7ea5370..e2e2784 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -54,6 +54,10 @@ public:
, m_next_core(0)
, m_epoch_cnt(0)
{
+ if constexpr (L == LayoutPolicy::BSM) {
+ assert(scale_factor == 2);
+ }
+
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
m_previous_epoch.store({nullptr, 0});
@@ -201,7 +205,7 @@ public:
*/
size_t get_memory_usage() {
auto epoch = get_active_epoch();
- auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage();
+ auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage();
end_job(epoch);
return t;
@@ -214,7 +218,7 @@ public:
*/
size_t get_aux_memory_usage() {
auto epoch = get_active_epoch();
- auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage();
+ auto t = epoch->get_structure()->get_aux_memory_usage();
end_job(epoch);
return t;
@@ -487,10 +491,17 @@ private:
((DynamicExtension *) args->extension)->SetThreadAffinity();
Structure *vers = args->epoch->get_structure();
- for (ssize_t i=0; i<args->merges.size(); i++) {
- vers->reconstruction(args->merges[i].second, args->merges[i].first);
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (args->merges.size() > 0) {
+ vers->reconstruction(args->merges[0]);
+ }
+ } else {
+ for (ssize_t i=0; i<args->merges.size(); i++) {
+ vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]);
+ }
}
+
/*
* we'll grab the buffer AFTER doing the internal reconstruction, so we
* can flush as many records as possible in one go. The reconstruction
@@ -546,30 +557,34 @@ private:
std::vector<std::pair<ShardID, Shard*>> shards;
std::vector<void *> states = vers->get_query_states(shards, parms);
+ std::vector<R> results;
Q::process_query_states(parms, states, buffer_state);
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
- for (size_t i=0; i<query_results.size(); i++) {
- std::vector<Wrapped<R>> local_results;
- ShardID shid;
-
- if (i == 0) { /* process the buffer first */
- local_results = Q::buffer_query(buffer_state, parms);
- shid = INVALID_SHID;
- } else {
- local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
- shid = shards[i - 1].first;
- }
+ do {
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
+ for (size_t i=0; i<query_results.size(); i++) {
+ std::vector<Wrapped<R>> local_results;
+ ShardID shid;
+
+ if (i == 0) { /* process the buffer first */
+ local_results = Q::buffer_query(buffer_state, parms);
+ shid = INVALID_SHID;
+ } else {
+ local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
+ shid = shards[i - 1].first;
+ }
- query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer));
+ query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[i].size() > 0) break;
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[i].size() > 0) break;
+ }
}
- }
+ Q::merge(query_results, parms, results);
+
+ } while (Q::repeat(parms, results, states, buffer_state));
- auto result = Q::merge(query_results, parms);
- args->result_set.set_value(std::move(result));
+ args->result_set.set_value(std::move(results));
((DynamicExtension *) args->extension)->end_job(epoch);
@@ -624,7 +639,7 @@ private:
static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) {
if constexpr (Q::SKIP_DELETE_FILTER) {
- return records;
+ return std::move(records);
}
std::vector<Wrapped<R>> processed_records;
@@ -685,7 +700,12 @@ private:
return processed_records;
}
+#ifdef _GNU_SOURCE
void SetThreadAffinity() {
+ if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ return;
+ }
+
int core = m_next_core.fetch_add(1) % m_core_cnt;
cpu_set_t mask;
CPU_ZERO(&mask);
@@ -707,6 +727,11 @@ private:
CPU_SET(core, &mask);
::sched_setaffinity(0, sizeof(mask), &mask);
}
+#else
+ void SetThreadAffinity() {
+
+ }
+#endif
void end_job(_Epoch *epoch) {