Skip to content

Commit

Permalink
Introduce Kafka::asyncPublish() (#312)
Browse files Browse the repository at this point in the history
* Introduce Kafka::asyncPublish() that will not flush on each send/batchSend but only once when the application is terminating (#310)

* upd: async production (that has only one flush at the end of the application) [see discussion #309]

* fix: added broker parameter to the facade helper [see discussion #309]

* ref: simplify Builder construct extension [see discussion #309]

---------

Co-authored-by: Alexander (SASh) Alexiev <alex@ampeco.global>

* wip

* Deprecate batch messages

* Docs

* wip

* wip

* wip

* wip

* Update docs for v2

* Remove cache folder

* Update gitignore

* Store the builder in memory

* Add tests

* Update tests

---------

Co-authored-by: SASh <alexiev@gmail.com>
Co-authored-by: Alexander (SASh) Alexiev <alex@ampeco.global>
  • Loading branch information
3 people authored Aug 18, 2024
1 parent 4911ebb commit 0864373
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 8 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ node_modules/
.idea
tests/reports
.php-cs-fixer.cache
composer.lock
composer.lock
.phpunit.cache/
1 change: 0 additions & 1 deletion .phpunit.cache/test-results

This file was deleted.

13 changes: 12 additions & 1 deletion docs/producing-messages/1-producing-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,15 @@ Kafka::publish('broker')->onTopic('topic-name')
```

This method returns a `ProducerBuilder` instance, which contains a few methods to configure your kafka producer.
The following lines describes these methods.
The following lines describes these methods.

If you are going to produce a lot of messages to different topics, please use the `asyncPublish` method on the `Junges\Kafka\Facades\Kafka` class:

```php
use Junges\Kafka\Facades\Kafka;

Kafka::asyncPublish('broker')->onTopic('topic-name')
```

The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send or batch send.
This reduces the overhead when you want to send a lot of messages in your request handlers.
2 changes: 2 additions & 0 deletions docs/producing-messages/6-producing-message-batch-to-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ title: Producing message batch to kafka
weight: 6
---

> This is deprecated and will be removed in a future version. Please use [async producers](1-producing-messages.md) instead of batch messaging.
You can publish multiple messages at the same time by using message batches.
To use a message batch, you must create a `Junges\Kafka\Producers\MessageBatch` instance.
Then create as many messages as you want and push them to the `MesageBatch` instance.
Expand Down
2 changes: 2 additions & 0 deletions src/Contracts/MessageProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Junges\Kafka\Contracts;

use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Producers\MessageBatch;
use Junges\Kafka\Producers\Producer;
use Junges\Kafka\Support\Testing\Fakes\ProducerFake;
Expand Down Expand Up @@ -65,6 +66,7 @@ public function build(): Producer|ProducerFake;
* Send a message batch to Kafka.
*
* @throws \Junges\Kafka\Exceptions\CouldNotPublishMessage
* @deprecated Please use {@see Kafka::asyncPublish()} instead of batch messages.
*/
public function sendBatch(MessageBatch $messageBatch): int;
}
1 change: 1 addition & 0 deletions src/Facades/Kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

/**
* @method static \Junges\Kafka\Contracts\MessageProducer publish(string $broker = null)
* @method static \Junges\Kafka\Contracts\MessageProducer asyncPublish(string $broker = null)
* @method static \Junges\Kafka\Consumers\Builder consumer(array $topics = [], string $groupId = null, string $brokers = null)
* @method static void assertPublished(ProducerMessage $expectedMessage = null, callable $callback = null)
* @method static void assertPublishedTimes(int $times = 1, ProducerMessage $expectedMessage = null, callable $callback = null)
Expand Down
31 changes: 31 additions & 0 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class Factory implements Manager
/** @var array<int, ConsumerMessage> This array is passed to the underlying consumer when faking macroed consumers. */
private array $fakeMessages = [];

private ?ProducerBuilder $builder = null;

/** Creates a new ProducerBuilder instance, setting brokers and topic. */
public function publish(string $broker = null): MessageProducer
{
Expand All @@ -31,6 +33,35 @@ public function publish(string $broker = null): MessageProducer
);
}

/**
* Creates a new ProducerBuilder instance, optionally setting the brokers.
* The producer will be flushed only when the application terminates,
* and doing SEND does not mean that the message was flushed!
*/
public function asyncPublish(string $broker = null): MessageProducer
{
if ($this->shouldFake) {
return Kafka::fake()->publish($broker);
}

if ($this->builder instanceof ProducerBuilder) {
return $this->builder;
}

$this->builder = new ProducerBuilder(
broker: $broker ?? config('kafka.brokers'),
asyncProducer: true
);

return $this->builder;
}

/** This is an alias for the asyncPublish method. */
public function publishAsync(string $broker = null): MessageProducer
{
return $this->asyncPublish($broker);
}

/** Return a ConsumerBuilder instance. */
public function consumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilder
{
Expand Down
17 changes: 15 additions & 2 deletions src/Producers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Builder implements MessageProducer
private array $options = [];
private ProducerMessage $message;
private MessageSerializer $serializer;
private Producer $producer;
private ?Producer $producer = null;
private string $topic = '';
private ?Sasl $saslConfig = null;
private readonly string $broker;
Expand All @@ -27,6 +27,7 @@ class Builder implements MessageProducer

public function __construct(
?string $broker = null,
private readonly bool $asyncProducer = false,
) {
/** @var ProducerMessage $message */
$message = app(ProducerMessage::class);
Expand Down Expand Up @@ -177,6 +178,7 @@ public function send(): bool
* Send a message batch to Kafka.
*
* @throws \Junges\Kafka\Exceptions\CouldNotPublishMessage
* @deprecated Please use {@see Kafka::asyncPublish()} instead of batch messages.
*/
public function sendBatch(MessageBatch $messageBatch): int
{
Expand All @@ -191,6 +193,10 @@ public function sendBatch(MessageBatch $messageBatch): int

public function build(): Producer
{
if ($this->asyncProducer && $this->producer) {
return $this->producer;
}

$conf = new Config(
broker: $this->broker,
topics: [],
Expand All @@ -200,9 +206,16 @@ public function build(): Producer
callbacks: $this->callbacks,
);

return app(Producer::class, [
$producer = app(Producer::class, [
'config' => $conf,
'serializer' => $this->serializer,
'async' => $this->asyncProducer,
]);

if ($this->asyncProducer) {
$this->producer = $producer;
}

return $producer;
}
}
2 changes: 2 additions & 0 deletions src/Producers/MessageBatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
namespace Junges\Kafka\Producers;

use Illuminate\Support\Str;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
use SplDoublyLinkedList;

/**
* Class stores multiple messages to produce them to kafka topic as a batch
*
* @see MessageProducer::sendBatch
* @deprecated Please use {@see Kafka::asyncPublish()} instead of batch messaging.
*/
class MessageBatch
{
Expand Down
20 changes: 18 additions & 2 deletions src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,18 @@ class Producer implements ProducerContract
public function __construct(
private readonly Config $config,
private readonly MessageSerializer $serializer,
private readonly bool $async = false,
) {
$this->producer = app(KafkaProducer::class, [
'conf' => $this->getConf($this->config->getProducerOptions()),
]);
$this->dispatcher = App::make(Dispatcher::class);

if ($this->async) {
app()->terminating(function () {
$this->flush();
});
}
}

/** Set the Kafka Configuration. */
Expand Down Expand Up @@ -72,10 +79,17 @@ public function produce(ProducerMessage $message): bool

$this->producer->poll(0);

if ($this->async) {
return true;
}

return $this->flush();
}

/** @inheritDoc */
/**
* @inheritDoc
* @deprecated This will be removed in the future. Please use asyncPublish instead of batch messages.
*/
public function produceBatch(MessageBatch $messageBatch): int
{
$this->assertTopicWasSetForAllBatchMessages($messageBatch);
Expand Down Expand Up @@ -106,7 +120,9 @@ public function produceBatch(MessageBatch $messageBatch): int
$produced++;
}

$this->flush();
if (! $this->async) {
$this->flush();
}

$this->dispatcher->dispatch(new MessageBatchPublished($messageBatch, $produced));

Expand Down
5 changes: 4 additions & 1 deletion src/Support/Testing/Fakes/ProducerBuilderFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ public function send(): bool
return $producer->produce($this->getMessage());
}

/** Send a message batch to Kafka. */
/**
* Send a message batch to Kafka.
* @deprecated Please use {@see Kafka::asyncPublish()} instead of batch messages.
*/
public function sendBatch(MessageBatch $messageBatch): int
{
$producer = $this->build();
Expand Down
Loading

0 comments on commit 0864373

Please sign in to comment.