diff options
| -rw-r--r-- | benchmarks/insert_tail_latency.cpp | 13 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 33 | ||||
| -rw-r--r-- | include/framework/scheduling/FIFOScheduler.h | 1 |
3 files changed, 40 insertions, 7 deletions
diff --git a/benchmarks/insert_tail_latency.cpp b/benchmarks/insert_tail_latency.cpp index 5e32898..1640ce5 100644 --- a/benchmarks/insert_tail_latency.cpp +++ b/benchmarks/insert_tail_latency.cpp @@ -19,7 +19,7 @@ typedef de::Record<int64_t, int64_t> Rec; typedef de::ISAMTree<Rec> ISAM; typedef de::rc::Query<ISAM, Rec> Q; -typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::FIFOScheduler> Ext; +typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::SerialScheduler> Ext; std::atomic<size_t> total_latency = 0; @@ -53,8 +53,8 @@ void insert_thread(Ext *extension, size_t n, size_t k, size_t rate) { int main(int argc, char **argv) { /* the closeout routine takes _forever_ ... so we'll just leak the memory */ - auto extension = new Ext(100, 1000000, 3); - size_t n = 100000000; + auto extension = new Ext(12000, 12001, 3); + size_t n = 10000000; size_t per_trial = 1000; double selectivity = .001; size_t rate = 1000000; @@ -63,11 +63,12 @@ int main(int argc, char **argv) { gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937); - std::thread i_thrd1(insert_thread, extension, n/2, per_trial, rate); - std::thread i_thrd2(insert_thread, extension, n/2, per_trial, rate); + std::thread i_thrd1(insert_thread, extension, n, per_trial, rate); + //std::thread i_thrd2(insert_thread, extension, n/2, per_trial, rate); + i_thrd1.join(); - i_thrd2.join(); + //i_thrd2.join(); auto avg_latency = total_latency.load() / n; auto throughput = (int64_t) ((double) n / (double) total_latency * 1e9); diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 40f137c..3203945 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -32,7 +32,7 @@ namespace de { template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, - DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler> + DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler> class DynamicExtension { typedef S Shard; typedef MutableBuffer<R> Buffer; @@ -51,6 +51,8 @@ public: , m_max_delete_prop(1) , m_sched(memory_budget, thread_cnt) , m_buffer(new Buffer(buffer_lwm, buffer_hwm)) + , m_core_cnt(thread_cnt) + , m_next_core(0) { auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop); auto epoch = new _Epoch(0, vers, m_buffer, 0); @@ -258,6 +260,9 @@ private: size_t m_scale_factor; double m_max_delete_prop; + std::atomic<int> m_next_core; + size_t m_core_cnt; + void enforce_delete_invariant(_Epoch *epoch) { auto structure = epoch->get_structure(); auto compactions = structure->get_compaction_tasks(); @@ -415,6 +420,8 @@ private: static void reconstruction(void *arguments) { auto args = (ReconstructionArgs<R, S, Q, L> *) arguments; + + ((DynamicExtension *) args->extension)->SetThreadAffinity(); Structure *vers = args->epoch->get_structure(); for (ssize_t i=0; i<args->merges.size(); i++) { @@ -605,6 +612,30 @@ private: return processed_records; } + + void SetThreadAffinity() { + int core = m_next_core.fetch_add(1) % m_core_cnt; + cpu_set_t mask; + CPU_ZERO(&mask); + + switch (core % 2) { + case 0: + // 0 |-> 0 + // 2 |-> 2 + // 4 |-> 4 + core = core; + break; + case 1: + // 1 |-> 28 + // 3 |-> 30 + // 5 |-> 32 + core = (core - 1) + m_core_cnt; + break; + } + CPU_SET(core, &mask); + ::sched_setaffinity(0, sizeof(mask), &mask); + } + }; } diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 0df4d3c..c6baf9b 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -114,6 +114,7 @@ private: } } while(!m_shutdown.load()); } + }; } |