Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,37 @@ namespace NKqp {

using namespace NYdb;

TTestHelper::TTestHelper(const TKikimrSettings& settings)
: Kikimr(settings)
, TableClient(Kikimr.GetTableClient())
, Session(TableClient.CreateSession().GetValueSync().GetSession())
{}
TTestHelper::TTestHelper(const TKikimrSettings& settings) {
TKikimrSettings kikimrSettings(settings);
if (!kikimrSettings.FeatureFlags.HasEnableTieringInColumnShard()) {
kikimrSettings.SetEnableTieringInColumnShard(true);
}

Kikimr = std::make_unique<TKikimrRunner>(kikimrSettings);
TableClient = std::make_unique<NYdb::NTable::TTableClient>(Kikimr->GetTableClient());
Session = std::make_unique<NYdb::NTable::TSession>(TableClient->CreateSession().GetValueSync().GetSession());
}

NKikimr::NKqp::TKikimrRunner& TTestHelper::GetKikimr() {
return Kikimr;
return *Kikimr;
}

TTestActorRuntime& TTestHelper::GetRuntime() {
return *Kikimr.GetTestServer().GetRuntime();
return *Kikimr->GetTestServer().GetRuntime();
}

NYdb::NTable::TSession& TTestHelper::GetSession() {
return Session;
return *Session;
}

void TTestHelper::CreateTable(const TColumnTableBase& table, const EStatus expectedStatus) {
std::cerr << (table.BuildQuery()) << std::endl;
auto result = Session.ExecuteSchemeQuery(table.BuildQuery()).GetValueSync();
auto result = GetSession().ExecuteSchemeQuery(table.BuildQuery()).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), expectedStatus, result.GetIssues().ToString());
}

void TTestHelper::CreateTier(const TString& tierName) {
auto result = Session.ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync();
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Expand All @@ -70,43 +75,43 @@ namespace NKqp {
}
]
})";
auto result = Session.ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync();
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
return ruleName;
}

void TTestHelper::SetTiering(const TString& tableName, const TString& ruleName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET (TIERING = '" << ruleName << "')";
auto result = Session.ExecuteSchemeQuery(alterQuery).GetValueSync();
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::ResetTiering(const TString& tableName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TIERING)";
auto result = Session.ExecuteSchemeQuery(alterQuery).GetValueSync();
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::DropTable(const TString& tableName) {
auto result = Session.DropTable(tableName).GetValueSync();
auto result = GetSession().DropTable(tableName).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) {
Y_UNUSED(opStatus);
NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer());
NKikimr::Tests::NCS::THelper helper(GetKikimr().GetTestServer());
auto batch = updates.BuildArrow();
helper.SendDataViaActorSystem(table.GetName(), batch, opStatus);
}

void TTestHelper::BulkUpsert(const TColumnTable& table, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) {
Y_UNUSED(opStatus);
NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer());
NKikimr::Tests::NCS::THelper helper(GetKikimr().GetTestServer());
helper.SendDataViaActorSystem(table.GetName(), batch, opStatus);
}

void TTestHelper::ReadData(const TString& query, const TString& expected, const EStatus opStatus /*= EStatus::SUCCESS*/) {
auto it = TableClient.StreamExecuteScanQuery(query).GetValueSync();
auto it = TableClient->StreamExecuteScanQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); // Means stream successfully get
TString result = StreamResultToYson(it, false, opStatus);
if (opStatus == EStatus::SUCCESS) {
Expand All @@ -115,17 +120,17 @@ namespace NKqp {
}

void TTestHelper::RebootTablets(const TString& tableName) {
auto runtime = Kikimr.GetTestServer().GetRuntime();
auto runtime = GetKikimr().GetTestServer().GetRuntime();
TActorId sender = runtime->AllocateEdgeActor();
TVector<ui64> shards;
{
auto describeResult = DescribeTable(&Kikimr.GetTestServer(), sender, tableName);
auto describeResult = DescribeTable(&GetKikimr().GetTestServer(), sender, tableName);
for (auto shard : describeResult.GetPathDescription().GetColumnTableDescription().GetSharding().GetColumnShards()) {
shards.push_back(shard);
}
}
for (auto shard : shards) {
Kikimr.GetTestServer().GetRuntime()->Send(MakePipePerNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward(
GetKikimr().GetTestServer().GetRuntime()->Send(MakePipePerNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward(
new TEvents::TEvPoisonPill(), shard, false));
}
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ namespace NKqp {
};

private:
TKikimrRunner Kikimr;
NYdb::NTable::TTableClient TableClient;
NYdb::NTable::TSession Session;
std::unique_ptr<TKikimrRunner> Kikimr;
std::unique_ptr<NYdb::NTable::TTableClient> TableClient;
std::unique_ptr<NYdb::NTable::TSession> Session;

public:
TTestHelper(const TKikimrSettings& settings);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,5 @@ message TFeatureFlags {
optional bool EnableGranularTimecast = 137 [default = true];
optional bool EnableAlterShardingInColumnShard = 138 [default = false];
optional bool EnablePgSyntax = 139 [default = true];
optional bool EnableTieringInColumnShard = 140 [default = false];
}
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableBackupService)
FEATURE_FLAG_SETTER(EnableGranularTimecast)
FEATURE_FLAG_SETTER(EnablePgSyntax)
FEATURE_FLAG_SETTER(EnableTieringInColumnShard)

#undef FEATURE_FLAG_SETTER
};
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/schemeshard/olap/operations/alter_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ class TAlterColumnTable: public TSubOperation {
return result;
}

const bool hasTiering = Transaction.HasAlterColumnTable() && Transaction.GetAlterColumnTable().HasAlterTtlSettings() &&
Transaction.GetAlterColumnTable().GetAlterTtlSettings().HasUseTiering();
if (hasTiering && HasAppData() && !AppDataVerified().FeatureFlags.GetEnableTieringInColumnShard()) {
Copy link
Collaborator

@ijon ijon Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HasAppData() && !AppDataVerified() -- лишняя защита, здесь мы заведомо находимся в контексте акторной системы.

В других подобных местах так не делается.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Таким образом использующийся флаг будет нельзя менять в runtime -- только через рестарт нод базы

result->SetError(NKikimrScheme::StatusPreconditionFailed, "Tiering functionality is disabled for OLAP tables");
return result;
}

const TString& parentPathStr = Transaction.GetWorkingDir();
const TString& name = Transaction.HasAlterColumnTable() ? Transaction.GetAlterColumnTable().GetName() : Transaction.GetAlterTable().GetName();
LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/olap/ttl/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(
)

PEERDIR(
ydb/core/base
ydb/core/protos
)

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
app.SetEnableAddColumsWithDefaults(opts.EnableAddColumsWithDefaults_);
app.SetEnableReplaceIfExistsForExternalEntities(opts.EnableReplaceIfExistsForExternalEntities_);
app.SetEnableChangefeedsOnIndexTables(opts.EnableChangefeedsOnIndexTables_);
app.SetEnableTieringInColumnShard(opts.EnableTieringInColumnShard_);

app.ColumnShardConfig.SetDisabledOnSchemeShard(false);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/test_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ namespace NSchemeShardUT_Private {
OPTION(std::optional<bool>, EnableReplaceIfExistsForExternalEntities, std::nullopt);
OPTION(std::optional<TString>, GraphBackendType, std::nullopt);
OPTION(std::optional<bool>, EnableChangefeedsOnIndexTables, std::nullopt);
OPTION(std::optional<bool>, EnableTieringInColumnShard, std::nullopt);

#undef OPTION
};
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,9 @@ Y_UNIT_TEST_SUITE(TOlap) {

Y_UNIT_TEST(AlterTtl) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
TTestEnvOptions options;
options.EnableTieringInColumnShard(true);
TTestEnv env(runtime, options);
ui64 txId = 100;

TString olapSchema = R"(
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/tiering/rule/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ void TTieringRulesManager::DoPrepareObjectsBeforeModification(std::vector<TTieri
NMetadata::NModifications::TOperationParsingResult TTieringRulesManager::DoBuildPatchFromSettings(
const NYql::TObjectSettingsImpl& settings,
TInternalModificationContext& /*context*/) const {
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableTieringInColumnShard()) {
return TConclusionStatus::Fail("Tiering functionality is disabled for OLAP tables.");
}

NMetadata::NInternal::TTableRecord result;
result.SetColumn(TTieringRule::TDecoder::TieringRuleId, NMetadata::NInternal::TYDBValue::Utf8(settings.GetObjectId()));
if (settings.GetObjectId().StartsWith("$") || settings.GetObjectId().StartsWith("_")) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/tiering/tier/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ NMetadata::NModifications::TOperationParsingResult TTiersManager::DoBuildPatchFr
const NYql::TObjectSettingsImpl& settings,
TInternalModificationContext& context) const
{
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableTieringInColumnShard()) {
return TConclusionStatus::Fail("Tiering functionality is disabled for OLAP tables.");
}

NMetadata::NInternal::TTableRecord result;
result.SetColumn(TTierConfig::TDecoder::TierName, NMetadata::NInternal::TYDBValue::Utf8(settings.GetObjectId()));
if (settings.GetObjectId().StartsWith("$") || settings.GetObjectId().StartsWith("_")) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/tiering/ut/ut_tiers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
serverSettings.GrpcPort = grpcPort;
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true);
.SetEnableMetadataProvider(true)
.SetEnableTieringInColumnShard(true)
;

Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
Expand Down Expand Up @@ -420,6 +421,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
.SetEnableTieringInColumnShard(true)
.SetAppConfig(appConfig);

Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
Expand Down Expand Up @@ -550,6 +552,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
.SetEnableTieringInColumnShard(true)
;

Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
Expand Down