Skip to content

Commit d72fa56

Browse files
committed
Partition autosplit feature
1 parent 228bb52 commit d72fa56

File tree

3 files changed

+35
-0
lines changed

3 files changed

+35
-0
lines changed

ydb/_grpc/grpcwrapper/ydb_topic.py

+24
Original file line numberDiff line numberDiff line change
@@ -419,12 +419,14 @@ def from_proto(
419419
class InitRequest(IToProto):
420420
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
421421
consumer: str
422+
auto_partitioning_support: bool
422423

423424
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
424425
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
425426
res.consumer = self.consumer
426427
for settings in self.topics_read_settings:
427428
res.topics_read_settings.append(settings.to_proto())
429+
res.auto_partitioning_support = self.auto_partitioning_support
428430
return res
429431

430432
@dataclass
@@ -696,6 +698,20 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRespon
696698
partition_session_id=self.partition_session_id,
697699
)
698700

701+
@dataclass
702+
class EndPartitionSession(IFromProto):
703+
partition_session_id: int
704+
adjacent_partition_ids: List[int]
705+
child_partition_ids: List[int]
706+
707+
@staticmethod
708+
def from_proto(msg: ydb_topic_pb2.StreamReadMessage.EndPartitionSession):
709+
return StreamReadMessage.EndPartitionSession(
710+
partition_session_id=msg.partition_session_id,
711+
adjacent_partition_ids=list(msg.adjacent_partition_ids),
712+
child_partition_ids=list(msg.child_partition_ids),
713+
)
714+
699715
@dataclass
700716
class FromClient(IToProto):
701717
client_message: "ReaderMessagesFromClientToServer"
@@ -775,6 +791,13 @@ def from_proto(
775791
msg.partition_session_status_response
776792
),
777793
)
794+
elif mess_type == "end_partition_session":
795+
return StreamReadMessage.FromServer(
796+
server_status=server_status,
797+
server_message=StreamReadMessage.EndPartitionSession.from_proto(
798+
msg.end_partition_session,
799+
)
800+
)
778801
else:
779802
raise issues.UnexpectedGrpcMessage(
780803
"Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type
@@ -799,6 +822,7 @@ def from_proto(
799822
UpdateTokenResponse,
800823
StreamReadMessage.StartPartitionSessionRequest,
801824
StreamReadMessage.StopPartitionSessionRequest,
825+
StreamReadMessage.EndPartitionSession,
802826
]
803827

804828

ydb/_topic_reader/topic_reader.py

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class PublicReaderSettings:
4545
consumer: str
4646
topic: TopicSelectorTypes
4747
buffer_size_bytes: int = 50 * 1024 * 1024
48+
auto_partitioning_support: bool = False
4849

4950
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
5051
"""decoders: map[codec_code] func(encoded_bytes)->decoded_bytes"""
@@ -77,6 +78,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
7778
return StreamReadMessage.InitRequest(
7879
topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore
7980
consumer=self.consumer,
81+
auto_partitioning_support=self.auto_partitioning_support,
8082
)
8183

8284
def _retry_settings(self) -> RetrySettings:

ydb/_topic_reader/topic_reader_asyncio.py

+9
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,12 @@ async def _read_messages_loop(self):
498498
):
499499
self._on_partition_session_stop(message.server_message)
500500

501+
elif isinstance(
502+
message.server_message,
503+
StreamReadMessage.EndPartitionSession,
504+
):
505+
self._on_end_partition_session(message.server_message)
506+
501507
elif isinstance(message.server_message, UpdateTokenResponse):
502508
self._update_token_event.set()
503509

@@ -575,6 +581,9 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes
575581
)
576582
)
577583

584+
def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession):
585+
logger.debug(f"End partition session with id: {message.partition_session_id}")
586+
578587
def _on_read_response(self, message: StreamReadMessage.ReadResponse):
579588
self._buffer_consume_bytes(message.bytes_size)
580589

0 commit comments

Comments
 (0)