Skip to content

Commit

Permalink
Merge e31bb33 into 26f95da
Browse files Browse the repository at this point in the history
  • Loading branch information
evanevanevanevannnn authored Sep 23, 2024
2 parents 26f95da + e31bb33 commit 2d0e21a
Show file tree
Hide file tree
Showing 43 changed files with 486 additions and 295 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,32 +127,10 @@ bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type, s
case arrow::Type::LIST: { // TODO: is ok?
return false;
}
case arrow::Type::STRUCT: { // TODO: is ok?
auto& structType = *resType.mutable_struct_type();
for (const auto& field : type.fields()) {
auto& member = *structType.add_members();
auto& memberType = *member.mutable_type();
if (!ArrowToYdbType(memberType, *field->type(), config)) {
return false;
}
member.mutable_name()->assign(field->name().data(), field->name().size());
}
return true;
}
case arrow::Type::STRUCT:
case arrow::Type::SPARSE_UNION:
case arrow::Type::DENSE_UNION: { // TODO: is ok?
auto& variant = *resType.mutable_variant_type()->mutable_struct_items();
for (const auto& field : type.fields()) {
auto& member = *variant.add_members();
if (!ArrowToYdbType(*member.mutable_type(), *field->type(), config)) {
return false;
}
if (field->name().empty()) {
return false;
}
member.mutable_name()->assign(field->name().data(), field->name().size());
}
return true;
case arrow::Type::DENSE_UNION: {
return false;
}
case arrow::Type::DICTIONARY: // TODO: is representable?
return false;
Expand Down Expand Up @@ -326,14 +304,15 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
auto& arrowFields = std::get<ArrowFields>(mbArrowFields);
std::vector<Ydb::Column> ydbFields;
for (const auto& field : arrowFields) {
ydbFields.emplace_back();
auto& ydbField = ydbFields.back();
if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type(), file.Config)) {
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
RequesterId_ = {};
return;
Ydb::Column column;
if (!ArrowToYdbType(*column.mutable_type(), *field->type(), file.Config)) {
continue;
}
if (field->name().empty()) {
continue;
}
ydbField.mutable_name()->assign(field->name());
column.mutable_name()->assign(field->name());
ydbFields.push_back(column);
}

ctx.Send(RequesterId_, new TEvInferredFileSchema(file.Path, std::move(ydbFields)));
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <ydb/core/fq/libs/common/cache.h>
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/fq/libs/exceptions/exceptions.h>
#include <ydb/library/yql/utils/exceptions.h>
#include <ydb/core/util/tuples.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
Expand Down Expand Up @@ -213,11 +213,13 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
DatabaseId2Description[std::make_pair(params.Id, params.DatabaseType)] = description;
result.ConstructInPlace(description);
return "";
} catch (const TCodeLineException& ex) {
} catch (const NYql::TCodeLineException& ex) {
LOG_E("ResponseProcessor::Handle(HttpIncomingResponse): " << ex.what());
return TStringBuilder()
<< "response parser error: " << params.ToDebugString() << Endl
<< ex.GetRawMessage();
} catch (...) {
LOG_E("ResponseProcessor::Handle(HttpIncomingResponse): " << CurrentExceptionMessage());
return TStringBuilder()
<< "response parser error: " << params.ToDebugString() << Endl
<< CurrentExceptionMessage();
Expand Down Expand Up @@ -359,7 +361,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
}

if (aliveHosts.empty()) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE ClickHouse hosts found";
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE ClickHouse hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
Expand Down Expand Up @@ -407,7 +409,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
}

if (aliveHosts.empty()) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE PostgreSQL hosts found";
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE PostgreSQL hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
Expand Down Expand Up @@ -445,7 +447,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
}

if (aliveHost == "") {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE Greenplum hosts found";
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE Greenplum hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
Expand Down Expand Up @@ -495,7 +497,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
}

if (aliveHosts.empty()) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
Expand Down
1 change: 0 additions & 1 deletion ydb/core/fq/libs/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ PEERDIR(
ydb/core/fq/libs/db_id_async_resolver_impl
ydb/core/fq/libs/db_schema
ydb/core/fq/libs/events
ydb/core/fq/libs/exceptions
ydb/core/fq/libs/grpc
ydb/core/fq/libs/private_client
ydb/core/fq/libs/rate_limiter/utils
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/fq/libs/config/yq_issue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,28 @@

namespace NFq {

NYql::TIssue MakeFatalIssue(TIssuesIds::EIssueCode id, const TString& message) {
NYql::TIssue MakeFatalIssue(ui32 id, const TString& message) {
NYql::TIssue issue;
issue.SetCode(id, NYql::TSeverityIds::S_FATAL);
issue.SetMessage(message);
return issue;
}

NYql::TIssue MakeErrorIssue(TIssuesIds::EIssueCode id, const TString& message) {
NYql::TIssue MakeErrorIssue(ui32 id, const TString& message) {
NYql::TIssue issue;
issue.SetCode(id, NYql::TSeverityIds::S_ERROR);
issue.SetMessage(message);
return issue;
}

NYql::TIssue MakeWarningIssue(TIssuesIds::EIssueCode id, const TString& message) {
NYql::TIssue MakeWarningIssue(ui32 id, const TString& message) {
NYql::TIssue issue;
issue.SetCode(id, NYql::TSeverityIds::S_WARNING);
issue.SetMessage(message);
return issue;
}

NYql::TIssue MakeInfoIssue(TIssuesIds::EIssueCode id, const TString& message) {
NYql::TIssue MakeInfoIssue(ui32 id, const TString& message) {
NYql::TIssue issue;
issue.SetCode(id, NYql::TSeverityIds::S_INFO);
issue.SetMessage(message);
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/fq/libs/config/yq_issue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

namespace NFq {

NYql::TIssue MakeFatalIssue(TIssuesIds::EIssueCode id, const TString& message);
NYql::TIssue MakeFatalIssue(ui32 id, const TString& message);

NYql::TIssue MakeErrorIssue(TIssuesIds::EIssueCode id, const TString& message);
NYql::TIssue MakeErrorIssue(ui32 id, const TString& message);

NYql::TIssue MakeWarningIssue(TIssuesIds::EIssueCode id, const TString& message);
NYql::TIssue MakeWarningIssue(ui32 id, const TString& message);

NYql::TIssue MakeInfoIssue(TIssuesIds::EIssueCode id, const TString& message);
NYql::TIssue MakeInfoIssue(ui32 id, const TString& message);

}
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/control_plane_storage/extractors.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ TValidationQuery CreateEntityExtractor(const TString& scope,
auto validator = [response, entityColumnName, parseProtobufError](NYdb::NTable::TDataQueryResult result) {
const auto& resultSets = result.GetResultSets();
if (resultSets.size() != 1) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size();
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size();
}

NYdb::TResultSetParser parser(resultSets.back());
Expand All @@ -39,7 +39,7 @@ TValidationQuery CreateEntityExtractor(const TString& scope,

if (!response->second.Before.ConstructInPlace().ParseFromString(*parser.ColumnParser(entityColumnName).GetOptionalString())) {
parseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message. Please contact internal support";
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message. Please contact internal support";
}
return false;
};
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam

if (!task.Query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
commonCounters->ParseProtobufError->Inc();
throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support";
throw NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support";
}
const TInstant deadline = TInstant::Now() + (task.Query.content().automatic() ? std::min(automaticQueriesTtl, resultSetsTtl) : resultSetsTtl);
task.Deadline = deadline;
if (!task.Internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
commonCounters->ParseProtobufError->Inc();
throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
throw NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
}

*task.Internal.mutable_result_ttl() = NProtoInterop::CastToProto(resultSetsTtl);
Expand Down
Loading

0 comments on commit 2d0e21a

Please sign in to comment.