Skip to content

Commit

Permalink
RS-261: Add each_n and each_s query parameters (#68)
Browse files Browse the repository at this point in the history
* fix version check

* support each_n and each_s query parameters

* update CHANGELOG

* fix ttl calculation

* fix cppcheck folder
  • Loading branch information
atimin authored May 18, 2024
1 parent c3b9d50 commit 818c108
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 23 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
run: find src/ -name "*.cc" -o -name "*.h" | xargs cpplint
- name: Check code in /tests
run: find tests/ -name "*.cc" -o -name "*.h" | xargs cpplint
- name: Check code in /exampes
- name: Check code in /examples
run: find examples/ -name "*.cc" -o -name "*.h" | xargs cpplint
build:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -69,8 +69,8 @@ jobs:
exclude_token_api_tag: ""
- reductstore_version: "main"
exclude_api_version_tag: ""
# - reductstore_version: "latest"
# exclude_api_version_tag: "~[1_9]"
- reductstore_version: "latest"
exclude_api_version_tag: "~[1_10]"
- license_file: ""
exclude_license_tag: "~[license]"

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- RS-261: Support for `each_n` and `each_s` query parameters, [PR-68](https://github.com/reductstore/reduct-cpp/pull/68)

### Fixed

- Windows compilation, [PR-66](https://github.com/reductstore/reduct-cpp/pull/66)
Expand Down
33 changes: 23 additions & 10 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 Alexey Timin
// Copyright 2022-2024 Alexey Timin

#include "reduct/bucket.h"
#define FMT_HEADER_ONLY 1
Expand Down Expand Up @@ -224,24 +224,37 @@ class Bucket : public IBucket {
url += fmt::format("stop={}&", ToMicroseconds(*stop));
}

if (options.ttl) {
url += fmt::format("ttl={}&", options.ttl->count());
for (const auto& [key, value] : options.include) {
url += fmt::format("&include-{}={}", key, value);
}

if (options.continuous) {
url += "continuous=true&";
for (const auto& [key, value] : options.exclude) {
url += fmt::format("&exclude-{}={}", key, value);
}

if (options.each_s) {
url += fmt::format("each_s={}&", *options.each_s);
}

if (options.each_n) {
url += fmt::format("each_n={}&", *options.each_n);
}

if (options.limit) {
url += fmt::format("limit={}&", *options.limit);
}

for (const auto& [key, value] : options.include) {
url += fmt::format("&include-{}={}", key, value);

if (options.ttl) {
url += fmt::format("ttl={}&", options.ttl->count() / 1000);
}

for (const auto& [key, value] : options.exclude) {
url += fmt::format("&exclude-{}={}", key, value);
if (options.continuous) {
url += "continuous=true&";
}

if (options.head_only) {
url += "head=true&";
}

auto [body, err] = client_->Get(url);
Expand All @@ -258,7 +271,7 @@ class Bucket : public IBucket {
}

while (true) {
bool batched = client_->api_version() >= "1.5";
bool batched = internal::IsCompatible("1.5", client_->api_version());
auto [stopped, record_err] =
ReadRecord(fmt::format("{}/{}{}?q={}", path_, entry_name, batched ? "/batch" : "", id), batched,
options.head_only, callback);
Expand Down
8 changes: 6 additions & 2 deletions src/reduct/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,17 @@ class IBucket {
* Query options
*/
struct QueryOptions {
std::optional<std::chrono::milliseconds> ttl; ///< time to live
LabelMap include; ///< include labels
LabelMap exclude; ///< exclude labels
std::optional<double> each_s; ///< return one record each S seconds
std::optional<size_t> each_n; ///< return each N-th record
std::optional<size_t> limit; ///< limit number of records

std::optional<std::chrono::milliseconds> ttl; ///< time to live

bool continuous; ///< continuous query. If true, the method returns the latest record and waits for the next one
std::chrono::milliseconds poll_interval; ///< poll interval for continuous query
bool head_only; ///< read only metadata
std::optional<size_t> limit; ///< limit number of records
};

/**
Expand Down
32 changes: 31 additions & 1 deletion src/reduct/internal/http_client.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 Alexey Timin
// Copyright 2022-2024 Alexey Timin

#include "reduct/internal/http_client.h"
#undef CPPHTTPLIB_BROTLI_SUPPORT
Expand All @@ -8,6 +8,7 @@
#include <nlohmann/json.hpp>

#include <algorithm>
#include <string>

namespace reduct::internal {

Expand Down Expand Up @@ -176,4 +177,33 @@ class HttpClient : public IHttpClient {
std::unique_ptr<IHttpClient> IHttpClient::Build(std::string_view url, const HttpOptions& options) {
return std::make_unique<HttpClient>(url, options);
}

bool IsCompatible(std::string_view min, std::string_view version) {
if (version.empty()) {
return false;
}

auto parse_version = [](std::string version) {
std::stringstream ss(version);
std::string item;
std::vector<int> result;
while (std::getline(ss, item, '.')) {
result.push_back(std::stoi(item));
}
return result;
};

auto min_version = parse_version(std::string(min));
auto current_version = parse_version(std::string(version));

if (min_version.size() != current_version.size()) {
return false;
}

if (min_version.size() != 2) {
return false;
}

return min_version[0] == current_version[0] && min_version[1] <= current_version[1];
}
} // namespace reduct::internal
2 changes: 2 additions & 0 deletions src/reduct/internal/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,7 @@ class IHttpClient {
static std::unique_ptr<IHttpClient> Build(std::string_view url, const HttpOptions &options);
};

bool IsCompatible(std::string_view min, std::string_view version);

} // namespace reduct::internal
#endif // REDUCT_CPP_HTTP_CLIENT_H
50 changes: 43 additions & 7 deletions tests/reduct/entry_api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ using reduct::IClient;

const auto kBucketName = "test_bucket_3";
using us = std::chrono::microseconds;
using s = std::chrono::seconds;

TEST_CASE("reduct::IBucket should write/read a record", "[entry_api]") {
Fixture ctx;
Expand Down Expand Up @@ -251,6 +252,41 @@ TEST_CASE("reduct::IBucket should query records (huge blobs)", "[entry_api]") {
REQUIRE(received_data[1] == blob2);
}

TEST_CASE("reduct::IBucket should resample data", "[entry_api][1_10]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);
REQUIRE(bucket);

IBucket::Time ts{};
REQUIRE(bucket->Write("entry", ts, [](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk);
REQUIRE(bucket->Write("entry", ts + s(1), [](auto rec) { rec->WriteAll("some_data2"); }) == Error::kOk);
REQUIRE(bucket->Write("entry", ts + s(2), [](auto rec) { rec->WriteAll("some_data3"); }) == Error::kOk);

std::vector<std::string> received_data;
auto call_back = [&received_data](auto record) {
auto [data, err] = record.ReadAll();
received_data.push_back(data);
return true;
};

SECTION("return a record each 2 seconds") {
auto err = bucket->Query("entry", std::nullopt, std::nullopt, {.each_s = 2.0}, call_back);

REQUIRE(err == Error::kOk);
REQUIRE(received_data.size() == 2);
REQUIRE(received_data[0] == "some_data1");
REQUIRE(received_data[1] == "some_data3");
}

SECTION("return each 3th record") {
auto err = bucket->Query("entry", std::nullopt, std::nullopt, {.each_n = 3}, call_back);

REQUIRE(err == Error::kOk);
REQUIRE(received_data.size() == 1);
REQUIRE(received_data[0] == "some_data1");
}
}

TEST_CASE("reduct::IBucket should limit records in a query", "[entry_api][1_6]") {
Fixture ctx;
auto [bucket, _] = ctx.client->GetBucket("test_bucket_1");
Expand Down Expand Up @@ -282,13 +318,13 @@ TEST_CASE("reduct::IBucket should write batch of records", "[bucket_api][1_7]")

auto t = IBucket::Time();
REQUIRE(bucket
->WriteBatch(
"entry-1",
[t](IBucket::Batch* batch) {
batch->AddRecord(t, "some_data1");
batch->AddRecord(t + us(1), "some_data2", "text/plain");
batch->AddRecord(t + us(2), "some_data3", "text/plain", {{"key1", "value1"}, {"key2", "value2"}});
})
->WriteBatch("entry-1",
[t](IBucket::Batch* batch) {
batch->AddRecord(t, "some_data1");
batch->AddRecord(t + us(1), "some_data2", "text/plain");
batch->AddRecord(t + us(2), "some_data3", "text/plain",
{{"key1", "value1"}, {"key2", "value2"}});
})
.error == Error::kOk);

REQUIRE(bucket->Read("entry-1", t, [](auto record) {
Expand Down

0 comments on commit 818c108

Please sign in to comment.