From 2baf6d39d866d2d6817471977820b8532918b659 Mon Sep 17 00:00:00 2001 From: Marcos Schroh <2828842+marcosschroh@users.noreply.github.com> Date: Tue, 19 Dec 2023 16:16:07 +0100 Subject: [PATCH] fix(Stream): handle errors.ConsumerStoppedError exception for the new consumtion way (#150) --- kstreams/streams.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kstreams/streams.py b/kstreams/streams.py index fe765b45..b75c95a7 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -250,6 +250,8 @@ async def func_wrapper_with_typing(self, calling_type: UDFType) -> None: else: # typing with cr and stream await self.func(cr, self) + except errors.ConsumerStoppedError: + return except Exception as e: logger.exception(f"CRASHED Stream!!! Task {self._consumer_task} \n\n {e}")