Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions src/Producer/KafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ public function __construct(
* If a schema name was given, the message body will be avro serialized.
*
* @param KafkaProducerMessageInterface $message
* @param boolean $autoPoll
* @param integer $pollTimeoutMs
* @return void
*/
public function produce(KafkaProducerMessageInterface $message, int $pollTimeoutMs = 0): void
public function produce(KafkaProducerMessageInterface $message, bool $autoPoll = true, int $pollTimeoutMs = 0): void
{
$message = $this->encoder->encode($message);

Expand All @@ -73,11 +74,49 @@ public function produce(KafkaProducerMessageInterface $message, int $pollTimeout
$message->getHeaders()
);

while ($this->producer->getOutQLen() > 0) {
if (true === $autoPoll) {
$this->producer->poll($pollTimeoutMs);
}
}

/**
* Produces a message to the topic and partition defined in the message
* If a schema name was given, the message body will be avro serialized.
* Wait for an event to arrive before continuing (blocking)
*
* @param KafkaProducerMessageInterface $message
* @return void
*/
public function syncProduce(KafkaProducerMessageInterface $message): void
{
$this->produce($message, true, -1);
}

/**
* Poll for producer event, pass 0 for non-blocking, pass -1 to block until an event arrives
*
* @param integer $timeoutMs
* @return void
*/
public function poll(int $timeoutMs = 0): void
{
$this->producer->poll($timeoutMs);
}

/**
* Poll for producer events until the number of $queueSize events remain
*
* @param integer $timeoutMs
* @param integer $queueSize
* @return void
*/
public function pollUntilQueueSizeReached(int $timeoutMs = 0, int $queueSize = 0): void
{
while ($this->producer->getOutQLen() > $queueSize) {
$this->producer->poll($timeoutMs);
}
}

/**
* Purge producer messages that are in flight
*
Expand Down
34 changes: 33 additions & 1 deletion src/Producer/KafkaProducerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,42 @@ interface KafkaProducerInterface

/**
* @param KafkaProducerMessageInterface $message
* @param boolean $autoPoll
* @param integer $pollTimeoutMs
* @return void
*/
public function produce(KafkaProducerMessageInterface $message, int $pollTimeoutMs = 0): void;
public function produce(
KafkaProducerMessageInterface $message,
bool $autoPoll = true,
int $pollTimeoutMs = 0
): void;

/**
* Produces a message to the topic and partition defined in the message
* If a schema name was given, the message body will be avro serialized.
* Wait for the message to event to arrive before continuing (blocking)
*
* @param KafkaProducerMessageInterface $message
* @return void
*/
public function syncProduce(KafkaProducerMessageInterface $message): void;

/**
* Poll for producer event, pass 0 for non-blocking, pass -1 to block until an event arrives
*
* @param integer $timeoutMs
* @return void
*/
public function poll(int $timeoutMs = 0): void;

/**
* Poll for producer events until the number of $queueSize events remain
*
* @param integer $timeoutMs
* @param integer $queueSize
* @return void
*/
public function pollUntilQueueSizeReached(int $timeoutMs = 0, int $queueSize = 0): void;

/**
* Purge producer messages that are in flight
Expand Down
91 changes: 89 additions & 2 deletions tests/Unit/Producer/KafkaProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,92 @@ public function testProduceError(): void
}

public function testProduceSuccess()
{
$message = KafkaProducerMessage::create('test-topic', 1)
->withKey('asdf-asdf-asfd-asdf')
->withBody('some test content')
->withHeaders([ 'key' => 'value' ]);

/** @var RdKafkaProducerTopic|MockObject $rdKafkaProducerTopicMock */
$rdKafkaProducerTopicMock = $this->createMock(RdKafkaProducerTopic::class);
$rdKafkaProducerTopicMock
->expects(self::once())
->method('producev')
->with(
$message->getPartition(),
RD_KAFKA_MSG_F_BLOCK,
$message->getBody(),
$message->getKey(),
$message->getHeaders()
);

$this->encoderMock
->expects(self::once())
->method('encode')
->with($message)
->willReturn($message);
$this->rdKafkaProducerMock
->expects(self::once())
->method('newTopic')
->with('test-topic')
->willReturn($rdKafkaProducerTopicMock);
$this->rdKafkaProducerMock
->expects(self::once())
->method('poll')
->with(0);

$this->kafkaProducer->produce($message);
}

public function testSyncProduceSuccess()
{
$message = KafkaProducerMessage::create('test-topic', 1)
->withKey('asdf-asdf-asfd-asdf')
->withBody('some test content')
->withHeaders([ 'key' => 'value' ]);

/** @var RdKafkaProducerTopic|MockObject $rdKafkaProducerTopicMock */
$rdKafkaProducerTopicMock = $this->createMock(RdKafkaProducerTopic::class);
$rdKafkaProducerTopicMock
->expects(self::once())
->method('producev')
->with(
$message->getPartition(),
RD_KAFKA_MSG_F_BLOCK,
$message->getBody(),
$message->getKey(),
$message->getHeaders()
);

$this->encoderMock
->expects(self::once())
->method('encode')
->with($message)
->willReturn($message);
$this->rdKafkaProducerMock
->expects(self::once())
->method('newTopic')
->with('test-topic')
->willReturn($rdKafkaProducerTopicMock);
$this->rdKafkaProducerMock
->expects(self::once())
->method('poll')
->with(-1);

$this->kafkaProducer->syncProduce($message);
}

public function testPoll()
{
$this->rdKafkaProducerMock
->expects(self::once())
->method('poll')
->with(1000);

$this->kafkaProducer->poll(1000);
}

public function testPollUntilQueueSizeReached()
{
$message = KafkaProducerMessage::create('test-topic', 1)
->withKey('asdf-asdf-asfd-asdf')
Expand Down Expand Up @@ -134,9 +220,10 @@ function () {
$this->rdKafkaProducerMock
->expects(self::exactly(2))
->method('poll')
->with(1000);
->with(0);

$this->kafkaProducer->produce($message, 1000);
$this->kafkaProducer->produce($message, false);
$this->kafkaProducer->pollUntilQueueSizeReached();
}

/**
Expand Down