From 02934ab21ae3ea6dd0dab738fc6e9fe7cba354c8 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Mon, 12 Aug 2024 12:32:45 +0200 Subject: [PATCH 1/2] [CDC] Do not lose presition during float/double to json serialization (#7625) --- .../change_record_cdc_serializer.cpp | 14 ++- ydb/tests/functional/replication/main.cpp | 113 ++++++++++++++++++ ydb/tests/functional/replication/ya.make | 26 ++++ 3 files changed, 149 insertions(+), 4 deletions(-) create mode 100644 ydb/tests/functional/replication/main.cpp create mode 100644 ydb/tests/functional/replication/ya.make diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp index 6e6eb098d28a..aa7d0e0a0d9c 100644 --- a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp +++ b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp @@ -91,10 +91,16 @@ class TJsonSerializer: public TBaseSerializer { friend class TChangeRecord; // used in GetPartitionKey() static NJson::TJsonWriterConfig DefaultJsonConfig() { - NJson::TJsonWriterConfig jsonConfig; - jsonConfig.ValidateUtf8 = false; - jsonConfig.WriteNanAsString = true; - return jsonConfig; + constexpr ui32 doubleNDigits = std::numeric_limits::max_digits10; + constexpr ui32 floatNDigits = std::numeric_limits::max_digits10; + constexpr EFloatToStringMode floatMode = EFloatToStringMode::PREC_NDIGITS; + return NJson::TJsonWriterConfig { + .DoubleNDigits = doubleNDigits, + .FloatNDigits = floatNDigits, + .FloatToStringMode = floatMode, + .ValidateUtf8 = false, + .WriteNanAsString = true, + }; } protected: diff --git a/ydb/tests/functional/replication/main.cpp b/ydb/tests/functional/replication/main.cpp new file mode 100644 index 000000000000..ef643253d302 --- /dev/null +++ b/ydb/tests/functional/replication/main.cpp @@ -0,0 +1,113 @@ +#include +#include + +#include +#include +#include +#include + +#include + +using namespace NYdb; +using namespace NYdb::NTable; + +namespace { + +std::pair DoRead(TSession& s, const TString& table) { + auto res = s.ExecuteDataQuery( + Sprintf("SELECT * FROM `/local/%s`; SELECT COUNT(*) AS __count FROM `/local/%s`;", + table.data(), table.data()), TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + auto rs = NYdb::TResultSetParser(res.GetResultSet(1)); + UNIT_ASSERT(rs.TryNextRow()); + auto count = rs.ColumnParser("__count").GetUint64(); + + const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0)); + return {count, proto}; +} + +} // namespace + +Y_UNIT_TEST_SUITE(Replication) +{ + Y_UNIT_TEST(Types) + { + TString connectionString = GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE"); + auto config = TDriverConfig(connectionString); + auto driver = TDriver(config); + auto tableClient = TTableClient(driver); + auto session = tableClient.GetSession().GetValueSync().GetSession(); + + { + auto res = session.ExecuteSchemeQuery(R"( + CREATE TABLE `/local/ProducerUuidValue` ( + Key Uint32, + v01 Uuid, + v02 Uuid NOT NULL, + v03 Double, + PRIMARY KEY (Key) + ); + )").GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + + { + auto sessionResult = tableClient.GetSession().GetValueSync(); + UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString()); + auto s = sessionResult.GetSession(); + + { + const TString query = "UPSERT INTO ProducerUuidValue (Key,v01,v02,v03) VALUES" + "(1, " + "CAST(\"5b99a330-04ef-4f1a-9b64-ba6d5f44ea01\" as Uuid), " + "UNWRAP(CAST(\"5b99a330-04ef-4f1a-9b64-ba6d5f44ea02\" as Uuid)), " + "CAST(\"311111111113.222222223\" as Double) " + ");"; + auto res = s.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + + { + const TString query = Sprintf("CREATE ASYNC REPLICATION `replication` FOR" + "`ProducerUuidValue` AS `ConsumerUuidValue`" + "WITH (" + "CONNECTION_STRING = 'grpc://%s'," + "TOKEN = 'root@builtin'" + ");", connectionString.data()); + auto res = s.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + // TODO: Make CREATE ASYNC REPLICATION to be a sync call + Sleep(TDuration::Seconds(10)); + } + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.KeepInQueryCache(true); + + auto sessionResult = tableClient.GetSession().GetValueSync(); + UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString()); + + auto s = sessionResult.GetSession(); + TUuidValue expectedV1("5b99a330-04ef-4f1a-9b64-ba6d5f44ea01"); + TUuidValue expectedV2("5b99a330-04ef-4f1a-9b64-ba6d5f44ea02"); + double expectedV3 = 311111111113.222222223; + ui32 attempt = 10; + while (--attempt) { + auto res = DoRead(s, "ConsumerUuidValue"); + if (res.first == 1) { + const Ydb::ResultSet& proto = res.second; + UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(0).uint32_value(), 1); + UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(1).low_128(), expectedV1.Buf_.Halfs[0]); + UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(1).high_128(), expectedV1.Buf_.Halfs[1]); + UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(2).low_128(), expectedV2.Buf_.Halfs[0]); + UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(2).high_128(), expectedV2.Buf_.Halfs[1]); + UNIT_ASSERT_DOUBLES_EQUAL(proto.rows(0).items(3).double_value(), expectedV3, 0.0001); + break; + } + Sleep(TDuration::Seconds(1)); + } + + UNIT_ASSERT_C(attempt, "Unable to wait replication result"); + } +} + diff --git a/ydb/tests/functional/replication/ya.make b/ydb/tests/functional/replication/ya.make new file mode 100644 index 000000000000..591b22d58014 --- /dev/null +++ b/ydb/tests/functional/replication/ya.make @@ -0,0 +1,26 @@ +UNITTEST() + +ENV(YDB_USE_IN_MEMORY_PDISKS=true) + +ENV(YDB_ERASURE=block_4-2) + +PEERDIR( + library/cpp/threading/local_executor + ydb/public/sdk/cpp/client/ydb_table + ydb/public/sdk/cpp/client/ydb_proto + ydb/public/sdk/cpp/client/draft +) + +SRCS( + main.cpp +) + +INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc) + +SIZE(MEDIUM) + +IF (SANITIZER_TYPE) + REQUIREMENTS(ram:16) +ENDIF() + +END() From c972d5dfbfa967c43656d77c172c5debea63f26c Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 13 Aug 2024 16:13:36 +0000 Subject: [PATCH 2/2] add recurce --- ydb/tests/functional/ya.make | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/tests/functional/ya.make b/ydb/tests/functional/ya.make index 4aa762a48af5..fee5a01f2f77 100644 --- a/ydb/tests/functional/ya.make +++ b/ydb/tests/functional/ya.make @@ -18,6 +18,7 @@ RECURSE( query_cache rename restarts + replication scheme_shard scheme_tests script_execution