Skip to content

Commit 2eea6af

Browse files
committed
Stop rotation batch queue after split
1 parent f68af4c commit 2eea6af

File tree

2 files changed

+29
-9
lines changed

2 files changed

+29
-9
lines changed

ydb/_topic_reader/datatypes.py

+11
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,16 @@ def close(self):
121121
def closed(self):
122122
return self.state == PartitionSession.State.Stopped
123123

124+
def end(self):
125+
if self.closed:
126+
return
127+
128+
self.state = PartitionSession.State.Ended
129+
130+
@property
131+
def ended(self):
132+
return self.state == PartitionSession.State.Ended
133+
124134
def _ensure_not_closed(self):
125135
if self.state == PartitionSession.State.Stopped:
126136
raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError()
@@ -129,6 +139,7 @@ class State(enum.Enum):
129139
Active = 1
130140
GracefulShutdown = 2
131141
Stopped = 3
142+
Ended = 4
132143

133144
@dataclass(order=True)
134145
class CommitAckWaiter:

ydb/_topic_reader/topic_reader_asyncio.py

+18-9
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,6 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
335335
self._started = True
336336
self._stream = stream
337337

338-
print(init_message)
339-
340338
stream.write(StreamReadMessage.FromClient(client_message=init_message))
341339
init_response = await stream.receive() # type: StreamReadMessage.FromServer
342340
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
@@ -390,6 +388,14 @@ def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]:
390388
partition_session_id, batch = self._message_batches.popitem(last=False)
391389
return partition_session_id, batch
392390

391+
def _return_batch_to_queue(self, part_sess_id: int, batch: datatypes.PublicBatch):
392+
self._message_batches[part_sess_id] = batch
393+
394+
# In case of auto-split we should return all parent messages ASAP
395+
# without queue rotation to prevent child's messages before parent's.
396+
if self._partition_sessions[part_sess_id].ended:
397+
self._message_batches.move_to_end(part_sess_id, last=False)
398+
393399
def receive_batch_nowait(self, max_messages: Optional[int] = None):
394400
if self._get_first_error():
395401
raise self._get_first_error()
@@ -405,7 +411,8 @@ def receive_batch_nowait(self, max_messages: Optional[int] = None):
405411

406412
cutted_batch = batch._pop_batch(message_count=max_messages)
407413

408-
self._message_batches[part_sess_id] = batch
414+
self._return_batch_to_queue(part_sess_id, batch)
415+
409416
self._buffer_release_bytes(cutted_batch._bytes_size)
410417

411418
return cutted_batch
@@ -425,7 +432,7 @@ def receive_message_nowait(self):
425432
self._buffer_release_bytes(batch._bytes_size)
426433
else:
427434
# TODO: we should somehow release bytes from single message as well
428-
self._message_batches[part_sess_id] = batch
435+
self._return_batch_to_queue(part_sess_id, batch)
429436

430437
return message
431438

@@ -584,13 +591,15 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes
584591
)
585592

586593
def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession):
587-
logger.info(
588-
f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}"
594+
logger.debug(
595+
f"End partition session with id: {message.partition_session_id}, "
596+
f"child partitions: {message.child_partition_ids}"
589597
)
590598

591-
print(
592-
f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}"
593-
)
599+
if message.partition_session_id in self._partition_sessions:
600+
# Mark partition session as ended not to shuffle messages.
601+
self._partition_sessions[message.partition_session_id].end()
602+
594603

595604
def _on_read_response(self, message: StreamReadMessage.ReadResponse):
596605
self._buffer_consume_bytes(message.bytes_size)

0 commit comments

Comments
 (0)