Skip to content

Commit ed572a7

Browse files
committed
Forbid writing/reading directly to external data source
1 parent f78fb7b commit ed572a7

File tree

11 files changed

+231
-33
lines changed

11 files changed

+231
-33
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
176176
counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters);
177177
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
178178
std::make_shared<TKqpTableMetadataLoader>(
179-
TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
179+
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
180180
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
181181
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
182182
Gateway->SetToken(QueryId.Cluster, UserToken);

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -806,12 +806,7 @@ class TKikimrIcGateway : public IKqpGateway {
806806
return InvalidCluster<TTableMetadataResult>(cluster);
807807
}
808808

809-
settings.WithExternalDatasources_ = !CheckCluster(cluster);
810-
// In the case of reading from an external data source,
811-
// we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...)
812-
// In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
813-
// To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
814-
return MetadataLoader->LoadTableMetadata(settings.WithExternalDatasources_ ? GetDefaultCluster() : cluster, settings.WithExternalDatasources_ ? cluster : table, settings, Database, UserToken);
809+
return MetadataLoader->LoadTableMetadata(cluster, table, settings, Database, UserToken);
815810
} catch (yexception& e) {
816811
return MakeFuture(ResultFromException<TTableMetadataResult>(e));
817812
}

ydb/core/kqp/gateway/kqp_metadata_loader.cpp

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC
229229
tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId());
230230
tableMeta->SchemaVersion = description.GetVersion();
231231
tableMeta->Kind = NYql::EKikimrTableKind::External;
232+
tableMeta->TableType = NYql::ETableType::ExternalTable;
232233

233234
tableMeta->Attributes = entry.Attributes;
234235

@@ -255,7 +256,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC
255256
}
256257

257258
TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
258-
const TString& cluster, const TString& tableName) {
259+
const TString& cluster, const TString& mainCluster, const TString& tableName) {
259260
const auto& description = entry.ExternalDataSourceInfo->Description;
260261
TTableMetadataResult result;
261262
result.SetSuccess();
@@ -265,6 +266,11 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
265266
tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId());
266267
tableMeta->SchemaVersion = description.GetVersion();
267268
tableMeta->Kind = NYql::EKikimrTableKind::External;
269+
if (cluster == mainCluster) { // resolved external data source itself
270+
tableMeta->TableType = NYql::ETableType::Unknown;
271+
} else {
272+
tableMeta->TableType = NYql::ETableType::Table; // wanted to resolve table in external data source
273+
}
268274

269275
tableMeta->Attributes = entry.Attributes;
270276

@@ -302,7 +308,7 @@ TTableMetadataResult GetViewMetadataResult(
302308
}
303309

304310
TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
305-
const TString& cluster, const TString& tableName, std::optional<TString> queryName = std::nullopt) {
311+
const TString& cluster, const TString& mainCluster, const TString& tableName, std::optional<TString> queryName = std::nullopt) {
306312
using TResult = NYql::IKikimrGateway::TTableMetadataResult;
307313
using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
308314
using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
@@ -341,7 +347,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
341347
result = GetExternalTableMetadataResult(entry, cluster, tableName);
342348
break;
343349
case EKind::KindExternalDataSource:
344-
result = GetExternalDataSourceMetadataResult(entry, cluster, tableName);
350+
result = GetExternalDataSourceMetadataResult(entry, cluster, mainCluster, tableName);
345351
break;
346352
case EKind::KindView:
347353
result = GetViewMetadataResult(entry, cluster, tableName);
@@ -699,16 +705,30 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
699705
using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
700706
using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
701707

702-
const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_);
703-
Y_ABORT_UNLESS(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only");
704-
auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(cluster,
705-
id, settings, TempTablesState);
708+
// In the case of reading from an external data source,
709+
// we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...)
710+
// In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
711+
// To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
712+
const bool resolveEntityInsideDataSource = (cluster != Cluster);
713+
TPath entityName = id;
714+
if constexpr (std::is_same_v<TPath, TString>) {
715+
if (resolveEntityInsideDataSource) {
716+
entityName = cluster;
717+
}
718+
} else {
719+
Y_ENSURE(!resolveEntityInsideDataSource);
720+
}
721+
722+
const auto externalEntryItem = CreateNavigateExternalEntry(entityName, resolveEntityInsideDataSource);
723+
Y_ABORT_UNLESS(!resolveEntityInsideDataSource || externalEntryItem, "External data source must be resolved using path only");
724+
auto resNavigate = resolveEntityInsideDataSource ? *externalEntryItem : CreateNavigateEntry(Cluster,
725+
entityName, settings, TempTablesState);
706726
const auto entry = resNavigate.Entry;
707727
const auto queryName = resNavigate.QueryName;
708-
const auto externalEntry = settings.WithExternalDatasources_ ? std::optional<NavigateEntryResult>{} : externalEntryItem;
709-
const ui64 expectedSchemaVersion = GetExpectedVersion(id);
728+
const auto externalEntry = resolveEntityInsideDataSource ? std::optional<NavigateEntryResult>{} : externalEntryItem;
729+
const ui64 expectedSchemaVersion = GetExpectedVersion(entityName);
710730

711-
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(id));
731+
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(entityName));
712732

713733
auto navigate = MakeHolder<TNavigate>();
714734
navigate->ResultSet.emplace_back(entry);
@@ -730,7 +750,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
730750
ActorSystem,
731751
schemeCacheId,
732752
ev.Release(),
733-
[userToken, database, cluster, table, settings, expectedSchemaVersion, this, queryName]
753+
[userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName]
734754
(TPromise<TResult> promise, TResponse&& response) mutable
735755
{
736756
try {
@@ -741,7 +761,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
741761
auto& entry = InferEntry(navigate.ResultSet);
742762

743763
if (entry.Status != EStatus::Ok) {
744-
promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table));
764+
promise.SetValue(GetLoadTableMetadataResult(entry, cluster, mainCluster, table));
745765
return;
746766
}
747767

@@ -763,7 +783,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
763783

764784
switch (entry.Kind) {
765785
case EKind::KindExternalDataSource: {
766-
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, table);
786+
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
767787
if (!externalDataSourceMetadata.Success()) {
768788
promise.SetValue(externalDataSourceMetadata);
769789
return;
@@ -779,7 +799,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
779799
case EKind::KindExternalTable: {
780800
YQL_ENSURE(entry.ExternalTableInfo, "expected external table info");
781801
const auto& dataSourcePath = entry.ExternalTableInfo->Description.GetDataSourcePath();
782-
auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, table);
802+
auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
783803
if (!externalTableMetadata.Success()) {
784804
promise.SetValue(externalTableMetadata);
785805
return;
@@ -810,7 +830,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
810830
}
811831
break;
812832
default: {
813-
promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table, queryName));
833+
promise.SetValue(GetLoadTableMetadataResult(entry, cluster, mainCluster, table, queryName));
814834
}
815835
}
816836
}

ydb/core/kqp/gateway/kqp_metadata_loader.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@ namespace NKikimr::NKqp {
1414
class TKqpTableMetadataLoader : public NYql::IKikimrGateway::IKqpTableMetadataLoader {
1515
public:
1616

17-
explicit TKqpTableMetadataLoader(TActorSystem* actorSystem,
18-
NYql::TKikimrConfiguration::TPtr config,
19-
bool needCollectSchemeData = false,
17+
explicit TKqpTableMetadataLoader(const TString& cluster,
18+
TActorSystem* actorSystem,
19+
NYql::TKikimrConfiguration::TPtr config,
20+
bool needCollectSchemeData = false,
2021
TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
2122
TDuration maximalSecretsSnapshotWaitTime = TDuration::Seconds(20))
22-
: NeedCollectSchemeData(needCollectSchemeData)
23+
: Cluster(cluster)
24+
, NeedCollectSchemeData(needCollectSchemeData)
2325
, ActorSystem(actorSystem)
2426
, Config(config)
2527
, TempTablesState(std::move(tempTablesState))
2628
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
27-
{};
29+
{}
2830

2931
NThreading::TFuture<NYql::IKikimrGateway::TTableMetadataResult> LoadTableMetadata(
3032
const TString& cluster, const TString& table, const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, const TString& database,
@@ -56,6 +58,7 @@ class TKqpTableMetadataLoader : public NYql::IKikimrGateway::IKqpTableMetadataLo
5658

5759
void OnLoadedTableMetadata(NYql::IKikimrGateway::TTableMetadataResult& loadTableMetadataResult);
5860

61+
const TString Cluster;
5962
TVector<NKikimrKqp::TKqpTableMetadataProto> CollectedSchemeData;
6063
TMutex Lock;
6164
bool NeedCollectSchemeData;

ydb/core/kqp/provider/yql_kikimr_datasink.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,11 @@ class TKikimrDataSink : public TDataProviderBase
557557
return true;
558558
}
559559

560+
if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) {
561+
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Attempt to write to external data source \"" << key.GetTablePath() << "\". Please specify table to write to"));
562+
return false;
563+
}
564+
560565
if (tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalDataSource && tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalTable) {
561566
YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType;
562567
return true;

ydb/core/kqp/provider/yql_kikimr_datasource.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase {
256256
}
257257
}
258258
break;
259-
default:
259+
default:
260260
break;
261261
}
262262
*result = value;
@@ -679,6 +679,11 @@ class TKikimrDataSource : public TDataProviderBase {
679679
auto& tableDesc = SessionCtx->Tables().GetTable(cluster, tablePath);
680680
if (key.GetKeyType() == TKikimrKey::Type::Table) {
681681
if (tableDesc.Metadata->Kind == EKikimrTableKind::External) {
682+
if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) {
683+
ctx.AddError(TIssue(node->Pos(ctx),
684+
TStringBuilder() << "Attempt to read from external data source \"" << tablePath << "\". Please specify table to read from"));
685+
return nullptr;
686+
}
682687
if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource) {
683688
const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type);
684689
ctx.Step.Repeat(TExprStep::DiscoveryIO)

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
923923
auto tableTypeItem = table.Metadata->TableType;
924924
if (tableTypeItem == ETableType::ExternalTable && !SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) {
925925
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()),
926-
TStringBuilder() << "External table are disabled. Please contact your system administrator to enable it"));
926+
TStringBuilder() << "External tables are disabled. Please contact your system administrator to enable it"));
927927
return SyncError();
928928
}
929929

ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) {
7373
counters->Counters = new TKqpCounters(server.GetRuntime()->GetAppData(0).Counters);
7474
counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters);
7575

76-
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false);
76+
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TestCluster, server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false);
7777
return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
7878
server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig->GetQueryServiceConfig());
7979
}

ydb/core/kqp/session_actor/kqp_worker_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
181181
QueryState->RequestEv.reset(ev->Release().Release());
182182

183183
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(
184-
TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
184+
Settings.Cluster, TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
185185
Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, std::move(loader),
186186
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters, QueryServiceConfig);
187187

0 commit comments

Comments
 (0)