diff --git a/.circleci/config.yml b/.circleci/config.yml index 2fb67fd..a7b384c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -27,6 +27,11 @@ workflows: dependencyCheckSumFile: "./composer.json" requires: - ci-php/install-dependencies + - ci-php/infection-testing: + dockerComposeFile: "./docker/docker-compose.yml" + dependencyCheckSumFile: "./composer.json" + requires: + - ci-php/install-dependencies jobs: coverage: diff --git a/Makefile b/Makefile index 94a5255..51e326f 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: clean code-style coverage help test static-analysis update-dependencies xdebug-enable xdebug-disable +.PHONY: clean code-style coverage help test static-analysis update-dependencies xdebug-enable xdebug-disable infection-testing .DEFAULT_GOAL := test PHPUNIT = ./vendor/bin/phpunit -c ./phpunit.xml @@ -6,6 +6,7 @@ PHPDBG = phpdbg -qrr ./vendor/bin/phpunit -c ./phpunit.xml PHPSTAN = ./vendor/bin/phpstan PHPCS = ./vendor/bin/phpcs --extensions=php CONSOLE = ./bin/console +INFECTION = ./vendor/bin/infection clean: rm -rf ./build ./vendor @@ -33,6 +34,11 @@ install-dependencies: install-dependencies-lowest: composer install --prefer-lowest +infection-testing: + make coverage + cp -f build/logs/phpunit/junit.xml build/logs/phpunit/coverage/junit.xml + ${INFECTION} --coverage=build/logs/phpunit/coverage --min-msi=76 --threads=`nproc` + xdebug-enable: sudo php-ext-enable xdebug @@ -50,6 +56,7 @@ help: # help You're looking at it! # test (default) Run all the tests with phpunit # static-analysis Run static analysis using phpstan + # infection-testing Run infection/mutation testing # install-dependencies Run composer install # update-dependencies Run composer update # xdebug-enable Enable xdebug diff --git a/composer.json b/composer.json index eacdb0f..6748312 100644 --- a/composer.json +++ b/composer.json @@ -22,14 +22,15 @@ "ext-json": "*" }, "require-dev": { - "phpunit/phpunit": "^9.1", + "phpunit/phpunit": "^9.3", "squizlabs/php_codesniffer": "^3.5.4", "phpstan/phpstan": "0.12.32", "php-mock/php-mock-phpunit": "^2.6", "kwn/php-rdkafka-stubs": "^2.0.0", "rregeer/phpunit-coverage-check": "^0.3.1", "johnkary/phpunit-speedtrap": "^3.1", - "flix-tech/avro-serde-php": "^1.3" + "flix-tech/avro-serde-php": "^1.3", + "infection/infection": "^0.16" }, "autoload": { "psr-4": { diff --git a/infection.json b/infection.json new file mode 100644 index 0000000..3205417 --- /dev/null +++ b/infection.json @@ -0,0 +1,18 @@ +{ + "timeout": 10, + "source": { + "directories": [ + "src" + ] + }, + "logs": { + "text": "build\/logs\/infection\/infection.log", + "summary": "build\/logs\/infection\/infection-summary.log" + }, + "mutators": { + "@default": true + }, + "phpUnit": { + "customPath": "vendor/bin/phpunit" + } +} diff --git a/phpunit.xml b/phpunit.xml index 462409d..13c6ec3 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -1,45 +1,46 @@ - - - - - - - - - - - - - - - - - ./tests/Unit - - - - - - - - - - - - src - - - - - + + + + src + + + + + + + + + + + + + + + + + + + + + + ./tests/Unit + + + + + + + + diff --git a/src/Consumer/AbstractKafkaConsumer.php b/src/Consumer/AbstractKafkaConsumer.php index cba0612..af4ff01 100644 --- a/src/Consumer/AbstractKafkaConsumer.php +++ b/src/Consumer/AbstractKafkaConsumer.php @@ -200,6 +200,24 @@ public function getLastOffsetForTopicPartition(string $topic, int $partition, in return $highOffset; } + /** + * @param string $topic + * @return int[] + * @throws RdKafkaException + */ + protected function getAllTopicPartitions(string $topic): array + { + + $partitions = []; + $topicMetadata = $this->getMetadataForTopic($topic); + + foreach ($topicMetadata->getPartitions() as $partition) { + $partitions[] = $partition->getId(); + } + + return $partitions; + } + /** * @param RdKafkaMessage $message * @return KafkaConsumerMessageInterface diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index a8f479d..818867d 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -146,6 +146,7 @@ public function withSubscription( int $offset = self::OFFSET_STORED ): KafkaConsumerBuilderInterface { $that = clone $this; + $that->topics = [new TopicSubscription($topicName, $partitions, $offset)]; return $that; diff --git a/src/Consumer/KafkaHighLevelConsumer.php b/src/Consumer/KafkaHighLevelConsumer.php index 7baacf8..6355617 100644 --- a/src/Consumer/KafkaHighLevelConsumer.php +++ b/src/Consumer/KafkaHighLevelConsumer.php @@ -246,7 +246,10 @@ private function getTopicSubscriptions(): array $subscriptions = []; foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) { - if ([] !== $topicSubscription->getPartitions()) { + if ( + [] !== $topicSubscription->getPartitions() + || KafkaConsumerBuilderInterface::OFFSET_STORED !== $topicSubscription->getOffset() + ) { continue; } $subscriptions[] = $topicSubscription->getTopicName(); @@ -263,13 +266,21 @@ private function getTopicAssignments(): array $assignments = []; foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) { - if ([] === $topicSubscription->getPartitions()) { + if ( + [] === $topicSubscription->getPartitions() + && KafkaConsumerBuilderInterface::OFFSET_STORED === $topicSubscription->getOffset() + ) { continue; } $offset = $topicSubscription->getOffset(); + $partitions = $topicSubscription->getPartitions(); - foreach ($topicSubscription->getPartitions() as $partitionId) { + if ([] === $partitions) { + $partitions = $this->getAllTopicPartitions($topicSubscription->getTopicName()); + } + + foreach ($partitions as $partitionId) { $assignments[] = new RdKafkaTopicPartition( $topicSubscription->getTopicName(), $partitionId, diff --git a/src/Consumer/KafkaLowLevelConsumer.php b/src/Consumer/KafkaLowLevelConsumer.php index aa1d644..c7fc6b8 100644 --- a/src/Consumer/KafkaLowLevelConsumer.php +++ b/src/Consumer/KafkaLowLevelConsumer.php @@ -148,22 +148,4 @@ protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage { return $this->queue->consume($timeoutMs); } - - /** - * @param string $topic - * @return int[] - * @throws RdKafkaException - */ - private function getAllTopicPartitions(string $topic): array - { - - $partitions = []; - $topicMetadata = $this->getMetadataForTopic($topic); - - foreach ($topicMetadata->getPartitions() as $partition) { - $partitions[] = $partition->getId(); - } - - return $partitions; - } } diff --git a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php index ae11cec..ebada42 100644 --- a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php +++ b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php @@ -13,8 +13,13 @@ use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; use PHPUnit\Framework\TestCase; use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer; +use RdKafka\ConsumerTopic as RdKafkaConsumerTopic; use RdKafka\Exception as RdKafkaException; use RdKafka\Message; +use RdKafka\Metadata as RdKafkaMetadata; +use RdKafka\Metadata\Collection as RdKafkaMetadataCollection; +use RdKafka\Metadata\Partition as RdKafkaMetadataPartition; +use RdKafka\Metadata\Topic as RdKafkaMetadataTopic; /** * @covers \Jobcloud\Kafka\Consumer\AbstractKafkaConsumer @@ -31,8 +36,7 @@ public function testSubscribeSuccess(): void $topics = [new TopicSubscription('testTopic')]; $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); - $kafkaConfigurationMock->expects(self::at(0))->method('getTopicSubscriptions')->willReturn($topics); - $kafkaConfigurationMock->expects(self::at(1))->method('getTopicSubscriptions')->willReturn([]); + $kafkaConfigurationMock->expects(self::exactly(2))->method('getTopicSubscriptions')->willReturnOnConsecutiveCalls($topics, []); $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); @@ -44,13 +48,12 @@ public function testSubscribeSuccess(): void /** * @throws KafkaConsumerSubscriptionException */ - public function testSubscribeSuccessWithAssignment(): void + public function testSubscribeSuccessWithAssignmentWithPartitions(): void { $topics = [new TopicSubscription('testTopic', [1,2], RD_KAFKA_OFFSET_BEGINNING)]; $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); - $kafkaConfigurationMock->expects(self::at(0))->method('getTopicSubscriptions')->willReturn([]); - $kafkaConfigurationMock->expects(self::at(1))->method('getTopicSubscriptions')->willReturn($topics); + $kafkaConfigurationMock->expects(self::exactly(2))->method('getTopicSubscriptions')->willReturnOnConsecutiveCalls([], $topics); $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); @@ -59,6 +62,67 @@ public function testSubscribeSuccessWithAssignment(): void $kafkaConsumer->subscribe($topics); } + /** + * @throws KafkaConsumerSubscriptionException + */ + public function testSubscribeSuccessWithAssignmentWithOffsetOnly(): void + { + $partitions = [ + $this->getMetadataPartitionMock(1), + $this->getMetadataPartitionMock(2) + ]; + + /** @var RdKafkaConsumerTopic|MockObject $rdKafkaConsumerTopicMock */ + $rdKafkaConsumerTopicMock = $this->createMock(RdKafkaConsumerTopic::class); + + /** @var RdKafkaMetadataTopic|MockObject $rdKafkaMetadataTopicMock */ + $rdKafkaMetadataTopicMock = $this->createMock(RdKafkaMetadataTopic::class); + $rdKafkaMetadataTopicMock + ->expects(self::once()) + ->method('getPartitions') + ->willReturn($partitions); + + /** @var RdKafkaMetadata|MockObject $rdKafkaMetadataMock */ + $rdKafkaMetadataMock = $this->createMock(RdKafkaMetadata::class); + $rdKafkaMetadataMock + ->expects(self::once()) + ->method('getTopics') + ->willReturnCallback( + function () use ($rdKafkaMetadataTopicMock) { + /** @var RdKafkaMetadataCollection|MockObject $collection */ + $collection = $this->createMock(RdKafkaMetadataCollection::class); + $collection + ->expects(self::once()) + ->method('current') + ->willReturn($rdKafkaMetadataTopicMock); + + return $collection; + } + ); + + $topics = [new TopicSubscription('testTopic', [], RD_KAFKA_OFFSET_END)]; + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $kafkaConfigurationMock->expects(self::exactly(2))->method('getTopicSubscriptions')->willReturnOnConsecutiveCalls([], $topics); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $rdKafkaConsumerMock->expects(self::once())->method('assign'); + $rdKafkaConsumerMock + ->expects(self::once()) + ->method('getMetadata') + ->with(false, $rdKafkaConsumerTopicMock, 10000) + ->willReturn($rdKafkaMetadataMock); + $rdKafkaConsumerMock + ->expects(self::once()) + ->method('newTopic') + ->with('testTopic') + ->willReturn($rdKafkaConsumerTopicMock); + + + $kafkaConsumer->subscribe($topics); + } + /** * @throws KafkaConsumerSubscriptionException @@ -320,8 +384,7 @@ public function testKafkaConsumeWithDecode(): void ->with(10000) ->willReturn($message); $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); - $kafkaConfigurationMock->expects(self::at(0))->method('getTopicSubscriptions')->willReturn($topics); - $kafkaConfigurationMock->expects(self::at(1))->method('getTopicSubscriptions')->willReturn([]); + $kafkaConfigurationMock->expects(self::exactly(2))->method('getTopicSubscriptions')->willReturnOnConsecutiveCalls($topics, []); $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); $decoderMock->expects(self::once())->method('decode')->with( $this->callback( @@ -365,8 +428,7 @@ public function testKafkaConsumeWithoutDecode(): void ->with(10000) ->willReturn($message); $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); - $kafkaConfigurationMock->expects(self::at(0))->method('getTopicSubscriptions')->willReturn($topics); - $kafkaConfigurationMock->expects(self::at(1))->method('getTopicSubscriptions')->willReturn([]); + $kafkaConfigurationMock->expects(self::exactly(2))->method('getTopicSubscriptions')->willReturnOnConsecutiveCalls($topics, []); $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); $decoderMock->expects(self::never())->method('decode'); $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); @@ -479,4 +541,23 @@ public function testClose(): void $kafkaConsumer->close(); } + + /** + * @param int $partitionId + * @return RdKafkaMetadataPartition|MockObject + */ + private function getMetadataPartitionMock(int $partitionId): RdKafkaMetadataPartition + { + $partitionMock = $this->getMockBuilder(RdKafkaMetadataPartition::class) + ->disableOriginalConstructor() + ->onlyMethods(['getId']) + ->getMock(); + + $partitionMock + ->expects(self::once()) + ->method('getId') + ->willReturn($partitionId); + + return $partitionMock; + } } diff --git a/tests/Unit/Message/Decoder/AvroDecoderTest.php b/tests/Unit/Message/Decoder/AvroDecoderTest.php index 13b9f0a..05cb10e 100644 --- a/tests/Unit/Message/Decoder/AvroDecoderTest.php +++ b/tests/Unit/Message/Decoder/AvroDecoderTest.php @@ -60,8 +60,13 @@ public function testDecodeWithSchema() $registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true); $recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock(); - $recordSerializer->expects(self::at(0))->method('decodeMessage')->with($message->getKey(), $schemaDefinition)->willReturn('decoded-key'); - $recordSerializer->expects(self::at(1))->method('decodeMessage')->with($message->getBody(), $schemaDefinition)->willReturn(['test']); + $recordSerializer->expects(self::exactly(2)) + ->method('decodeMessage') + ->withConsecutive( + [$message->getKey(), $schemaDefinition], + [$message->getBody(), $schemaDefinition], + ) + ->willReturnOnConsecutiveCalls('decoded-key', ['test']); $decoder = new AvroDecoder($registry, $recordSerializer); diff --git a/tests/Unit/Message/Encoder/AvroEncoderTest.php b/tests/Unit/Message/Encoder/AvroEncoderTest.php index 97a3456..866e43a 100644 --- a/tests/Unit/Message/Encoder/AvroEncoderTest.php +++ b/tests/Unit/Message/Encoder/AvroEncoderTest.php @@ -87,8 +87,13 @@ public function testEncodeSuccessWithSchema() $producerMessage->expects(self::once())->method('withKey')->with('encodedKey')->willReturn($producerMessage); $recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock(); - $recordSerializer->expects(self::at(0))->method('encodeRecord')->with($avroSchema->getName(), $avroSchema->getDefinition(), [])->willReturn('encodedValue'); - $recordSerializer->expects(self::at(1))->method('encodeRecord')->with($avroSchema->getName(), $avroSchema->getDefinition(), 'test-key')->willReturn('encodedKey'); + $recordSerializer + ->expects(self::exactly(2)) + ->method('encodeRecord') + ->withConsecutive( + [$avroSchema->getName(), $avroSchema->getDefinition(), []], + [$avroSchema->getName(), $avroSchema->getDefinition(), 'test-key'] + )->willReturnOnConsecutiveCalls('encodedValue', 'encodedKey'); $encoder = new AvroEncoder($registry, $recordSerializer);