diff --git a/ydb/public/lib/json_value/ydb_json_value.cpp b/ydb/public/lib/json_value/ydb_json_value.cpp index bd64cdfae60c..d92537dd8684 100644 --- a/ydb/public/lib/json_value/ydb_json_value.cpp +++ b/ydb/public/lib/json_value/ydb_json_value.cpp @@ -677,22 +677,20 @@ namespace { ValueBuilder.Decimal(jsonValue.GetString()); break; - case TTypeParser::ETypeKind::Pg: { - TPgType pgType(""); // TODO: correct type? - if (jsonValue.GetType() == NJson::JSON_STRING) { - ValueBuilder.Pg(TPgValue(TPgValue::VK_TEXT, jsonValue.GetString(), pgType)); - } else if (jsonValue.GetType() == NJson::JSON_NULL) { - ValueBuilder.Pg(TPgValue(TPgValue::VK_NULL, {}, pgType)); - } else { - EnsureType(jsonValue, NJson::JSON_ARRAY); - if (jsonValue.GetArray().size() != 1) { - ThrowFatalError(TStringBuilder() << "Pg type should be encoded as array with size 1, but not " << jsonValue.GetArray().size()); - } - auto& innerJsonValue = jsonValue.GetArray().at(0); - EnsureType(innerJsonValue, NJson::JSON_STRING); - auto binary = JsonStringToBinaryString(innerJsonValue.GetString()); - ValueBuilder.Pg(TPgValue(TPgValue::VK_BINARY, binary, pgType)); + case TTypeParser::ETypeKind::Pg: + if (jsonValue.GetType() == NJson::JSON_STRING) { + ValueBuilder.Pg(TPgValue(TPgValue::VK_TEXT, jsonValue.GetString(), TypeParser.GetPg())); + } else if (jsonValue.GetType() == NJson::JSON_NULL) { + ValueBuilder.Pg(TPgValue(TPgValue::VK_NULL, {}, TypeParser.GetPg())); + } else { + EnsureType(jsonValue, NJson::JSON_ARRAY); + if (jsonValue.GetArray().size() != 1) { + ThrowFatalError(TStringBuilder() << "Pg type should be encoded as array with size 1, but not " << jsonValue.GetArray().size()); } + auto& innerJsonValue = jsonValue.GetArray().at(0); + EnsureType(innerJsonValue, NJson::JSON_STRING); + auto binary = JsonStringToBinaryString(innerJsonValue.GetString()); + ValueBuilder.Pg(TPgValue(TPgValue::VK_BINARY, binary, TypeParser.GetPg())); } break; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp index ac0c7f00dd78..f089a1b7a014 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp @@ -244,7 +244,9 @@ int TCommandImportFromCsv::Run(TConfig& config) { settings.Header(Header); settings.NewlineDelimited(NewlineDelimited); settings.HeaderRow(HeaderRow); - settings.NullValue(NullValue); + if (config.ParseResult->Has("null-value")) { + settings.NullValue(NullValue); + } if (Delimiter.size() != 1) { throw TMisuseException() diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.cpp b/ydb/public/lib/ydb_cli/common/csv_parser.cpp index 5837bc73bcdb..2e6a89d1db67 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser.cpp +++ b/ydb/public/lib/ydb_cli/common/csv_parser.cpp @@ -10,8 +10,9 @@ namespace { class TCsvToYdbConverter { public: - explicit TCsvToYdbConverter(TTypeParser& parser) + explicit TCsvToYdbConverter(TTypeParser& parser, const std::optional& nullValue) : Parser(parser) + , NullValue(nullValue) { } @@ -40,12 +41,12 @@ class TCsvToYdbConverter { size_t cnt; try { auto value = StringToArithmetic(token, cnt); - if (cnt != token.Size() || value < std::numeric_limits::min() || value > std::numeric_limits::max()) { + if (cnt != token.Size() || value < std::numeric_limits::lowest() || value > std::numeric_limits::max()) { throw yexception(); } return static_cast(value); } catch (std::exception& e) { - throw TMisuseException() << "Expected " << Parser.GetPrimitive() << " value, recieved: \"" << token << "\"."; + throw TMisuseException() << "Expected " << Parser.GetPrimitive() << " value, recieved: \"" << token << "\"."; } } @@ -105,15 +106,30 @@ class TCsvToYdbConverter { case EPrimitiveType::DyNumber: Builder.DyNumber(token); break; - case EPrimitiveType::Date: - Builder.Date(TInstant::Days(GetArithmetic(token))); + case EPrimitiveType::Date: { + TInstant date; + if (!TInstant::TryParseIso8601(token, date)) { + date = TInstant::Days(GetArithmetic(token)); + } + Builder.Date(date); break; - case EPrimitiveType::Datetime: - Builder.Datetime(TInstant::Seconds(GetArithmetic(token))); + } + case EPrimitiveType::Datetime: { + TInstant datetime; + if (!TInstant::TryParseIso8601(token, datetime)) { + datetime = TInstant::Seconds(GetArithmetic(token)); + } + Builder.Datetime(datetime); break; - case EPrimitiveType::Timestamp: - Builder.Timestamp(TInstant::MicroSeconds(GetArithmetic(token))); + } + case EPrimitiveType::Timestamp: { + TInstant timestamp; + if (!TInstant::TryParseIso8601(token, timestamp)) { + timestamp = TInstant::MicroSeconds(GetArithmetic(token)); + } + Builder.Timestamp(timestamp); break; + } case EPrimitiveType::Interval: Builder.Interval(GetArithmetic(token)); break; @@ -133,17 +149,17 @@ class TCsvToYdbConverter { void BuildValue(TStringBuf token) { switch (Parser.GetKind()) { - case TTypeParser::ETypeKind::Primitive: + case TTypeParser::ETypeKind::Primitive: { BuildPrimitive(TString(token)); break; - - case TTypeParser::ETypeKind::Decimal: + } + case TTypeParser::ETypeKind::Decimal: { Builder.Decimal(TString(token)); break; - - case TTypeParser::ETypeKind::Optional: + } + case TTypeParser::ETypeKind::Optional: { Parser.OpenOptional(); - if (token == NullValue) { + if (NullValue && token == NullValue) { Builder.EmptyOptional(GetType()); } else { Builder.BeginOptional(); @@ -152,23 +168,31 @@ class TCsvToYdbConverter { } Parser.CloseOptional(); break; - - case TTypeParser::ETypeKind::Null: + } + case TTypeParser::ETypeKind::Null: { EnsureNull(token); break; - - case TTypeParser::ETypeKind::Void: + } + case TTypeParser::ETypeKind::Void: { EnsureNull(token); break; - - case TTypeParser::ETypeKind::Tagged: + } + case TTypeParser::ETypeKind::Tagged: { Parser.OpenTagged(); Builder.BeginTagged(Parser.GetTag()); BuildValue(token); Builder.EndTagged(); Parser.CloseTagged(); break; - + } + case TTypeParser::ETypeKind::Pg: { + if (NullValue && token == NullValue) { + Builder.Pg(TPgValue(TPgValue::VK_NULL, {}, Parser.GetPg())); + } else { + Builder.Pg(TPgValue(TPgValue::VK_TEXT, TString(token), Parser.GetPg())); + } + break; + } default: throw TMisuseException() << "Unsupported type kind: " << Parser.GetKind(); } @@ -200,6 +224,10 @@ class TCsvToYdbConverter { Parser.CloseTagged(); break; + case TTypeParser::ETypeKind::Pg: + typeBuilder.Pg(Parser.GetPg()); + break; + default: throw TMisuseException() << "Unsupported type kind: " << Parser.GetKind(); } @@ -222,6 +250,9 @@ class TCsvToYdbConverter { } void EnsureNull(TStringBuf token) const { + if (!NullValue) { + throw TMisuseException() << "Expected null value instead of \"" << token << "\", but null value is not set."; + } if (token != NullValue) { throw TMisuseException() << "Expected null value: \"" << NullValue << "\", recieved: \"" << token << "\"."; } @@ -234,15 +265,18 @@ class TCsvToYdbConverter { private: TTypeParser& Parser; - const TString NullValue = ""; + const std::optional NullValue = ""; TValueBuilder Builder; }; } -TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::map& paramTypes, const std::map& paramSources) +TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::optional& nullValue, + const std::map* paramTypes, + const std::map* paramSources) : HeaderRow(std::move(headerRow)) , Delimeter(delimeter) + , NullValue(nullValue) , ParamTypes(paramTypes) , ParamSources(paramSources) { @@ -250,12 +284,23 @@ TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::map Header = static_cast>(splitter); } -TValue TCsvParser::FieldToValue(TTypeParser& parser, TStringBuf token) { - TCsvToYdbConverter converter(parser); +TCsvParser::TCsvParser(TVector&& header, const char delimeter, const std::optional& nullValue, + const std::map* paramTypes, + const std::map* paramSources) + : Header(std::move(header)) + , Delimeter(delimeter) + , NullValue(nullValue) + , ParamTypes(paramTypes) + , ParamSources(paramSources) +{ +} + +TValue TCsvParser::FieldToValue(TTypeParser& parser, TStringBuf token) const { + TCsvToYdbConverter converter(parser, NullValue); return converter.Convert(token); } -void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder) { +void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder) const { NCsvFormat::CsvSplitter splitter(data, Delimeter); auto headerIt = Header.begin(); do { @@ -264,14 +309,16 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder) { throw TMisuseException() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << data << "\""; } TString fullname = "$" + *headerIt; - auto paramIt = ParamTypes.find(fullname); - if (paramIt == ParamTypes.end()) { + auto paramIt = ParamTypes->find(fullname); + if (paramIt == ParamTypes->end()) { ++headerIt; continue; } - auto paramSource = ParamSources.find(fullname); - if (paramSource != ParamSources.end()) { - throw TMisuseException() << "Parameter " << fullname << " value found in more than one source: stdin, " << paramSource->second << "."; + if (ParamSources) { + auto paramSource = ParamSources->find(fullname); + if (paramSource != ParamSources->end()) { + throw TMisuseException() << "Parameter " << fullname << " value found in more than one source: stdin, " << paramSource->second << "."; + } } TTypeParser parser(paramIt->second); builder.AddParam(fullname, FieldToValue(parser, token)); @@ -283,20 +330,20 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder) { } } -void TCsvParser::GetValue(TString&& data, const TType& type, TValueBuilder& builder) { +void TCsvParser::GetValue(TString&& data, TValueBuilder& builder, const TType& type) const { NCsvFormat::CsvSplitter splitter(data, Delimeter); - auto headerIt = Header.begin(); + auto headerIt = Header.cbegin(); std::map fields; do { TStringBuf token = splitter.Consume(); - if (headerIt == Header.end()) { + if (headerIt == Header.cend()) { throw TMisuseException() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << data << "\""; } fields[*headerIt] = token; ++headerIt; } while (splitter.Step()); - if (headerIt != Header.end()) { + if (headerIt != Header.cend()) { throw TMisuseException() << "Header contains more fields than data. Header: \"" << HeaderRow << "\", data: \"" << data << "\""; } builder.BeginStruct(); @@ -304,6 +351,9 @@ void TCsvParser::GetValue(TString&& data, const TType& type, TValueBuilder& buil parser.OpenStruct(); while (parser.TryNextMember()) { TString name = parser.GetMemberName(); + if (name == "__ydb_skip_column_name") { + continue; + } auto fieldIt = fields.find(name); if (fieldIt == fields.end()) { throw TMisuseException() << "No member \"" << name << "\" in csv string for YDB struct type"; @@ -314,5 +364,19 @@ void TCsvParser::GetValue(TString&& data, const TType& type, TValueBuilder& buil builder.EndStruct(); } +TType TCsvParser::GetColumnsType() const { + TTypeBuilder builder; + builder.BeginStruct(); + for (const auto& colName : Header) { + if (ParamTypes->find(colName) != ParamTypes->end()) { + builder.AddMember(colName, ParamTypes->at(colName)); + } else { + builder.AddMember("__ydb_skip_column_name", TTypeBuilder().Build()); + } + } + builder.EndStruct(); + return builder.Build(); +} + } } \ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.h b/ydb/public/lib/ydb_cli/common/csv_parser.h index f690d4e484bc..fa3ab304a327 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser.h +++ b/ydb/public/lib/ydb_cli/common/csv_parser.h @@ -9,19 +9,34 @@ namespace NConsoleClient { class TCsvParser { public: - TCsvParser(TString&& headerRow, const char delimeter, const std::map& paramTypes, const std::map& paramSources); + TCsvParser() = default; - void GetParams(TString&& data, TParamsBuilder& builder); - void GetValue(TString&& data, const TType& type, TValueBuilder& builder); + TCsvParser(const TCsvParser&) = delete; + TCsvParser(TCsvParser&&) = default; + TCsvParser& operator=(const TCsvParser&) = delete; + TCsvParser& operator=(TCsvParser&&) = default; + ~TCsvParser() = default; + + TCsvParser(TString&& headerRow, const char delimeter, const std::optional& nullValue, + const std::map* paramTypes = nullptr, + const std::map* paramSources = nullptr); + TCsvParser(TVector&& header, const char delimeter, const std::optional& nullValue, + const std::map* paramTypes = nullptr, + const std::map* paramSources = nullptr); + + void GetParams(TString&& data, TParamsBuilder& builder) const; + void GetValue(TString&& data, TValueBuilder& builder, const TType& type) const; + TType GetColumnsType() const; private: - TValue FieldToValue(TTypeParser& parser, TStringBuf token); + TValue FieldToValue(TTypeParser& parser, TStringBuf token) const; TVector Header; TString HeaderRow; - const char Delimeter; - const std::map& ParamTypes; - const std::map& ParamSources; + char Delimeter; + std::optional NullValue; + const std::map* ParamTypes; + const std::map* ParamSources; }; } diff --git a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp index ca09248901bf..9887c9fc7ea2 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp +++ b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp @@ -9,7 +9,6 @@ using namespace NYdb::NConsoleClient; Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { bool CompareValues(const TValue& lhs, const TValue& rhs) { - lhs.GetProto(); TString stringFirst, stringSecond; NProtoBuf::TextFormat::PrintToString(lhs.GetProto(), &stringFirst); NProtoBuf::TextFormat::PrintToString(rhs.GetProto(), &stringSecond); @@ -22,7 +21,7 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { paramTypes.insert({name, value.GetType()}); } std::map paramSources; - TCsvParser parser(std::move(header), ',', paramTypes, paramSources); + TCsvParser parser(std::move(header), ',', "", ¶mTypes, ¶mSources); TParamsBuilder paramBuilder; parser.GetParams(std::move(data), paramBuilder); auto values = paramBuilder.Build().GetValues(); @@ -40,9 +39,9 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { paramTypes.insert({member.name(), member.type()}); } - TCsvParser parser(std::move(header), ',', paramTypes, {}); + TCsvParser parser(std::move(header), ',', "", ¶mTypes, nullptr); TValueBuilder valueBuilder; - parser.GetValue(std::move(data), result.GetType(), valueBuilder); + parser.GetValue(std::move(data), valueBuilder, result.GetType()); UNIT_ASSERT(CompareValues(valueBuilder.Build(), result)); } @@ -73,6 +72,9 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { } Y_UNIT_TEST(DateTypesTestParams) { + CommonTestParams("name", "\"2001-01-01\"", {{"$name", TValueBuilder().Date(TInstant::ParseIso8601("2001-01-01")).Build()}}); + CommonTestParams("name", "\"2001-01-01T12:12:12\"", {{"$name", TValueBuilder().Datetime(TInstant::ParseIso8601("2001-01-01T12:12:12")).Build()}}); + CommonTestParams("name", "\"2001-01-01T12:12:12.111111\"", {{"$name", TValueBuilder().Timestamp(TInstant::ParseIso8601("2001-01-01T12:12:12.111111")).Build()}}); CommonTestParams("name", "12000", {{"$name", TValueBuilder().Date(TInstant::Days(12000)).Build()}}); CommonTestParams("name", "1200000", {{"$name", TValueBuilder().Datetime(TInstant::Seconds(1200000)).Build()}}); CommonTestParams("name", "120000000", {{"$name", TValueBuilder().Timestamp(TInstant::MicroSeconds(120000000)).Build()}}); @@ -83,6 +85,9 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { } Y_UNIT_TEST(DateTypesTestValue) { + CommonTestValue("name", "\"2001-01-01\"", MakeStruct("name", TValueBuilder().Date(TInstant::ParseIso8601("2001-01-01")).Build())); + CommonTestValue("name", "\"2001-01-01T12:12:12\"", MakeStruct("name", TValueBuilder().Datetime(TInstant::ParseIso8601("2001-01-01T12:12:12")).Build())); + CommonTestValue("name", "\"2001-01-01T12:12:12.111111\"", MakeStruct("name", TValueBuilder().Timestamp(TInstant::ParseIso8601("2001-01-01T12:12:12.111111")).Build())); CommonTestValue("name", "12000", MakeStruct("name", TValueBuilder().Date(TInstant::Days(12000)).Build())); CommonTestValue("name", "1200000", MakeStruct("name", TValueBuilder().Datetime(TInstant::Seconds(1200000)).Build())); CommonTestValue("name", "120000000", MakeStruct("name", TValueBuilder().Timestamp(TInstant::MicroSeconds(120000000)).Build())); @@ -105,6 +110,7 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { CommonTestParams("name", "\"{\"\"a\"\":10, \"\"b\"\":\"\"string\"\"}\"", {{"$name", TValueBuilder().Json("{\"a\":10, \"b\":\"string\"}").Build()}}); CommonTestParams("name", "строка", {{"$name", TValueBuilder().OptionalUtf8("строка").Build()}}); CommonTestParams("name", "\"\"", {{"$name", TValueBuilder().OptionalUtf8({}).Build()}}); + CommonTestParams("name", "данные", {{"$name", TValueBuilder().Pg(TPgValue(TPgValue::VK_TEXT, "данные", TPgType("some_type"))).Build()}}); } Y_UNIT_TEST(OtherPrimitiveTypesTestValue) { @@ -120,6 +126,7 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { CommonTestValue("name", "\"{\"\"a\"\":10, \"\"b\"\":\"\"string\"\"}\"", MakeStruct("name", TValueBuilder().Json("{\"a\":10, \"b\":\"string\"}").Build())); CommonTestValue("name", "строка", MakeStruct("name", TValueBuilder().OptionalUtf8("строка").Build())); CommonTestValue("name", "\"\"", MakeStruct("name", TValueBuilder().OptionalUtf8({}).Build())); + CommonTestValue("name", "данные", MakeStruct("name", TValueBuilder().Pg(TPgValue(TPgValue::VK_TEXT, "данные", TPgType("some_type"))).Build())); } Y_UNIT_TEST(EdgeValuesTestParams) { @@ -137,6 +144,11 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { CommonTestParams("name", "-32768", {{"$name", TValueBuilder().Int16(-32768).Build()}}); CommonTestParams("name", "-2147483648", {{"$name", TValueBuilder().Int32(-2147483648).Build()}}); CommonTestParams("name", "-9223372036854775808", {{"$name", TValueBuilder().Int64(std::numeric_limits::min()).Build()}}); + + double minDouble = std::numeric_limits::lowest(); + double maxDouble = std::numeric_limits::max(); + CommonTestParams("name", std::to_string(minDouble), {{"$name", TValueBuilder().Double(minDouble).Build()}}); + CommonTestParams("name", std::to_string(maxDouble), {{"$name", TValueBuilder().Double(maxDouble).Build()}}); } Y_UNIT_TEST(MultipleFields) { diff --git a/ydb/public/lib/ydb_cli/common/parameters.cpp b/ydb/public/lib/ydb_cli/common/parameters.cpp index 668940b7f016..30638b8079b3 100644 --- a/ydb/public/lib/ydb_cli/common/parameters.cpp +++ b/ydb/public/lib/ydb_cli/common/parameters.cpp @@ -23,7 +23,11 @@ void TCommandWithParameters::ParseParameters(TClientCommand::TConfig& config) { switch (StdinFormat) { case EOutputFormat::Csv: + Delimiter = ','; + break; case EOutputFormat::Tsv: + Delimiter = '\t'; + break; case EOutputFormat::Raw: break; case EOutputFormat::Default: @@ -223,11 +227,7 @@ bool TCommandWithParameters::GetNextParams(THolder& paramBuilder } --SkipRows; } - if (StdinFormat == EOutputFormat::Csv) { - CsvParser = MakeHolder(std::move(headerRow), ',', ParamTypes, ParameterSources); - } else { - CsvParser = MakeHolder(std::move(headerRow), '\t', ParamTypes, ParameterSources); - } + CsvParser = TCsvParser(std::move(headerRow), Delimiter, "", &ParamTypes, &ParameterSources); } else { Input = MakeHolder(); } @@ -257,7 +257,7 @@ bool TCommandWithParameters::GetNextParams(THolder& paramBuilder } case EOutputFormat::Csv: case EOutputFormat::Tsv: { - CsvParser->GetParams(std::move(*data), *paramBuilder); + CsvParser.GetParams(std::move(*data), *paramBuilder); break; } default: @@ -300,7 +300,7 @@ bool TCommandWithParameters::GetNextParams(THolder& paramBuilder case EOutputFormat::Csv: case EOutputFormat::Tsv: { TValueBuilder valueBuilder; - CsvParser->GetValue(std::move(*data), type, valueBuilder); + CsvParser.GetValue(std::move(*data), valueBuilder, type); paramBuilder->AddParam(fullname, valueBuilder.Build()); break; } @@ -379,7 +379,7 @@ bool TCommandWithParameters::GetNextParams(THolder& paramBuilder case EOutputFormat::Csv: case EOutputFormat::Tsv: { valueBuilder.AddListItem(); - CsvParser->GetValue(std::move(*data), type.GetProto().list_type().item(), valueBuilder); + CsvParser.GetValue(std::move(*data), valueBuilder, type.GetProto().list_type().item()); break; } default: diff --git a/ydb/public/lib/ydb_cli/common/parameters.h b/ydb/public/lib/ydb_cli/common/parameters.h index 87f854c86e74..d88fae053110 100644 --- a/ydb/public/lib/ydb_cli/common/parameters.h +++ b/ydb/public/lib/ydb_cli/common/parameters.h @@ -41,7 +41,8 @@ class TCommandWithParameters : public TCommandWithExamples, public TCommandWithF THolder Input; bool IsFirstEncounter = true; size_t SkipRows = 0; - THolder CsvParser; + char Delimiter; + TCsvParser CsvParser; protected: TVector ParameterOptions, ParameterFiles, StdinParameters; diff --git a/ydb/public/lib/ydb_cli/common/progress_bar.cpp b/ydb/public/lib/ydb_cli/common/progress_bar.cpp index e2e22358a731..c2643418a26e 100644 --- a/ydb/public/lib/ydb_cli/common/progress_bar.cpp +++ b/ydb/public/lib/ydb_cli/common/progress_bar.cpp @@ -24,6 +24,12 @@ void TProgressBar::AddProgress(size_t value) { Render(); } +TProgressBar::~TProgressBar() { + if (!Finished) { + Cout << Endl; + } +} + void TProgressBar::Render() { std::optional barLenOpt = GetTerminalWidth(); @@ -31,7 +37,8 @@ void TProgressBar::Render() return; size_t barLen = *barLenOpt; - TString output = ToString(CurProgress * 100 / Capacity); + TString output = "\r"; + output += ToString(CurProgress * 100 / Capacity); output += "% |"; TString outputEnd = "| ["; outputEnd += ToString(CurProgress); @@ -39,8 +46,8 @@ void TProgressBar::Render() outputEnd += ToString(Capacity); outputEnd += "]"; - if (barLen > output.Size()) { - barLen -= output.Size(); + if (barLen > output.Size() - 1) { + barLen -= output.Size() - 1; } else { barLen = 1; } @@ -55,10 +62,10 @@ void TProgressBar::Render() output += TString("█") * filledBarLen; output += TString("░") * (barLen - filledBarLen); output += outputEnd; - output += "\r"; Cout << output; if (CurProgress == Capacity) { Cout << "\n"; + Finished = true; } Cout.Flush(); } diff --git a/ydb/public/lib/ydb_cli/common/progress_bar.h b/ydb/public/lib/ydb_cli/common/progress_bar.h index fb55cb1d568c..12cbbe03da60 100644 --- a/ydb/public/lib/ydb_cli/common/progress_bar.h +++ b/ydb/public/lib/ydb_cli/common/progress_bar.h @@ -9,6 +9,8 @@ class TProgressBar { public: explicit TProgressBar(size_t capacity); + ~TProgressBar(); + void SetProcess(size_t progress); void AddProgress(size_t value); @@ -18,6 +20,7 @@ class TProgressBar { size_t Capacity = 0; size_t CurProgress = 0; + bool Finished = false; }; } // namespace NConsoleClient diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index 601fc6670ea5..3677437c15a4 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -249,22 +250,6 @@ TStatus TImportFileClient::Import(const TVector& filePaths, const TStri .OperationTimeout(settings.OperationTimeout_) .ClientTimeout(settings.ClientTimeout_); - switch (settings.Format_) { - case EOutputFormat::Default: - case EOutputFormat::Csv: - case EOutputFormat::Tsv: - SetupUpsertSettingsCsv(settings); - break; - case EOutputFormat::Json: - case EOutputFormat::JsonUnicode: - case EOutputFormat::JsonBase64: - case EOutputFormat::Parquet: - break; - default: - return MakeStatus(EStatus::BAD_REQUEST, - TStringBuilder() << "Unsupported format #" << (int) settings.Format_); - } - bool isStdoutInteractive = IsStdoutInteractive(); size_t filePathsSize = filePaths.size(); std::mutex progressWriteLock; @@ -363,15 +348,18 @@ TStatus TImportFileClient::Import(const TVector& filePaths, const TStri auto finish = TInstant::Now(); auto duration = finish - start; + progressBar.SetProcess(100); Cout << "Elapsed: " << duration.SecondsFloat() << " sec\n"; return MakeStatus(EStatus::SUCCESS); } inline -TAsyncStatus TImportFileClient::UpsertCsvBuffer(const TString& dbPath, const TString& buffer) { - auto upsert = [this, dbPath, buffer](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus { - return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::CSV, buffer, {}, UpsertSettings) +TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder) { + auto upsert = [this, dbPath, rows = builder.Build()] + (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus { + NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto()); + return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings) .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { NYdb::TStatus status = bulkUpsertResult.GetValueSync(); return NThreading::MakeFuture(status); @@ -380,61 +368,58 @@ TAsyncStatus TImportFileClient::UpsertCsvBuffer(const TString& dbPath, const TSt return TableClient->RetryOperation(upsert, RetrySettings); } -void TImportFileClient::SetupUpsertSettingsCsv(const TImportFileSettings& settings) { - Ydb::Formats::CsvSettings csvSettings; - bool special = false; - if (settings.Delimiter_ != settings.DefaultDelimiter) { - csvSettings.set_delimiter(settings.Delimiter_); - special = true; - } - - if (settings.NullValue_.size()) { - csvSettings.set_null_value(settings.NullValue_); - special = true; - } - - if (settings.Header_ || settings.HeaderRow_) { - csvSettings.set_header(true); - special = true; - } - - if (special) { - TString formatSettings; - Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString(&formatSettings); - UpsertSettings.FormatSettings(formatSettings); - } -} - TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings, std::optional inputSizeHint, ProgressCallbackFunc & progressCallback) { - TString localHeader; - TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount); TCountingInput countInput(&input); NCsvFormat::TLinesSplitter splitter(countInput); - TString headerRow; + TCsvParser parser; bool RemoveLastDelimiter = false; + + NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync(); + if (!sessionResult.IsSuccess()) + return sessionResult; + NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync(); + if (!tableResult.IsSuccess()) + return tableResult; + + auto columnTypes = GetColumnTypes(tableResult.GetTableDescription()); + ValidateTable(tableResult.GetTableDescription()); + if (settings.Header_ || settings.HeaderRow_) { + TString headerRow; if (settings.Header_) { headerRow = splitter.ConsumeLine(); } if (settings.HeaderRow_) { headerRow = settings.HeaderRow_; } + if (headerRow.EndsWith("\r\n")) { + headerRow.erase(headerRow.Size() - 2); + } + if (headerRow.EndsWith("\n")) { + headerRow.erase(headerRow.Size() - 1); + } if (headerRow.EndsWith(settings.Delimiter_)) { RemoveLastDelimiter = true; headerRow.erase(headerRow.Size() - settings.Delimiter_.Size()); } - headerRow += '\n'; - localHeader = headerRow; + parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, &columnTypes); + } else { + TVector columns; + for (const auto& column : tableResult.GetTableDescription().GetColumns()) { + columns.push_back(column.Name); + } + parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, &columnTypes); } - // Do not use csvSettings.skip_rows. for (ui32 i = 0; i < settings.SkipRows_; ++i) { splitter.ConsumeLine(); } + TType lineType = parser.GetColumnsType(); + THolder pool = CreateThreadPool(settings.Threads_); ui32 row = settings.SkipRows_; @@ -444,9 +429,24 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, TString line; std::vector inFlightRequests; - TString buffer = localHeader; + std::vector buffer; + + auto upsertCsv = [&](std::vector&& buffer) { + TValueBuilder builder; + builder.BeginList(); + for (auto&& line : buffer) { + builder.AddListItem(); + parser.GetValue(std::move(line), builder, lineType); + } + builder.EndList(); + return UpsertTValueBuffer(dbPath, builder).ExtractValueSync(); + }; + while (TString line = splitter.ConsumeLine()) { ++row; + if (line.empty()) { + continue; + } readBytes += line.Size(); batchBytes += line.Size(); @@ -458,8 +458,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, line.erase(line.Size() - settings.Delimiter_.Size()); } - buffer += line; - buffer += '\n'; + buffer.push_back(line); if (readBytes >= nextBorder && RetrySettings.Verbose_) { nextBorder += VerboseModeReadSize; @@ -474,14 +473,12 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, progressCallback(readBytes, *inputSizeHint); } - auto asyncUpsertCSV = [&, buffer = std::move(buffer)]() { - auto value = UpsertCsvBuffer(dbPath, buffer); - return value.ExtractValueSync(); + auto asyncUpsertCSV = [&, buffer = std::move(buffer)]() mutable { + return upsertCsv(std::move(buffer)); }; batchBytes = 0; buffer.clear(); - buffer += localHeader; inFlightRequests.push_back(NThreading::Async(asyncUpsertCSV, *pool)); @@ -491,8 +488,8 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, } } - if (!buffer.Empty() && countInput.Counter() > 0) { - inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); + if (!buffer.empty() && countInput.Counter() > 0) { + upsertCsv(std::move(buffer)); } return WaitForQueue(0, inFlightRequests); @@ -501,34 +498,73 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) { TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount); TString headerRow; + TCsvParser parser; TCsvFileReader splitter(filePath, settings, headerRow, inFlightGetter); bool RemoveLastDelimiter = false; + NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync(); + if (!sessionResult.IsSuccess()) + return sessionResult; + NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync(); + if (!tableResult.IsSuccess()) + return tableResult; + + auto columnTypes = GetColumnTypes(tableResult.GetTableDescription()); + ValidateTable(tableResult.GetTableDescription()); + if (settings.Header_ || settings.HeaderRow_) { if (settings.HeaderRow_) { headerRow = settings.HeaderRow_; } + if (headerRow.EndsWith("\r\n")) { + headerRow.erase(headerRow.Size() - 2); + } + if (headerRow.EndsWith("\n")) { + headerRow.erase(headerRow.Size() - 1); + } if (headerRow.EndsWith(settings.Delimiter_)) { RemoveLastDelimiter = true; headerRow.erase(headerRow.Size() - settings.Delimiter_.Size()); } - headerRow += '\n'; + parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, &columnTypes); + } else { + TVector columns; + for (const auto& column : tableResult.GetTableDescription().GetColumns()) { + columns.push_back(column.Name); + } + parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, &columnTypes); } + TType lineType = parser.GetColumnsType(); + TVector threadResults(splitter.GetSplitCount()); THolder pool = CreateThreadPool(splitter.GetSplitCount()); for (size_t threadId = 0; threadId < splitter.GetSplitCount(); ++threadId) { - auto loadCsv = [this, &settings, &headerRow, &splitter, &dbPath, threadId, RemoveLastDelimiter] () { + auto loadCsv = [&, threadId] () { + auto upsertCsv = [&](std::vector&& buffer) { + TValueBuilder builder; + builder.BeginList(); + for (auto&& line : buffer) { + builder.AddListItem(); + parser.GetValue(std::move(line), builder, lineType); + } + builder.EndList(); + return UpsertTValueBuffer(dbPath, builder); + }; std::vector inFlightRequests; - TString buffer; - buffer = headerRow; + std::vector buffer; ui32 idx = settings.SkipRows_; - ui64 readSize = 0; + ui64 readBytes = 0; + ui64 batchBytes = 0; ui64 nextBorder = VerboseModeReadSize; TAsyncStatus status; TString line; while (splitter.GetChunk(threadId).ConsumeLine(line)) { - readSize += line.size(); + if (line.empty()) { + continue; + } + readBytes += line.size(); + batchBytes += line.size(); if (RemoveLastDelimiter) { if (!line.EndsWith(settings.Delimiter_)) { return MakeStatus(EStatus::BAD_REQUEST, @@ -536,28 +572,28 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr } line.erase(line.Size() - settings.Delimiter_.Size()); } - buffer += line; - buffer += '\n'; + buffer.push_back(line); ++idx; - if (readSize >= nextBorder && RetrySettings.Verbose_) { + if (readBytes >= nextBorder && RetrySettings.Verbose_) { nextBorder += VerboseModeReadSize; TStringBuilder builder; - builder << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl; + builder << "Processed " << 1.0 * readBytes / (1 << 20) << "Mb and " << idx << " records" << Endl; Cerr << builder; } - if (buffer.Size() >= settings.BytesPerRequest_) { + if (batchBytes >= settings.BytesPerRequest_) { + batchBytes = 0; auto status = WaitForQueue(splitter.GetThreadLimit(threadId), inFlightRequests); if (!status.IsSuccess()) { return status; } - inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); - buffer = headerRow; + inFlightRequests.push_back(upsertCsv(std::move(buffer))); + buffer.clear(); } } - if (!buffer.Empty() && splitter.GetChunk(threadId).GetReadCount() != 0) { - inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); + if (!buffer.empty() && splitter.GetChunk(threadId).GetReadCount() != 0) { + inFlightRequests.push_back(upsertCsv(std::move(buffer))); } return WaitForQueue(0, inFlightRequests); @@ -573,21 +609,6 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr return MakeStatus(); } -inline -TAsyncStatus TImportFileClient::UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder) { - auto upsert = [this, dbPath, rows = builder.Build()] - (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus { - NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto()); - return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings) - .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { - NYdb::TStatus status = bulkUpsertResult.GetValueSync(); - return NThreading::MakeFuture(status); - }); - }; - return TableClient->RetryOperation(upsert, RetrySettings); -} - - TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings, std::optional inputSizeHint, ProgressCallbackFunc & progressCallback) { NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync(); @@ -598,6 +619,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath return tableResult; const TType tableType = GetTableType(tableResult.GetTableDescription()); + ValidateTable(tableResult.GetTableDescription()); const NYdb::EBinaryStringEncoding stringEncoding = (settings.Format_ == EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 : NYdb::EBinaryStringEncoding::Unicode; @@ -612,7 +634,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath std::vector batchLines; std::vector inFlightRequests; - auto upsertJson = [&](std::vector batchLines) { + auto upsertJson = [&](const std::vector& batchLines) { TValueBuilder batch; batch.BeginList(); @@ -622,7 +644,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath batch.EndList(); - auto value = UpsertJsonBuffer(dbPath, batch); + auto value = UpsertTValueBuffer(dbPath, batch); return value.ExtractValueSync(); }; @@ -800,12 +822,35 @@ TType TImportFileClient::GetTableType(const NTable::TTableDescription& tableDesc TTypeBuilder typeBuilder; typeBuilder.BeginStruct(); const auto& columns = tableDescription.GetTableColumns(); - for (auto it = columns.begin(); it!=columns.end(); it++) { + for (auto it = columns.begin(); it != columns.end(); it++) { typeBuilder.AddMember((*it).Name, (*it).Type); } typeBuilder.EndStruct(); return typeBuilder.Build(); } +std::map TImportFileClient::GetColumnTypes(const NTable::TTableDescription& tableDescription) { + std::map columnTypes; + const auto& columns = tableDescription.GetTableColumns(); + for (auto it = columns.begin(); it != columns.end(); it++) { + columnTypes.insert({(*it).Name, (*it).Type}); + } + return columnTypes; +} + +void TImportFileClient::ValidateTable(const NTable::TTableDescription& tableDescription) { + auto columnTypes = GetColumnTypes(tableDescription); + bool hasPgType = false; + for (const auto& [_, type] : columnTypes) { + if (TTypeParser(type).GetKind() == TTypeParser::ETypeKind::Pg) { + hasPgType = true; + break; + } + } + if (tableDescription.GetStoreType() == NTable::EStoreType::Column && hasPgType) { + throw TMisuseException() << "Import into column table with Pg type columns in not supported"; + } +} + } } diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h index 366d0e0394d3..fde269301824 100644 --- a/ydb/public/lib/ydb_cli/import/import.h +++ b/ydb/public/lib/ydb_cli/import/import.h @@ -51,7 +51,7 @@ struct TImportFileSettings : public TOperationRequestSettings, NullValue, std::nullopt); }; class TImportFileClient { @@ -79,16 +79,16 @@ class TImportFileClient { using ProgressCallbackFunc = std::function; - void SetupUpsertSettingsCsv(const TImportFileSettings& settings); TStatus UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings, std::optional inputSizeHint, ProgressCallbackFunc & progressCallback); TStatus UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings); - TAsyncStatus UpsertCsvBuffer(const TString& dbPath, const TString& buffer); + TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder); TStatus UpsertJson(IInputStream &input, const TString &dbPath, const TImportFileSettings &settings, std::optional inputSizeHint, ProgressCallbackFunc & progressCallback); - TAsyncStatus UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder); TType GetTableType(const NTable::TTableDescription& tableDescription); + std::map GetColumnTypes(const NTable::TTableDescription& tableDescription); + void ValidateTable(const NTable::TTableDescription& tableDescription); TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings, ProgressCallbackFunc & progressCallback);