summaryrefslogtreecommitdiffstats
path: root/include/framework/scheduling
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/scheduling')
-rw-r--r--include/framework/scheduling/Task.h2
-rw-r--r--include/framework/scheduling/Version.h102
2 files changed, 42 insertions, 62 deletions
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 197e8bf..1591909 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -32,7 +32,7 @@ enum class ReconstructionPriority {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
struct ReconstructionArgs {
typedef typename ShardType::RECORD RecordType;
- std::atomic<std::shared_ptr<Version<ShardType, QueryType>>> version;
+ std::shared_ptr<Version<ShardType, QueryType>> version;
ReconstructionVector tasks;
void *extension;
ReconstructionPriority priority;
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index 2b2b5ba..8d3d038 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -20,30 +20,21 @@ template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class Version {
private:
typedef typename ShardType::RECORD RecordType;
- typedef MutableBuffer<RecordType> Buffer;
- typedef ExtensionStructure<ShardType, QueryType> Structure;
- typedef BufferView<RecordType> BufView;
+ typedef MutableBuffer<RecordType> BufferType;
+ typedef ExtensionStructure<ShardType, QueryType> StructureType;
+ typedef BufferView<RecordType> BufferViewType;
public:
- Version(size_t number = 0)
- : m_buffer(nullptr), m_structure(nullptr), m_active_merge(false),
- m_epoch_number(number), m_buffer_head(0) {}
-
- Version(size_t number, Structure *structure, Buffer *buff, size_t head)
- : m_buffer(buff), m_structure(structure), m_active_merge(false),
- m_epoch_number(number), m_buffer_head(head) {
- structure->take_reference();
- }
+ Version(size_t vid = 0)
+ : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0),
+ m_pending_buffer_head(-1) {}
- ~Version() {
- if (m_structure) {
- m_structure->release_reference();
- }
+ Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff,
+ size_t head)
+ : m_buffer(buff), m_structure(std::move(structure)), m_id(number),
+ m_buffer_head(head), m_pending_buffer_head(-1) {}
- if (m_structure->get_reference_count() == 0) {
- delete m_structure;
- }
- }
+ ~Version() = default;
/*
* Versions are *not* copyable or movable. Only one can exist, and all users
@@ -54,13 +45,26 @@ public:
Version &operator=(const Version &) = delete;
Version &operator=(Version &&) = delete;
- size_t get_epoch_number() const { return m_epoch_number; }
+ size_t get_id() const { return m_id; }
+
+ void set_id(size_t id) { m_id = id;}
- const Structure *get_structure() const { return m_structure.get(); }
+ const StructureType *get_structure() const { return m_structure.get(); }
- Structure *get_mutable_structure() { return m_structure.get(); }
+ StructureType *get_mutable_structure() { return m_structure.get(); }
- BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); }
+ bool set_structure(std::unique_ptr<StructureType> new_struct) {
+ if (m_structure) {
+ return false;
+ }
+
+ m_structure = std::move(new_struct);
+ return true;
+ }
+
+ BufferViewType get_buffer() const {
+ return m_buffer->get_buffer_view(m_buffer_head);
+ }
/*
* Returns a new Version object that is a copy of this one. The new object
@@ -69,54 +73,29 @@ public:
* provided argument.
*/
Version *clone(size_t number) {
- std::unique_lock<std::mutex> m_buffer_lock;
- auto epoch = new Version(number);
- epoch->m_buffer = m_buffer;
- epoch->m_buffer_head = m_buffer_head;
+ auto version = new Version(number);
+ version->m_buffer = m_buffer;
+ version->m_buffer_head = m_buffer_head;
if (m_structure) {
- epoch->m_structure = m_structure->copy();
- /* the copy routine returns a structure with 0 references */
- epoch->m_structure->take_reference();
+ version->m_structure = std::unique_ptr(m_structure->copy());
}
- return epoch;
+ return version;
}
- /*
- * Check if a merge can be started from this Version. At present, without
- * concurrent merging, this simply checks if there is currently a scheduled
- * merge based on this Version. If there is, returns false. If there isn't,
- * return true and set a flag indicating that there is an active merge.
- */
- bool prepare_reconstruction() {
- auto old = m_active_merge.load();
- if (old) {
- return false;
- }
-
- // FIXME: this needs cleaned up
- while (!m_active_merge.compare_exchange_strong(old, true)) {
- old = m_active_merge.load();
- if (old) {
- return false;
- }
- }
-
- return true;
+ void set_next_buffer_head(size_t new_head) {
+ m_pending_buffer_head = new_head;
}
- bool advance_buffer_head(size_t head) {
- m_buffer_head = head;
+ bool advance_buffer_head() {
+ m_buffer_head = m_pending_buffer_head;
return m_buffer->advance_head(m_buffer_head);
}
private:
- Buffer *m_buffer;
- std::unique_ptr<Structure> m_structure;
-
- std::mutex m_buffer_lock;
- std::atomic<bool> m_active_merge;
+ BufferType *m_buffer;
+ std::unique_ptr<StructureType> m_structure;
/*
* The number of currently active jobs
@@ -124,7 +103,8 @@ private:
* epoch. An epoch can only be retired
* when this number is 0.
*/
- size_t m_epoch_number;
+ size_t m_id;
size_t m_buffer_head;
+ ssize_t m_pending_buffer_head;
};
} // namespace de