From 150256bd2b271d7dcbff44a513f20fba487e0de2 Mon Sep 17 00:00:00 2001
From: Vladislav Oleshko <vlad@dragonflydb.io>
Date: Fri, 19 Jul 2024 17:11:27 +0300
Subject: [PATCH] feat(tiering): Faster small bins serialization

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
---
 src/server/snapshot.cc           | 107 ++++++++++++++++++++++++-------
 src/server/snapshot.h            |  12 +---
 src/server/tiered_storage.cc     |  14 ++++
 tests/dragonfly/snapshot_test.py |   8 +--
 4 files changed, 105 insertions(+), 36 deletions(-)

diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc
index f3036005453e..09f7e6ffbd5b 100644
--- a/src/server/snapshot.cc
+++ b/src/server/snapshot.cc
@@ -4,6 +4,7 @@
 
 #include "server/snapshot.h"
 
+#include <absl/container/btree_set.h>
 #include <absl/functional/bind_front.h>
 #include <absl/strings/match.h>
 #include <absl/strings/str_cat.h>
@@ -37,8 +38,80 @@ size_t SliceSnapshot::DbRecord::size() const {
   return HeapSize(value);
 }
 
+struct SliceSnapshot::TieredSerializer {
+  struct ValueBase {
+    DbIndex dbid;
+    CompactObj key;
+    time_t expire;
+  };
+
+  using PendingValue = std::pair<ValueBase, util::fb2::Future<PrimeValue>>;
+  using DelayedValue = std::pair<ValueBase, PrimeValue /* stores segment and encoding flags */>;
+
+  static const int kMaxPageAccum = 100000;
+
+  void Save(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv, time_t expire_time) {
+    ValueBase base{db_indx, PrimeKey(pk.ToString()), expire_time};
+
+    // Delay serialization of small values to possibly find more for the same page,
+    // reducing the total number of issued io requests.
+    if (auto [offset, size] = pv.GetExternalSlice(); size < TieredStorage::kMinOccupancySize) {
+      PrimeValue pv_copy;
+      pv_copy.ImportExternal(pv);
+      size_t page = offset / tiering::kPageSize;
+      auto& entries = delayed_[page];
+      {
+        delayed_sizes_.erase({entries.size(), page});
+        entries.emplace_back(std::move(base), std::move(pv_copy));
+        delayed_sizes_.insert({entries.size(), page});
+      }
+    } else {
+      pending_.push_back(Read(std::move(base), pv));
+    }
+  }
+
+  void FlushDelayed(bool force) {
+    // Flush pages with most records accumulated first, or all, if forced.
+    // It's enough just to issue reads, because they are collapsed by the tiered storage internally
+    while ((force && !delayed_.empty()) || delayed_.size() > kMaxPageAccum) {
+      DCHECK(!delayed_sizes_.empty());
+      auto [size, page] = delayed_sizes_.extract(delayed_sizes_.begin()).value();
+      auto entries = delayed_.extract(page);
+      for (auto& [base, value] : entries.mapped()) {
+        DCHECK(value.IsExternal());
+        pending_.push_back(Read(std::move(base), value));
+      }
+    }
+  }
+
+  void Serialize(RdbSerializer* serializer) {
+    for (auto& [base, value] : pending_)
+      serializer->SaveEntry(base.key, value.Get(), base.expire, base.dbid);
+    pending_.clear();
+  }
+
+ private:
+  static PendingValue Read(ValueBase value, const PrimeValue& pv) {
+    auto* ts = EngineShard::tlocal()->tiered_storage();
+    util::fb2::Future<PrimeValue> future;  // store PrimeValue directly to avoid further copies
+    ts->Read(value.dbid, value.key.ToString(), pv,
+             [future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); });
+    return PendingValue{std::move(value), std::move(future)};
+  }
+
+  std::vector<PendingValue> pending_;
+
+  // Small values with delayed serialization
+  absl::flat_hash_map<size_t /* page */, std::vector<DelayedValue>> delayed_;
+  // Largest entries in delayed map
+  absl::btree_set<std::pair<size_t /* size */, size_t /* page */>, std::greater<>> delayed_sizes_;
+};
+
 SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode)
-    : db_slice_(slice), dest_(dest), compression_mode_(compression_mode) {
+    : db_slice_(slice),
+      dest_(dest),
+      tiered_serializer_(new TieredSerializer{}),
+      compression_mode_(compression_mode) {
   db_array_ = slice->databases();
   tl_slice_snapshots.insert(this);
 }
@@ -308,12 +381,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
   }
 
   if (pv.IsExternal()) {
-    // We can't block, so we just schedule a tiered read and append it to the delayed entries
-    util::fb2::Future<PrimeValue> future;
-    EngineShard::tlocal()->tiered_storage()->Read(
-        db_indx, pk.ToString(), pv,
-        [future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); });
-    delayed_entries_.push_back({db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time});
+    tiered_serializer_->Save(db_indx, pk, pv, expire_time);
     ++type_freq_map_[RDB_TYPE_STRING];
   } else {
     io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, db_indx);
@@ -335,29 +403,19 @@ size_t SliceSnapshot::Serialize() {
   DbRecord db_rec{.id = id, .value = std::move(sfile.val)};
 
   dest_->Push(std::move(db_rec));
-  if (serialized != 0) {
-    VLOG(2) << "Pushed with Serialize() " << serialized << " bytes";
-  }
+
+  VLOG_IF(2, serialized != 0) << "Pushed with Serialize() " << serialized << " bytes";
   return serialized;
 }
 
 bool SliceSnapshot::PushSerializedToChannel(bool force) {
+  tiered_serializer_->FlushDelayed(force);
+  tiered_serializer_->Serialize(serializer_.get());
+
   if (!force && serializer_->SerializedLen() < 4096)
     return false;
 
-  // Flush any of the leftovers to avoid interleavings
-  const auto serialized = Serialize();
-
-  // Bucket serialization might have accumulated some delayed values.
-  // Because we can finally block in this function, we'll await and serialize them
-  while (!delayed_entries_.empty()) {
-    auto& entry = delayed_entries_.back();
-    serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid);
-    delayed_entries_.pop_back();
-  }
-
-  const auto total_serialized = Serialize() + serialized;
-  return total_serialized > 0;
+  return Serialize() > 0;
 }
 
 void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
@@ -377,6 +435,9 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
       stats_.side_saved += SerializeBucket(db_index, it);
     });
   }
+
+  // Flush tiered delayed entries to avoid reordering with journal
+  tiered_serializer_->FlushDelayed(true);
 }
 
 // For any key any journal entry must arrive at the replica strictly after its first original rdb
diff --git a/src/server/snapshot.h b/src/server/snapshot.h
index 95529dca3beb..a42b178a7379 100644
--- a/src/server/snapshot.h
+++ b/src/server/snapshot.h
@@ -91,6 +91,8 @@ class SliceSnapshot {
   void Cancel();
 
  private:
+  struct TieredSerializer;
+
   // Main fiber that iterates over all buckets in the db slice
   // and submits them to SerializeBucket.
   void IterateBucketsFb(const Cancellation* cll, bool send_full_sync_cut);
@@ -140,14 +142,6 @@ class SliceSnapshot {
   RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;
 
  private:
-  // An entry whose value must be awaited
-  struct DelayedEntry {
-    DbIndex dbid;
-    CompactObj key;
-    util::fb2::Future<PrimeValue> value;
-    time_t expire;
-  };
-
   DbSlice* db_slice_;
   DbTableArray db_array_;
 
@@ -157,7 +151,7 @@ class SliceSnapshot {
   DbIndex current_db_;
 
   std::unique_ptr<RdbSerializer> serializer_;
-  std::vector<DelayedEntry> delayed_entries_;  // collected during atomic bucket traversal
+  std::unique_ptr<TieredSerializer> tiered_serializer_;
 
   // Used for sanity checks.
   bool serialize_bucket_running_ = false;
diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc
index 984c3dc73778..272021a4671f 100644
--- a/src/server/tiered_storage.cc
+++ b/src/server/tiered_storage.cc
@@ -158,7 +158,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
     return db_slice_.memory_budget() - memory_margin_ - value_len > 0;
   }
 
+  void ClearDelayed() {
+    for (auto segment : delayed_deletes_)
+      OpManager::DeleteOffloaded(segment);
+    delayed_deletes_.clear();
+  }
+
   int64_t memory_margin_ = 0;
+  std::vector<tiering::DiskSegment> delayed_deletes_;
 
   struct {
     size_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
@@ -228,6 +235,11 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
 
   auto bin = ts_->bins_->Delete(segment);
   if (bin.empty) {
+    if (SliceSnapshot::IsSnaphotInProgress()) {
+      delayed_deletes_.push_back(segment);
+      return false;
+    }
+
     return true;
   }
 
@@ -422,6 +434,8 @@ void TieredStorage::RunOffloading(DbIndex dbid) {
   if (SliceSnapshot::IsSnaphotInProgress())
     return;
 
+  op_manager_->ClearDelayed();
+
   // Don't run offloading if there's only very little space left
   auto disk_stats = op_manager_->GetStats().disk_stats;
   if (disk_stats.allocated_bytes + kMaxIterations / 2 * tiering::kPageSize >
diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py
index ca6f04c9885f..8d9880faf6f6 100644
--- a/tests/dragonfly/snapshot_test.py
+++ b/tests/dragonfly/snapshot_test.py
@@ -430,10 +430,10 @@ async def test_bgsave_and_save(async_client: aioredis.Redis):
 async def test_tiered_entries(async_client: aioredis.Redis):
     """This test makes sure tieried entries are correctly persisted"""
 
-    # With variance 4: 512 - 8192 we include small and large values
-    await StaticSeeder(key_target=5000, data_size=1024, variance=4, types=["STRING"]).run(
-        async_client
-    )
+    # With variance 8: 128 - 8192 we include small and large values
+    await StaticSeeder(
+        key_target=6000, data_size=1024, variance=8, types=["STRING"], samples=30
+    ).run(async_client)
 
     # Compute the capture, this brings all items back to memory... so we'll wait for offloading
     start_capture = await StaticSeeder.capture(async_client)