From 2cf60cbb114e6c9159a0b20bec66cad9c2307566 Mon Sep 17 00:00:00 2001 From: LeeC20 <956155441@qq.com> Date: Sat, 16 Nov 2024 18:20:00 +0800 Subject: [PATCH] modify: sasl kafka to close connection --- funboost/consumers/kafka_consumer_manually_commit.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/funboost/consumers/kafka_consumer_manually_commit.py b/funboost/consumers/kafka_consumer_manually_commit.py index dccd0d4c..3e9ac82b 100644 --- a/funboost/consumers/kafka_consumer_manually_commit.py +++ b/funboost/consumers/kafka_consumer_manually_commit.py @@ -145,7 +145,8 @@ def _shedual_task(self): 'sasl.password': BrokerConnConfig.KFFKA_SASL_CONFIG['sasl_plain_password'], 'group.id': self.consumer_params.broker_exclusive_config["group_id"], 'auto.offset.reset': self.consumer_params.broker_exclusive_config["auto_offset_reset"], - 'enable.auto.commit': False + 'enable.auto.commit': False, + "enable.auto.offset.store": False, }) self._confluent_consumer.subscribe([self._queue_name]) @@ -170,3 +171,6 @@ def _shedual_task(self): self.logger.debug( f'从kafka的 [{self._queue_name}] 主题,分区 {msg.partition()} 中 的 offset {msg.offset()} 取出的消息是: {msg.value()}') # noqa self._submit_task(kw) + + def __exit__(self): + self._confluent_consumer.close()