summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-01-31 13:29:49 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2024-01-31 13:29:49 -0500
commitd166465dcca3550cb8f3263e0f5b5189a69d531a (patch)
tree74314867661080b3ef27130ffcbdc2308fa6413e /include/framework
parent51a85013236f4b2bd596caf179d90e67c848963c (diff)
downloaddynamic-extension-d166465dcca3550cb8f3263e0f5b5189a69d531a.tar.gz
Temporary thread affinity for reconstruction
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h33
-rw-r--r--include/framework/scheduling/FIFOScheduler.h1
2 files changed, 33 insertions, 1 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 40f137c..3203945 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -32,7 +32,7 @@
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING,
- DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler>
+ DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler>
class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
@@ -51,6 +51,8 @@ public:
, m_max_delete_prop(1)
, m_sched(memory_budget, thread_cnt)
, m_buffer(new Buffer(buffer_lwm, buffer_hwm))
+ , m_core_cnt(thread_cnt)
+ , m_next_core(0)
{
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
auto epoch = new _Epoch(0, vers, m_buffer, 0);
@@ -258,6 +260,9 @@ private:
size_t m_scale_factor;
double m_max_delete_prop;
+ std::atomic<int> m_next_core;
+ size_t m_core_cnt;
+
void enforce_delete_invariant(_Epoch *epoch) {
auto structure = epoch->get_structure();
auto compactions = structure->get_compaction_tasks();
@@ -415,6 +420,8 @@ private:
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<R, S, Q, L> *) arguments;
+
+ ((DynamicExtension *) args->extension)->SetThreadAffinity();
Structure *vers = args->epoch->get_structure();
for (ssize_t i=0; i<args->merges.size(); i++) {
@@ -605,6 +612,30 @@ private:
return processed_records;
}
+
+ void SetThreadAffinity() {
+ int core = m_next_core.fetch_add(1) % m_core_cnt;
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+
+ switch (core % 2) {
+ case 0:
+ // 0 |-> 0
+ // 2 |-> 2
+ // 4 |-> 4
+ core = core;
+ break;
+ case 1:
+ // 1 |-> 28
+ // 3 |-> 30
+ // 5 |-> 32
+ core = (core - 1) + m_core_cnt;
+ break;
+ }
+ CPU_SET(core, &mask);
+ ::sched_setaffinity(0, sizeof(mask), &mask);
+ }
+
};
}
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 0df4d3c..c6baf9b 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -114,6 +114,7 @@ private:
}
} while(!m_shutdown.load());
}
+
};
}