summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
blob: 16cbb0e3aa51bc891a0205c4eaf33bee5fa19701 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
/*
 * include/framework/DynamicExtension.h
 *
 * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
 *                         Dong Xie <dongx@psu.edu>
 *
 * Distributed under the Modified BSD License.
 *
 */
#pragma once

#include <atomic>
#include <cstdio>
#include <vector>

#include "framework/interface/Scheduler.h"
#include "framework/scheduling/SerialScheduler.h"

#include "framework/structure/ExtensionStructure.h"
#include "framework/structure/MutableBuffer.h"

#include "framework/scheduling/Epoch.h"
#include "framework/util/Configuration.h"

namespace de {

template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
          LayoutPolicy L = LayoutPolicy::TEIRING,
          DeletePolicy D = DeletePolicy::TAGGING,
          SchedulerInterface SchedType = SerialScheduler>
class DynamicExtension {
  /* for unit testing purposes */
public:
    LayoutPolicy Layout = L;

private:
  /* convenience typedefs for commonly used types within the class */
  typedef typename ShardType::RECORD RecordType;
  typedef MutableBuffer<RecordType> Buffer;
  typedef ExtensionStructure<ShardType, QueryType, L> Structure;
  typedef Epoch<ShardType, QueryType, L> _Epoch;
  typedef BufferView<RecordType> BufView;

  typedef typename QueryType::Parameters Parameters;
  typedef typename QueryType::LocalQuery LocalQuery;
  typedef typename QueryType::LocalQueryBuffer BufferQuery;
  typedef typename QueryType::LocalResultType LocalResult;
  typedef typename QueryType::ResultType QueryResult;
  

  static constexpr size_t QUERY = 1;
  static constexpr size_t RECONSTRUCTION = 2;

  struct epoch_ptr {
    _Epoch *epoch;
    size_t refcnt;
  };

public:
  /**
   * Create a new Dynamized version of a data structure, supporting
   * inserts and, possibly, deletes. The following parameters are used
   * for configuration of the structure,
   * @param buffer_low_watermark The number of records that can be 
   *        inserted before a buffer flush is initiated
   *
   * @param buffer_high_watermark The maximum buffer capacity, inserts 
   *        will begin to fail once this number is reached, until the 
   *        buffer flush has completed. Has no effect in single-threaded 
   *        operation
   *
   * @param scale_factor The rate at which the capacity of levels 
   *        grows; should be at least 2 for reasonable performance
   *
   * @param memory_budget Unused at this time
   *
   * @param thread_cnt The maximum number of threads available to the
   *        framework's scheduler for use in answering queries and 
   *        performing compactions and flushes, etc.
   */
  DynamicExtension(size_t buffer_low_watermark, size_t buffer_high_watermark,
                   size_t scale_factor, size_t memory_budget = 0,
                   size_t thread_cnt = 16)
      : m_scale_factor(scale_factor), m_max_delete_prop(1),
        m_sched(memory_budget, thread_cnt),
        m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)),
        m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0) {
    if constexpr (L == LayoutPolicy::BSM) {
      assert(scale_factor == 2);
    }

    auto vers =
        new Structure(buffer_high_watermark, m_scale_factor, m_max_delete_prop);
    m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
    m_previous_epoch.store({nullptr, 0});
    m_next_epoch.store({nullptr, 0});
  }

  /**
   *  Destructor for DynamicExtension. Will block until the completion of
   *  any outstanding epoch transition, shut down the scheduler, and free
   *  all currently allocated shards, buffers, etc., by calling their
   *  destructors.
   */
  ~DynamicExtension() {

    /* let any in-flight epoch transition finish */
    await_next_epoch();

    /* shutdown the scheduler */
    m_sched.shutdown();

    /* delete all held resources */
    delete m_next_epoch.load().epoch;
    delete m_current_epoch.load().epoch;
    delete m_previous_epoch.load().epoch;

    delete m_buffer;
  }

  /**
   *  Inserts a record into the index. Returns 1 if the insert succeeds,
   *  and 0 if it fails. Inserts may fail if the DynamicExtension's buffer
   *  has reached the high water mark; in this case, the insert should be
   *  retried when the buffer has flushed. The record will be immediately
   *  visible inside the index upon the return of this function.
   *
   *  @param rec The record to be inserted
   *
   *  @return 1 on success, 0 on failure (in which case the insert should
   *          be retried)
   */
  int insert(const RecordType &rec) { return internal_append(rec, false); }

  /**
   *  Erases a record from the index, according to the DeletePolicy 
   *  template parameter. Returns 1 on success and 0 on failure. The
   *  equality comparison operator of RecordType is used to identify
   *  the record to be deleted. 
   * 
   *  Deletes behave differently, depending on the DeletionPolicy. For
   *  Tombstone deletes, a tombstone record will be inserted into the
   *  index. The presence of the deleted record is not checked first, so
   *  deleting a record that does not exist will result in an unnecessary
   *  tombstone record being written. 
   *
   *  Deletes using Tagging will perform a point lookup for the record to
   *  be removed, and mark it as deleted in its header. 
   *
   *  @param rec The record to be deleted. The argument to this function 
   *         should compare equal to the record to be deleted.
   *
   *  @return 1 on success, and 0 on failure. For tombstone deletes, a 
   *          failure will occur if the insert fails due to the buffer 
   *          being full, and can be retried. For tagging deletes, a 
   *          failure means that hte record to be deleted could not be
   *          found in the index, and should *not* be retried.
   */
  int erase(const RecordType &rec) {
    // FIXME: delete tagging will require a lot of extra work to get
    //        operating "correctly" in a concurrent environment.

    /*
     * Get a view on the buffer *first*. This will ensure a stronger
     * ordering than simply accessing the buffer directly, but is
     * not *strictly* necessary.
     */
    if constexpr (D == DeletePolicy::TAGGING) {
      static_assert(std::same_as<SchedType, SerialScheduler>,
                    "Tagging is only supported in single-threaded operation");

      auto view = m_buffer->get_buffer_view();

      auto epoch = get_active_epoch();
      if (epoch->get_structure()->tagged_delete(rec)) {
        end_job(epoch);
        return 1;
      }

      end_job(epoch);

      /*
       * the buffer will take the longest amount of time, and
       * probably has the lowest probability of having the record,
       * so we'll check it last.
       */
      return view.delete_record(rec);
    }

    /*
     * If tagging isn't used, then delete using a tombstone
     */
    return internal_append(rec, true);
  }

  /**
   *  Schedule the execution of a query with specified parameters and
   *  returns a future that can be used to access the results. The query
   *  is executed asynchronously.
   *  @param parms An rvalue reference to the query parameters.
   *
   *  @return A future, from which the query results can be retrieved upon
   *          query completion
   */
  std::future<std::vector<QueryResult>>
  query(Parameters &&parms) {
    return schedule_query(std::move(parms));
  }

  /**
   *  Determine the number of records (including tagged records and 
   *  tombstones) currently within the framework. This number is used for
   *  determining when and how reconstructions occur.
   *
   *  @return The number of records within the index
   */
  size_t get_record_count() {
    auto epoch = get_active_epoch();
    auto t = epoch->get_buffer().get_record_count() +
             epoch->get_structure()->get_record_count();
    end_job(epoch);

    return t;
  }

  /**
   *  Returns the number of tombstone records currently within the
   *  index. This function can be called when tagged deletes are used,
   *  but will always return 0 in that case.
   *
   *  @return The number of tombstone records within the index
   */ 
  size_t get_tombstone_count() {
    auto epoch = get_active_epoch();
    auto t = epoch->get_buffer().get_tombstone_count() +
             epoch->get_structure()->get_tombstone_count();
    end_job(epoch);

    return t;
  }

  /**
   *  Get the number of levels within the framework. This count will
   *  include any empty levels, but will not include the buffer. Note that
   *  this is *not* the same as the number of shards when tiering is used,
   *  as each level can contain multiple shards in that case.
   *
   *  @return The number of levels within the index
   */ 
  size_t get_height() {
    auto epoch = get_active_epoch();
    auto t = epoch->get_structure()->get_height();
    end_job(epoch);

    return t;
  }

  /**
   *  Get the number of bytes of memory allocated across the framework for
   *  storing records and associated index information (i.e., internal
   *  ISAM tree nodes). This includes memory that is allocated but
   *  currently unused in the buffer, or in shards themselves
   *  (overallocation due to delete cancellation, etc.).
   *
   *  @return The number of bytes of memory used for shards (as reported by
   *          ShardType::get_memory_usage) and the buffer by the index.
   */
  size_t get_memory_usage() {
    auto epoch = get_active_epoch();
    auto t = m_buffer->get_memory_usage() +
             epoch->get_structure()->get_memory_usage();
    end_job(epoch);

    return t;
  }

  /**
   *  Get the number of bytes of memory allocated across the framework for
   *  auxiliary structures. This can include bloom filters, aux
   *  hashtables, etc.
   *
   *  @return The number of bytes of memory used for auxilliary structures
   *          (as reported by ShardType::get_aux_memory_usage) by the index.
   */
  size_t get_aux_memory_usage() {
    auto epoch = get_active_epoch();
    auto t = epoch->get_structure()->get_aux_memory_usage();
    end_job(epoch);

    return t;
  }

  /**
   *  Create a new single Shard object containing all of the records
   *  within the framework (buffer and shards). 
   *
   *  @param await_reconstruction_completion Specifies whether the currently
   *         active state of the index should be used to create the shard
   *         (false), or if shard construction should wait for any active
   *         reconstructions to finish first (true). Default value of false.
   *
   *  @return A new shard object, containing a copy of all records within
   *          the index. Ownership of this object is transfered to the
   *          caller.
   */
  ShardType *
  create_static_structure(bool await_reconstruction_completion = false) {
    if (await_reconstruction_completion) {
      await_next_epoch();
    }

    auto epoch = get_active_epoch();
    auto vers = epoch->get_structure();
    std::vector<ShardType *> shards;

    if (vers->get_levels().size() > 0) {
      for (int i = vers->get_levels().size() - 1; i >= 0; i--) {
        if (vers->get_levels()[i] &&
            vers->get_levels()[i]->get_record_count() > 0) {
          shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
        }
      }
    }

    /*
     * construct a shard from the buffer view. We'll hold the view
     * for as short a time as possible: once the records are exfiltrated
     * from the buffer, there's no reason to retain a hold on the view's
     * head pointer any longer
     */
    {
      auto bv = epoch->get_buffer();
      if (bv.get_record_count() > 0) {
        shards.emplace_back(new ShardType(std::move(bv)));
      }
    }

    ShardType *flattened = new ShardType(shards);

    for (auto shard : shards) {
      delete shard;
    }

    end_job(epoch);
    return flattened;
  }

  /*
   * If the current epoch is *not* the newest one, then wait for
   * the newest one to become available. Otherwise, returns immediately.
   */
  void await_next_epoch() {
    while (m_next_epoch.load().epoch != nullptr) {
      std::unique_lock<std::mutex> lk(m_epoch_cv_lk);
      m_epoch_cv.wait(lk);
    }
  }

  /**
   *  Verify that the currently active version of the index does not 
   *  violate tombstone proportion invariants. Exposed for unit-testing
   *  purposes.
   *
   *  @return Returns true if the tombstone proportion invariant is 
   *  satisfied, and false if it is not.
   */
  bool validate_tombstone_proportion() {
    auto epoch = get_active_epoch();
    auto t = epoch->get_structure()->validate_tombstone_proportion();
    end_job(epoch);
    return t;
  }

  /**
   * Calls SchedType::print_statistics, which should write a report of
   * scheduler performance statistics to stdout.
   */
  void print_scheduler_statistics() const { m_sched.print_statistics(); }

private:
  size_t m_scale_factor;
  double m_max_delete_prop;

  SchedType m_sched;
  Buffer *m_buffer;

  size_t m_core_cnt;
  std::atomic<int> m_next_core;
  std::atomic<size_t> m_epoch_cnt;
  
  alignas(64) std::atomic<bool> m_reconstruction_scheduled;

  std::atomic<epoch_ptr> m_next_epoch;
  std::atomic<epoch_ptr> m_current_epoch;
  std::atomic<epoch_ptr> m_previous_epoch;

  std::condition_variable m_epoch_cv;
  std::mutex m_epoch_cv_lk;




  void enforce_delete_invariant(_Epoch *epoch) {
    auto structure = epoch->get_structure();
    auto compactions = structure->get_compaction_tasks();

    while (compactions.size() > 0) {

      /* schedule a compaction */
      ReconstructionArgs<ShardType, QueryType, L> *args =
          new ReconstructionArgs<ShardType, QueryType, L>();
      args->epoch = epoch;
      args->merges = compactions;
      args->extension = this;
      args->compaction = true;
      /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed
       * here */

      auto wait = args->result.get_future();

      m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);

      /* wait for compaction completion */
      wait.get();

      /* get a new batch of compactions to perform, if needed */
      compactions = structure->get_compaction_tasks();
    }
  }

  _Epoch *get_active_epoch() {
    epoch_ptr old, new_ptr;

    do {
      /*
       * during an epoch transition, a nullptr will installed in the
       * current_epoch. At this moment, the "new" current epoch will
       * soon be installed, but the "current" current epoch has been
       * moved back to m_previous_epoch.
       */
      if (m_current_epoch.load().epoch == nullptr) {
        old = m_previous_epoch;
        new_ptr = {old.epoch, old.refcnt + 1};
        if (old.epoch != nullptr &&
            m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
          break;
        }
      } else {
        old = m_current_epoch;
        new_ptr = {old.epoch, old.refcnt + 1};
        if (old.epoch != nullptr &&
            m_current_epoch.compare_exchange_strong(old, new_ptr)) {
          break;
        }
      }
    } while (true);

    assert(new_ptr.refcnt > 0);

    return new_ptr.epoch;
  }

  void advance_epoch(size_t buffer_head) {

    retire_epoch(m_previous_epoch.load().epoch);

    epoch_ptr tmp = {nullptr, 0};
    epoch_ptr cur;
    do {
      cur = m_current_epoch;
    } while (!m_current_epoch.compare_exchange_strong(cur, tmp));

    m_previous_epoch.store(cur);

    // FIXME: this may currently block because there isn't any
    // query preemption yet. At this point, we'd need to either
    // 1) wait for all queries on the old_head to finish
    // 2) kill all queries on the old_head
    // 3) somehow migrate all queries on the old_head to the new
    //    version
    while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) {
      _mm_pause();
    }

    m_current_epoch.store(m_next_epoch);
    m_next_epoch.store({nullptr, 0});

    /* notify any blocking threads that the new epoch is available */
    m_epoch_cv_lk.lock();
    m_epoch_cv.notify_all();
    m_epoch_cv_lk.unlock();
  }

  /*
   * Creates a new epoch by copying the currently active one. The new epoch's
   * structure will be a shallow copy of the old one's.
   */
  _Epoch *create_new_epoch() {
    /*
     * This epoch access is _not_ protected under the assumption that
     * only one reconstruction will be able to trigger at a time. If that
     * condition is violated, it is possible that this code will clone a retired
     * epoch.
     */
    assert(m_next_epoch.load().epoch == nullptr);
    auto current_epoch = get_active_epoch();

    m_epoch_cnt.fetch_add(1);
    m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0});

    end_job(current_epoch);

    return m_next_epoch.load().epoch;
  }

  void retire_epoch(_Epoch *epoch) {
    /*
     * Epochs with currently active jobs cannot
     * be retired. By the time retire_epoch is called,
     * it is assumed that a new epoch is active, meaning
     * that the epoch to be retired should no longer
     * accumulate new active jobs. Eventually, this
     * number will hit zero and the function will
     * proceed.
     */

    if (epoch == nullptr) {
      return;
    }

    epoch_ptr old, new_ptr;
    new_ptr = {nullptr, 0};
    do {
      old = m_previous_epoch.load();

      /*
       * If running in single threaded mode, the failure to retire
       * an Epoch will result in the thread of execution blocking
       * indefinitely.
       */
      if constexpr (std::same_as<SchedType, SerialScheduler>) {
        if (old.epoch == epoch)
          assert(old.refcnt == 0);
      }

      if (old.epoch == epoch && old.refcnt == 0 &&
          m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
        break;
      }
      usleep(1);

    } while (true);

    delete epoch;
  }

  static void reconstruction(void *arguments) {
    auto args = (ReconstructionArgs<ShardType, QueryType, L> *)arguments;

    ((DynamicExtension *)args->extension)->SetThreadAffinity();
    Structure *vers = args->epoch->get_structure();

    if constexpr (L == LayoutPolicy::BSM) {
      if (args->merges.size() > 0) {
        vers->reconstruction(args->merges[0]);
      }
    } else {
      for (ssize_t i = 0; i < args->merges.size(); i++) {
        vers->reconstruction(args->merges[i].target,
                             args->merges[i].sources[0]);
      }
    }

    /*
     * we'll grab the buffer AFTER doing the internal reconstruction, so we
     * can flush as many records as possible in one go. The reconstruction
     * was done so as to make room for the full buffer anyway, so there's
     * no real benefit to doing this first.
     */
    auto buffer_view = args->epoch->get_buffer();
    size_t new_head = buffer_view.get_tail();

    /*
     * if performing a compaction, don't flush the buffer, as
     * there is no guarantee that any necessary reconstructions
     * will free sufficient space in L0 to support a flush
     */
    if (!args->compaction) {
      vers->flush_buffer(std::move(buffer_view));
    }

    args->result.set_value(true);

    /*
     * Compactions occur on an epoch _before_ it becomes active,
     * and as a result the active epoch should _not_ be advanced as
     * part of a compaction
     */
    if (!args->compaction) {
      ((DynamicExtension *)args->extension)->advance_epoch(new_head);
    }

    ((DynamicExtension *)args->extension)
        ->m_reconstruction_scheduled.store(false);

    delete args;
  }

  static void async_query(void *arguments) {
    auto *args = 
      (QueryArgs<ShardType, QueryType, DynamicExtension> *) arguments;

    auto epoch = args->extension->get_active_epoch();

    auto buffer = epoch->get_buffer();
    auto vers = epoch->get_structure();
    auto *parms = &(args->query_parms);

    /* create initial buffer query */
    auto buffer_query = QueryType::local_preproc_buffer(&buffer, parms);

    /* create initial local queries */
    std::vector<std::pair<ShardID, ShardType *>> shards;
    std::vector<LocalQuery *> local_queries =
        vers->get_local_queries(shards, parms);

    /* process local/buffer queries to create the final version */
    QueryType::distribute_query(parms, local_queries, buffer_query);

    /* execute the local/buffer queries and combine the results into output */
    std::vector<QueryResult> output;
    do {
      std::vector<std::vector<LocalResult>>
          query_results(shards.size() + 1);
      for (size_t i = 0; i < query_results.size(); i++) {
        std::vector<LocalResult> local_results;
        ShardID shid;

        if (i == 0) { /* execute buffer query */
          local_results = QueryType::local_query_buffer(buffer_query);
          shid = INVALID_SHID;
        } else { /*execute local queries */
          local_results = QueryType::local_query(shards[i - 1].second,
                                                 local_queries[i - 1]);
          shid = shards[i - 1].first;
        }

        /* framework-level, automatic delete filtering */
        query_results[i] = std::move(local_results);

        /* end query early if EARLY_ABORT is set and a result exists */
        if constexpr (QueryType::EARLY_ABORT) {
          if (query_results[i].size() > 0)
            break;
        }
      }

      /*
       * combine the results of the local queries, also translating
       * from LocalResultType to ResultType
       */
      QueryType::combine(query_results, parms, output);

      /* optionally repeat the local queries if necessary */
    } while (QueryType::repeat(parms, output, local_queries, buffer_query));

    /* return the output vector to caller via the future */
    args->result_set.set_value(std::move(output));

    /* officially end the query job, releasing the pin on the epoch */
    args->extension->end_job(epoch);

    /* clean up memory allocated for temporary query objects */
    delete buffer_query;
    for (size_t i = 0; i < local_queries.size(); i++) {
      delete local_queries[i];
    }

    delete args;
  }

  void schedule_reconstruction() {
    auto epoch = create_new_epoch();

    ReconstructionArgs<ShardType, QueryType, L> *args =
        new ReconstructionArgs<ShardType, QueryType, L>();
    args->epoch = epoch;
    args->merges = epoch->get_structure()->get_reconstruction_tasks(
        m_buffer->get_high_watermark());
    args->extension = this;
    args->compaction = false;
    /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed
     * here */

    m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
  }

  std::future<std::vector<QueryResult>>
  schedule_query(Parameters &&query_parms) {
    auto args =
        new QueryArgs<ShardType, QueryType, DynamicExtension>();
    args->extension = this;
    args->query_parms = std::move(query_parms);
    auto result = args->result_set.get_future();

    m_sched.schedule_job(async_query, 0, (void *)args, QUERY);

    return result;
  }

  int internal_append(const RecordType &rec, bool ts) {
    if (m_buffer->is_at_low_watermark()) {
      auto old = false;

      if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) {
        schedule_reconstruction();
      }
    }

    /* this will fail if the HWM is reached and return 0 */
    return m_buffer->append(rec, ts);
  }

#ifdef _GNU_SOURCE
  void SetThreadAffinity() {
    if constexpr (std::same_as<SchedType, SerialScheduler>) {
      return;
    }

    int core = m_next_core.fetch_add(1) % m_core_cnt;
    cpu_set_t mask;
    CPU_ZERO(&mask);

    switch (core % 2) {
    case 0:
      // 0 |-> 0
      // 2 |-> 2
      // 4 |-> 4
      core = core + 0;
      break;
    case 1:
      // 1 |-> 28
      // 3 |-> 30
      // 5 |-> 32
      core = (core - 1) + m_core_cnt;
      break;
    }
    CPU_SET(core, &mask);
    ::sched_setaffinity(0, sizeof(mask), &mask);
  }
#else
  void SetThreadAffinity() {}
#endif

  void end_job(_Epoch *epoch) {
    epoch_ptr old, new_ptr;

    do {
      if (m_previous_epoch.load().epoch == epoch) {
        old = m_previous_epoch;
        /*
         * This could happen if we get into the system during a
         * transition. In this case, we can just back out and retry
         */
        if (old.epoch == nullptr) {
          continue;
        }

        assert(old.refcnt > 0);

        new_ptr = {old.epoch, old.refcnt - 1};
        if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
          break;
        }
      } else {
        old = m_current_epoch;
        /*
         * This could happen if we get into the system during a
         * transition. In this case, we can just back out and retry
         */
        if (old.epoch == nullptr) {
          continue;
        }

        assert(old.refcnt > 0);

        new_ptr = {old.epoch, old.refcnt - 1};
        if (m_current_epoch.compare_exchange_strong(old, new_ptr)) {
          break;
        }
      }
    } while (true);
  }
};
} // namespace de