summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/ExtensionStructure.h6
-rw-r--r--include/framework/structure/MutableBuffer.h59
2 files changed, 13 insertions, 52 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 2ced439..f5657af 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -93,11 +93,11 @@ public:
inline bool merge_buffer(Buffer *buffer) {
assert(can_merge_with(0, buffer->get_record_count()));
+ // FIXME: this step makes an extra copy of the buffer,
+ // which could be avoided by adjusting the shard
+ // reconstruction process a bit, possibly.
buffer->start_merge();
merge_buffer_into_l0(buffer);
- buffer->finish_merge();
-
- buffer->truncate();
return true;
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index e0a6962..a70b86b 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -42,13 +42,10 @@ public:
}
m_refcnt.store(0);
- m_deferred_truncate.store(false);
- m_merging.store(false);
}
~MutableBuffer() {
assert(m_refcnt.load() == 0);
- assert(m_merging.load() == false);
if (m_data) free(m_data);
if (m_tombstone_filter) delete m_tombstone_filter;
@@ -90,23 +87,12 @@ public:
}
bool truncate() {
-
- while (active_merge() || m_refcnt.load() > 0)
- ;
-
- m_merge_lock.lock();
-
- while (m_refcnt > 0)
- ;
-
m_tombstonecnt.store(0);
m_reccnt.store(0);
m_weight.store(0);
m_max_weight.store(0);
if (m_tombstone_filter) m_tombstone_filter->clear();
- m_merge_lock.unlock();
-
return true;
}
@@ -176,26 +162,15 @@ public:
return m_max_weight;
}
+ /*
+ * This operation assumes that no other threads have write access
+ * to the buffer. This will be the case in normal operation, at
+ * present, but may change (in which case this approach will need
+ * to be adjusted). Other threads having read access is perfectly
+ * acceptable, however.
+ */
bool start_merge() {
- if (m_merge_lock.try_lock()) {
- /* there cannot already been an active merge */
- if (m_merging.load()) {
- m_merge_lock.unlock();
- return false;
- }
-
- m_merging.store(true);
- memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load());
- return true;
- }
-
- /* lock could not be obtained */
- return false;
- }
-
- bool finish_merge() {
- m_merge_lock.unlock();
- m_merging.store(false);
+ memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load());
return true;
}
@@ -208,12 +183,8 @@ public:
}
bool release_reference() {
+ assert(m_refcnt > 0);
m_refcnt.fetch_add(-1);
-
- if (m_refcnt.load() == 0 && m_deferred_truncate.load()) {
- assert(this->truncate());
- }
-
return true;
}
@@ -221,10 +192,6 @@ public:
return m_refcnt.load();
}
- bool active_merge() {
- return m_merging.load();
- }
-
private:
int32_t try_advance_tail() {
size_t new_tail = m_reccnt.fetch_add(1);
@@ -245,14 +212,8 @@ private:
alignas(64) std::atomic<uint32_t> m_reccnt;
alignas(64) std::atomic<double> m_weight;
alignas(64) std::atomic<double> m_max_weight;
- alignas(64) std::atomic<bool> m_merging;
- alignas(64) std::atomic<bool> m_deferred_truncate;
- alignas(64) std::atomic<size_t> m_refcnt;
-
- alignas(64) std::mutex m_merge_lock;
- alignas(64) std::mutex m_trunc_lock;
- alignas(64) std::condition_variable m_trunc_signal;
+ alignas(64) std::atomic<size_t> m_refcnt;
};
}