diff --git a/src/Callback/KafkaErrorCallback.php b/src/Callback/KafkaErrorCallback.php index b3be9ae..c024e96 100644 --- a/src/Callback/KafkaErrorCallback.php +++ b/src/Callback/KafkaErrorCallback.php @@ -22,7 +22,8 @@ final class KafkaErrorCallback */ public function __invoke($kafka, int $errorCode, string $reason) { - if (RD_KAFKA_RESP_ERR__TRANSPORT === $errorCode) { + // non fatal errors are retried by librdkafka + if (RD_KAFKA_RESP_ERR__FATAL !== $errorCode) { return; } diff --git a/tests/Unit/Callback/KafkaErrorCallbackTest.php b/tests/Unit/Callback/KafkaErrorCallbackTest.php index b39a37c..b036a56 100644 --- a/tests/Unit/Callback/KafkaErrorCallbackTest.php +++ b/tests/Unit/Callback/KafkaErrorCallbackTest.php @@ -11,51 +11,17 @@ */ class KafkaErrorCallbackTest extends TestCase { - - public function getConsumerMock() - { - return $this->getMockBuilder(RdKafkaConsumer::class) - ->disableOriginalConstructor() - ->onlyMethods(['unsubscribe', 'getSubscription']) - ->getMock(); - } - public function testInvokeWithBrokerException() { self::expectException('Jobcloud\Kafka\Exception\KafkaBrokerException'); - - $consumerMock = $this->getConsumerMock(); - - $consumerMock - ->expects(self::any()) - ->method('unsubscribe') - ->willReturn(null); - - $consumerMock - ->expects(self::any()) - ->method('getSubscription') - ->willReturn([]); - $callback = new KafkaErrorCallback(); - call_user_func($callback, $consumerMock, 1, 'error'); + call_user_func($callback, null, RD_KAFKA_RESP_ERR__FATAL, 'error'); } public function testInvokeWithAcceptableError() { - $consumerMock = $this->getConsumerMock(); - - $consumerMock - ->expects(self::any()) - ->method('unsubscribe') - ->willReturn(null); - - $consumerMock - ->expects(self::any()) - ->method('getSubscription') - ->willReturn([]); - $callback = new KafkaErrorCallback(); - $result = call_user_func($callback, $consumerMock, RD_KAFKA_RESP_ERR__TRANSPORT, 'error'); + $result = call_user_func($callback, null, RD_KAFKA_RESP_ERR__TRANSPORT, 'error'); self::assertNull($result); }