From d166465dcca3550cb8f3263e0f5b5189a69d531a Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 31 Jan 2024 13:29:49 -0500 Subject: Temporary thread affinity for reconstruction --- benchmarks/insert_tail_latency.cpp | 13 ++++++----- include/framework/DynamicExtension.h | 33 +++++++++++++++++++++++++++- include/framework/scheduling/FIFOScheduler.h | 1 + 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/benchmarks/insert_tail_latency.cpp b/benchmarks/insert_tail_latency.cpp index 5e32898..1640ce5 100644 --- a/benchmarks/insert_tail_latency.cpp +++ b/benchmarks/insert_tail_latency.cpp @@ -19,7 +19,7 @@ typedef de::Record Rec; typedef de::ISAMTree ISAM; typedef de::rc::Query Q; -typedef de::DynamicExtension Ext; +typedef de::DynamicExtension Ext; std::atomic total_latency = 0; @@ -53,8 +53,8 @@ void insert_thread(Ext *extension, size_t n, size_t k, size_t rate) { int main(int argc, char **argv) { /* the closeout routine takes _forever_ ... so we'll just leak the memory */ - auto extension = new Ext(100, 1000000, 3); - size_t n = 100000000; + auto extension = new Ext(12000, 12001, 3); + size_t n = 10000000; size_t per_trial = 1000; double selectivity = .001; size_t rate = 1000000; @@ -63,11 +63,12 @@ int main(int argc, char **argv) { gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937); - std::thread i_thrd1(insert_thread, extension, n/2, per_trial, rate); - std::thread i_thrd2(insert_thread, extension, n/2, per_trial, rate); + std::thread i_thrd1(insert_thread, extension, n, per_trial, rate); + //std::thread i_thrd2(insert_thread, extension, n/2, per_trial, rate); + i_thrd1.join(); - i_thrd2.join(); + //i_thrd2.join(); auto avg_latency = total_latency.load() / n; auto throughput = (int64_t) ((double) n / (double) total_latency * 1e9); diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 40f137c..3203945 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -32,7 +32,7 @@ namespace de { template + DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler> class DynamicExtension { typedef S Shard; typedef MutableBuffer Buffer; @@ -51,6 +51,8 @@ public: , m_max_delete_prop(1) , m_sched(memory_budget, thread_cnt) , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) + , m_core_cnt(thread_cnt) + , m_next_core(0) { auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); auto epoch = new _Epoch(0, vers, m_buffer, 0); @@ -258,6 +260,9 @@ private: size_t m_scale_factor; double m_max_delete_prop; + std::atomic m_next_core; + size_t m_core_cnt; + void enforce_delete_invariant(_Epoch *epoch) { auto structure = epoch->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -415,6 +420,8 @@ private: static void reconstruction(void *arguments) { auto args = (ReconstructionArgs *) arguments; + + ((DynamicExtension *) args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); for (ssize_t i=0; imerges.size(); i++) { @@ -605,6 +612,30 @@ private: return processed_records; } + + void SetThreadAffinity() { + int core = m_next_core.fetch_add(1) % m_core_cnt; + cpu_set_t mask; + CPU_ZERO(&mask); + + switch (core % 2) { + case 0: + // 0 |-> 0 + // 2 |-> 2 + // 4 |-> 4 + core = core; + break; + case 1: + // 1 |-> 28 + // 3 |-> 30 + // 5 |-> 32 + core = (core - 1) + m_core_cnt; + break; + } + CPU_SET(core, &mask); + ::sched_setaffinity(0, sizeof(mask), &mask); + } + }; } diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 0df4d3c..c6baf9b 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -114,6 +114,7 @@ private: } } while(!m_shutdown.load()); } + }; } -- cgit v1.2.3