-
Notifications
You must be signed in to change notification settings - Fork 870
Producer
A commonly used configuration property on the producer is LingerMs
- the minimum time between batches of messages being sent to the cluster. Larger values allow for more batching, which increases throughput. Smaller values may reduce latency. The tradeoff is complicated somewhat as throughput increases though because a smaller LingerMs will result in more broker requests, putting greater load on the server, which in turn may reduce throughput.
A good general purpose setting is 5 (the default is 0.5).
Set EnableIdempotence
to true
. This has very little overhead - there's not much downside to having this on by default
Before the idempotent producer was available, you could achieve this by setting max.in.flight
to 1 (at the expense of reduced throughput).
Messages are sent in the same order as ProduceAsync are called. However, in case of failure (temporary network failure for example), Confluent.Kafka will try to resend the data, which can cause reordering (note that this is not happening frequently, only in case of failure)
If order is critical in your use case, you will have to produce synchronously, which will reduce the throughput a lot (wait for each message to be delivered before producing the next one) You can configure some configuration to improve this, set queue.buffering.max.ms
and socket.blocking.max.ms
to 1.
By default, librdkafka will not send messages immediatly - it will wait some time to batch messages together and makes its best to improve bandwith / throughput
If you need to send messages immediatly and don't want to batch, you can inspire yourself from the following conf:
var config = new Dictionary<string, object>()
{
["bootstrap.servers"] = bootstrapServers,
["retries"] = 0,
["client.id"] = clientId,
["batch.num.messages"] = 1,
["socket.blocking.max.ms"] = 1,
["socket.nagle.disable"] = true,
["queue.buffering.max.ms"] = 0,
["default.topic.config"] = new Dictionary<string, object>
{
["acks"] = 1
};
};
Kafka uses byte arrays for key and value. You can use serializing producer to make the serialization easier to manage.
There are two ways to use it:
- Create a
Producer<TKey, TValue>(config, keySerializer, valueSerializer)
, which will create a new client instance - Call
GetSerializingProducer<TKey, Value>(keySerializer, valueSerializer)
on an existing standardProducer
, which will use the existing producer as a proxy.
If you only need to create a Producer once in your process, use 1. If you need multiple Producer in your application, create a single Producer instance and use 2, see here for more information.
Only base serializer are provided for now (StringSerializer, IntSerializer...) Avro serializer will be provided in the future. You can create your own serializer easily by implementing Confluent.Kafka.Serialization.ISerializer
If you don't have keys, you can use NullSerializer
and provide null
as key.
- If librdkafka can't send the message or there was an error, ProduceAsync still work, but the Message received will have a non-null Error
- If you active set delivery.report.only.error or call
ProduceAsync
with disableDeliveryCallback, librdkafka will not call callbacks on successfull produced messages. You must not use theTask ProduceAsync
in this case, as the task will never complete (it will fail-fast, event ith). You should not use a deliveryhandler which await a callback for every message sent. - At current time, ProduceAsync may throw exception in some case (message too big for example). This may change in a future release (no throw but returns a message with Error instead)
- When a message fail to be reported, there are two ways to be informed of this:
- The ProduceAsync fail (fail-fast : the message wasn't even sent to the broker)
- The
Message
passed toIDeliveryHandler
has aMessage.Error
different fromError.NoError