diff --git a/ydb/core/formats/arrow/converter.cpp b/ydb/core/formats/arrow/converter.cpp index d832275dcf5d..58bcc0f8aafe 100644 --- a/ydb/core/formats/arrow/converter.cpp +++ b/ydb/core/formats/arrow/converter.cpp @@ -1,9 +1,11 @@ #include "converter.h" #include "switch_type.h" +#include #include #include +#include #include #include @@ -47,16 +49,21 @@ static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryP } static bool ConvertColumn(const NScheme::TTypeInfo colType, std::shared_ptr& column, std::shared_ptr& field) { - if (colType.GetTypeId() == NScheme::NTypeIds::Decimal) { + switch (colType.GetTypeId()) { + case NScheme::NTypeIds::Decimal: return false; + case NScheme::NTypeIds::JsonDocument: { + const static TSet jsonDocArrowTypes{ arrow::Type::BINARY, arrow::Type::STRING }; + if (!jsonDocArrowTypes.contains(column->type()->id())) { + return false; + } + break; } - - if ((colType.GetTypeId() == NScheme::NTypeIds::JsonDocument) && - (column->type()->id() == arrow::Type::BINARY || column->type()->id() == arrow::Type::STRING)) - { - ; - } else if (column->type()->id() != arrow::Type::BINARY) { - return false; + default: + if (column->type()->id() != arrow::Type::BINARY) { + return false; + } + break; } auto& binaryArray = static_cast(*column); @@ -81,9 +88,16 @@ static bool ConvertColumn(const NScheme::TTypeInfo colType, std::shared_ptrData(), binaryJson->Size()).ok()) { - return false; + const TStringBuf valueBuf(value.data(), value.size()); + if (NBinaryJson::IsValidBinaryJson(valueBuf)) { + if (!builder.Append(value).ok()) { + return false; + } + } else { + const auto binaryJson = NBinaryJson::SerializeToBinaryJson(valueBuf); + if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) { + return false; + } } } } diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index bef64ef18078..0f30a84aca39 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -5925,8 +5926,39 @@ Y_UNIT_TEST_SUITE(KqpOlapTypes) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); } } -} + Y_UNIT_TEST(JsonImport) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + + TTestHelper testHelper(runnerSettings); + + TVector schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("json").SetType(NScheme::NTypeIds::Json).SetNullable(true), + TTestHelper::TColumnSchema().SetName("json_doc").SetType(NScheme::NTypeIds::JsonDocument).SetNullable(true), + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSharding({ "id" }).SetSchema(schema); + testHelper.CreateTable(testTable); + std::string jsonString = R"({"col1": "val1", "obj": {"obj_col2_int": 16}})"; + auto maybeJsonDoc = NBinaryJson::SerializeToBinaryJson(jsonString); + Y_ABORT_UNLESS(maybeJsonDoc.Defined()); + const std::string jsonBin(maybeJsonDoc->Data(), maybeJsonDoc->Size()); + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + tableInserter.AddRow().Add(1).AddNull().Add(jsonString); + tableInserter.AddRow().Add(2).Add(jsonString).Add(jsonBin); + testHelper.BulkUpsert(testTable, tableInserter); + } + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + tableInserter.AddRow().Add(3).Add(jsonBin).AddNull(); + testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::SCHEME_ERROR); + } + } +} } // namespace NKqp } // namespace NKikimr