Skip to content

Commit

Permalink
Add streamlookup lru cache backport (#9763)
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Sep 27, 2024
1 parent 67e35be commit 95afda4
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 105 deletions.
263 changes: 167 additions & 96 deletions ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion ydb/library/yql/dq/proto/dq_tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ message TDqInputTransformLookupSettings {
repeated string RightJoinKeyNames = 6; //Join column names in the right hand, in the same order as previous
bytes NarrowInputRowType = 7; //Serialized struct type
bytes NarrowOutputRowType = 8; //Serialized struct type
//TODO add lookup cache parameters
uint64 CacheLimit = 9;
uint64 CacheTtlSeconds = 10;
}

message TDqTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace NKikimr::NMiniKQL {
// Never requests system time, expects monotonically increased time points in methods argument
class TUnboxedKeyValueLruCacheWithTtl {
struct TEntry {
TEntry(NUdf::TUnboxedValue key, NUdf::TUnboxedValue value, std::chrono::time_point<std::chrono::steady_clock> expiration)
TEntry(NUdf::TUnboxedValue key, NUdf::TUnboxedValue value, std::chrono::time_point<std::chrono::steady_clock> expiration)
: Key(std::move(key))
, Value(std::move(value))
, Expiration(std::move(expiration))
Expand Down Expand Up @@ -73,16 +73,23 @@ class TUnboxedKeyValueLruCacheWithTtl {
return std::nullopt;
}

// Perform garbage collection.
// Perform garbage collection, single step, O(1) time.
// Must be called periodically
void Tick(const std::chrono::time_point<std::chrono::steady_clock>& now) {
bool Tick(const std::chrono::time_point<std::chrono::steady_clock>& now) {
if (UsageList.empty()) {
return;
return false;
}
if (now < UsageList.front().Expiration) {
return;
return false;
}
RemoveLeastRecentlyUsedEntry();
return true;
}

// Perform garbage collection, O(1) amortized, but O(n) one-time
void Prune(const std::chrono::time_point<std::chrono::steady_clock>& now) {
while (Tick(now)) {
}
}

size_t Size() const {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/dq/planner/execution_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ namespace NYql::NDqs {
const auto narrowOutputRowType = GetSeqItemType(streamLookup.Ptr()->GetTypeAnn());
Y_ABORT_UNLESS(narrowOutputRowType->GetKind() == ETypeAnnotationKind::Struct);
settings.SetNarrowOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(narrowOutputRowType));
settings.SetCacheLimit(1'000'000); //TODO configure me
settings.SetCacheTtlSeconds(60); //TODO configure me

const auto inputRowType = GetSeqItemType(streamLookup.Output().Stage().Program().Ref().GetTypeAnn());
const auto outputRowType = GetSeqItemType(stage.Program().Args().Arg(inputIndex).Ref().GetTypeAnn());
Expand Down
89 changes: 86 additions & 3 deletions ydb/tests/fq/generic/test_streaming_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
DEBUG = 0


def ResequenceId(messages):
def ResequenceId(messages, field="id"):
res = []
i = 1
for pair in messages:
rpair = []
for it in pair:
src = json.loads(it)
src["id"] = i
if field in src:
src[field] = i
rpair += [json.dumps(src)]
res += [tuple(rpair)]
i += 1
Expand Down Expand Up @@ -310,6 +311,88 @@ def freeze(json):
),
],
),
# 6
(
R'''
$input = SELECT * FROM myyds.`{input_topic}`
WITH (
FORMAT=json_each_row,
SCHEMA (
za Int32,
yb STRING,
yc Int32,
zd Int32,
)
) ;
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.db as u
on(e.yb = u.b AND e.za = u.a )
;
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
ResequenceId(
[
(
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101}',
),
(
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107}',
),
(
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115}',
),
]
),
),
# 7
(
R'''
$input = SELECT * FROM myyds.`{input_topic}`
WITH (
FORMAT=json_each_row,
SCHEMA (
za Int32,
yb STRING,
yc Int32,
zd Int32,
)
) ;
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.db as u
on(e.za = u.a AND e.yb = u.b)
;
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
ResequenceId(
[
(
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101}',
),
(
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107}',
),
(
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115}',
),
]
),
),
]


Expand Down Expand Up @@ -367,7 +450,7 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting
@yq_v1
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder_slj"}], indirect=True)
@pytest.mark.parametrize("partitions_count", [1, 3])
@pytest.mark.parametrize("partitions_count", [1, 3] if DEBUG else [3])
@pytest.mark.parametrize("streamlookup", [False, True] if DEBUG else [True])
@pytest.mark.parametrize("testcase", [*range(len(TESTCASES))])
def test_streamlookup(
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/fq/generic/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ TEST_SRCS(
test_ydb.py
)

TIMEOUT(1800)

END()
6 changes: 6 additions & 0 deletions ydb/tests/fq/generic/ydb/01_basic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ set -ex
(56, 12, "2a02:1812:1713:4f00:517e:1d79:c88b:704", "Elena", 2),
(18, 17, "ivalid ip", "newUser", 12);
COMMIT;
CREATE TABLE db (b STRING NOT NULL, c Int32, a Int32 NOT NULL, d Int32, f Int32, e Int32, PRIMARY KEY(b, a));
COMMIT;
INSERT INTO db (a, b, c, d, e, f) VALUES
(1, "2", 3, 4, 5, 6),
(7, "8", 9, 10, 11, 12);
COMMIT;
'

retVal=$?
Expand Down

0 comments on commit 95afda4

Please sign in to comment.