Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri
externaDataSourceDesc.SetSourceType(GetOrEmpty(settings, "source_type"));
externaDataSourceDesc.SetLocation(GetOrEmpty(settings, "location"));
externaDataSourceDesc.SetInstallation(GetOrEmpty(settings, "installation"));
externaDataSourceDesc.SetReplaceIfExists(settings.GetReplaceIfExists());

TString authMethod = GetOrEmpty(settings, "auth_method");
if (authMethod == "NONE") {
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,10 +855,11 @@ class TKikimrIcGateway : public IKqpGateway {
return profilesPromise.GetFuture();
}

TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override {
TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk, bool replaceIfExists) override {
Y_UNUSED(metadata);
Y_UNUSED(createDir);
Y_UNUSED(existingOk);
Y_UNUSED(replaceIfExists);
return NotImplemented<TGenericResult>();
}

Expand Down Expand Up @@ -1187,7 +1188,7 @@ class TKikimrIcGateway : public IKqpGateway {

TFuture<TGenericResult> CreateExternalTable(const TString& cluster,
const NYql::TCreateExternalTableSettings& settings,
bool createDir, bool existingOk) override {
bool createDir, bool existingOk, bool replaceIfExists) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

try {
Expand All @@ -1214,7 +1215,7 @@ class TKikimrIcGateway : public IKqpGateway {
schemeTx.SetFailedOnAlreadyExists(!existingOk);

NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, replaceIfExists, settings);
return SendSchemeRequest(ev.Release(), true);
}
catch (yexception& e) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/gateway/utils/scheme_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMay

void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc,
const TString& name,
bool replaceIfExists,
const NYql::TCreateExternalTableSettings& settings)
{
externalTableDesc.SetName(name);
externalTableDesc.SetDataSourcePath(settings.DataSourcePath);
externalTableDesc.SetLocation(settings.Location);
externalTableDesc.SetSourceType("General");
externalTableDesc.SetReplaceIfExists(replaceIfExists);

Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size());
for (const auto& name : settings.ColumnOrder) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/gateway/utils/scheme_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMay

void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc,
const TString& name,
bool replaceIfExists,
const NYql::TCreateExternalTableSettings& settings);

std::pair<TString, TString> SplitPathByDirAndBaseNames(const TString& path);
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ class TKqpGatewayProxy : public IKikimrGateway {
return Gateway->LoadTableMetadata(cluster, table, settings);
}

TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override {
TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk, bool replaceIfExists) override {
Y_UNUSED(replaceIfExists);
CHECK_PREPARED_DDL(CreateTable);

std::pair<TString, TString> pathPair;
Expand Down Expand Up @@ -1243,7 +1244,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
}

TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings,
bool createDir, bool existingOk) override
bool createDir, bool existingOk, bool replaceIfExists) override
{
CHECK_PREPARED_DDL(CreateExternalTable);

Expand All @@ -1270,13 +1271,13 @@ class TKqpGatewayProxy : public IKikimrGateway {
schemeTx.SetFailedOnAlreadyExists(!existingOk);

NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, replaceIfExists, settings);
TGenericResult result;
result.SetSuccess();
phyTxRemover.Forget();
return MakeFuture(result);
} else {
return Gateway->CreateExternalTable(cluster, settings, createDir, existingOk);
return Gateway->CreateExternalTable(cluster, settings, createDir, existingOk, replaceIfExists);
}
}

Expand Down
17 changes: 12 additions & 5 deletions ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
? GetTableTypeFromString(settings.TableType.Cast())
: ETableType::Table; // v0 support

if (mode == "create" || mode == "create_if_not_exists") {
if (mode == "create" || mode == "create_if_not_exists" || mode == "create_or_replace") {
if (!settings.Columns) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "No columns provided for create mode."));
Expand Down Expand Up @@ -757,7 +757,7 @@ class TKikimrDataSink : public TDataProviderBase
? settings.TableType.Cast()
: Build<TCoAtom>(ctx, node->Pos()).Value("table").Done(); // v0 support
auto mode = settings.Mode.Cast();
if (mode == "create" || mode == "create_if_not_exists") {
if (mode == "create" || mode == "create_if_not_exists" || mode == "create_or_replace") {
YQL_ENSURE(settings.Columns);
YQL_ENSURE(!settings.Columns.Cast().Empty());

Expand All @@ -779,6 +779,7 @@ class TKikimrDataSink : public TDataProviderBase
? settings.Temporary.Cast()
: Build<TCoAtom>(ctx, node->Pos()).Value("false").Done();

auto replaceIfExists = (settings.Mode.Cast().Value() == "create_or_replace");
auto existringOk = (settings.Mode.Cast().Value() == "create_if_not_exists");

return Build<TKiCreateTable>(ctx, node->Pos())
Expand All @@ -795,9 +796,12 @@ class TKikimrDataSink : public TDataProviderBase
.ColumnFamilies(settings.ColumnFamilies.Cast())
.TableSettings(settings.TableSettings.Cast())
.TableType(tableType)
.ReplaceIfExists<TCoAtom>()
.Value(replaceIfExists)
.Build()
.ExistingOk<TCoAtom>()
.Value(existringOk)
.Build()
.Build()
.Done()
.Ptr();
} else if (mode == "alter") {
Expand Down Expand Up @@ -886,16 +890,19 @@ class TKikimrDataSink : public TDataProviderBase
.Features(settings.Features)
.Done()
.Ptr();
} else if (mode == "createObject" || mode == "createObjectIfNotExists") {
} else if (mode == "createObject" || mode == "createObjectIfNotExists" || mode == "createObjectOrReplace") {
return Build<TKiCreateObject>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.ObjectId().Build(key.GetObjectId())
.TypeId().Build(key.GetObjectType())
.Features(settings.Features)
.ReplaceIfExists<TCoAtom>()
.Value(mode == "createObjectOrReplace")
.Build()
.ExistingOk<TCoAtom>()
.Value(mode == "createObjectIfNotExists")
.Build()
.Build()
.Done()
.Ptr();
} else if (mode == "alterObject") {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,10 +934,11 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
NThreading::TFuture<IKikimrGateway::TGenericResult> future;
bool isColumn = (table.Metadata->StoreType == EStoreType::Column);
bool existingOk = (maybeCreate.ExistingOk().Cast().Value() == "1");
bool replaceIfExists = (maybeCreate.ReplaceIfExists().Cast().Value() == "1");
switch (tableTypeItem) {
case ETableType::ExternalTable: {
future = Gateway->CreateExternalTable(cluster,
ParseCreateExternalTableSettings(maybeCreate.Cast(), table.Metadata->TableSettings), true, existingOk);
ParseCreateExternalTableSettings(maybeCreate.Cast(), table.Metadata->TableSettings), true, existingOk, replaceIfExists);
break;
}
case ETableType::TableStore: {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@
{"Index": 10, "Name": "Changefeeds", "Type": "TCoChangefeedList"},
{"Index": 11, "Name": "TableType", "Type": "TCoAtom"},
{"Index": 12, "Name": "Temporary", "Type": "TCoAtom"},
{"Index": 13, "Name": "ExistingOk", "Type": "TCoAtom"}
{"Index": 13, "Name": "ExistingOk", "Type": "TCoAtom"},
{"Index": 14, "Name": "ReplaceIfExists", "Type": "TCoAtom"}
]
},
{
Expand Down Expand Up @@ -254,7 +255,8 @@
{"Index": 2, "Name": "ObjectId", "Type": "TCoAtom"},
{"Index": 3, "Name": "TypeId", "Type": "TCoAtom"},
{"Index": 4, "Name": "Features", "Type": "TCoNameValueTupleList"},
{"Index": 5, "Name": "ExistingOk", "Type": "TCoAtom"}
{"Index": 5, "Name": "ExistingOk", "Type": "TCoAtom"},
{"Index": 6, "Name": "ReplaceIfExists", "Type": "TCoAtom"}
]
},
{
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ class IKikimrGateway : public TThrRefBase {
virtual NThreading::TFuture<TTableMetadataResult> LoadTableMetadata(
const TString& cluster, const TString& table, TLoadTableMetadataSettings settings) = 0;

virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;
virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false, bool replaceIfExists = false) = 0;

virtual NThreading::TFuture<TGenericResult> SendSchemeExecuterRequest(const TString& cluster,
const TMaybe<TString>& requestType,
Expand Down Expand Up @@ -843,7 +843,7 @@ class IKikimrGateway : public TThrRefBase {

virtual NThreading::TFuture<TGenericResult> DropTableStore(const TString& cluster, const TDropTableStoreSettings& settings) = 0;

virtual NThreading::TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir, bool existingOk) = 0;
virtual NThreading::TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir, bool existingOk, bool replaceIfExists) = 0;

virtual NThreading::TFuture<TGenericResult> AlterExternalTable(const TString& cluster, const TAlterExternalTableSettings& settings) = 0;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void TestCreateExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGa
settings.Columns.insert(std::make_pair("Column2", TKikimrColumnMetadata{"Column2", 0, "String", false}));
settings.ColumnOrder.push_back("Column2");

auto responseFuture = gateway->CreateExternalTable(TestCluster, settings, true, false);
auto responseFuture = gateway->CreateExternalTable(TestCluster, settings, true, false, false);
responseFuture.Wait();
auto response = responseFuture.GetValue();
response.Issues().PrintTo(Cerr);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,7 @@ message TExternalTableDescription {
optional string Location = 6;
repeated TColumnDescription Columns = 7;
optional bytes Content = 8;
optional bool ReplaceIfExists = 9; // Only applicable for `create external table` operation
}

// Access without authorization
Expand Down Expand Up @@ -1866,6 +1867,7 @@ message TExternalDataSourceDescription {
optional string Installation = 6;
optional TAuth Auth = 7;
optional TExternalDataSourceProperties Properties = 8;
optional bool ReplaceIfExists = 9;
}

message TViewDescription {
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/sql/v1/SQLv1.g.in
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ values_source_row: LPAREN expr_list RPAREN;

simple_values_source: expr_list | select_stmt;

create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE (IF NOT EXISTS)? object_ref
create_external_data_source_stmt: CREATE (OR REPLACE)? EXTERNAL DATA SOURCE (IF NOT EXISTS)? object_ref
with_table_settings
;

Expand Down Expand Up @@ -623,7 +623,7 @@ object_features: object_feature | LPAREN object_feature (COMMA object_feature)*

object_type_ref: an_id_or_type;

create_table_stmt: CREATE (TABLE | TABLESTORE | EXTERNAL TABLE) (IF NOT EXISTS)? simple_table_ref LPAREN create_table_entry (COMMA create_table_entry)* COMMA? RPAREN
create_table_stmt: CREATE (OR REPLACE)? (TABLE | TABLESTORE | EXTERNAL TABLE) (IF NOT EXISTS)? simple_table_ref LPAREN create_table_entry (COMMA create_table_entry)* COMMA? RPAREN
table_inherits?
table_partition_by?
with_table_settings?
Expand Down
23 changes: 12 additions & 11 deletions ydb/library/yql/sql/v1/format/sql_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -891,27 +891,24 @@ friend struct TStaticData;
Visit(msg.GetToken1());
Visit(msg.GetBlock2());
Visit(msg.GetBlock3());
Visit(msg.GetRule_simple_table_ref4());
Visit(msg.GetToken5());
Visit(msg.GetBlock4());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add tests for this new behaviour to ydb/library/yql/sql/v1/format/sql_format_ut.cpp

Visit(msg.GetRule_simple_table_ref5());
Visit(msg.GetToken6());
PushCurrentIndent();
NewLine();
Visit(msg.GetRule_create_table_entry6());
for (const auto& b : msg.GetBlock7()) {
Visit(msg.GetRule_create_table_entry7());
for (const auto& b : msg.GetBlock8()) {
Visit(b.GetToken1());
NewLine();
Visit(b.GetRule_create_table_entry2());
}
if (msg.HasBlock8()) {
Visit(msg.GetBlock8());
if (msg.HasBlock9()) {
Visit(msg.GetBlock9());
}

PopCurrentIndent();
NewLine();
Visit(msg.GetToken9());
if (msg.HasBlock10()) {
NewLine();
Visit(msg.GetBlock10());
}
Visit(msg.GetToken10());
if (msg.HasBlock11()) {
NewLine();
Visit(msg.GetBlock11());
Expand All @@ -924,6 +921,10 @@ friend struct TStaticData;
NewLine();
Visit(msg.GetBlock13());
}
if (msg.HasBlock14()) {
NewLine();
Visit(msg.GetBlock14());
}
}

void VisitDropTable(const TRule_drop_table_stmt& msg) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/sql/v1/format/sql_format_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
"CREATE EXTERNAL DATA SOURCE usEr WITH (a = \"b\");\n"},
{"creAte exTernAl daTa SouRce if not exists usEr With (a = \"b\")",
"CREATE EXTERNAL DATA SOURCE IF NOT EXISTS usEr WITH (a = \"b\");\n"},
{"creAte oR rePlaCe exTernAl daTa SouRce usEr With (a = \"b\")",
"CREATE OR REPLACE EXTERNAL DATA SOURCE usEr WITH (a = \"b\");\n"},
{"create external data source eds with (a=\"a\",b=\"b\",c = true)",
"CREATE EXTERNAL DATA SOURCE eds WITH (\n\ta = \"a\",\n\tb = \"b\",\n\tc = TRUE\n);\n"},
{"alter external data source eds set a true, reset (b, c), set (x=y, z=false)",
Expand Down Expand Up @@ -388,6 +390,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
TCases cases = {
{"creAte exTernAl TabLe usEr (a int) With (a = \"b\")",
"CREATE EXTERNAL TABLE usEr (\n\ta int\n)\nWITH (a = \"b\");\n"},
{"creAte oR rePlaCe exTernAl TabLe usEr (a int) With (a = \"b\")",
"CREATE OR REPLACE EXTERNAL TABLE usEr (\n\ta int\n)\nWITH (a = \"b\");\n"},
{"creAte exTernAl TabLe iF NOt Exists usEr (a int) With (a = \"b\")",
"CREATE EXTERNAL TABLE IF NOT EXISTS usEr (\n\ta int\n)\nWITH (a = \"b\");\n"},
{"create external table user (a int) with (a=\"b\",c=\"d\")",
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/sql/v1/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ namespace NSQLTranslationV1 {
TNodePtr BuildUpsertObjectOperation(TPosition pos, const TString& objectId, const TString& typeId,
std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
TNodePtr BuildCreateObjectOperation(TPosition pos, const TString& objectId, const TString& typeId,
bool existingOk, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
TNodePtr BuildAlterObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,
std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context);
TNodePtr BuildDropObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,
Expand Down
19 changes: 15 additions & 4 deletions ydb/library/yql/sql/v1/object_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,30 @@ class TCreateObject: public TObjectProcessorImpl {
std::set<TString> FeaturesToReset;
protected:
bool ExistingOk = false;
bool ReplaceIfExists = false;
protected:
virtual INode::TPtr BuildOptions() const override {
return Y(Q(Y(Q("mode"), Q(ExistingOk ? "createObjectIfNotExists" : "createObject"))));
TString mode;
if (ExistingOk) {
mode = "createObjectIfNotExists";
} else if (ReplaceIfExists) {
mode = "createObjectOrReplace";
} else {
mode = "createObject";
}

return Y(Q(Y(Q("mode"), Q(mode))));
}
virtual INode::TPtr FillFeatures(INode::TPtr options) const override;
public:
TCreateObject(TPosition pos, const TString& objectId,
const TString& typeId, bool existingOk, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context)
const TString& typeId, bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context)
: TBase(pos, objectId, typeId, context)
, Features(std::move(features))
, FeaturesToReset(std::move(featuresToReset))
, ExistingOk(existingOk) {
}
, ExistingOk(existingOk)
, ReplaceIfExists(replaceIfExists) {
}
};

class TUpsertObject final: public TCreateObject {
Expand Down
Loading