Skip to content

Commit 1a7935a

Browse files
authored
Merge pull request #549 from ydb-platform/partition_autosplit
Partition autosplit feature
2 parents 279da78 + 3da64a0 commit 1a7935a

10 files changed

+550
-9
lines changed

examples/topic/autosplit_example.py

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import argparse
2+
import asyncio
3+
import datetime
4+
import logging
5+
6+
import ydb
7+
8+
logger = logging.getLogger(__name__)
9+
logger.setLevel(logging.DEBUG)
10+
11+
12+
async def connect(endpoint: str, database: str) -> ydb.aio.Driver:
13+
config = ydb.DriverConfig(endpoint=endpoint, database=database)
14+
config.credentials = ydb.credentials_from_env_variables()
15+
driver = ydb.aio.Driver(config)
16+
await driver.wait(5, fail_fast=True)
17+
return driver
18+
19+
20+
async def recreate_topic(driver: ydb.aio.Driver, topic: str, consumer: str):
21+
try:
22+
await driver.topic_client.drop_topic(topic)
23+
except ydb.SchemeError:
24+
pass
25+
26+
await driver.topic_client.create_topic(
27+
topic,
28+
consumers=[consumer],
29+
max_active_partitions=100,
30+
auto_partitioning_settings=ydb.TopicAutoPartitioningSettings(
31+
strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP,
32+
up_utilization_percent=1,
33+
down_utilization_percent=1,
34+
stabilization_window=datetime.timedelta(seconds=1),
35+
),
36+
)
37+
38+
39+
async def write_messages(driver: ydb.aio.Driver, topic: str, id: int = 0):
40+
async with driver.topic_client.writer(topic) as writer:
41+
for i in range(100):
42+
mess = ydb.TopicWriterMessage(data=f"[{id}] mess-{i}", metadata_items={"index": f"{i}"})
43+
await writer.write(mess)
44+
await asyncio.sleep(0.01)
45+
46+
47+
async def read_messages(driver: ydb.aio.Driver, topic: str, consumer: str):
48+
async with driver.topic_client.reader(topic, consumer, auto_partitioning_support=True) as reader:
49+
count = 0
50+
while True:
51+
try:
52+
mess = await asyncio.wait_for(reader.receive_message(), 5)
53+
count += 1
54+
print(mess.data.decode())
55+
reader.commit(mess)
56+
except asyncio.TimeoutError:
57+
assert count == 200
58+
return
59+
60+
61+
async def main():
62+
parser = argparse.ArgumentParser(
63+
formatter_class=argparse.RawDescriptionHelpFormatter,
64+
description="""YDB topic basic example.\n""",
65+
)
66+
parser.add_argument("-d", "--database", default="/local", help="Name of the database to use")
67+
parser.add_argument("-e", "--endpoint", default="grpc://localhost:2136", help="Endpoint url to use")
68+
parser.add_argument("-p", "--path", default="test-topic", help="Topic name")
69+
parser.add_argument("-c", "--consumer", default="consumer", help="Consumer name")
70+
parser.add_argument("-v", "--verbose", default=True, action="store_true")
71+
72+
args = parser.parse_args()
73+
74+
if args.verbose:
75+
logger.addHandler(logging.StreamHandler())
76+
77+
driver = await connect(args.endpoint, args.database)
78+
79+
await recreate_topic(driver, args.path, args.consumer)
80+
81+
await asyncio.gather(
82+
write_messages(driver, args.path, 0),
83+
write_messages(driver, args.path, 1),
84+
read_messages(driver, args.path, args.consumer),
85+
)
86+
87+
88+
if __name__ == "__main__":
89+
asyncio.run(main())

tests/topics/test_control_plane.py

+34
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pytest
44

5+
import ydb
56
from ydb import issues
67

78

@@ -56,6 +57,39 @@ async def test_alter_existed_topic(self, driver, topic_path):
5657
topic_after = await client.describe_topic(topic_path)
5758
assert topic_after.min_active_partitions == target_min_active_partitions
5859

60+
async def test_alter_auto_partitioning_settings(self, driver, topic_path):
61+
client = driver.topic_client
62+
63+
topic_before = await client.describe_topic(topic_path)
64+
65+
expected = topic_before.auto_partitioning_settings
66+
67+
expected.strategy = ydb.TopicAutoPartitioningStrategy.SCALE_UP
68+
69+
await client.alter_topic(
70+
topic_path,
71+
alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings(
72+
set_strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP,
73+
),
74+
)
75+
76+
topic_after = await client.describe_topic(topic_path)
77+
78+
assert topic_after.auto_partitioning_settings == expected
79+
80+
expected.up_utilization_percent = 88
81+
82+
await client.alter_topic(
83+
topic_path,
84+
alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings(
85+
set_up_utilization_percent=88,
86+
),
87+
)
88+
89+
topic_after = await client.describe_topic(topic_path)
90+
91+
assert topic_after.auto_partitioning_settings == expected
92+
5993

6094
class TestTopicClientControlPlane:
6195
def test_create_topic(self, driver_sync, database):

0 commit comments

Comments
 (0)