diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 71 |
1 files changed, 48 insertions, 23 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 7ea5370..e2e2784 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -54,6 +54,10 @@ public: , m_next_core(0) , m_epoch_cnt(0) { + if constexpr (L == LayoutPolicy::BSM) { + assert(scale_factor == 2); + } + auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0}); m_previous_epoch.store({nullptr, 0}); @@ -201,7 +205,7 @@ public: */ size_t get_memory_usage() { auto epoch = get_active_epoch(); - auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage(); + auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage(); end_job(epoch); return t; @@ -214,7 +218,7 @@ public: */ size_t get_aux_memory_usage() { auto epoch = get_active_epoch(); - auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); + auto t = epoch->get_structure()->get_aux_memory_usage(); end_job(epoch); return t; @@ -487,10 +491,17 @@ private: ((DynamicExtension *) args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); - for (ssize_t i=0; i<args->merges.size(); i++) { - vers->reconstruction(args->merges[i].second, args->merges[i].first); + if constexpr (L == LayoutPolicy::BSM) { + if (args->merges.size() > 0) { + vers->reconstruction(args->merges[0]); + } + } else { + for (ssize_t i=0; i<args->merges.size(); i++) { + vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]); + } } + /* * we'll grab the buffer AFTER doing the internal reconstruction, so we * can flush as many records as possible in one go. The reconstruction @@ -546,30 +557,34 @@ private: std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void *> states = vers->get_query_states(shards, parms); + std::vector<R> results; Q::process_query_states(parms, states, buffer_state); - std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); - for (size_t i=0; i<query_results.size(); i++) { - std::vector<Wrapped<R>> local_results; - ShardID shid; - - if (i == 0) { /* process the buffer first */ - local_results = Q::buffer_query(buffer_state, parms); - shid = INVALID_SHID; - } else { - local_results = Q::query(shards[i - 1].second, states[i - 1], parms); - shid = shards[i - 1].first; - } + do { + std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1); + for (size_t i=0; i<query_results.size(); i++) { + std::vector<Wrapped<R>> local_results; + ShardID shid; + + if (i == 0) { /* process the buffer first */ + local_results = Q::buffer_query(buffer_state, parms); + shid = INVALID_SHID; + } else { + local_results = Q::query(shards[i - 1].second, states[i - 1], parms); + shid = shards[i - 1].first; + } - query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); + query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer)); - if constexpr (Q::EARLY_ABORT) { - if (query_results[i].size() > 0) break; + if constexpr (Q::EARLY_ABORT) { + if (query_results[i].size() > 0) break; + } } - } + Q::merge(query_results, parms, results); + + } while (Q::repeat(parms, results, states, buffer_state)); - auto result = Q::merge(query_results, parms); - args->result_set.set_value(std::move(result)); + args->result_set.set_value(std::move(results)); ((DynamicExtension *) args->extension)->end_job(epoch); @@ -624,7 +639,7 @@ private: static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) { if constexpr (Q::SKIP_DELETE_FILTER) { - return records; + return std::move(records); } std::vector<Wrapped<R>> processed_records; @@ -685,7 +700,12 @@ private: return processed_records; } +#ifdef _GNU_SOURCE void SetThreadAffinity() { + if constexpr (std::same_as<SCHED, SerialScheduler>) { + return; + } + int core = m_next_core.fetch_add(1) % m_core_cnt; cpu_set_t mask; CPU_ZERO(&mask); @@ -707,6 +727,11 @@ private: CPU_SET(core, &mask); ::sched_setaffinity(0, sizeof(mask), &mask); } +#else + void SetThreadAffinity() { + + } +#endif void end_job(_Epoch *epoch) { |