diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 4bfb1ec0ad86..25b897d3dd41 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1300,13 +1300,9 @@ class TKqpExecuterBase : public TActorBootstrapped { const auto& input = stage.GetInputs(inputIndex); // Current assumptions: - // 1. `Broadcast` can not be the 1st stage input unless it's a single input - // 2. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll` - if (inputIndex == 0) { - if (stage.InputsSize() > 1) { - YQL_ENSURE(input.GetTypeCase() != NKqpProto::TKqpPhyConnection::kBroadcast); - } - } else { + // 1. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll` + // 2. Stages where 1st input is `Broadcast` are not partitioned. + if (inputIndex > 0) { switch (input.GetTypeCase()) { case NKqpProto::TKqpPhyConnection::kBroadcast: case NKqpProto::TKqpPhyConnection::kHashShuffle: diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 2dcce5cd1e2a..b153add48ff5 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3955,6 +3955,55 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { AssertTableReads(result, "/Root/SecondaryKeys/Index/indexImplTable", 1); } + Y_UNIT_TEST(MultipleBroadcastJoin) { + TKikimrSettings kisettings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX); + kisettings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(kisettings); + + auto db = kikimr.GetTableClient(); + auto client = kikimr.GetQueryClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto session = db.CreateSession().GetValueSync().GetSession(); + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + create table demo_ba(id text, some text, ref1 text, ref2 text, primary key(id)); + create table demo_ref1(id text, code text, some text, primary key(id), index ix_code global on (code)); + create table demo_ref2(id text, code text, some text, primary key(id), index ix_code global on (code)); + )").GetValueSync()); + } + + auto query = R"( + select ba_0.id, ba_0.some, + r_1.id, r_1.some, r_1.code, + r_2.id, r_2.some, r_2.code + from demo_ba ba_0 + left join demo_ref1 r_1 on r_1.id=ba_0.ref1 + left join demo_ref2 r_2 on r_2.code=ba_0.ref2 + where ba_0.id in ("ba#10"u,"ba#20"u,"ba#30"u,"ba#40"u,"ba#50"u,"ba#60"u,"ba#70"u,"ba#80"u,"ba#90"u,"ba#100"u); + )"; + + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::YqlV1) + .ConcurrentResultSets(false); + { + auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + //CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0))); + //CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1))); + } + { + auto it = client.StreamExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + Cerr << StreamResultToYson(it); + } + + } Y_UNIT_TEST_TWIN(ComplexLookupLimit, NewPredicateExtract) { TKikimrSettings settings;