diff --git a/CHANGELOG.md b/CHANGELOG.md index 2aa0ac8..9cfb635 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - RS-389: Support `QuotaType::kHard`, [PR-75](https://github.com/reductstore/reduct-cpp/pull/75) - RS-388: `IBucket::RenameEntry` to rename entry in bucket, [PR-76](https://github.com/reductstore/reduct-cpp/pull/76) - RS-419: Add IBucket::Rename method to rename bucket, [PR-77](https://github.com/reductstore/reduct-cpp/pull/77) +- RS-462: Improve batching, [PR-78](https://github.com/reductstore/reduct-cpp/pull/78) ## [1.11.0] - 2022-08-19 diff --git a/src/reduct/bucket.cc b/src/reduct/bucket.cc index 9aad90a..44ee187 100644 --- a/src/reduct/bucket.cc +++ b/src/reduct/bucket.cc @@ -589,11 +589,11 @@ class Bucket : public IBucket { Result> resp_result; switch (type) { case BatchType::kWrite: { - const auto content_length = batch.body().size(); + const auto content_length = batch.size(); resp_result = client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length, std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) { - return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)}; + return std::pair{true, batch.Slice(offset, size)}; }); break; } diff --git a/src/reduct/bucket.h b/src/reduct/bucket.h index 2e2e65c..f2bdb07 100644 --- a/src/reduct/bucket.h +++ b/src/reduct/bucket.h @@ -174,10 +174,10 @@ class IBucket { * @param content_type * @param labels */ - void AddRecord(Time timestamp, const std::string& data, std::string content_type = "application/octet-stream", - LabelMap labels = {}) { + void AddRecord(Time timestamp, std::string data, std::string content_type = "", LabelMap labels = {}) { records_[timestamp] = Record{timestamp, data.size(), std::move(content_type), std::move(labels)}; - body_ += data; + size_ += data.size(); + body_.push_back(std::move(data)); } /** @@ -191,11 +191,37 @@ class IBucket { } [[nodiscard]] const std::map& records() const { return records_; } - [[nodiscard]] const std::string& body() const { return body_; } + + [[nodiscard]] std::string Slice(size_t offset, size_t size) const { + if (offset >= size_) { + return ""; + } + + std::string result; + for (const auto& data : body_) { + if (offset < data.size()) { + auto n = std::min(size, data.size() - offset); + result.append(data.substr(offset, n)); + size -= n; + offset = 0; + } else { + offset -= data.size(); + } + + if (size == 0) { + break; + } + } + + return result; + } + + [[nodiscard]] uint64_t size() const { return size_; } private: std::map records_; - std::string body_; + std::vector body_; + uint64_t size_ = 0; }; /** @@ -382,7 +408,6 @@ class IBucket { virtual Result RemoveQuery(std::string_view entry_name, std::optional