Skip to content

Commit

Permalink
streamlookupjoin: fix joining from multi-partition stream (backport y…
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Sep 4, 2024
1 parent 12d1cd4 commit 0fb80c7
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 82 deletions.
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/tasks/dq_connection_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void BuildStreamLookupChannels(TGraph& graph, const NNodes::TDqPhyStage& stage,
auto& originStageInfo = graph.GetStageInfo(cnStreamLookup.Output().Stage());
auto outputIndex = FromString<ui32>(cnStreamLookup.Output().Index().Value());

BuildMapChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc);
BuildUnionAllChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc);
}

template <typename TGraph>
Expand Down
205 changes: 124 additions & 81 deletions ydb/tests/fq/generic/test_streaming_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
import json
import sys
from collections import Counter
from operator import itemgetter

import ydb.public.api.protos.draft.fq_pb2 as fq
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
Expand All @@ -11,6 +13,31 @@
from ydb.tests.fq.generic.utils.settings import Settings

DEBUG = 0


def ResequenceId(messages):
res = []
i = 1
for pair in messages:
rpair = []
for it in pair:
src = json.loads(it)
src["id"] = i
rpair += [json.dumps(src)]
res += [tuple(rpair)]
i += 1
return res


def freeze(json):
t = type(json)
if t == dict:
return frozenset((k, freeze(v)) for k, v in json.items())
if t == list:
return tuple(map(freeze, json))
return json


TESTCASES = [
# 0
(
Expand Down Expand Up @@ -96,17 +123,19 @@
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'),
('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'),
('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'),
('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'),
('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'),
('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'),
('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'),
('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'),
]
* 20,
ResequenceId(
[
('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'),
('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'),
('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'),
('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'),
('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'),
('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'),
('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'),
('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'),
]
* 20
),
),
# 3
(
Expand Down Expand Up @@ -137,37 +166,39 @@
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
(
'{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}',
),
(
'{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}',
'{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}',
),
]
* 10,
ResequenceId(
[
(
'{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}',
),
(
'{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}',
'{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}',
),
]
* 10
),
),
# 4
(
Expand Down Expand Up @@ -200,37 +231,39 @@
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
(
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
),
(
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
'{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
),
]
* 1000,
ResequenceId(
[
(
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
),
(
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
'{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
),
]
* 1000
),
),
# 5
(
Expand Down Expand Up @@ -334,12 +367,23 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting
@yq_v1
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder_slj"}], indirect=True)
@pytest.mark.parametrize("streamlookup", [False, True])
@pytest.mark.parametrize("partitions_count", [1, 3])
@pytest.mark.parametrize("streamlookup", [False, True] if DEBUG else [True])
@pytest.mark.parametrize("testcase", [*range(len(TESTCASES))])
def test_streamlookup(
self, kikimr, testcase, streamlookup, fq_client: FederatedQueryClient, settings: Settings, yq_version
self,
kikimr,
testcase,
streamlookup,
partitions_count,
fq_client: FederatedQueryClient,
settings: Settings,
yq_version,
):
self.init_topics(f"pq_yq_streaming_test_lookup_{streamlookup}{testcase}_{yq_version}")
self.init_topics(
f"pq_yq_str_lookup_{partitions_count}{streamlookup}{testcase}_{yq_version}",
partitions_count=partitions_count,
)
fq_client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))

table_name = 'join_table'
Expand All @@ -359,7 +403,7 @@ def test_streamlookup(
)

query_id = fq_client.create_query(
f"streamlookup_{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING
f"streamlookup_{partitions_count}{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING
).result.query_id
fq_client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)
Expand All @@ -375,10 +419,9 @@ def test_streamlookup(
print(streamlookup, testcase, file=sys.stderr)
print(sql, file=sys.stderr)
print(*zip(messages, read_data), file=sys.stderr, sep="\n")
for r, exp in zip(read_data, messages):
r = json.loads(r)
exp = json.loads(exp[1])
assert r == exp
read_data_ctr = Counter(map(freeze, map(json.loads, read_data)))
messages_ctr = Counter(map(freeze, map(json.loads, map(itemgetter(1), messages))))
assert read_data_ctr == messages_ctr

fq_client.abort_query(query_id)
fq_client.wait_query(query_id)
Expand Down

0 comments on commit 0fb80c7

Please sign in to comment.