summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/framework/DynamicExtension.h161
-rw-r--r--include/framework/scheduling/Epoch.h4
-rw-r--r--include/framework/scheduling/Task.h1
-rw-r--r--include/framework/structure/BufferView.h4
-rw-r--r--include/framework/structure/InternalLevel.h21
-rw-r--r--include/shard/WIRS.h14
6 files changed, 106 insertions, 99 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6936247..d2a6b7a 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -39,7 +39,7 @@ class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
typedef ExtensionStructure<R, S, Q, L> Structure;
- typedef Epoch<R, S, Q, L> Epoch;
+ typedef Epoch<R, S, Q, L> _Epoch;
typedef BufferView<R, Q> BufView;
public:
@@ -53,20 +53,24 @@ public:
{
auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop);
- auto epoch = new Epoch(vers, buf);
+ auto epoch = new _Epoch(vers, buf);
- m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap));
- m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop));
- m_epochs.push_back({0, epoch});
+ m_buffers.insert(new Buffer(buffer_cap, max_delete_prop*buffer_cap));
+ m_versions.insert(new Structure(buffer_cap, scale_factor, max_delete_prop));
+ m_epochs.insert({0, epoch});
}
~DynamicExtension() {
- for (size_t i=0; i<m_buffers.size(); i++) {
- delete m_buffers[i];
+ for (auto e : m_buffers) {
+ delete e;
}
- for (size_t i=0; i<m_versions.size(); i++) {
- delete m_versions[i];
+ for (auto e : m_versions) {
+ delete e;
+ }
+
+ for (auto e : m_epochs) {
+ delete e.second;
}
}
@@ -87,7 +91,7 @@ public:
* probably has the lowest probability of having the record,
* so we'll check it last.
*/
- return buffers->delete_record(rec);
+ return buffers.delete_record(rec);
}
/*
@@ -97,7 +101,7 @@ public:
}
std::future<std::vector<R>> query(void *parms) {
- return schedule_query(get_active_epoch()->get_structure(), get_active_epoch()->get_buffers()[0], parms);
+ return schedule_query(parms);
}
size_t get_record_count() {
@@ -183,36 +187,37 @@ private:
std::set<Structure *> m_versions;
std::atomic<size_t> m_current_epoch;
- std::unordered_map<size_t, Epoch *> m_epochs;
+ std::unordered_map<size_t, _Epoch *> m_epochs;
size_t m_scale_factor;
double m_max_delete_prop;
size_t m_buffer_capacity;
size_t m_buffer_delete_capacity;
- Epoch *get_active_epoch() {
+ _Epoch *get_active_epoch() {
return m_epochs[m_current_epoch.load()];
}
void advance_epoch() {
size_t new_epoch_num = m_current_epoch.load() + 1;
- Epoch *new_epoch = m_epochs[new_epoch_num];
- Epoch *old_epoch = m_epochs[m_current_epoch.load()];
+ _Epoch *new_epoch = m_epochs[new_epoch_num];
+ _Epoch *old_epoch = m_epochs[m_current_epoch.load()];
// Update the new Epoch to contain the buffers
// from the old one that it doesn't currently have
size_t old_buffer_cnt = new_epoch->clear_buffers();
for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) {
- new_epoch->add_buffer(old_epoch->get_buffers[i]);
+ new_epoch->add_buffer(old_epoch->get_buffers()[i]);
}
m_current_epoch.fetch_add(1);
+ retire_epoch(old_epoch);
}
/*
* Creates a new epoch by copying the currently active one. The new epoch's
* structure will be a shallow copy of the old one's.
*/
- Epoch *create_new_epoch() {
+ _Epoch *create_new_epoch() {
auto new_epoch = get_active_epoch()->clone();
std::unique_lock<std::mutex> m_struct_lock;
m_versions.insert(new_epoch->get_structure());
@@ -228,8 +233,8 @@ private:
* buffer while a new epoch is being created in the background. Returns a
* pointer to the newly created buffer.
*/
- Buffer *add_empty_buffer(Epoch *epoch) {
- auto new_buffer = Buffer(m_buffer_capacity, m_buffer_delete_capacity);
+ Buffer *add_empty_buffer(_Epoch *epoch) {
+ auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity);
std::unique_lock<std::mutex> m_struct_lock;
m_buffers.insert(new_buffer);
@@ -239,7 +244,7 @@ private:
return new_buffer;
}
- void retire_epoch(Epoch *epoch) {
+ void retire_epoch(_Epoch *epoch) {
/*
* Epochs with currently active jobs cannot
* be retired. By the time retire_epoch is called,
@@ -294,26 +299,15 @@ private:
vers->merge_buffer(buff);
- args->result.set_value(true);
args->epoch->end_job();
- delete args;
- }
-
- static std::vector<R> finalize_query_result(std::vector<std::vector<Wrapped<R>>> &query_results, void *parms,
- std::vector<void *> &shard_states, std::vector<void *> &buffer_states) {
- auto result = Q::merge(query_results, parms);
-
- for (size_t i=0; i<buffer_states.size(); i++) {
- Q::delete_buffer_query_state(buffer_states[i]);
- }
-
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(shard_states[i]);
- }
+ args->result.set_value(true);
- return result;
+ ((DynamicExtension *) args->extension)->advance_epoch();
+
+ // FIXME: this might break things... not sure
+ delete args;
}
-
+
static void async_query(void *arguments) {
QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments;
@@ -322,58 +316,56 @@ private:
void *parms = args->query_parms;
// Get the buffer query states
- std::vector<void *> buffer_states = buffers->get_buffer_query_states(parms);
+ std::vector<void *> buffer_states = buffers.get_query_states(parms);
// Get the shard query states
std::vector<std::pair<ShardID, Shard*>> shards;
- std::vector<void *> shard_states = vers->get_query_states(shards, parms);
+ std::vector<void *> states = vers->get_query_states(shards, parms);
- Q::process_query_states(parms, shard_states, buffer_states);
+ Q::process_query_states(parms, states, buffer_states);
std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + buffer_states.size());
+ for (size_t i=0; i<query_results.size(); i++) {
+ std::vector<Wrapped<R>> local_results = (i < buffer_states.size())
+ ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms)
+ : Q::query(shards[i - buffer_states.size()].second,
+ states[i - buffer_states.size()], parms);
+ ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first;
+ query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers));
- // Execute the query for the buffer
- std::vector<std::vector<Wrapped<R>>> buffer_results(buffer_states.size());
- for (size_t i=0; i<buffer_states.size(); i++) {
- auto buffer_results = Q::buffer_query(buffers->get_buffers[i], buffer_states[i], parms);
- query_results[i] = std::move(filter_deletes(buffer_results, {-1, -1}, buffers, vers));
-
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[i] > 0) {
- return finalize_query_result(query_results, parms, buffer_states, shard_states);
- }
- }
- }
-
- // Execute the query for each shard
- for (size_t i=0; i<shards.size(); i++) {
- auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i+buffer_states.size()] = std::move(filter_deletes(shard_results, shards[i].first, buffers, vers));
if constexpr (Q::EARLY_ABORT) {
- if (query_results[i+buffer_states.size()].size() > 0) {
- return finalize_query_result(query_results, parms, buffer_states, shard_states);
- }
+ if (query_results[i].size() > 0) break;
}
}
-
- // Merge the results together and finalize the job
- auto result = finalize_query_result(query_results, parms, buffer_states, shard_states);
+
+ auto result = Q::merge(query_results, parms);
args->result_set.set_value(std::move(result));
args->epoch->end_job();
+
+ for (size_t i=0; i<buffer_states.size(); i++) {
+ Q::delete_buffer_query_state(buffer_states[i]);
+ }
+
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
+ }
+
+ // FIXME: this might break things... not sure
delete args;
}
- std::future<bool> schedule_merge() {
- auto epoch = get_active_epoch();
+ void schedule_merge() {
+ auto epoch = create_new_epoch();
epoch->start_job();
MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>();
args->epoch = epoch;
- args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]);
+ // FIXME: all full buffers can be merged at this point--but that requires
+ // retooling the shard interface a bit to do efficiently.
+ args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count());
+ args->extension = this;
m_sched.schedule_job(merge, 0, args);
-
- return args->result.get_future();
}
std::future<std::vector<R>> schedule_query(void *query_parms) {
@@ -389,20 +381,29 @@ private:
}
int internal_append(const R &rec, bool ts) {
- Buffer *buffer;
- while (!(buffer = get_active_epoch()->get_active_buffer()))
- ;
-
- if (buffer->is_full()) {
- auto vers = get_active_epoch()->get_structure();
- auto res = schedule_merge(vers, buffer);
- res.get();
- }
+ Buffer *buffer = nullptr;
+ do {
+ auto epoch = get_active_epoch();
+
+ while (!(buffer = epoch->get_active_buffer()))
+ ;
+
+ /* if the buffer is full, schedule a merge and add a new empty buffer */
+ if (buffer->is_full()) {
+ // FIXME: possible race here--two identical merges could be scheduled
+ auto vers = epoch->get_structure();
+ schedule_merge();
+ buffer = add_empty_buffer(epoch);
+ }
+
+ } while(!buffer->append(rec, ts));
- return buffer->append(rec, ts);
+ /* internal append should always succeed, eventually */
+ return 1;
}
- static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, BufView *buffers, Structure *vers) {
+ static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid,
+ BufView &buffers, Structure *vers) {
if constexpr (!Q::SKIP_DELETE_FILTER) {
return records;
}
@@ -431,7 +432,7 @@ private:
continue;
}
- if (buffers->check_tombstone(rec.rec)) {
+ if (buffers.check_tombstone(rec.rec)) {
continue;
}
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index a1f865c..fe63c86 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -41,7 +41,7 @@ public:
assert(m_active_jobs.load() == 0);
for (auto buf : m_buffers) {
- buf.release_reference();
+ buf->release_reference();
}
if (m_structure) {
@@ -111,6 +111,8 @@ public:
if (m_structure) {
epoch->m_structure = m_structure->copy();
}
+
+ return epoch;
}
private:
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 94c4d0a..d25c7c0 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -17,6 +17,7 @@ struct MergeArgs {
Epoch<R, S, Q, L> *epoch;
std::vector<MergeTask> merges;
std::promise<bool> result;
+ void *extension;
};
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 1efc1ac..14abedc 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -35,7 +35,7 @@ public:
BufferView(std::vector<Buffer*> buffers)
: m_buffers(buffers)
- , m_cutoff(buffers[buffers->size()-1]->get_record_count())
+ , m_cutoff(buffers[buffers.size()-1]->get_record_count())
{}
~BufferView() = default;
@@ -102,7 +102,7 @@ public:
return m_buffers[0]->get_tombstone_capacity();
}
- std::vector<void *> get_buffer_states(void *parms) {
+ std::vector<void *> get_query_states(void *parms) {
std::vector<void *> states;
for (auto buf : m_buffers) {
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index b9230f4..342a2c7 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -231,6 +231,17 @@ public:
return (double) tscnt / (double) (tscnt + reccnt);
}
+ std::shared_ptr<InternalLevel> clone() {
+ auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ new_level->m_shards[i] = m_shards[i];
+ new_level->m_owns[i] = true;
+ m_owns[i] = false;
+ }
+
+ return new_level;
+ }
+
private:
ssize_t m_level_no;
@@ -243,16 +254,6 @@ private:
std::vector<bool> m_owns;
- std::shared_ptr<InternalLevel> clone() {
- auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
- for (size_t i=0; i<m_shard_cnt; i++) {
- new_level->m_shards[i] = m_shards[i];
- new_level->m_owns[i] = true;
- m_owns[i] = false;
- }
-
- return new_level;
- }
};
}
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 8583cb0..83573c8 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -448,17 +448,21 @@ public:
return state;
}
- static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void *buff_state) {
+ static void process_query_states(void *query_parms, std::vector<void*> &shard_states, std::vector<void*> &buff_states) {
+ // FIXME: need to redo for the buffer vector interface
auto p = (wirs_query_parms<R> *) query_parms;
- auto bs = (WIRSBufferState<R> *) buff_state;
std::vector<size_t> shard_sample_sizes(shard_states.size()+1, 0);
size_t buffer_sz = 0;
+ decltype(R::weight) total_weight = 0;
std::vector<decltype(R::weight)> weights;
- weights.push_back(bs->total_weight);
+ for (auto &s : buff_states) {
+ auto state = (WIRSBufferState<R> *) s;
+ total_weight += state->total_weight;
+ weights.push_back(state->total_weight);
+ }
- decltype(R::weight) total_weight = 0;
for (auto &s : shard_states) {
auto state = (WIRSState<R> *) s;
total_weight += state->total_weight;
@@ -480,8 +484,6 @@ public:
}
}
-
- bs->sample_size = buffer_sz;
for (size_t i=0; i<shard_states.size(); i++) {
auto state = (WIRSState<R> *) shard_states[i];
state->sample_size = shard_sample_sizes[i+1];