Skip to content

Commit

Permalink
Merge a3b6ead into a7ca38e
Browse files Browse the repository at this point in the history
  • Loading branch information
evanevanevanevannnn authored Aug 12, 2024
2 parents a7ca38e + a3b6ead commit d3a7ede
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 27 deletions.
16 changes: 13 additions & 3 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/providers/s3/proto/credentials.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
Expand Down Expand Up @@ -307,15 +308,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
structuredTokenBuilder.SetNoAuth();
}

auto effectiveFilePattern = NYql::NS3::NormalizePath(meta->TableLocation);
if (meta->TableLocation.EndsWith('/')) {
effectiveFilePattern += '*';
}

const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
auto httpGateway = NYql::IHTTPGateway::Make();
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
.Url = meta->DataSourceLocation,
.Credentials = credentials,
.Pattern = meta->TableLocation,
.Pattern = effectiveFilePattern,
}, Nothing(), false);
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto& listRes = listResFut.GetValue();
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
Expand Down Expand Up @@ -349,13 +355,17 @@ struct TObjectStorageExternalSource : public IExternalSource {
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
auto promise = NThreading::NewPromise<TMetadataResult>();
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
if (!response.Status.IsSuccess()) {
metaPromise.SetValue(NYql::NCommon::ResultFromError<TMetadataResult>(response.Status.GetIssues()));
return;
}
TMetadataResult result;
meta->Changed = true;
meta->Schema.clear_column();
for (const auto& column : response.Fields) {
auto& destColumn = *meta->Schema.add_column();
destColumn = column;
}
TMetadataResult result;
result.SetSuccess();
result.Metadata = meta;
metaPromise.SetValue(std::move(result));
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/external_sources/object_storage/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
#include <ydb/public/api/protos/ydb_value.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>

namespace NKikimr::NExternalSource::NObjectStorage {

Expand Down Expand Up @@ -128,10 +129,16 @@ struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvIn
struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> {
TEvInferredFileSchema(TString path, std::vector<Ydb::Column>&& fields)
: Path{std::move(path)}
, Status{NYdb::EStatus::SUCCESS, {}}
, Fields{std::move(fields)}
{}
TEvInferredFileSchema(TString path, NYql::TIssues&& issues)
: Path{std::move(path)}
, Status{NYdb::EStatus::INTERNAL_ERROR, std::move(issues)}
{}

TString Path;
NYdb::TStatus Status;
std::vector<Ydb::Column> Fields;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,24 @@ namespace NKikimr::NExternalSource::NObjectStorage::NInference {

namespace {

bool ArrowToYdbType(Ydb::Type& resType, const arrow::DataType& type) {
bool ShouldBeOptional(const arrow::DataType& type) {
switch (type.id()) {
case arrow::Type::NA:
resType.set_null_type(google::protobuf::NullValue::NULL_VALUE);
case arrow::Type::STRING:
case arrow::Type::BINARY:
case arrow::Type::LARGE_BINARY:
case arrow::Type::FIXED_SIZE_BINARY:
return false;
default:
return true;
}
}

bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type) {
auto& resType = ShouldBeOptional(type) ? *maybeOptionalType.mutable_optional_type()->mutable_item() : maybeOptionalType;
switch (type.id()) {
case arrow::Type::NA:
resType.set_type_id(Ydb::Type::UTF8);
return true;
case arrow::Type::BOOL:
resType.set_type_id(Ydb::Type::BOOL);
Expand Down Expand Up @@ -139,6 +153,14 @@ bool ArrowToYdbType(Ydb::Type& resType, const arrow::DataType& type) {
}
return false;
}

TEvInferredFileSchema* MakeErrorSchema(TString path, NFq::TIssuesIds::EIssueCode code, TString message) {
NYql::TIssues issues;
issues.AddIssue(std::move(message));
issues.back().SetCode(code, NYql::TSeverityIds::S_ERROR);
return new TEvInferredFileSchema{std::move(path), std::move(issues)};
}

}

struct FormatConfig {
Expand Down Expand Up @@ -167,14 +189,14 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand
.Value(&reader);

if (!readerStatus.ok()) {
return TString{TStringBuilder{} << "couldn't make table from data: " << readerStatus.ToString()};
return TString{TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString()};
}

std::shared_ptr<arrow::Table> table;
auto tableRes = reader->Read().Value(&table);

if (!tableRes.ok()) {
return TStringBuilder{} << "couldn't read table from data: " << readerStatus.ToString();
return TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString();
}

return table->fields();
Expand Down Expand Up @@ -245,7 +267,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
auto& file = *ev->Get();
auto mbArrowFields = InferType(Format_, file.File, *Config_);
if (std::holds_alternative<TString>(mbArrowFields)) {
ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
return;
}

Expand All @@ -255,7 +277,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
ydbFields.emplace_back();
auto& ydbField = ydbFields.back();
if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type())) {
ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
return;
}
ydbField.mutable_name()->assign(field->name());
Expand All @@ -265,7 +287,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin

void HandleFileError(TEvFileError::TPtr& ev, const NActors::TActorContext& ctx) {
Cout << "TArrowInferencinator::HandleFileError" << Endl;
ctx.Send(RequesterId_, ev->Release());
ctx.Send(RequesterId_, new TEvInferredFileSchema(ev->Get()->Path, std::move(ev->Get()->Issues)));
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ TEST_F(ArrowInferenceTest, csv_simple) {
ASSERT_NE(response, nullptr);

auto& fields = response->Fields;
ASSERT_TRUE(fields[0].type().has_type_id());
ASSERT_EQ(response->Fields[0].type().type_id(), Ydb::Type::INT64);
ASSERT_EQ(response->Fields[0].name(), "A");
ASSERT_TRUE(fields[0].type().optional_type().item().has_type_id());
ASSERT_EQ(fields[0].type().optional_type().item().type_id(), Ydb::Type::INT64);
ASSERT_EQ(fields[0].name(), "A");

ASSERT_TRUE(fields[1].type().has_type_id());
ASSERT_EQ(fields[1].type().type_id(), Ydb::Type::UTF8);
ASSERT_EQ(fields[1].name(), "B");

ASSERT_TRUE(fields[2].type().has_type_id());
ASSERT_EQ(fields[2].type().type_id(), Ydb::Type::DOUBLE);
ASSERT_TRUE(fields[2].type().optional_type().item().has_type_id());
ASSERT_EQ(fields[2].type().optional_type().item().type_id(), Ydb::Type::DOUBLE);
ASSERT_EQ(fields[2].name(), "C");
}

Expand All @@ -129,16 +129,16 @@ TEST_F(ArrowInferenceTest, tsv_simple) {
ASSERT_NE(response, nullptr);

auto& fields = response->Fields;
ASSERT_TRUE(fields[0].type().has_type_id());
ASSERT_EQ(response->Fields[0].type().type_id(), Ydb::Type::INT64);
ASSERT_EQ(response->Fields[0].name(), "A");
ASSERT_TRUE(fields[0].type().optional_type().item().has_type_id());
ASSERT_EQ(fields[0].type().optional_type().item().type_id(), Ydb::Type::INT64);
ASSERT_EQ(fields[0].name(), "A");

ASSERT_TRUE(fields[1].type().has_type_id());
ASSERT_EQ(fields[1].type().type_id(), Ydb::Type::UTF8);
ASSERT_EQ(fields[1].name(), "B");

ASSERT_TRUE(fields[2].type().has_type_id());
ASSERT_EQ(fields[2].type().type_id(), Ydb::Type::DOUBLE);
ASSERT_TRUE(fields[2].type().optional_type().item().has_type_id());
ASSERT_EQ(fields[2].type().optional_type().item().type_id(), Ydb::Type::DOUBLE);
ASSERT_EQ(fields[2].name(), "C");
}

Expand Down
7 changes: 3 additions & 4 deletions ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
tableMeta->ExternalSource.DataSourceAuth = description.GetAuth();
tableMeta->ExternalSource.Properties = description.GetProperties();
tableMeta->ExternalSource.DataSourcePath = tableName;
tableMeta->ExternalSource.TableLocation = JoinPath(entry.Path);
return result;
}

Expand Down Expand Up @@ -822,14 +821,14 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta

switch (entry.Kind) {
case EKind::KindExternalDataSource: {
if (externalPath) {
entry.Path = SplitPath(*externalPath);
}
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
if (!externalDataSourceMetadata.Success() || !settings.RequestAuthInfo_) {
promise.SetValue(externalDataSourceMetadata);
return;
}
if (externalPath) {
externalDataSourceMetadata.Metadata->ExternalSource.TableLocation = *externalPath;
}
LoadExternalDataSourceSecretValues(entry, userToken, ActorSystem)
.Subscribe([promise, externalDataSourceMetadata, settings](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable
{
Expand Down
Loading

0 comments on commit d3a7ede

Please sign in to comment.