Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RS-261: Add each_n and each_s query parameters #68

Merged
merged 5 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
run: find src/ -name "*.cc" -o -name "*.h" | xargs cpplint
- name: Check code in /tests
run: find tests/ -name "*.cc" -o -name "*.h" | xargs cpplint
- name: Check code in /exampes
- name: Check code in /examples
run: find examples/ -name "*.cc" -o -name "*.h" | xargs cpplint
build:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -69,8 +69,8 @@ jobs:
exclude_token_api_tag: ""
- reductstore_version: "main"
exclude_api_version_tag: ""
# - reductstore_version: "latest"
# exclude_api_version_tag: "~[1_9]"
- reductstore_version: "latest"
exclude_api_version_tag: "~[1_10]"
- license_file: ""
exclude_license_tag: "~[license]"

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- RS-261: Support for `each_n` and `each_s` query parameters, [PR-68](https://github.com/reductstore/reduct-cpp/pull/68)

### Fixed

- Windows compilation, [PR-66](https://github.com/reductstore/reduct-cpp/pull/66)
Expand Down
33 changes: 23 additions & 10 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 Alexey Timin
// Copyright 2022-2024 Alexey Timin

#include "reduct/bucket.h"
#define FMT_HEADER_ONLY 1
Expand Down Expand Up @@ -224,24 +224,37 @@ class Bucket : public IBucket {
url += fmt::format("stop={}&", ToMicroseconds(*stop));
}

if (options.ttl) {
url += fmt::format("ttl={}&", options.ttl->count());
for (const auto& [key, value] : options.include) {
url += fmt::format("&include-{}={}", key, value);
}

if (options.continuous) {
url += "continuous=true&";
for (const auto& [key, value] : options.exclude) {
url += fmt::format("&exclude-{}={}", key, value);
}

if (options.each_s) {
url += fmt::format("each_s={}&", *options.each_s);
}

if (options.each_n) {
url += fmt::format("each_n={}&", *options.each_n);
}

if (options.limit) {
url += fmt::format("limit={}&", *options.limit);
}

for (const auto& [key, value] : options.include) {
url += fmt::format("&include-{}={}", key, value);

if (options.ttl) {
url += fmt::format("ttl={}&", options.ttl->count() / 1000);
}

for (const auto& [key, value] : options.exclude) {
url += fmt::format("&exclude-{}={}", key, value);
if (options.continuous) {
url += "continuous=true&";
}

if (options.head_only) {
url += "head=true&";
}

auto [body, err] = client_->Get(url);
Expand All @@ -258,7 +271,7 @@ class Bucket : public IBucket {
}

while (true) {
bool batched = client_->api_version() >= "1.5";
bool batched = internal::IsCompatible("1.5", client_->api_version());
auto [stopped, record_err] =
ReadRecord(fmt::format("{}/{}{}?q={}", path_, entry_name, batched ? "/batch" : "", id), batched,
options.head_only, callback);
Expand Down
8 changes: 6 additions & 2 deletions src/reduct/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,17 @@ class IBucket {
* Query options
*/
struct QueryOptions {
std::optional<std::chrono::milliseconds> ttl; ///< time to live
LabelMap include; ///< include labels
LabelMap exclude; ///< exclude labels
std::optional<double> each_s; ///< return one record each S seconds
std::optional<size_t> each_n; ///< return each N-th record
std::optional<size_t> limit; ///< limit number of records

std::optional<std::chrono::milliseconds> ttl; ///< time to live

bool continuous; ///< continuous query. If true, the method returns the latest record and waits for the next one
std::chrono::milliseconds poll_interval; ///< poll interval for continuous query
bool head_only; ///< read only metadata
std::optional<size_t> limit; ///< limit number of records
};

/**
Expand Down
32 changes: 31 additions & 1 deletion src/reduct/internal/http_client.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 Alexey Timin
// Copyright 2022-2024 Alexey Timin

#include "reduct/internal/http_client.h"
#undef CPPHTTPLIB_BROTLI_SUPPORT
Expand All @@ -8,6 +8,7 @@
#include <nlohmann/json.hpp>

#include <algorithm>
#include <string>

namespace reduct::internal {

Expand Down Expand Up @@ -176,4 +177,33 @@ class HttpClient : public IHttpClient {
std::unique_ptr<IHttpClient> IHttpClient::Build(std::string_view url, const HttpOptions& options) {
return std::make_unique<HttpClient>(url, options);
}

bool IsCompatible(std::string_view min, std::string_view version) {
if (version.empty()) {
return false;
}

auto parse_version = [](std::string version) {
std::stringstream ss(version);
std::string item;
std::vector<int> result;
while (std::getline(ss, item, '.')) {
result.push_back(std::stoi(item));
}
return result;
};

auto min_version = parse_version(std::string(min));
auto current_version = parse_version(std::string(version));

if (min_version.size() != current_version.size()) {
return false;
}

if (min_version.size() != 2) {
return false;
}

return min_version[0] == current_version[0] && min_version[1] <= current_version[1];
}
} // namespace reduct::internal
2 changes: 2 additions & 0 deletions src/reduct/internal/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,7 @@ class IHttpClient {
static std::unique_ptr<IHttpClient> Build(std::string_view url, const HttpOptions &options);
};

bool IsCompatible(std::string_view min, std::string_view version);

} // namespace reduct::internal
#endif // REDUCT_CPP_HTTP_CLIENT_H
50 changes: 43 additions & 7 deletions tests/reduct/entry_api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ using reduct::IClient;

const auto kBucketName = "test_bucket_3";
using us = std::chrono::microseconds;
using s = std::chrono::seconds;

TEST_CASE("reduct::IBucket should write/read a record", "[entry_api]") {
Fixture ctx;
Expand Down Expand Up @@ -251,6 +252,41 @@ TEST_CASE("reduct::IBucket should query records (huge blobs)", "[entry_api]") {
REQUIRE(received_data[1] == blob2);
}

TEST_CASE("reduct::IBucket should resample data", "[entry_api][1_10]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);
REQUIRE(bucket);

IBucket::Time ts{};
REQUIRE(bucket->Write("entry", ts, [](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk);
REQUIRE(bucket->Write("entry", ts + s(1), [](auto rec) { rec->WriteAll("some_data2"); }) == Error::kOk);
REQUIRE(bucket->Write("entry", ts + s(2), [](auto rec) { rec->WriteAll("some_data3"); }) == Error::kOk);

std::vector<std::string> received_data;
auto call_back = [&received_data](auto record) {
auto [data, err] = record.ReadAll();
received_data.push_back(data);
return true;
};

SECTION("return a record each 2 seconds") {
auto err = bucket->Query("entry", std::nullopt, std::nullopt, {.each_s = 2.0}, call_back);

REQUIRE(err == Error::kOk);
REQUIRE(received_data.size() == 2);
REQUIRE(received_data[0] == "some_data1");
REQUIRE(received_data[1] == "some_data3");
}

SECTION("return each 3th record") {
auto err = bucket->Query("entry", std::nullopt, std::nullopt, {.each_n = 3}, call_back);

REQUIRE(err == Error::kOk);
REQUIRE(received_data.size() == 1);
REQUIRE(received_data[0] == "some_data1");
}
}

TEST_CASE("reduct::IBucket should limit records in a query", "[entry_api][1_6]") {
Fixture ctx;
auto [bucket, _] = ctx.client->GetBucket("test_bucket_1");
Expand Down Expand Up @@ -282,13 +318,13 @@ TEST_CASE("reduct::IBucket should write batch of records", "[bucket_api][1_7]")

auto t = IBucket::Time();
REQUIRE(bucket
->WriteBatch(
"entry-1",
[t](IBucket::Batch* batch) {
batch->AddRecord(t, "some_data1");
batch->AddRecord(t + us(1), "some_data2", "text/plain");
batch->AddRecord(t + us(2), "some_data3", "text/plain", {{"key1", "value1"}, {"key2", "value2"}});
})
->WriteBatch("entry-1",
[t](IBucket::Batch* batch) {
batch->AddRecord(t, "some_data1");
batch->AddRecord(t + us(1), "some_data2", "text/plain");
batch->AddRecord(t + us(2), "some_data3", "text/plain",
{{"key1", "value1"}, {"key2", "value2"}});
})
.error == Error::kOk);

REQUIRE(bucket->Read("entry-1", t, [](auto record) {
Expand Down
Loading