From d2279e1b96d352a0af1d425dcaaf93e8a26a8d52 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 30 Oct 2023 17:15:05 -0400 Subject: General Comment + Consistency updates --- include/framework/DynamicExtension.h | 46 +++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 19 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index f2bbacc..9129060 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -1,7 +1,7 @@ /* * include/framework/DynamicExtension.h * - * Copyright (C) 2023 Douglas Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * Dong Xie * * All rights reserved. Published under the Modified BSD License. @@ -107,8 +107,7 @@ public: } size_t get_record_count() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t = epoch->get_buffer_view().get_record_count() + epoch->get_structure()->get_record_count(); epoch->end_job(); @@ -116,8 +115,7 @@ public: } size_t get_tombstone_count() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t = epoch->get_buffer_view().get_tombstone_count() + epoch->get_structure()->get_tombstone_count(); epoch->end_job(); @@ -129,8 +127,7 @@ public: } size_t get_memory_usage() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t= epoch->get_buffer_view().get_memory_usage() + epoch->get_structure()->get_memory_usage(); epoch->end_job(); @@ -138,8 +135,7 @@ public: } size_t get_aux_memory_usage() { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); auto t = epoch->get_buffer_view().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage(); epoch->end_job(); @@ -151,9 +147,8 @@ public: } Shard *create_static_structure() { - auto epoch = get_active_epoch(); + auto epoch = get_active_epoch_protected(); auto bv = epoch->get_buffer_view(); - epoch->start_job(); auto vers = epoch->get_structure(); std::vector shards; @@ -219,6 +214,11 @@ private: return m_epochs[m_current_epoch.load()]; } + _Epoch *get_active_epoch_protected() { + m_epochs[m_current_epoch.load()]->start_job(); + return m_epochs[m_current_epoch.load()]; + } + void advance_epoch() { size_t new_epoch_num = m_current_epoch.load() + 1; _Epoch *new_epoch = m_epochs[new_epoch_num]; @@ -241,6 +241,12 @@ private: * structure will be a shallow copy of the old one's. */ _Epoch *create_new_epoch() { + /* + * This epoch access is _not_ protected under the assumption that + * only one merge will be able to trigger at a time. If that condition + * is violated, it is possible that this code will clone a retired + * epoch. + */ auto new_epoch = get_active_epoch()->clone(); std::unique_lock m_struct_lock; m_versions.insert(new_epoch->get_structure()); @@ -311,6 +317,8 @@ private: MergeArgs *args = (MergeArgs *) arguments; Structure *vers = args->epoch->get_structure(); + // FIXME: with an improved shard interface, multiple full buffers + // could be merged at once here. Buffer *buff = (Buffer *) args->epoch->get_buffers()[0]; for (ssize_t i=args->merges.size() - 1; i>=0; i--) { @@ -387,24 +395,23 @@ private: } std::future> schedule_query(void *query_parms) { - auto epoch = get_active_epoch(); - epoch->start_job(); + auto epoch = get_active_epoch_protected(); QueryArgs *args = new QueryArgs(); args->epoch = epoch; args->query_parms = query_parms; + auto result = args->result_set.get_future(); + m_sched.schedule_job(async_query, 0, args); - return args->result_set.get_future(); + return result; } int internal_append(const R &rec, bool ts) { Buffer *buffer = nullptr; do { - auto epoch = get_active_epoch(); - - while (!(buffer = epoch->get_active_buffer())) - ; + auto epoch = get_active_epoch_protected(); + buffer = epoch->get_active_buffer(); /* if the buffer is full, schedule a merge and add a new empty buffer */ if (buffer->is_full()) { @@ -413,7 +420,8 @@ private: schedule_merge(); buffer = add_empty_buffer(epoch); } - + // FIXME: not exactly the best spot for this + epoch->end_job(); } while(!buffer->append(rec, ts)); /* internal append should always succeed, eventually */ -- cgit v1.2.3