Skip to content

Commit c09ee6b

Browse files
authored
Merge a527b28 into 04801c0
2 parents 04801c0 + a527b28 commit c09ee6b

File tree

4 files changed

+137
-0
lines changed

4 files changed

+137
-0
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# -*- coding: utf-8 -*-
2+
import pytest
3+
import time
4+
import ydb
5+
6+
from ydb.tests.library.harness.kikimr_runner import KiKiMR
7+
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
8+
from ydb.tests.library.common.types import Erasure
9+
10+
11+
class TestYdbTransferWorkload(object):
12+
@classmethod
13+
def setup_class(cls):
14+
cls.cluster = KiKiMR(KikimrConfigGenerator(
15+
erasure=Erasure.MIRROR_3_DC,
16+
extra_feature_flags={
17+
"enable_topic_transfer": True,
18+
}
19+
))
20+
cls.cluster.start()
21+
22+
@classmethod
23+
def teardown_class(cls):
24+
cls.cluster.stop()
25+
26+
@pytest.mark.parametrize("store_type", ["row", "column"])
27+
def test(self, store_type):
28+
self.database = '/Root'
29+
self.message_count = 10000
30+
31+
driver_config = ydb.DriverConfig(f'grpc://localhost:{self.cluster.nodes[1].grpc_port}', self.database)
32+
33+
with ydb.Driver(driver_config) as driver:
34+
driver.wait(timeout=5)
35+
36+
with ydb.QuerySessionPool(driver) as pool:
37+
print(f"CREATE TABLE {store_type}")
38+
pool.execute_with_retries(
39+
f"""
40+
CREATE TABLE target_table_{store_type} (
41+
partition Uint32 NOT NULL,
42+
offset Uint64 NOT NULL,
43+
message Utf8,
44+
PRIMARY KEY (partition, offset)
45+
) WITH (
46+
STORE = {store_type}
47+
);
48+
"""
49+
)
50+
51+
print("CREATE TOPIC")
52+
pool.execute_with_retries(
53+
f"""
54+
CREATE TOPIC source_topic_{store_type};
55+
"""
56+
)
57+
58+
lmb = '''
59+
$l = ($x) -> {
60+
return [
61+
<|
62+
partition:CAST($x._partition AS Uint32),
63+
offset:CAST($x._offset AS Uint64),
64+
message:CAST($x._data AS Utf8)
65+
|>
66+
];
67+
};
68+
'''
69+
70+
pool.execute_with_retries(
71+
"""
72+
{0}
73+
74+
CREATE TRANSFER transfer_{1}
75+
FROM source_topic_{1} TO target_table_{1} USING $l
76+
WITH (
77+
CONNECTION_STRING = 'grpc://localhost:{2}/?database={3}',
78+
FLUSH_INTERVAL = Interval('PT1S'),
79+
BATCH_SIZE_BYTES = 8388608
80+
);
81+
""".format(lmb, store_type, self.cluster.nodes[1].grpc_port, self.database)
82+
)
83+
84+
with driver.topic_client.writer(f"source_topic_{store_type}", producer_id="producer-id") as writer:
85+
for i in range(self.message_count):
86+
writer.write(ydb.TopicWriterMessage(f"message-{i}"))
87+
88+
for i in range(30):
89+
time.sleep(1)
90+
91+
rss = pool.execute_with_retries(
92+
f"""
93+
SELECT MAX(offset) AS last_offset
94+
FROM target_table_{store_type};
95+
"""
96+
)
97+
rs = rss[0]
98+
row = rs.rows[0]
99+
100+
print("Last offset={row.last_offset}, expected={self.message_count}")
101+
if row.last_offset == (self.message_count - 1):
102+
return
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
IF (NOT WITH_VALGRIND)
2+
PY3TEST()
3+
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
4+
ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb")
5+
ENV(YDB_ERASURE=mirror_3_dc)
6+
ENV(YDB_USE_IN_MEMORY_PDISKS=true)
7+
8+
TEST_SRCS(
9+
test_workload.py
10+
)
11+
12+
IF (SANITIZER_TYPE)
13+
REQUIREMENTS(ram:32)
14+
ENDIF()
15+
16+
SIZE(MEDIUM)
17+
18+
DEPENDS(
19+
ydb/apps/ydbd
20+
ydb/apps/ydb
21+
)
22+
23+
PEERDIR(
24+
ydb/tests/library
25+
)
26+
27+
28+
END()
29+
30+
ENDIF()

ydb/tests/stress/transfer/ya.make

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
RECURSE_FOR_TESTS(
2+
tests
3+
)
4+

ydb/tests/stress/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ RECURSE(
66
oltp_workload
77
simple_queue
88
statistics_workload
9+
transfer
910
)

0 commit comments

Comments
 (0)