Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ydb/tests/stress/transfer/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
import argparse
from ydb.tests.stress.transfer.workload import Workload

if __name__ == '__main__':
text = """\033[92mTransfer workload\x1b[0m"""
parser = argparse.ArgumentParser(description=text, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used")
parser.add_argument('--database', default=None, required=True, help='A database to connect')
parser.add_argument('--duration', default=10 ** 9, type=lambda x: int(x), help='A duration of workload in seconds.')
parser.add_argument('--mode', default="row", choices=["row", "column"], help='STORE mode for CREATE TABLE')
args = parser.parse_args()
with Workload(args.endpoint, args.database, args.duration, args.mode) as workload:
workload.loop()
35 changes: 35 additions & 0 deletions ydb/tests/stress/transfer/tests/test_workload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
import os
import pytest
import yatest

from ydb.tests.library.harness.kikimr_runner import KiKiMR
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
from ydb.tests.library.common.types import Erasure


class TestYdbTransferWorkload(object):
@classmethod
def setup_class(cls):
cls.cluster = KiKiMR(KikimrConfigGenerator(
erasure=Erasure.MIRROR_3_DC,
extra_feature_flags={
"enable_topic_transfer": True,
}
))
cls.cluster.start()

@classmethod
def teardown_class(cls):
cls.cluster.stop()

@pytest.mark.parametrize("store_type", ["row", "column"])
def test(self, store_type):
cmd = [
yatest.common.binary_path(os.getenv("YDB_TEST_PATH")),
"--endpoint", f'grpc://localhost:{self.cluster.nodes[1].grpc_port}',
"--database", "/Root",
"--duration", "60",
"--mode", store_type
]
yatest.common.execute(cmd, wait=True)
33 changes: 33 additions & 0 deletions ydb/tests/stress/transfer/tests/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
IF (NOT WITH_VALGRIND)
PY3TEST()
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb")
ENV(YDB_ERASURE=mirror_3_dc)
ENV(YDB_USE_IN_MEMORY_PDISKS=true)
ENV(YDB_TEST_PATH="ydb/tests/stress/transfer/transfer")

TEST_SRCS(
test_workload.py
)

IF (SANITIZER_TYPE)
REQUIREMENTS(ram:32)
ENDIF()

SIZE(MEDIUM)

DEPENDS(
ydb/apps/ydbd
ydb/apps/ydb
ydb/tests/stress/transfer
)

PEERDIR(
ydb/tests/library
ydb/tests/stress/transfer/workload
)


END()

ENDIF()
113 changes: 113 additions & 0 deletions ydb/tests/stress/transfer/workload/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
import ydb

import time
import unittest
import uuid


class Workload(unittest.TestCase):
def __init__(self, endpoint, database, duration, mode):
self.database = database
self.endpoint = endpoint
self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database))
self.pool = ydb.QuerySessionPool(self.driver)
self.duration = duration
self.mode = mode
self.id = f"{uuid.uuid1()}".replace("-", "_")
self.table_name = f"transfer_target_table_{mode}_{self.id}"
self.topic_name = f"transfer_source_topic_{mode}_{self.id}"
self.transfer_name = f"transfer_{mode}_{self.id}"

def create_table(self):
self.pool.execute_with_retries(
f"""
CREATE TABLE {self.table_name} (
partition Uint32 NOT NULL,
offset Uint64 NOT NULL,
message Utf8,
PRIMARY KEY (partition, offset)
) WITH (
STORE = {self.mode}
);
"""
)

def create_topic(self):
self.pool.execute_with_retries(
f"CREATE TOPIC {self.topic_name};"
)

def create_transfer(self):
lmb = '''
$l = ($x) -> {
return [
<|
partition:CAST($x._partition AS Uint32),
offset:CAST($x._offset AS Uint64),
message:CAST($x._data AS Utf8)
|>
];
};
'''

self.pool.execute_with_retries(
f"""
{lmb}

CREATE TRANSFER {self.transfer_name}
FROM {self.topic_name} TO {self.table_name} USING $l
WITH (
CONNECTION_STRING = '{self.endpoint}/?database={self.database}',
FLUSH_INTERVAL = Interval('PT1S'),
BATCH_SIZE_BYTES = 8388608
);
"""
)

def write_to_topic(self):
finished_at = time.time() + self.duration
self.message_count = 0

with self.driver.topic_client.writer(self.topic_name, producer_id="producer-id") as writer:
while time.time() < finished_at:
writer.write(ydb.TopicWriterMessage(f"message-{time.time()}"))
self.message_count += 1

def wait_transfer_finished(self):
iterations = 30

last_offset = -1

for i in range(iterations):
time.sleep(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А точно слип нужен, почему просто не можем фигачить как не в себя?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

И делал бы это время случайное от 10 до 60 секунд, например

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это проверка завершения трансфера, а не создание нагрузки. Здесь не надо ни рандомизировать, ни лопатить постоянно.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А, понял, сорри


rss = self.pool.execute_with_retries(
f"""
SELECT MAX(offset) AS last_offset
FROM {self.table_name};
"""
)
rs = rss[0]
last_offset = rs.rows[0].last_offset

if last_offset + 1 == self.message_count:
return

raise Exception(f"Transfer still work after {iterations} seconds. Last offset is {last_offset}")

def loop(self):
self.create_table()
self.create_topic()
self.create_transfer()

self.write_to_topic()

self.wait_transfer_finished()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.stop()
self.driver.stop()
13 changes: 13 additions & 0 deletions ydb/tests/stress/transfer/workload/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
PY3_LIBRARY()

PY_SRCS(
__init__.py
)

PEERDIR(
library/python/monlib
ydb/public/sdk/python
ydb/public/sdk/python/enable_v3_new_behavior
)

END()
16 changes: 16 additions & 0 deletions ydb/tests/stress/transfer/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
PY3_PROGRAM(transfer)

PY_SRCS(
__main__.py
)

PEERDIR(
ydb/tests/stress/transfer/workload
)

END()

RECURSE_FOR_TESTS(
tests
)

1 change: 1 addition & 0 deletions ydb/tests/stress/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ RECURSE(
oltp_workload
simple_queue
statistics_workload
transfer
)