Skip to content

Commit e24d500

Browse files
committed
Merge branch 'master' into feat/JMP-2328/possibility-to-decode-later
2 parents 3570fbf + 0724419 commit e24d500

18 files changed

+235
-203
lines changed

src/Conf/KafkaConfiguration.php

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,17 @@ class KafkaConfiguration extends RdKafkaConf
2020
*/
2121
protected $topicSubscriptions;
2222

23-
/**
24-
* @var int
25-
*/
26-
protected $timeout;
27-
2823
/**
2924
* @param string[] $brokers
3025
* @param array|TopicSubscription[] $topicSubscriptions
31-
* @param integer $timeout
3226
* @param mixed[] $config
3327
*/
34-
public function __construct(array $brokers, array $topicSubscriptions, int $timeout, array $config = [])
28+
public function __construct(array $brokers, array $topicSubscriptions, array $config = [])
3529
{
3630
parent::__construct();
3731

3832
$this->brokers = $brokers;
3933
$this->topicSubscriptions = $topicSubscriptions;
40-
$this->timeout = $timeout;
4134

4235
$this->initializeConfig($config);
4336
}
@@ -58,14 +51,6 @@ public function getTopicSubscriptions(): array
5851
return $this->topicSubscriptions;
5952
}
6053

61-
/**
62-
* @return integer
63-
*/
64-
public function getTimeout(): int
65-
{
66-
return $this->timeout;
67-
}
68-
6954
/**
7055
* @return string[]
7156
*/

src/Consumer/AbstractKafkaConsumer.php

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,20 @@ public function getConfiguration(): array
8181
* Consumes a message and returns it
8282
* In cases of errors / timeouts an exception is thrown
8383
*
84+
* @param integer $timeoutMs
8485
* @param boolean $autoDecode
8586
* @return KafkaConsumerMessageInterface
8687
* @throws KafkaConsumerConsumeException
8788
* @throws KafkaConsumerEndOfPartitionException
8889
* @throws KafkaConsumerTimeoutException
8990
*/
90-
public function consume(bool $autoDecode = true): KafkaConsumerMessageInterface
91+
public function consume(int $timeoutMs = 10000, bool $autoDecode = true): KafkaConsumerMessageInterface
9192
{
9293
if (false === $this->isSubscribed()) {
9394
throw new KafkaConsumerConsumeException(KafkaConsumerConsumeException::NOT_SUBSCRIBED_EXCEPTION_MESSAGE);
9495
}
9596

96-
if (null === $rdKafkaMessage = $this->kafkaConsume($this->kafkaConfiguration->getTimeout())) {
97+
if (null === $rdKafkaMessage = $this->kafkaConsume($timeoutMs)) {
9798
throw new KafkaConsumerEndOfPartitionException(
9899
rd_kafka_err2str(RD_KAFKA_RESP_ERR__PARTITION_EOF),
99100
RD_KAFKA_RESP_ERR__PARTITION_EOF
@@ -134,17 +135,18 @@ public function decodeMessage(KafkaConsumerMessageInterface $message): KafkaCons
134135
* Queries the broker for metadata on a certain topic
135136
*
136137
* @param string $topicName
138+
* @param integer $timeoutMs
137139
* @return RdKafkaMetadataTopic
138140
* @throws RdKafkaException
139141
*/
140-
public function getMetadataForTopic(string $topicName): RdKafkaMetadataTopic
142+
public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000): RdKafkaMetadataTopic
141143
{
142144
$topic = $this->consumer->newTopic($topicName);
143145
return $this->consumer
144146
->getMetadata(
145147
false,
146148
$topic,
147-
$this->kafkaConfiguration->getTimeout()
149+
$timeoutMs
148150
)
149151
->getTopics()
150152
->current();
@@ -154,28 +156,28 @@ public function getMetadataForTopic(string $topicName): RdKafkaMetadataTopic
154156
* Get the earliest offset for a certain timestamp for topic partitions
155157
*
156158
* @param array|RdKafkaTopicPartition[] $topicPartitions
157-
* @param integer $timeout
159+
* @param integer $timeoutMs
158160
* @return array|RdKafkaTopicPartition[]
159161
*/
160-
public function offsetsForTimes(array $topicPartitions, int $timeout): array
162+
public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array
161163
{
162-
return $this->consumer->offsetsForTimes($topicPartitions, $timeout);
164+
return $this->consumer->offsetsForTimes($topicPartitions, $timeoutMs);
163165
}
164166

165167
/**
166168
* Queries the broker for the first offset of a given topic and partition
167169
*
168170
* @param string $topic
169171
* @param integer $partition
170-
* @param integer $timeout
172+
* @param integer $timeoutMs
171173
* @return integer
172174
*/
173-
public function getFirstOffsetForTopicPartition(string $topic, int $partition, int $timeout): int
175+
public function getFirstOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int
174176
{
175177
$lowOffset = 0;
176178
$highOffset = 0;
177179

178-
$this->consumer->queryWatermarkOffsets($topic, $partition, $lowOffset, $highOffset, $timeout);
180+
$this->consumer->queryWatermarkOffsets($topic, $partition, $lowOffset, $highOffset, $timeoutMs);
179181

180182
return $lowOffset;
181183
}
@@ -185,15 +187,15 @@ public function getFirstOffsetForTopicPartition(string $topic, int $partition, i
185187
*
186188
* @param string $topic
187189
* @param integer $partition
188-
* @param integer $timeout
190+
* @param integer $timeoutMs
189191
* @return integer
190192
*/
191-
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeout): int
193+
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int
192194
{
193195
$lowOffset = 0;
194196
$highOffset = 0;
195197

196-
$this->consumer->queryWatermarkOffsets($topic, $partition, $lowOffset, $highOffset, $timeout);
198+
$this->consumer->queryWatermarkOffsets($topic, $partition, $lowOffset, $highOffset, $timeoutMs);
197199

198200
return $highOffset;
199201
}
@@ -216,8 +218,8 @@ protected function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMes
216218
}
217219

218220
/**
219-
* @param integer $timeout
221+
* @param integer $timeoutMs
220222
* @return null|RdKafkaMessage
221223
*/
222-
abstract protected function kafkaConsume(int $timeout): ?RdKafkaMessage;
224+
abstract protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage;
223225
}

src/Consumer/KafkaConsumerBuilder.php

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,6 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface
4747
*/
4848
private $consumerType = self::CONSUMER_TYPE_HIGH_LEVEL;
4949

50-
/**
51-
* @var int
52-
*/
53-
private $timeout = 1000;
54-
5550
/**
5651
* @var callable
5752
*/
@@ -170,20 +165,6 @@ public function withAdditionalConfig(array $config): KafkaConsumerBuilderInterfa
170165
return $that;
171166
}
172167

173-
/**
174-
* Set the timeout for all consumer actions
175-
*
176-
* @param integer $timeout
177-
* @return KafkaConsumerBuilderInterface
178-
*/
179-
public function withTimeout(int $timeout): KafkaConsumerBuilderInterface
180-
{
181-
$that = clone $this;
182-
$that->timeout = $timeout;
183-
184-
return $that;
185-
}
186-
187168
/**
188169
* Set the consumer group
189170
*
@@ -321,7 +302,6 @@ public function build(): KafkaConsumerInterface
321302
$kafkaConfig = new KafkaConfiguration(
322303
$this->brokers,
323304
$this->topics,
324-
$this->timeout,
325305
$this->config
326306
);
327307

src/Consumer/KafkaConsumerBuilderInterface.php

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,6 @@ public function withSubscription(
5858
*/
5959
public function withAdditionalConfig(array $config): self;
6060

61-
/**
62-
* Set the timeout for all consumer actions
63-
*
64-
* @param integer $timeout
65-
* @return KafkaConsumerBuilderInterface
66-
*/
67-
public function withTimeout(int $timeout): self;
68-
6961
/**
7062
* Set the consumer group
7163
*

src/Consumer/KafkaConsumerInterface.php

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ public function isSubscribed(): bool;
3939
* Consumes a message and returns it
4040
* In cases of errors / timeouts a KafkaConsumerConsumeException is thrown
4141
*
42+
* @param integer $timeoutMs
4243
* @param boolean $autoDecode
4344
* @return KafkaConsumerMessageInterface
4445
*/
45-
public function consume(bool $autoDecode = true): KafkaConsumerMessageInterface;
46+
public function consume(int $timeoutMs = 10000, bool $autoDecode = true): KafkaConsumerMessageInterface;
4647

4748
/**
4849
* Decode consumer message
@@ -71,36 +72,37 @@ public function getConfiguration(): array;
7172
* Queries the broker for metadata on a certain topic
7273
*
7374
* @param string $topicName
75+
* @param integer $timeoutMs
7476
* @return RdKafkaMetadataTopic
7577
*/
76-
public function getMetadataForTopic(string $topicName): RdKafkaMetadataTopic;
78+
public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000): RdKafkaMetadataTopic;
7779

7880
/**
7981
* Get the earliest offset for a certain timestamp for topic partitions
8082
*
8183
* @param array|RdKafkaTopicPartition[] $topicPartitions
82-
* @param integer $timeout
84+
* @param integer $timeoutMs
8385
* @return array|RdKafkaTopicPartition[]
8486
*/
85-
public function offsetsForTimes(array $topicPartitions, int $timeout): array;
87+
public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array;
8688

8789
/**
8890
* Queries the broker for the first offset of a given topic and partition
8991
*
9092
* @param string $topic
9193
* @param integer $partition
92-
* @param integer $timeout
94+
* @param integer $timeoutMs
9395
* @return integer
9496
*/
95-
public function getFirstOffsetForTopicPartition(string $topic, int $partition, int $timeout): int;
97+
public function getFirstOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int;
9698

9799
/**
98100
* Queries the broker for the last offset of a given topic and partition
99101
*
100102
* @param string $topic
101103
* @param integer $partition
102-
* @param integer $timeout
104+
* @param integer $timeoutMs
103105
* @return integer
104106
*/
105-
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeout): int;
107+
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int;
106108
}

src/Consumer/KafkaHighLevelConsumer.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,14 @@ public function getAssignment(): array
143143
* Gets the commited offset for a TopicPartition for the configured consumer group
144144
*
145145
* @param array|RdKafkaTopicPartition[] $topicPartitions
146-
* @param integer $timeout
146+
* @param integer $timeoutMs
147147
* @return array|RdKafkaTopicPartition[]
148148
* @throws KafkaConsumerRequestException
149149
*/
150-
public function getCommittedOffsets(array $topicPartitions, int $timeout): array
150+
public function getCommittedOffsets(array $topicPartitions, int $timeoutMs): array
151151
{
152152
try {
153-
return $this->consumer->getCommittedOffsets($topicPartitions, $timeout);
153+
return $this->consumer->getCommittedOffsets($topicPartitions, $timeoutMs);
154154
} catch (RdKafkaException $e) {
155155
throw new KafkaConsumerRequestException($e->getMessage(), $e->getCode());
156156
}
@@ -178,13 +178,13 @@ public function close(): void
178178
}
179179

180180
/**
181-
* @param integer $timeout
181+
* @param integer $timeoutMs
182182
* @return RdKafkaMessage|null
183183
* @throws RdKafkaException
184184
*/
185-
protected function kafkaConsume(int $timeout): ?RdKafkaMessage
185+
protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage
186186
{
187-
return $this->consumer->consume($timeout);
187+
return $this->consumer->consume($timeoutMs);
188188
}
189189

190190
/**

src/Consumer/KafkaHighLevelConsumerInterface.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ public function getAssignment(): array;
3636
* Gets the commited offset for a TopicPartition for the configured consumer group
3737
*
3838
* @param array|RdKafkaTopicPartition[] $topicPartitions
39-
* @param integer $timeout
39+
* @param integer $timeoutMs
4040
* @return array|RdKafkaTopicPartition[]
4141
*/
42-
public function getCommittedOffsets(array $topicPartitions, int $timeout): array;
42+
public function getCommittedOffsets(array $topicPartitions, int $timeoutMs): array;
4343

4444
/**
4545
* Get current offset positions of the consumer

src/Consumer/KafkaLowLevelConsumer.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,12 @@ public function unsubscribe(): void
141141
}
142142

143143
/**
144-
* @param integer $timeout
144+
* @param integer $timeoutMs
145145
* @return null|RdKafkaMessage
146146
*/
147-
protected function kafkaConsume(int $timeout): ?RdKafkaMessage
147+
protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage
148148
{
149-
return $this->queue->consume($timeout);
149+
return $this->queue->consume($timeoutMs);
150150
}
151151

152152
/**

0 commit comments

Comments
 (0)