summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-31 13:29:49 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-31 13:29:49 -0500
commitd166465dcca3550cb8f3263e0f5b5189a69d531a (patch)
tree74314867661080b3ef27130ffcbdc2308fa6413e
parent51a85013236f4b2bd596caf179d90e67c848963c (diff)
downloaddynamic-extension-d166465dcca3550cb8f3263e0f5b5189a69d531a.tar.gz
Temporary thread affinity for reconstruction
-rw-r--r--benchmarks/insert_tail_latency.cpp13
-rw-r--r--include/framework/DynamicExtension.h33
-rw-r--r--include/framework/scheduling/FIFOScheduler.h1
3 files changed, 40 insertions, 7 deletions
diff --git a/benchmarks/insert_tail_latency.cpp b/benchmarks/insert_tail_latency.cpp
index 5e32898..1640ce5 100644
--- a/benchmarks/insert_tail_latency.cpp
+++ b/benchmarks/insert_tail_latency.cpp
@@ -19,7 +19,7 @@
typedef de::Record<int64_t, int64_t> Rec;
typedef de::ISAMTree<Rec> ISAM;
typedef de::rc::Query<ISAM, Rec> Q;
-typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::FIFOScheduler> Ext;
+typedef de::DynamicExtension<Rec, ISAM, Q, de::LayoutPolicy::TEIRING, de::DeletePolicy::TAGGING, de::SerialScheduler> Ext;
std::atomic<size_t> total_latency = 0;
@@ -53,8 +53,8 @@ void insert_thread(Ext *extension, size_t n, size_t k, size_t rate) {
int main(int argc, char **argv) {
/* the closeout routine takes _forever_ ... so we'll just leak the memory */
- auto extension = new Ext(100, 1000000, 3);
- size_t n = 100000000;
+ auto extension = new Ext(12000, 12001, 3);
+ size_t n = 10000000;
size_t per_trial = 1000;
double selectivity = .001;
size_t rate = 1000000;
@@ -63,11 +63,12 @@ int main(int argc, char **argv) {
gsl_rng * rng = gsl_rng_alloc(gsl_rng_mt19937);
- std::thread i_thrd1(insert_thread, extension, n/2, per_trial, rate);
- std::thread i_thrd2(insert_thread, extension, n/2, per_trial, rate);
+ std::thread i_thrd1(insert_thread, extension, n, per_trial, rate);
+ //std::thread i_thrd2(insert_thread, extension, n/2, per_trial, rate);
+
i_thrd1.join();
- i_thrd2.join();
+ //i_thrd2.join();
auto avg_latency = total_latency.load() / n;
auto throughput = (int64_t) ((double) n / (double) total_latency * 1e9);
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 40f137c..3203945 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -32,7 +32,7 @@
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING,
- DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler>
+ DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler>
class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
@@ -51,6 +51,8 @@ public:
, m_max_delete_prop(1)
, m_sched(memory_budget, thread_cnt)
, m_buffer(new Buffer(buffer_lwm, buffer_hwm))
+ , m_core_cnt(thread_cnt)
+ , m_next_core(0)
{
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
auto epoch = new _Epoch(0, vers, m_buffer, 0);
@@ -258,6 +260,9 @@ private:
size_t m_scale_factor;
double m_max_delete_prop;
+ std::atomic<int> m_next_core;
+ size_t m_core_cnt;
+
void enforce_delete_invariant(_Epoch *epoch) {
auto structure = epoch->get_structure();
auto compactions = structure->get_compaction_tasks();
@@ -415,6 +420,8 @@ private:
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<R, S, Q, L> *) arguments;
+
+ ((DynamicExtension *) args->extension)->SetThreadAffinity();
Structure *vers = args->epoch->get_structure();
for (ssize_t i=0; i<args->merges.size(); i++) {
@@ -605,6 +612,30 @@ private:
return processed_records;
}
+
+ void SetThreadAffinity() {
+ int core = m_next_core.fetch_add(1) % m_core_cnt;
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+
+ switch (core % 2) {
+ case 0:
+ // 0 |-> 0
+ // 2 |-> 2
+ // 4 |-> 4
+ core = core;
+ break;
+ case 1:
+ // 1 |-> 28
+ // 3 |-> 30
+ // 5 |-> 32
+ core = (core - 1) + m_core_cnt;
+ break;
+ }
+ CPU_SET(core, &mask);
+ ::sched_setaffinity(0, sizeof(mask), &mask);
+ }
+
};
}
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 0df4d3c..c6baf9b 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -114,6 +114,7 @@ private:
}
} while(!m_shutdown.load());
}
+
};
}