From 9431ec9eb38fbf9a5e7e82fb59cbfbb51de1427f Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Sat, 18 May 2024 13:42:50 +0200 Subject: [PATCH 1/5] fix version check --- src/reduct/bucket.cc | 4 ++-- src/reduct/bucket.h | 8 ++++++-- src/reduct/internal/http_client.cc | 32 +++++++++++++++++++++++++++++- src/reduct/internal/http_client.h | 2 ++ 4 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/reduct/bucket.cc b/src/reduct/bucket.cc index b714389..1355a61 100644 --- a/src/reduct/bucket.cc +++ b/src/reduct/bucket.cc @@ -1,4 +1,4 @@ -// Copyright 2022-2023 Alexey Timin +// Copyright 2022-2024 Alexey Timin #include "reduct/bucket.h" #define FMT_HEADER_ONLY 1 @@ -258,7 +258,7 @@ class Bucket : public IBucket { } while (true) { - bool batched = client_->api_version() >= "1.5"; + bool batched = internal::IsCompatible("1.5", client_->api_version()); auto [stopped, record_err] = ReadRecord(fmt::format("{}/{}{}?q={}", path_, entry_name, batched ? "/batch" : "", id), batched, options.head_only, callback); diff --git a/src/reduct/bucket.h b/src/reduct/bucket.h index c6c0802..af312b7 100644 --- a/src/reduct/bucket.h +++ b/src/reduct/bucket.h @@ -252,13 +252,17 @@ class IBucket { * Query options */ struct QueryOptions { - std::optional ttl; ///< time to live LabelMap include; ///< include labels LabelMap exclude; ///< exclude labels + std::optional each_s; ///< return one record each S seconds + std::optional each_n; ///< return each N-th record + std::optional limit; ///< limit number of records + + std::optional ttl; ///< time to live + bool continuous; ///< continuous query. If true, the method returns the latest record and waits for the next one std::chrono::milliseconds poll_interval; ///< poll interval for continuous query bool head_only; ///< read only metadata - std::optional limit; ///< limit number of records }; /** diff --git a/src/reduct/internal/http_client.cc b/src/reduct/internal/http_client.cc index 119262b..fc3fdd8 100644 --- a/src/reduct/internal/http_client.cc +++ b/src/reduct/internal/http_client.cc @@ -1,4 +1,4 @@ -// Copyright 2022-2023 Alexey Timin +// Copyright 2022-2024 Alexey Timin #include "reduct/internal/http_client.h" #undef CPPHTTPLIB_BROTLI_SUPPORT @@ -8,6 +8,7 @@ #include #include +#include namespace reduct::internal { @@ -176,4 +177,33 @@ class HttpClient : public IHttpClient { std::unique_ptr IHttpClient::Build(std::string_view url, const HttpOptions& options) { return std::make_unique(url, options); } + +bool IsCompatible(std::string_view min, std::string_view version) { + if (version.empty()) { + return false; + } + + auto parse_version = [](std::string version) { + std::stringstream ss(version); + std::string item; + std::vector result; + while (std::getline(ss, item, '.')) { + result.push_back(std::stoi(item)); + } + return result; + }; + + auto min_version = parse_version(std::string(min)); + auto current_version = parse_version(std::string(version)); + + if (min_version.size() != current_version.size()) { + return false; + } + + if (min_version.size() != 2) { + return false; + } + + return min_version[0] == current_version[0] && min_version[1] <= current_version[1]; +} } // namespace reduct::internal diff --git a/src/reduct/internal/http_client.h b/src/reduct/internal/http_client.h index e7baefe..cff4571 100644 --- a/src/reduct/internal/http_client.h +++ b/src/reduct/internal/http_client.h @@ -47,5 +47,7 @@ class IHttpClient { static std::unique_ptr Build(std::string_view url, const HttpOptions &options); }; +bool IsCompatible(std::string_view min, std::string_view version); + } // namespace reduct::internal #endif // REDUCT_CPP_HTTP_CLIENT_H From 3735dd7a9576c70a6357bbb24536eaa869ca1181 Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Sat, 18 May 2024 14:06:15 +0200 Subject: [PATCH 2/5] support each_n and each_s query parameters --- .github/workflows/ci.yml | 4 +-- src/reduct/bucket.cc | 25 +++++++++++------ tests/reduct/entry_api_test.cc | 50 +++++++++++++++++++++++++++++----- 3 files changed, 62 insertions(+), 17 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 18e47fb..bb1734e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,8 +69,8 @@ jobs: exclude_token_api_tag: "" - reductstore_version: "main" exclude_api_version_tag: "" -# - reductstore_version: "latest" -# exclude_api_version_tag: "~[1_9]" + - reductstore_version: "latest" + exclude_api_version_tag: "~[1_10]" - license_file: "" exclude_license_tag: "~[license]" diff --git a/src/reduct/bucket.cc b/src/reduct/bucket.cc index 1355a61..5522e83 100644 --- a/src/reduct/bucket.cc +++ b/src/reduct/bucket.cc @@ -224,24 +224,33 @@ class Bucket : public IBucket { url += fmt::format("stop={}&", ToMicroseconds(*stop)); } - if (options.ttl) { - url += fmt::format("ttl={}&", options.ttl->count()); + for (const auto& [key, value] : options.include) { + url += fmt::format("&include-{}={}", key, value); } - if (options.continuous) { - url += "continuous=true&"; + for (const auto& [key, value] : options.exclude) { + url += fmt::format("&exclude-{}={}", key, value); + } + + if (options.each_s) { + url += fmt::format("each_s={}&", *options.each_s); + } + + if (options.each_n) { + url += fmt::format("each_n={}&", *options.each_n); } if (options.limit) { url += fmt::format("limit={}&", *options.limit); } - for (const auto& [key, value] : options.include) { - url += fmt::format("&include-{}={}", key, value); + + if (options.ttl) { + url += fmt::format("ttl={}&", options.ttl->count()); } - for (const auto& [key, value] : options.exclude) { - url += fmt::format("&exclude-{}={}", key, value); + if (options.continuous) { + url += "continuous=true&"; } auto [body, err] = client_->Get(url); diff --git a/tests/reduct/entry_api_test.cc b/tests/reduct/entry_api_test.cc index ca22abd..e804838 100644 --- a/tests/reduct/entry_api_test.cc +++ b/tests/reduct/entry_api_test.cc @@ -14,6 +14,7 @@ using reduct::IClient; const auto kBucketName = "test_bucket_3"; using us = std::chrono::microseconds; +using s = std::chrono::seconds; TEST_CASE("reduct::IBucket should write/read a record", "[entry_api]") { Fixture ctx; @@ -251,6 +252,41 @@ TEST_CASE("reduct::IBucket should query records (huge blobs)", "[entry_api]") { REQUIRE(received_data[1] == blob2); } +TEST_CASE("reduct::IBucket should resample data", "[entry_api][1_10]") { + Fixture ctx; + auto [bucket, _] = ctx.client->CreateBucket(kBucketName); + REQUIRE(bucket); + + IBucket::Time ts{}; + REQUIRE(bucket->Write("entry", ts, [](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk); + REQUIRE(bucket->Write("entry", ts + s(1), [](auto rec) { rec->WriteAll("some_data2"); }) == Error::kOk); + REQUIRE(bucket->Write("entry", ts + s(2), [](auto rec) { rec->WriteAll("some_data3"); }) == Error::kOk); + + std::vector received_data; + auto call_back = [&received_data](auto record) { + auto [data, err] = record.ReadAll(); + received_data.push_back(data); + return true; + }; + + SECTION("return a record each 2 seconds") { + auto err = bucket->Query("entry", std::nullopt, std::nullopt, {.each_s = 2.0}, call_back); + + REQUIRE(err == Error::kOk); + REQUIRE(received_data.size() == 2); + REQUIRE(received_data[0] == "some_data1"); + REQUIRE(received_data[1] == "some_data3"); + } + + SECTION("return each 3th record") { + auto err = bucket->Query("entry", std::nullopt, std::nullopt, {.each_n = 3}, call_back); + + REQUIRE(err == Error::kOk); + REQUIRE(received_data.size() == 1); + REQUIRE(received_data[0] == "some_data1"); + } +} + TEST_CASE("reduct::IBucket should limit records in a query", "[entry_api][1_6]") { Fixture ctx; auto [bucket, _] = ctx.client->GetBucket("test_bucket_1"); @@ -282,13 +318,13 @@ TEST_CASE("reduct::IBucket should write batch of records", "[bucket_api][1_7]") auto t = IBucket::Time(); REQUIRE(bucket - ->WriteBatch( - "entry-1", - [t](IBucket::Batch* batch) { - batch->AddRecord(t, "some_data1"); - batch->AddRecord(t + us(1), "some_data2", "text/plain"); - batch->AddRecord(t + us(2), "some_data3", "text/plain", {{"key1", "value1"}, {"key2", "value2"}}); - }) + ->WriteBatch("entry-1", + [t](IBucket::Batch* batch) { + batch->AddRecord(t, "some_data1"); + batch->AddRecord(t + us(1), "some_data2", "text/plain"); + batch->AddRecord(t + us(2), "some_data3", "text/plain", + {{"key1", "value1"}, {"key2", "value2"}}); + }) .error == Error::kOk); REQUIRE(bucket->Read("entry-1", t, [](auto record) { From 0cb5ae7d30b10e9b309e697d7eb75e2e9526a01c Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Sat, 18 May 2024 14:12:34 +0200 Subject: [PATCH 3/5] update CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a482984..1e947cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- RS-261: Support for `each_n` and `each_s` query parameters, [PR-68](https://github.com/reductstore/reduct-cpp/pull/68) + ### Fixed - Windows compilation, [PR-66](https://github.com/reductstore/reduct-cpp/pull/66) From a20e0c67ddd7b30fd865e1f7628bea63281028da Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Sat, 18 May 2024 15:40:02 +0200 Subject: [PATCH 4/5] fix ttl calculation --- src/reduct/bucket.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/reduct/bucket.cc b/src/reduct/bucket.cc index 5522e83..7209147 100644 --- a/src/reduct/bucket.cc +++ b/src/reduct/bucket.cc @@ -246,13 +246,17 @@ class Bucket : public IBucket { if (options.ttl) { - url += fmt::format("ttl={}&", options.ttl->count()); + url += fmt::format("ttl={}&", options.ttl->count() / 1000); } if (options.continuous) { url += "continuous=true&"; } + if (options.head_only) { + url += "head=true&"; + } + auto [body, err] = client_->Get(url); if (err) { return err; From fa6c7232522bc5da307769e782c4f50acabd1657 Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Sat, 18 May 2024 15:46:07 +0200 Subject: [PATCH 5/5] fix cppcheck folder --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bb1734e..c8e447a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: run: find src/ -name "*.cc" -o -name "*.h" | xargs cpplint - name: Check code in /tests run: find tests/ -name "*.cc" -o -name "*.h" | xargs cpplint - - name: Check code in /exampes + - name: Check code in /examples run: find examples/ -name "*.cc" -o -name "*.h" | xargs cpplint build: runs-on: ubuntu-latest