Skip to content

Commit

Permalink
feat: Switch DeltaRecordLimitingFileWriter and consumers to native fb
Browse files Browse the repository at this point in the history
Bug: 385338000
Change-Id: Icff8af957772b4ee94b683536763a8e67f8794a8
GitOrigin-RevId: 111aefd786247da76ce21904c11bb0ea26b07fd4
  • Loading branch information
lusayaa authored and copybara-github committed Dec 26, 2024
1 parent 9e1d3b4 commit 0357b05
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 57 deletions.
1 change: 1 addition & 0 deletions public/data_loading/writers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ cc_test(
deps = [
":delta_record_limiting_file_writer",
"//public/data_loading/readers:delta_record_stream_reader",
"//public/test_util:data_record",
"@com_google_googletest//:gtest_main",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ absl::Status DeltaRecordLimitingFileWriter::ProcessWritingFailure() {
}

absl::Status DeltaRecordLimitingFileWriter::WriteRecord(
const DataRecordStruct& data_record) {
const DataRecordT& data_record) {
file_writer_pos_ = file_writer_.pos();
if (!record_writer_.WriteRecord(
ToStringView(ToFlatBufferBuilder(data_record)))) {
auto [fbs_buffer, bytes_to_write] = Serialize(data_record);
if (!record_writer_.WriteRecord(bytes_to_write)) {
return ProcessWritingFailure();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ class DeltaRecordLimitingFileWriter : public DeltaRecordWriter {
// has reached it's hard size limit. Please create a new
// `DeltaRecordLimitingFileWriter` writing to a new file. Note that multiple
// records might be dropped, and not just the latest one.
absl::Status WriteRecord(const DataRecordT& data_record) override {
return absl::UnimplementedError("To be implemented");
absl::Status WriteRecord(const DataRecordT& data_record) override;
[[deprecated("Use corresponding DataRecordT-based function")]]
absl::Status WriteRecord(const DataRecordStruct& record) override {
return absl::UnimplementedError(
"DeltaRecordLimitingFileWriter is updated to use newer data "
"structures");
};
absl::Status WriteRecord(const DataRecordStruct& data_record) override;
const Options& GetOptions() const override;
// If ResourceExhaustedStatus is returned, it means that the underlying file
// has reached it's hard size limit. Please create a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "gtest/gtest.h"
#include "public/data_loading/readers/delta_record_stream_reader.h"
#include "public/test_util/data_record.h"

namespace kv_server {
namespace {
Expand All @@ -38,16 +39,8 @@ void Write(std::string file_name, int num_records) {
auto record_writer = std::move(*maybe_record_writer);
int cur_record = 1;
while (cur_record <= num_records) {
const std::string key = "key";
const std::string value = "value";
auto kv_mutation_record = KeyValueMutationRecordStruct{
.mutation_type = kv_server::KeyValueMutationType::Update,
.logical_commit_time = absl::ToUnixSeconds(absl::Now()),
.key = key,
.value = value,
};
auto data_record =
DataRecordStruct{.record = std::move(kv_mutation_record)};
GetNativeDataRecord(GetKVMutationRecord(GetSimpleStringValue()));
auto result = record_writer->WriteRecord(data_record);
if (!result.ok()) {
return;
Expand All @@ -63,18 +56,9 @@ int Read(std::ifstream input_stream) {
absl::Status status = record_reader.ReadRecords(
[&records_count](const kv_server::DataRecord& data_record) {
records_count++;
std::unique_ptr<kv_server::DataRecordT> data_record_native(
data_record.UnPack());
auto [fbs_buffer, serialized_string_view] =
Serialize(*data_record_native);
return kv_server::DeserializeDataRecord(
serialized_string_view,
[&records_count](const kv_server::DataRecordStruct& data_record_2) {
auto kv_record =
std::get<KeyValueMutationRecordStruct>(data_record_2.record);
EXPECT_EQ(kv_record.key, "key");
return absl::OkStatus();
});
EXPECT_EQ(data_record.UnPack()->record.AsKeyValueMutationRecord()->key,
"key");
return absl::OkStatus();
});
EXPECT_TRUE(status.ok());
return records_count;
Expand Down
1 change: 1 addition & 0 deletions tools/request_simulation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ cc_test(
deps = [
":realtime_message_batcher",
"//public/data_loading/readers:riegeli_stream_record_reader_factory",
"//public/test_util:data_record",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
36 changes: 18 additions & 18 deletions tools/request_simulation/detla_based_realtime_updates_publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,25 @@ DeltaBasedRealtimeUpdatesPublisher::CreateRealtimeMessagesAndAddToQueue(
blob_client.GetBlobReader(location));
});
DataLoadingStats data_loading_stats;
const auto process_data_record_fn =
[this, &data_loading_stats](const DataRecord& data_record) {
if (data_record.record_type() == Record::KeyValueMutationRecord) {
KeyValueMutationRecordStruct kv_mutation_struct;
const auto* kv_mutation_record =
data_record.record_as_KeyValueMutationRecord();
if (kv_mutation_record->value_type() == Value::StringValue) {
const auto record =
GetTypedRecordStruct<KeyValueMutationRecordStruct>(data_record);
realtime_message_batcher_->Insert(std::move(record));
if (record.mutation_type == KeyValueMutationType::Update) {
data_loading_stats.total_updated_records++;
} else if (record.mutation_type == KeyValueMutationType::Delete) {
data_loading_stats.total_deleted_records++;
}
return absl::OkStatus();
}
const auto process_data_record_fn = [this, &data_loading_stats](
const DataRecord& data_record) {
if (data_record.record_type() == Record::KeyValueMutationRecord) {
const auto* kv_mutation_record =
data_record.record_as_KeyValueMutationRecord();
if (kv_mutation_record->value_type() == Value::StringValue) {
KeyValueMutationRecordT kv_record_struct;
kv_mutation_record->UnPackTo(&kv_record_struct);
realtime_message_batcher_->Insert(std::move(kv_record_struct));
if (kv_record_struct.mutation_type == KeyValueMutationType::Update) {
data_loading_stats.total_updated_records++;
} else if (kv_record_struct.mutation_type ==
KeyValueMutationType::Delete) {
data_loading_stats.total_deleted_records++;
}
};
return absl::OkStatus();
}
}
};
PS_RETURN_IF_ERROR(record_reader->ReadStreamRecords(
[&process_data_record_fn](std::string_view raw) {
return DeserializeDataRecord(raw, process_data_record_fn);
Expand Down
5 changes: 3 additions & 2 deletions tools/request_simulation/realtime_message_batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ RealtimeMessage RealtimeMessageBatcher::GetMessage(int shard_num) const {
}

absl::Status RealtimeMessageBatcher::Insert(
kv_server::KeyValueMutationRecordStruct key_value_mutation) {
kv_server::KeyValueMutationRecordT key_value_mutation) {
const int shard_num =
key_sharder_.GetShardNumForKey(key_value_mutation.key, num_shards_)
.shard_num;
auto data_record = DataRecordStruct{.record = std::move(key_value_mutation)};
DataRecordT data_record;
data_record.record.Set(std::move(key_value_mutation));
auto write_result = GetWriter(shard_num).WriteRecord(data_record);
if (write_result.ok()) {
return absl::OkStatus();
Expand Down
3 changes: 1 addition & 2 deletions tools/request_simulation/realtime_message_batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ namespace kv_server {
class RealtimeMessageBatcher {
public:
// Not thread safe.
absl::Status Insert(
kv_server::KeyValueMutationRecordStruct key_value_mutation);
absl::Status Insert(kv_server::KeyValueMutationRecordT key_value_mutation);
~RealtimeMessageBatcher();
// `queue_mutex` and `realtime_messages` are not owned by
// RealtimeMessageBatcher and must outlive it.
Expand Down
17 changes: 9 additions & 8 deletions tools/request_simulation/realtime_message_batcher_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
#include "absl/strings/substitute.h"
#include "gtest/gtest.h"
#include "public/data_loading/readers/riegeli_stream_record_reader_factory.h"
#include "public/test_util/data_record.h"

namespace kv_server {
namespace {

absl::StatusOr<std::vector<kv_server::KeyValueMutationRecordStruct>> Convert(
absl::StatusOr<std::vector<kv_server::KeyValueMutationRecordT>> Convert(
RealtimeMessage rt) {
std::vector<kv_server::KeyValueMutationRecordStruct> rows;
std::vector<kv_server::KeyValueMutationRecordT> rows;
std::string string_decoded;
absl::Base64Unescape(rt.message, &string_decoded);
std::istringstream is(string_decoded);
Expand All @@ -43,10 +44,11 @@ absl::StatusOr<std::vector<kv_server::KeyValueMutationRecordStruct>> Convert(
}
auto result = record_reader->ReadStreamRecords([&rows](std::string_view raw) {
const auto* data_record = flatbuffers::GetRoot<DataRecord>(raw.data());
const auto kv_record =
GetTypedRecordStruct<KeyValueMutationRecordStruct>(*data_record);
DataRecordT data_record_struct;
data_record->UnPackTo(&data_record_struct);
auto kv_record = *data_record_struct.record.AsKeyValueMutationRecord();
EXPECT_TRUE(absl::StartsWith(kv_record.key, "key"));
rows.emplace_back(kv_record);
rows.emplace_back(std::move(kv_record));
return absl::OkStatus();
});
return rows;
Expand All @@ -63,13 +65,12 @@ void Write(std::queue<RealtimeMessage>& realtime_messages, int num_records,
int cur_record = 1;
while (cur_record <= num_records) {
const std::string key = absl::StrCat("key", std::to_string(cur_record));
const std::string value = "value";
auto kv_mutation_record = KeyValueMutationRecordStruct{
KeyValueMutationRecordT kv_mutation_record = {
.mutation_type = kv_server::KeyValueMutationType::Update,
.logical_commit_time = 1232,
.key = key,
.value = value,
};
kv_mutation_record.value.Set(GetSimpleStringValue("value"));
auto result = batcher->Insert(kv_mutation_record);
if (!result.ok()) {
return;
Expand Down

0 comments on commit 0357b05

Please sign in to comment.