From 0357b05e40f8461c426eadcdd71614a38dba1f6a Mon Sep 17 00:00:00 2001 From: Lusa Zhan Date: Mon, 23 Dec 2024 19:54:14 +0000 Subject: [PATCH] feat: Switch DeltaRecordLimitingFileWriter and consumers to native fb Bug: 385338000 Change-Id: Icff8af957772b4ee94b683536763a8e67f8794a8 GitOrigin-RevId: 111aefd786247da76ce21904c11bb0ea26b07fd4 --- public/data_loading/writers/BUILD.bazel | 1 + .../delta_record_limiting_file_writer.cc | 6 ++-- .../delta_record_limiting_file_writer.h | 9 +++-- .../delta_record_limiting_file_writer_test.cc | 26 +++----------- tools/request_simulation/BUILD.bazel | 1 + .../detla_based_realtime_updates_publisher.cc | 36 +++++++++---------- .../realtime_message_batcher.cc | 5 +-- .../realtime_message_batcher.h | 3 +- .../realtime_message_batcher_test.cc | 17 ++++----- 9 files changed, 47 insertions(+), 57 deletions(-) diff --git a/public/data_loading/writers/BUILD.bazel b/public/data_loading/writers/BUILD.bazel index 3c3189bb..19f4542e 100644 --- a/public/data_loading/writers/BUILD.bazel +++ b/public/data_loading/writers/BUILD.bazel @@ -142,6 +142,7 @@ cc_test( deps = [ ":delta_record_limiting_file_writer", "//public/data_loading/readers:delta_record_stream_reader", + "//public/test_util:data_record", "@com_google_googletest//:gtest_main", ], ) diff --git a/public/data_loading/writers/delta_record_limiting_file_writer.cc b/public/data_loading/writers/delta_record_limiting_file_writer.cc index 14b62a55..401c8ff5 100644 --- a/public/data_loading/writers/delta_record_limiting_file_writer.cc +++ b/public/data_loading/writers/delta_record_limiting_file_writer.cc @@ -72,10 +72,10 @@ absl::Status DeltaRecordLimitingFileWriter::ProcessWritingFailure() { } absl::Status DeltaRecordLimitingFileWriter::WriteRecord( - const DataRecordStruct& data_record) { + const DataRecordT& data_record) { file_writer_pos_ = file_writer_.pos(); - if (!record_writer_.WriteRecord( - ToStringView(ToFlatBufferBuilder(data_record)))) { + auto [fbs_buffer, bytes_to_write] = Serialize(data_record); + if (!record_writer_.WriteRecord(bytes_to_write)) { return ProcessWritingFailure(); } diff --git a/public/data_loading/writers/delta_record_limiting_file_writer.h b/public/data_loading/writers/delta_record_limiting_file_writer.h index d439faac..006cc91e 100644 --- a/public/data_loading/writers/delta_record_limiting_file_writer.h +++ b/public/data_loading/writers/delta_record_limiting_file_writer.h @@ -53,10 +53,13 @@ class DeltaRecordLimitingFileWriter : public DeltaRecordWriter { // has reached it's hard size limit. Please create a new // `DeltaRecordLimitingFileWriter` writing to a new file. Note that multiple // records might be dropped, and not just the latest one. - absl::Status WriteRecord(const DataRecordT& data_record) override { - return absl::UnimplementedError("To be implemented"); + absl::Status WriteRecord(const DataRecordT& data_record) override; + [[deprecated("Use corresponding DataRecordT-based function")]] + absl::Status WriteRecord(const DataRecordStruct& record) override { + return absl::UnimplementedError( + "DeltaRecordLimitingFileWriter is updated to use newer data " + "structures"); }; - absl::Status WriteRecord(const DataRecordStruct& data_record) override; const Options& GetOptions() const override; // If ResourceExhaustedStatus is returned, it means that the underlying file // has reached it's hard size limit. Please create a new diff --git a/public/data_loading/writers/delta_record_limiting_file_writer_test.cc b/public/data_loading/writers/delta_record_limiting_file_writer_test.cc index 6f5828df..2dad6b49 100644 --- a/public/data_loading/writers/delta_record_limiting_file_writer_test.cc +++ b/public/data_loading/writers/delta_record_limiting_file_writer_test.cc @@ -21,6 +21,7 @@ #include "gtest/gtest.h" #include "public/data_loading/readers/delta_record_stream_reader.h" +#include "public/test_util/data_record.h" namespace kv_server { namespace { @@ -38,16 +39,8 @@ void Write(std::string file_name, int num_records) { auto record_writer = std::move(*maybe_record_writer); int cur_record = 1; while (cur_record <= num_records) { - const std::string key = "key"; - const std::string value = "value"; - auto kv_mutation_record = KeyValueMutationRecordStruct{ - .mutation_type = kv_server::KeyValueMutationType::Update, - .logical_commit_time = absl::ToUnixSeconds(absl::Now()), - .key = key, - .value = value, - }; auto data_record = - DataRecordStruct{.record = std::move(kv_mutation_record)}; + GetNativeDataRecord(GetKVMutationRecord(GetSimpleStringValue())); auto result = record_writer->WriteRecord(data_record); if (!result.ok()) { return; @@ -63,18 +56,9 @@ int Read(std::ifstream input_stream) { absl::Status status = record_reader.ReadRecords( [&records_count](const kv_server::DataRecord& data_record) { records_count++; - std::unique_ptr data_record_native( - data_record.UnPack()); - auto [fbs_buffer, serialized_string_view] = - Serialize(*data_record_native); - return kv_server::DeserializeDataRecord( - serialized_string_view, - [&records_count](const kv_server::DataRecordStruct& data_record_2) { - auto kv_record = - std::get(data_record_2.record); - EXPECT_EQ(kv_record.key, "key"); - return absl::OkStatus(); - }); + EXPECT_EQ(data_record.UnPack()->record.AsKeyValueMutationRecord()->key, + "key"); + return absl::OkStatus(); }); EXPECT_TRUE(status.ok()); return records_count; diff --git a/tools/request_simulation/BUILD.bazel b/tools/request_simulation/BUILD.bazel index ca6b8cea..481bace8 100644 --- a/tools/request_simulation/BUILD.bazel +++ b/tools/request_simulation/BUILD.bazel @@ -345,6 +345,7 @@ cc_test( deps = [ ":realtime_message_batcher", "//public/data_loading/readers:riegeli_stream_record_reader_factory", + "//public/test_util:data_record", "@com_google_googletest//:gtest_main", ], ) diff --git a/tools/request_simulation/detla_based_realtime_updates_publisher.cc b/tools/request_simulation/detla_based_realtime_updates_publisher.cc index ab43fb12..2765b5e5 100644 --- a/tools/request_simulation/detla_based_realtime_updates_publisher.cc +++ b/tools/request_simulation/detla_based_realtime_updates_publisher.cc @@ -128,25 +128,25 @@ DeltaBasedRealtimeUpdatesPublisher::CreateRealtimeMessagesAndAddToQueue( blob_client.GetBlobReader(location)); }); DataLoadingStats data_loading_stats; - const auto process_data_record_fn = - [this, &data_loading_stats](const DataRecord& data_record) { - if (data_record.record_type() == Record::KeyValueMutationRecord) { - KeyValueMutationRecordStruct kv_mutation_struct; - const auto* kv_mutation_record = - data_record.record_as_KeyValueMutationRecord(); - if (kv_mutation_record->value_type() == Value::StringValue) { - const auto record = - GetTypedRecordStruct(data_record); - realtime_message_batcher_->Insert(std::move(record)); - if (record.mutation_type == KeyValueMutationType::Update) { - data_loading_stats.total_updated_records++; - } else if (record.mutation_type == KeyValueMutationType::Delete) { - data_loading_stats.total_deleted_records++; - } - return absl::OkStatus(); - } + const auto process_data_record_fn = [this, &data_loading_stats]( + const DataRecord& data_record) { + if (data_record.record_type() == Record::KeyValueMutationRecord) { + const auto* kv_mutation_record = + data_record.record_as_KeyValueMutationRecord(); + if (kv_mutation_record->value_type() == Value::StringValue) { + KeyValueMutationRecordT kv_record_struct; + kv_mutation_record->UnPackTo(&kv_record_struct); + realtime_message_batcher_->Insert(std::move(kv_record_struct)); + if (kv_record_struct.mutation_type == KeyValueMutationType::Update) { + data_loading_stats.total_updated_records++; + } else if (kv_record_struct.mutation_type == + KeyValueMutationType::Delete) { + data_loading_stats.total_deleted_records++; } - }; + return absl::OkStatus(); + } + } + }; PS_RETURN_IF_ERROR(record_reader->ReadStreamRecords( [&process_data_record_fn](std::string_view raw) { return DeserializeDataRecord(raw, process_data_record_fn); diff --git a/tools/request_simulation/realtime_message_batcher.cc b/tools/request_simulation/realtime_message_batcher.cc index af20231d..007434ac 100644 --- a/tools/request_simulation/realtime_message_batcher.cc +++ b/tools/request_simulation/realtime_message_batcher.cc @@ -100,11 +100,12 @@ RealtimeMessage RealtimeMessageBatcher::GetMessage(int shard_num) const { } absl::Status RealtimeMessageBatcher::Insert( - kv_server::KeyValueMutationRecordStruct key_value_mutation) { + kv_server::KeyValueMutationRecordT key_value_mutation) { const int shard_num = key_sharder_.GetShardNumForKey(key_value_mutation.key, num_shards_) .shard_num; - auto data_record = DataRecordStruct{.record = std::move(key_value_mutation)}; + DataRecordT data_record; + data_record.record.Set(std::move(key_value_mutation)); auto write_result = GetWriter(shard_num).WriteRecord(data_record); if (write_result.ok()) { return absl::OkStatus(); diff --git a/tools/request_simulation/realtime_message_batcher.h b/tools/request_simulation/realtime_message_batcher.h index d299c680..7d0b6df6 100644 --- a/tools/request_simulation/realtime_message_batcher.h +++ b/tools/request_simulation/realtime_message_batcher.h @@ -36,8 +36,7 @@ namespace kv_server { class RealtimeMessageBatcher { public: // Not thread safe. - absl::Status Insert( - kv_server::KeyValueMutationRecordStruct key_value_mutation); + absl::Status Insert(kv_server::KeyValueMutationRecordT key_value_mutation); ~RealtimeMessageBatcher(); // `queue_mutex` and `realtime_messages` are not owned by // RealtimeMessageBatcher and must outlive it. diff --git a/tools/request_simulation/realtime_message_batcher_test.cc b/tools/request_simulation/realtime_message_batcher_test.cc index c5a486de..09bf4f81 100644 --- a/tools/request_simulation/realtime_message_batcher_test.cc +++ b/tools/request_simulation/realtime_message_batcher_test.cc @@ -21,13 +21,14 @@ #include "absl/strings/substitute.h" #include "gtest/gtest.h" #include "public/data_loading/readers/riegeli_stream_record_reader_factory.h" +#include "public/test_util/data_record.h" namespace kv_server { namespace { -absl::StatusOr> Convert( +absl::StatusOr> Convert( RealtimeMessage rt) { - std::vector rows; + std::vector rows; std::string string_decoded; absl::Base64Unescape(rt.message, &string_decoded); std::istringstream is(string_decoded); @@ -43,10 +44,11 @@ absl::StatusOr> Convert( } auto result = record_reader->ReadStreamRecords([&rows](std::string_view raw) { const auto* data_record = flatbuffers::GetRoot(raw.data()); - const auto kv_record = - GetTypedRecordStruct(*data_record); + DataRecordT data_record_struct; + data_record->UnPackTo(&data_record_struct); + auto kv_record = *data_record_struct.record.AsKeyValueMutationRecord(); EXPECT_TRUE(absl::StartsWith(kv_record.key, "key")); - rows.emplace_back(kv_record); + rows.emplace_back(std::move(kv_record)); return absl::OkStatus(); }); return rows; @@ -63,13 +65,12 @@ void Write(std::queue& realtime_messages, int num_records, int cur_record = 1; while (cur_record <= num_records) { const std::string key = absl::StrCat("key", std::to_string(cur_record)); - const std::string value = "value"; - auto kv_mutation_record = KeyValueMutationRecordStruct{ + KeyValueMutationRecordT kv_mutation_record = { .mutation_type = kv_server::KeyValueMutationType::Update, .logical_commit_time = 1232, .key = key, - .value = value, }; + kv_mutation_record.value.Set(GetSimpleStringValue("value")); auto result = batcher->Insert(kv_mutation_record); if (!result.ok()) { return;