Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ workflows:
dependencyCheckSumFile: "./composer.json"
requires:
- ci-php/install-dependencies
- ci-php/infection-testing:
dockerComposeFile: "./docker/docker-compose.yml"
dependencyCheckSumFile: "./composer.json"
requires:
- ci-php/install-dependencies

jobs:
coverage:
Expand Down
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
.PHONY: clean code-style coverage help test static-analysis update-dependencies xdebug-enable xdebug-disable
.PHONY: clean code-style coverage help test static-analysis update-dependencies xdebug-enable xdebug-disable infection-testing
.DEFAULT_GOAL := test

PHPUNIT = ./vendor/bin/phpunit -c ./phpunit.xml
PHPDBG = phpdbg -qrr ./vendor/bin/phpunit -c ./phpunit.xml
PHPSTAN = ./vendor/bin/phpstan
PHPCS = ./vendor/bin/phpcs --extensions=php
CONSOLE = ./bin/console
INFECTION = ./vendor/bin/infection

clean:
rm -rf ./build ./vendor
Expand Down Expand Up @@ -33,6 +34,11 @@ install-dependencies:
install-dependencies-lowest:
composer install --prefer-lowest

infection-testing:
make coverage
cp -f build/logs/phpunit/junit.xml build/logs/phpunit/coverage/junit.xml
${INFECTION} --coverage=build/logs/phpunit/coverage --min-msi=76 --threads=`nproc`

xdebug-enable:
sudo php-ext-enable xdebug

Expand All @@ -50,6 +56,7 @@ help:
# help You're looking at it!
# test (default) Run all the tests with phpunit
# static-analysis Run static analysis using phpstan
# infection-testing Run infection/mutation testing
# install-dependencies Run composer install
# update-dependencies Run composer update
# xdebug-enable Enable xdebug
Expand Down
5 changes: 3 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
"ext-json": "*"
},
"require-dev": {
"phpunit/phpunit": "^9.1",
"phpunit/phpunit": "^9.3",
"squizlabs/php_codesniffer": "^3.5.4",
"phpstan/phpstan": "0.12.32",
"php-mock/php-mock-phpunit": "^2.6",
"kwn/php-rdkafka-stubs": "^2.0.0",
"rregeer/phpunit-coverage-check": "^0.3.1",
"johnkary/phpunit-speedtrap": "^3.1",
"flix-tech/avro-serde-php": "^1.3"
"flix-tech/avro-serde-php": "^1.3",
"infection/infection": "^0.16"
},
"autoload": {
"psr-4": {
Expand Down
18 changes: 18 additions & 0 deletions infection.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"timeout": 10,
"source": {
"directories": [
"src"
]
},
"logs": {
"text": "build\/logs\/infection\/infection.log",
"summary": "build\/logs\/infection\/infection-summary.log"
},
"mutators": {
"@default": true
},
"phpUnit": {
"customPath": "vendor/bin/phpunit"
}
}
87 changes: 44 additions & 43 deletions phpunit.xml
Original file line number Diff line number Diff line change
@@ -1,45 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit
backupGlobals = "false"
backupStaticAttributes = "false"
colors = "true"
convertErrorsToExceptions = "true"
convertNoticesToExceptions = "true"
convertWarningsToExceptions = "true"
processIsolation = "false"
stopOnFailure = "false"
bootstrap = "tests/bootstrap.php" >
<php>
<ini name="max_execution_time" value="-1"/>
<ini name="html_errors" value="false"/>
<ini name="memory_limit" value="2G"/>

<ini name="xdebug.default_enable" value="1" />
<ini name="xdebug.enable_coverage" value="1" />
<ini name="xdebug.remote_autostart" value="0" />
<ini name="xdebug.remote_enable" value="0" />
<ini name="xdebug.overload_var_dump" value="0" />
<ini name="xdebug.show_mem_delta" value="0" />
</php>

<testsuites>
<testsuite name="Unit">
<directory>./tests/Unit</directory>
</testsuite>
</testsuites>
<logging>
<log type="coverage-text" target="php://stdout" showOnlySummary="true"/>
<log type="coverage-html" target="build/logs/phpunit/coverage"/>
<log type="coverage-xml" target="build/logs/phpunit/coverage/coverage-xml"/>
<log type="coverage-clover" target="clover.xml"/>
<log type="junit" target="build/logs/phpunit/junit.xml"/>
</logging>
<filter>
<whitelist>
<directory>src</directory>
</whitelist>
</filter>
<listeners>
<listener class="JohnKary\PHPUnit\Listener\SpeedTrapListener" />
</listeners>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
backupGlobals="false"
backupStaticAttributes="false"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
bootstrap="tests/bootstrap.php"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd">
<coverage>
<include>
<directory>src</directory>
</include>
<report>
<clover outputFile="clover.xml"/>
<html outputDirectory="build/logs/phpunit/coverage"/>
<text outputFile="php://stdout" showOnlySummary="true"/>
<xml outputDirectory="build/logs/phpunit/coverage/coverage-xml"/>
</report>
</coverage>
<php>
<ini name="max_execution_time" value="-1"/>
<ini name="html_errors" value="false"/>
<ini name="memory_limit" value="2G"/>
<ini name="xdebug.default_enable" value="1"/>
<ini name="xdebug.enable_coverage" value="1"/>
<ini name="xdebug.remote_autostart" value="0"/>
<ini name="xdebug.remote_enable" value="0"/>
<ini name="xdebug.overload_var_dump" value="0"/>
<ini name="xdebug.show_mem_delta" value="0"/>
</php>
<testsuites>
<testsuite name="Unit">
<directory>./tests/Unit</directory>
</testsuite>
</testsuites>
<logging>
<junit outputFile="build/logs/phpunit/junit.xml"/>
</logging>
<listeners>
<listener class="JohnKary\PHPUnit\Listener\SpeedTrapListener"/>
</listeners>
</phpunit>
18 changes: 18 additions & 0 deletions src/Consumer/AbstractKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,24 @@ public function getLastOffsetForTopicPartition(string $topic, int $partition, in
return $highOffset;
}

/**
* @param string $topic
* @return int[]
* @throws RdKafkaException
*/
protected function getAllTopicPartitions(string $topic): array
{

$partitions = [];
$topicMetadata = $this->getMetadataForTopic($topic);

foreach ($topicMetadata->getPartitions() as $partition) {
$partitions[] = $partition->getId();
}

return $partitions;
}

/**
* @param RdKafkaMessage $message
* @return KafkaConsumerMessageInterface
Expand Down
1 change: 1 addition & 0 deletions src/Consumer/KafkaConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public function withSubscription(
int $offset = self::OFFSET_STORED
): KafkaConsumerBuilderInterface {
$that = clone $this;

$that->topics = [new TopicSubscription($topicName, $partitions, $offset)];

return $that;
Expand Down
17 changes: 14 additions & 3 deletions src/Consumer/KafkaHighLevelConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,10 @@ private function getTopicSubscriptions(): array
$subscriptions = [];

foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
if ([] !== $topicSubscription->getPartitions()) {
if (
[] !== $topicSubscription->getPartitions()
|| KafkaConsumerBuilderInterface::OFFSET_STORED !== $topicSubscription->getOffset()
) {
continue;
}
$subscriptions[] = $topicSubscription->getTopicName();
Expand All @@ -263,13 +266,21 @@ private function getTopicAssignments(): array
$assignments = [];

foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
if ([] === $topicSubscription->getPartitions()) {
if (
[] === $topicSubscription->getPartitions()
&& KafkaConsumerBuilderInterface::OFFSET_STORED === $topicSubscription->getOffset()
) {
continue;
}

$offset = $topicSubscription->getOffset();
$partitions = $topicSubscription->getPartitions();

foreach ($topicSubscription->getPartitions() as $partitionId) {
if ([] === $partitions) {
$partitions = $this->getAllTopicPartitions($topicSubscription->getTopicName());
}

foreach ($partitions as $partitionId) {
$assignments[] = new RdKafkaTopicPartition(
$topicSubscription->getTopicName(),
$partitionId,
Expand Down
18 changes: 0 additions & 18 deletions src/Consumer/KafkaLowLevelConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,4 @@ protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage
{
return $this->queue->consume($timeoutMs);
}

/**
* @param string $topic
* @return int[]
* @throws RdKafkaException
*/
private function getAllTopicPartitions(string $topic): array
{

$partitions = [];
$topicMetadata = $this->getMetadataForTopic($topic);

foreach ($topicMetadata->getPartitions() as $partition) {
$partitions[] = $partition->getId();
}

return $partitions;
}
}
Loading