diff options
Diffstat (limited to 'include')
| -rw-r--r-- | include/framework/DynamicExtension.h | 161 | ||||
| -rw-r--r-- | include/framework/scheduling/Epoch.h | 4 | ||||
| -rw-r--r-- | include/framework/scheduling/Task.h | 1 | ||||
| -rw-r--r-- | include/framework/structure/BufferView.h | 4 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 21 | ||||
| -rw-r--r-- | include/shard/WIRS.h | 14 |
6 files changed, 106 insertions, 99 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6936247..d2a6b7a 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -39,7 +39,7 @@ class DynamicExtension { typedef S Shard; typedef MutableBuffer<R> Buffer; typedef ExtensionStructure<R, S, Q, L> Structure; - typedef Epoch<R, S, Q, L> Epoch; + typedef Epoch<R, S, Q, L> _Epoch; typedef BufferView<R, Q> BufView; public: @@ -53,20 +53,24 @@ public: { auto buf = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); auto vers = new Structure(m_buffer_capacity, m_scale_factor, m_max_delete_prop); - auto epoch = new Epoch(vers, buf); + auto epoch = new _Epoch(vers, buf); - m_buffers.push_back(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); - m_versions.push_back(new Structure(buffer_cap, scale_factor, max_delete_prop)); - m_epochs.push_back({0, epoch}); + m_buffers.insert(new Buffer(buffer_cap, max_delete_prop*buffer_cap)); + m_versions.insert(new Structure(buffer_cap, scale_factor, max_delete_prop)); + m_epochs.insert({0, epoch}); } ~DynamicExtension() { - for (size_t i=0; i<m_buffers.size(); i++) { - delete m_buffers[i]; + for (auto e : m_buffers) { + delete e; } - for (size_t i=0; i<m_versions.size(); i++) { - delete m_versions[i]; + for (auto e : m_versions) { + delete e; + } + + for (auto e : m_epochs) { + delete e.second; } } @@ -87,7 +91,7 @@ public: * probably has the lowest probability of having the record, * so we'll check it last. */ - return buffers->delete_record(rec); + return buffers.delete_record(rec); } /* @@ -97,7 +101,7 @@ public: } std::future<std::vector<R>> query(void *parms) { - return schedule_query(get_active_epoch()->get_structure(), get_active_epoch()->get_buffers()[0], parms); + return schedule_query(parms); } size_t get_record_count() { @@ -183,36 +187,37 @@ private: std::set<Structure *> m_versions; std::atomic<size_t> m_current_epoch; - std::unordered_map<size_t, Epoch *> m_epochs; + std::unordered_map<size_t, _Epoch *> m_epochs; size_t m_scale_factor; double m_max_delete_prop; size_t m_buffer_capacity; size_t m_buffer_delete_capacity; - Epoch *get_active_epoch() { + _Epoch *get_active_epoch() { return m_epochs[m_current_epoch.load()]; } void advance_epoch() { size_t new_epoch_num = m_current_epoch.load() + 1; - Epoch *new_epoch = m_epochs[new_epoch_num]; - Epoch *old_epoch = m_epochs[m_current_epoch.load()]; + _Epoch *new_epoch = m_epochs[new_epoch_num]; + _Epoch *old_epoch = m_epochs[m_current_epoch.load()]; // Update the new Epoch to contain the buffers // from the old one that it doesn't currently have size_t old_buffer_cnt = new_epoch->clear_buffers(); for (size_t i=old_buffer_cnt; i<old_epoch->get_buffers().size(); i++) { - new_epoch->add_buffer(old_epoch->get_buffers[i]); + new_epoch->add_buffer(old_epoch->get_buffers()[i]); } m_current_epoch.fetch_add(1); + retire_epoch(old_epoch); } /* * Creates a new epoch by copying the currently active one. The new epoch's * structure will be a shallow copy of the old one's. */ - Epoch *create_new_epoch() { + _Epoch *create_new_epoch() { auto new_epoch = get_active_epoch()->clone(); std::unique_lock<std::mutex> m_struct_lock; m_versions.insert(new_epoch->get_structure()); @@ -228,8 +233,8 @@ private: * buffer while a new epoch is being created in the background. Returns a * pointer to the newly created buffer. */ - Buffer *add_empty_buffer(Epoch *epoch) { - auto new_buffer = Buffer(m_buffer_capacity, m_buffer_delete_capacity); + Buffer *add_empty_buffer(_Epoch *epoch) { + auto new_buffer = new Buffer(m_buffer_capacity, m_buffer_delete_capacity); std::unique_lock<std::mutex> m_struct_lock; m_buffers.insert(new_buffer); @@ -239,7 +244,7 @@ private: return new_buffer; } - void retire_epoch(Epoch *epoch) { + void retire_epoch(_Epoch *epoch) { /* * Epochs with currently active jobs cannot * be retired. By the time retire_epoch is called, @@ -294,26 +299,15 @@ private: vers->merge_buffer(buff); - args->result.set_value(true); args->epoch->end_job(); - delete args; - } - - static std::vector<R> finalize_query_result(std::vector<std::vector<Wrapped<R>>> &query_results, void *parms, - std::vector<void *> &shard_states, std::vector<void *> &buffer_states) { - auto result = Q::merge(query_results, parms); - - for (size_t i=0; i<buffer_states.size(); i++) { - Q::delete_buffer_query_state(buffer_states[i]); - } - - for (size_t i=0; i<states.size(); i++) { - Q::delete_query_state(shard_states[i]); - } + args->result.set_value(true); - return result; + ((DynamicExtension *) args->extension)->advance_epoch(); + + // FIXME: this might break things... not sure + delete args; } - + static void async_query(void *arguments) { QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments; @@ -322,58 +316,56 @@ private: void *parms = args->query_parms; // Get the buffer query states - std::vector<void *> buffer_states = buffers->get_buffer_query_states(parms); + std::vector<void *> buffer_states = buffers.get_query_states(parms); // Get the shard query states std::vector<std::pair<ShardID, Shard*>> shards; - std::vector<void *> shard_states = vers->get_query_states(shards, parms); + std::vector<void *> states = vers->get_query_states(shards, parms); - Q::process_query_states(parms, shard_states, buffer_states); + Q::process_query_states(parms, states, buffer_states); std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + buffer_states.size()); + for (size_t i=0; i<query_results.size(); i++) { + std::vector<Wrapped<R>> local_results = (i < buffer_states.size()) + ? Q::buffer_query(buffers.get_buffers()[i], buffer_states[i], parms) + : Q::query(shards[i - buffer_states.size()].second, + states[i - buffer_states.size()], parms); + ShardID shid = (i < buffer_states.size()) ? INVALID_SHID : shards[i - buffer_states.size()].first; + query_results[i] = std::move(filter_deletes(local_results, shid, buffers, vers)); - // Execute the query for the buffer - std::vector<std::vector<Wrapped<R>>> buffer_results(buffer_states.size()); - for (size_t i=0; i<buffer_states.size(); i++) { - auto buffer_results = Q::buffer_query(buffers->get_buffers[i], buffer_states[i], parms); - query_results[i] = std::move(filter_deletes(buffer_results, {-1, -1}, buffers, vers)); - - if constexpr (Q::EARLY_ABORT) { - if (query_results[i] > 0) { - return finalize_query_result(query_results, parms, buffer_states, shard_states); - } - } - } - - // Execute the query for each shard - for (size_t i=0; i<shards.size(); i++) { - auto shard_results = Q::query(shards[i].second, states[i], parms); - query_results[i+buffer_states.size()] = std::move(filter_deletes(shard_results, shards[i].first, buffers, vers)); if constexpr (Q::EARLY_ABORT) { - if (query_results[i+buffer_states.size()].size() > 0) { - return finalize_query_result(query_results, parms, buffer_states, shard_states); - } + if (query_results[i].size() > 0) break; } } - - // Merge the results together and finalize the job - auto result = finalize_query_result(query_results, parms, buffer_states, shard_states); + + auto result = Q::merge(query_results, parms); args->result_set.set_value(std::move(result)); args->epoch->end_job(); + + for (size_t i=0; i<buffer_states.size(); i++) { + Q::delete_buffer_query_state(buffer_states[i]); + } + + for (size_t i=0; i<states.size(); i++) { + Q::delete_query_state(states[i]); + } + + // FIXME: this might break things... not sure delete args; } - std::future<bool> schedule_merge() { - auto epoch = get_active_epoch(); + void schedule_merge() { + auto epoch = create_new_epoch(); epoch->start_job(); MergeArgs<R, S, Q, L> *args = new MergeArgs<R, S, Q, L>(); args->epoch = epoch; - args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]); + // FIXME: all full buffers can be merged at this point--but that requires + // retooling the shard interface a bit to do efficiently. + args->merges = epoch->get_structure()->get_merge_tasks(epoch->get_buffers()[0]->get_record_count()); + args->extension = this; m_sched.schedule_job(merge, 0, args); - - return args->result.get_future(); } std::future<std::vector<R>> schedule_query(void *query_parms) { @@ -389,20 +381,29 @@ private: } int internal_append(const R &rec, bool ts) { - Buffer *buffer; - while (!(buffer = get_active_epoch()->get_active_buffer())) - ; - - if (buffer->is_full()) { - auto vers = get_active_epoch()->get_structure(); - auto res = schedule_merge(vers, buffer); - res.get(); - } + Buffer *buffer = nullptr; + do { + auto epoch = get_active_epoch(); + + while (!(buffer = epoch->get_active_buffer())) + ; + + /* if the buffer is full, schedule a merge and add a new empty buffer */ + if (buffer->is_full()) { + // FIXME: possible race here--two identical merges could be scheduled + auto vers = epoch->get_structure(); + schedule_merge(); + buffer = add_empty_buffer(epoch); + } + + } while(!buffer->append(rec, ts)); - return buffer->append(rec, ts); + /* internal append should always succeed, eventually */ + return 1; } - static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, BufView *buffers, Structure *vers) { + static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, + BufView &buffers, Structure *vers) { if constexpr (!Q::SKIP_DELETE_FILTER) { return records; } @@ -431,7 +432,7 @@ private: continue; } - if (buffers->check_tombstone(rec.rec)) { + if (buffers.check_tombstone(rec.rec)) { continue; } diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index a1f865c..fe63c86 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -41,7 +41,7 @@ public: assert(m_active_jobs.load() == 0); for (auto buf : m_buffers) { - buf.release_reference(); + buf->release_reference(); } if (m_structure) { @@ -111,6 +111,8 @@ public: if (m_structure) { epoch->m_structure = m_structure->copy(); } + + return epoch; } private: diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 94c4d0a..d25c7c0 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -17,6 +17,7 @@ struct MergeArgs { Epoch<R, S, Q, L> *epoch; std::vector<MergeTask> merges; std::promise<bool> result; + void *extension; }; template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L> diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index 1efc1ac..14abedc 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -35,7 +35,7 @@ public: BufferView(std::vector<Buffer*> buffers) : m_buffers(buffers) - , m_cutoff(buffers[buffers->size()-1]->get_record_count()) + , m_cutoff(buffers[buffers.size()-1]->get_record_count()) {} ~BufferView() = default; @@ -102,7 +102,7 @@ public: return m_buffers[0]->get_tombstone_capacity(); } - std::vector<void *> get_buffer_states(void *parms) { + std::vector<void *> get_query_states(void *parms) { std::vector<void *> states; for (auto buf : m_buffers) { diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index b9230f4..342a2c7 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -231,6 +231,17 @@ public: return (double) tscnt / (double) (tscnt + reccnt); } + std::shared_ptr<InternalLevel> clone() { + auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size()); + for (size_t i=0; i<m_shard_cnt; i++) { + new_level->m_shards[i] = m_shards[i]; + new_level->m_owns[i] = true; + m_owns[i] = false; + } + + return new_level; + } + private: ssize_t m_level_no; @@ -243,16 +254,6 @@ private: std::vector<bool> m_owns; - std::shared_ptr<InternalLevel> clone() { - auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size()); - for (size_t i=0; i<m_shard_cnt; i++) { - new_level->m_shards[i] = m_shards[i]; - new_level->m_owns[i] = true; - m_owns[i] = false; - } - - return new_level; - } }; } diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 8583cb0..83573c8 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -448,17 +448,21 @@ public: return state; } - static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void *buff_state) { + static void process_query_states(void *query_parms, std::vector<void*> &shard_states, std::vector<void*> &buff_states) { + // FIXME: need to redo for the buffer vector interface auto p = (wirs_query_parms<R> *) query_parms; - auto bs = (WIRSBufferState<R> *) buff_state; std::vector<size_t> shard_sample_sizes(shard_states.size()+1, 0); size_t buffer_sz = 0; + decltype(R::weight) total_weight = 0; std::vector<decltype(R::weight)> weights; - weights.push_back(bs->total_weight); + for (auto &s : buff_states) { + auto state = (WIRSBufferState<R> *) s; + total_weight += state->total_weight; + weights.push_back(state->total_weight); + } - decltype(R::weight) total_weight = 0; for (auto &s : shard_states) { auto state = (WIRSState<R> *) s; total_weight += state->total_weight; @@ -480,8 +484,6 @@ public: } } - - bs->sample_size = buffer_sz; for (size_t i=0; i<shard_states.size(); i++) { auto state = (WIRSState<R> *) shard_states[i]; state->sample_size = shard_sample_sizes[i+1]; |