Skip to content

Commit 0724419

Browse files
authored
JMP-2204: align timeout handling (#15)
* adjust timeout handling * fix tests * adapt interface * add poll, change poll logic (#16) * add poll, change poll logic * fix cs
1 parent e1e235a commit 0724419

18 files changed

+235
-203
lines changed

src/Conf/KafkaConfiguration.php

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,17 @@ class KafkaConfiguration extends RdKafkaConf
2020
*/
2121
protected $topicSubscriptions;
2222

23-
/**
24-
* @var int
25-
*/
26-
protected $timeout;
27-
2823
/**
2924
* @param string[] $brokers
3025
* @param array|TopicSubscription[] $topicSubscriptions
31-
* @param integer $timeout
3226
* @param mixed[] $config
3327
*/
34-
public function __construct(array $brokers, array $topicSubscriptions, int $timeout, array $config = [])
28+
public function __construct(array $brokers, array $topicSubscriptions, array $config = [])
3529
{
3630
parent::__construct();
3731

3832
$this->brokers = $brokers;
3933
$this->topicSubscriptions = $topicSubscriptions;
40-
$this->timeout = $timeout;
4134

4235
$this->initializeConfig($config);
4336
}
@@ -58,14 +51,6 @@ public function getTopicSubscriptions(): array
5851
return $this->topicSubscriptions;
5952
}
6053

61-
/**
62-
* @return integer
63-
*/
64-
public function getTimeout(): int
65-
{
66-
return $this->timeout;
67-
}
68-
6954
/**
7055
* @return string[]
7156
*/

src/Consumer/AbstractKafkaConsumer.php

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,18 +81,19 @@ public function getConfiguration(): array
8181
* Consumes a message and returns it
8282
* In cases of errors / timeouts an exception is thrown
8383
*
84+
* @param integer $timeoutMs
8485
* @return KafkaConsumerMessageInterface
8586
* @throws KafkaConsumerConsumeException
8687
* @throws KafkaConsumerEndOfPartitionException
8788
* @throws KafkaConsumerTimeoutException
8889
*/
89-
public function consume(): KafkaConsumerMessageInterface
90+
public function consume(int $timeoutMs = 10000): KafkaConsumerMessageInterface
9091
{
9192
if (false === $this->isSubscribed()) {
9293
throw new KafkaConsumerConsumeException(KafkaConsumerConsumeException::NOT_SUBSCRIBED_EXCEPTION_MESSAGE);
9394
}
9495

95-
if (null === $rdKafkaMessage = $this->kafkaConsume($this->kafkaConfiguration->getTimeout())) {
96+
if (null === $rdKafkaMessage = $this->kafkaConsume($timeoutMs)) {
9697
throw new KafkaConsumerEndOfPartitionException(
9798
rd_kafka_err2str(RD_KAFKA_RESP_ERR__PARTITION_EOF),
9899
RD_KAFKA_RESP_ERR__PARTITION_EOF
@@ -118,17 +119,18 @@ public function consume(): KafkaConsumerMessageInterface
118119
* Queries the broker for metadata on a certain topic
119120
*
120121
* @param string $topicName
122+
* @param integer $timeoutMs
121123
* @return RdKafkaMetadataTopic
122124
* @throws RdKafkaException
123125
*/
124-
public function getMetadataForTopic(string $topicName): RdKafkaMetadataTopic
126+
public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000): RdKafkaMetadataTopic
125127
{
126128
$topic = $this->consumer->newTopic($topicName);
127129
return $this->consumer
128130
->getMetadata(
129131
false,
130132
$topic,
131-
$this->kafkaConfiguration->getTimeout()
133+
$timeoutMs
132134
)
133135
->getTopics()
134136
->current();
@@ -138,28 +140,28 @@ public function getMetadataForTopic(string $topicName): RdKafkaMetadataTopic
138140
* Get the earliest offset for a certain timestamp for topic partitions
139141
*
140142
* @param array|RdKafkaTopicPartition[] $topicPartitions
141-
* @param integer $timeout
143+
* @param integer $timeoutMs
142144
* @return array|RdKafkaTopicPartition[]
143145
*/
144-
public function offsetsForTimes(array $topicPartitions, int $timeout): array
146+
public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array
145147
{
146-
return $this->consumer->offsetsForTimes($topicPartitions, $timeout);
148+
return $this->consumer->offsetsForTimes($topicPartitions, $timeoutMs);
147149
}
148150

149151
/**
150152
* Queries the broker for the first offset of a given topic and partition
151153
*
152154
* @param string $topic
153155
* @param integer $partition
154-
* @param integer $timeout
156+
* @param integer $timeoutMs
155157
* @return integer
156158
*/
157-
public function getFirstOffsetForTopicPartition(string $topic, int $partition, int $timeout): int
159+
public function getFirstOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int
158160
{
159161
$lowOffset = 0;
160162
$highOffset = 0;
161163

162-
$this->consumer->queryWatermarkOffsets($topic, $partition, $lowOffset, $highOffset, $timeout);
164+
$this->consumer->queryWatermarkOffsets($topic, $partition, $lowOffset, $highOffset, $timeoutMs);
163165

164166
return $lowOffset;
165167
}
@@ -169,15 +171,15 @@ public function getFirstOffsetForTopicPartition(string $topic, int $partition, i
169171
*
170172
* @param string $topic
171173
* @param integer $partition
172-
* @param integer $timeout
174+
* @param integer $timeoutMs
173175
* @return integer
174176
*/
175-
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeout): int
177+
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int
176178
{
177179
$lowOffset = 0;
178180
$highOffset = 0;
179181

180-
$this->consumer->queryWatermarkOffsets($topic, $partition, $lowOffset, $highOffset, $timeout);
182+
$this->consumer->queryWatermarkOffsets($topic, $partition, $lowOffset, $highOffset, $timeoutMs);
181183

182184
return $highOffset;
183185
}
@@ -200,8 +202,8 @@ protected function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMes
200202
}
201203

202204
/**
203-
* @param integer $timeout
205+
* @param integer $timeoutMs
204206
* @return null|RdKafkaMessage
205207
*/
206-
abstract protected function kafkaConsume(int $timeout): ?RdKafkaMessage;
208+
abstract protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage;
207209
}

src/Consumer/KafkaConsumerBuilder.php

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,6 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface
4747
*/
4848
private $consumerType = self::CONSUMER_TYPE_HIGH_LEVEL;
4949

50-
/**
51-
* @var int
52-
*/
53-
private $timeout = 1000;
54-
5550
/**
5651
* @var callable
5752
*/
@@ -170,20 +165,6 @@ public function withAdditionalConfig(array $config): KafkaConsumerBuilderInterfa
170165
return $that;
171166
}
172167

173-
/**
174-
* Set the timeout for all consumer actions
175-
*
176-
* @param integer $timeout
177-
* @return KafkaConsumerBuilderInterface
178-
*/
179-
public function withTimeout(int $timeout): KafkaConsumerBuilderInterface
180-
{
181-
$that = clone $this;
182-
$that->timeout = $timeout;
183-
184-
return $that;
185-
}
186-
187168
/**
188169
* Set the consumer group
189170
*
@@ -321,7 +302,6 @@ public function build(): KafkaConsumerInterface
321302
$kafkaConfig = new KafkaConfiguration(
322303
$this->brokers,
323304
$this->topics,
324-
$this->timeout,
325305
$this->config
326306
);
327307

src/Consumer/KafkaConsumerBuilderInterface.php

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,6 @@ public function withSubscription(
5858
*/
5959
public function withAdditionalConfig(array $config): self;
6060

61-
/**
62-
* Set the timeout for all consumer actions
63-
*
64-
* @param integer $timeout
65-
* @return KafkaConsumerBuilderInterface
66-
*/
67-
public function withTimeout(int $timeout): self;
68-
6961
/**
7062
* Set the consumer group
7163
*

src/Consumer/KafkaConsumerInterface.php

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ public function isSubscribed(): bool;
3939
* Consumes a message and returns it
4040
* In cases of errors / timeouts a KafkaConsumerConsumeException is thrown
4141
*
42+
* @param integer $timeoutMs
4243
* @return KafkaConsumerMessageInterface
4344
*/
44-
public function consume(): KafkaConsumerMessageInterface;
45+
public function consume(int $timeoutMs = 10000): KafkaConsumerMessageInterface;
4546

4647
/**
4748
* Commits the offset to the broker for the given message(s)
@@ -62,36 +63,37 @@ public function getConfiguration(): array;
6263
* Queries the broker for metadata on a certain topic
6364
*
6465
* @param string $topicName
66+
* @param integer $timeoutMs
6567
* @return RdKafkaMetadataTopic
6668
*/
67-
public function getMetadataForTopic(string $topicName): RdKafkaMetadataTopic;
69+
public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000): RdKafkaMetadataTopic;
6870

6971
/**
7072
* Get the earliest offset for a certain timestamp for topic partitions
7173
*
7274
* @param array|RdKafkaTopicPartition[] $topicPartitions
73-
* @param integer $timeout
75+
* @param integer $timeoutMs
7476
* @return array|RdKafkaTopicPartition[]
7577
*/
76-
public function offsetsForTimes(array $topicPartitions, int $timeout): array;
78+
public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array;
7779

7880
/**
7981
* Queries the broker for the first offset of a given topic and partition
8082
*
8183
* @param string $topic
8284
* @param integer $partition
83-
* @param integer $timeout
85+
* @param integer $timeoutMs
8486
* @return integer
8587
*/
86-
public function getFirstOffsetForTopicPartition(string $topic, int $partition, int $timeout): int;
88+
public function getFirstOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int;
8789

8890
/**
8991
* Queries the broker for the last offset of a given topic and partition
9092
*
9193
* @param string $topic
9294
* @param integer $partition
93-
* @param integer $timeout
95+
* @param integer $timeoutMs
9496
* @return integer
9597
*/
96-
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeout): int;
98+
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int;
9799
}

src/Consumer/KafkaHighLevelConsumer.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,14 @@ public function getAssignment(): array
143143
* Gets the commited offset for a TopicPartition for the configured consumer group
144144
*
145145
* @param array|RdKafkaTopicPartition[] $topicPartitions
146-
* @param integer $timeout
146+
* @param integer $timeoutMs
147147
* @return array|RdKafkaTopicPartition[]
148148
* @throws KafkaConsumerRequestException
149149
*/
150-
public function getCommittedOffsets(array $topicPartitions, int $timeout): array
150+
public function getCommittedOffsets(array $topicPartitions, int $timeoutMs): array
151151
{
152152
try {
153-
return $this->consumer->getCommittedOffsets($topicPartitions, $timeout);
153+
return $this->consumer->getCommittedOffsets($topicPartitions, $timeoutMs);
154154
} catch (RdKafkaException $e) {
155155
throw new KafkaConsumerRequestException($e->getMessage(), $e->getCode());
156156
}
@@ -178,13 +178,13 @@ public function close(): void
178178
}
179179

180180
/**
181-
* @param integer $timeout
181+
* @param integer $timeoutMs
182182
* @return RdKafkaMessage|null
183183
* @throws RdKafkaException
184184
*/
185-
protected function kafkaConsume(int $timeout): ?RdKafkaMessage
185+
protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage
186186
{
187-
return $this->consumer->consume($timeout);
187+
return $this->consumer->consume($timeoutMs);
188188
}
189189

190190
/**

src/Consumer/KafkaHighLevelConsumerInterface.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ public function getAssignment(): array;
3636
* Gets the commited offset for a TopicPartition for the configured consumer group
3737
*
3838
* @param array|RdKafkaTopicPartition[] $topicPartitions
39-
* @param integer $timeout
39+
* @param integer $timeoutMs
4040
* @return array|RdKafkaTopicPartition[]
4141
*/
42-
public function getCommittedOffsets(array $topicPartitions, int $timeout): array;
42+
public function getCommittedOffsets(array $topicPartitions, int $timeoutMs): array;
4343

4444
/**
4545
* Get current offset positions of the consumer

src/Consumer/KafkaLowLevelConsumer.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,12 @@ public function unsubscribe(): void
141141
}
142142

143143
/**
144-
* @param integer $timeout
144+
* @param integer $timeoutMs
145145
* @return null|RdKafkaMessage
146146
*/
147-
protected function kafkaConsume(int $timeout): ?RdKafkaMessage
147+
protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage
148148
{
149-
return $this->queue->consume($timeout);
149+
return $this->queue->consume($timeoutMs);
150150
}
151151

152152
/**

0 commit comments

Comments
 (0)