Skip to content

Commit

Permalink
RS-462: Improve batching (#78)
Browse files Browse the repository at this point in the history
* fix batch writing

* optimize batch writing

* update CHANGELOG
  • Loading branch information
atimin authored Oct 4, 2024
1 parent bfe0bf9 commit 578a519
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- RS-389: Support `QuotaType::kHard`, [PR-75](https://github.com/reductstore/reduct-cpp/pull/75)
- RS-388: `IBucket::RenameEntry` to rename entry in bucket, [PR-76](https://github.com/reductstore/reduct-cpp/pull/76)
- RS-419: Add IBucket::Rename method to rename bucket, [PR-77](https://github.com/reductstore/reduct-cpp/pull/77)
- RS-462: Improve batching, [PR-78](https://github.com/reductstore/reduct-cpp/pull/78)

## [1.11.0] - 2022-08-19

Expand Down
4 changes: 2 additions & 2 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,11 @@ class Bucket : public IBucket {
Result<std::tuple<std::string, IHttpClient::Headers>> resp_result;
switch (type) {
case BatchType::kWrite: {
const auto content_length = batch.body().size();
const auto content_length = batch.size();
resp_result =
client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length,
std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) {
return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)};
return std::pair{true, batch.Slice(offset, size)};
});
break;
}
Expand Down
37 changes: 31 additions & 6 deletions src/reduct/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ class IBucket {
* @param content_type
* @param labels
*/
void AddRecord(Time timestamp, const std::string& data, std::string content_type = "application/octet-stream",
LabelMap labels = {}) {
void AddRecord(Time timestamp, std::string data, std::string content_type = "", LabelMap labels = {}) {
records_[timestamp] = Record{timestamp, data.size(), std::move(content_type), std::move(labels)};
body_ += data;
size_ += data.size();
body_.push_back(std::move(data));
}

/**
Expand All @@ -191,11 +191,37 @@ class IBucket {
}

[[nodiscard]] const std::map<Time, Record>& records() const { return records_; }
[[nodiscard]] const std::string& body() const { return body_; }

[[nodiscard]] std::string Slice(size_t offset, size_t size) const {
if (offset >= size_) {
return "";
}

std::string result;
for (const auto& data : body_) {
if (offset < data.size()) {
auto n = std::min(size, data.size() - offset);
result.append(data.substr(offset, n));
size -= n;
offset = 0;
} else {
offset -= data.size();
}

if (size == 0) {
break;
}
}

return result;
}

[[nodiscard]] uint64_t size() const { return size_; }

private:
std::map<Time, Record> records_;
std::string body_;
std::vector<std::string> body_;
uint64_t size_ = 0;
};

/**
Expand Down Expand Up @@ -382,7 +408,6 @@ class IBucket {
virtual Result<uint64_t> RemoveQuery(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop,
QueryOptions options) const noexcept = 0;


/**
* @brief Rename an entry
* @param old_name entry name to rename
Expand Down
28 changes: 28 additions & 0 deletions tests/reduct/entry_api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,31 @@ TEST_CASE("reduct::IBucket should rename an entry", "[bucket_api][1_12]") {
return true;
}) == Error::kOk);
}

TEST_CASE("Batch should slice data", "[batch]") {
auto batch = IBucket::Batch();

batch.AddRecord(IBucket::Time(), "1111111111");
batch.AddRecord(IBucket::Time() + us(1), "2222222222");
batch.AddRecord(IBucket::Time() + us(2), "3333333333");

REQUIRE(batch.size() == 30);
REQUIRE(batch.records().size() == 3);

SECTION("slice smaller record") {
REQUIRE(batch.Slice(0, 6) == "111111");
REQUIRE(batch.Slice(6, 6) == "111122");
REQUIRE(batch.Slice(12, 6) == "222222");
REQUIRE(batch.Slice(18, 6) == "223333");
REQUIRE(batch.Slice(24, 6) == "333333");
}

SECTION("slice bigger record") {
REQUIRE(batch.Slice(0, 15) == "111111111122222");
REQUIRE(batch.Slice(15, 15) == "222223333333333");
}

SECTION("slice all record") { REQUIRE(batch.Slice(0, 30) == "111111111122222222223333333333"); }

SECTION("slice out of range") { REQUIRE(batch.Slice(0, 31) == "111111111122222222223333333333"); }
}

0 comments on commit 578a519

Please sign in to comment.