Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/Consumer/KafkaConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ public function withAdditionalSubscription(
array $partitions = [],
int $offset = self::OFFSET_STORED
): KafkaConsumerBuilderInterface {
$this->validateSubscription($partitions, $offset);

$that = clone $this;

$that->topics[] = new TopicSubscription($topicName, $partitions, $offset);
Expand All @@ -145,7 +147,10 @@ public function withSubscription(
array $partitions = [],
int $offset = self::OFFSET_STORED
): KafkaConsumerBuilderInterface {
$this->validateSubscription($partitions, $offset);

$that = clone $this;

$that->topics = [new TopicSubscription($topicName, $partitions, $offset)];

return $that;
Expand Down Expand Up @@ -359,4 +364,17 @@ private function registerCallbacks(KafkaConfiguration $conf): void
$conf->setOffsetCommitCb($this->rebalanceCallback);
}
}

/**
* @param array $partitions
* @param int $offset
*/
private function validateSubscription(array $partitions, int $offset): void
{
if (0 <= $offset && [] === $partitions) {
throw new KafkaConsumerBuilderException(
KafkaConsumerBuilderException::TOPIC_SUBSCRIPTION_OFFSET_WITHOUT_PARTITIONS
);
}
}
}
1 change: 1 addition & 0 deletions src/Exception/KafkaConsumerBuilderException.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ class KafkaConsumerBuilderException extends \Exception
public const NO_BROKER_EXCEPTION_MESSAGE = 'You need add at least one broker to connect to.';
public const NO_TOPICS_EXCEPTION_MESSAGE = 'No topics defined to subscribe to.';
public const UNSUPPORTED_CALLBACK_EXCEPTION_MESSAGE = 'The callback %s is not supported for %s';
public const TOPIC_SUBSCRIPTION_OFFSET_WITHOUT_PARTITIONS = 'If you define an offset for a topic subscription, partition(s) must be defined as well';
}
11 changes: 11 additions & 0 deletions tests/Unit/Consumer/KafkaConsumerBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ public function testSubscribeToTopic(): void
self::isInstanceOf(TopicSubscription::class, $reflectionProperty->getValue($clone));
}

/**
* @return void
*/
public function testSubscribeWithOffsetWithoutPartitions(): void
{
self::expectException(KafkaConsumerBuilderException::class);
self::expectExceptionMessage(KafkaConsumerBuilderException::TOPIC_SUBSCRIPTION_OFFSET_WITHOUT_PARTITIONS);

$this->kafkaConsumerBuilder->withAdditionalSubscription('test-topic', [], 0);
}

/**
* @return void
* @throws \ReflectionException
Expand Down