-
Notifications
You must be signed in to change notification settings - Fork 870
Producer
Configure max.in.flight
to 1.
When producing message, you will send data to librdkafka. It will be enqueued on an internal queue, and send to kafka. You will then get a delivery report in form of a Message when Polling (polling is done automatically in a dedicated LongRunning Task by default)
There are two way to send data:
-
void ProduceAsync(..., IDeliveryHandler handler)
Your data is send to librdkafka, and the delivery report will be transmitted via handler.HandleDeliveryReport. You can reuse the same delivery handler for multiple message. You are ensured that HandleDeliveryReport are called in the same order as messages are received by the broker - if order is important for you, this is the method to use. -
Task<Message> ProduceAsync(...)
This will create a TaskCompletionSource linked to the message sent. The task will be completed when librdkafka return the callbacks. You can await this task.
Messages are sent in the same order as ProduceAsync are called. However, in case of failure (temporary network failure for example), Confluent.Kafka will try to resend the data, which can cause reordering (note that this is not happening frequently, only in case of failure)
If order is critical in your use case, you will have to produce synchronously, which will reduce the throughput a lot (wait for each message to be delivered before producing the next one) You can configure some configuration to improve this, set queue.buffering.max.ms
and socket.blocking.max.ms
to 1.
By default, librdkafka will not send messages immediatly - it will wait some time to batch messages together and makes its best to improve bandwith / throughput
If you need to send messages immediatly and don't want to batch, you can inspire yourself from the following conf:
var config = new Dictionary<string, object>()
{
["bootstrap.servers"] = bootstrapServers,
["retries"] = 0,
["client.id"] = clientId,
["batch.num.messages"] = 1,
["socket.blocking.max.ms"] = 1,
["socket.nagle.disable"] = true,
["queue.buffering.max.ms"] = 0,
["default.topic.config"] = new Dictionary<string, object>
{
["acks"] = 1
};
};
Kafka uses byte arrays for key and value. You can use serializing producer to make the serialization easier to manage.
There are two ways to use it:
- Create a
Producer<TKey, TValue>(config, keySerializer, valueSerializer)
, which will create a new client instance - Call
GetSerializingProducer<TKey, Value>(keySerializer, valueSerializer)
on an existing standardProducer
, which will use the existing producer as a proxy.
If you only need to create a Producer once in your process, use 1. If you need multiple Producer in your application, create a single Producer instance and use 2, see here for more information.
Only base serializer are provided for now (StringSerializer, IntSerializer...) Avro serializer will be provided in the future. You can create your own serializer easily by implementing Confluent.Kafka.Serialization.ISerializer
If you don't have keys, you can use NullSerializer
and provide null
as key.
- If librdkafka can't send the message or there was an error, ProduceAsync still work, but the Message received will have a non-null Error
- If you active set delivery.report.only.error or call
ProduceAsync
with disableDeliveryCallback, librdkafka will not call callbacks on successfull produced messages. You must not use theTask ProduceAsync
in this case, as the task will never complete (it will fail-fast, event ith). You should not use a deliveryhandler which await a callback for every message sent. - At current time, ProduceAsync may throw exception in some case (message too big for example). This may change in a future release (no throw but returns a message with Error instead)
- When a message fail to be reported, there are two ways to be informed of this:
- The ProduceAsync fail (fail-fast : the message wasn't even sent to the broker)
- The
Message
passed toIDeliveryHandler
has aMessage.Error
different fromError.NoError