Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggested Enhancements to Python Producer #1106

Open
chuck-confluent opened this issue Jul 15, 2022 · 0 comments
Open

Suggested Enhancements to Python Producer #1106

chuck-confluent opened this issue Jul 15, 2022 · 0 comments
Labels
enhancement New feature or request

Comments

@chuck-confluent
Copy link

chuck-confluent commented Jul 15, 2022

I wrote a python producer app that was consistently getting Local: Queue full errors (BufferError) and found this code sample from Magnus (author of librdkafka) helpful:

    for i in range(msg_count):
       while True:
            try:
                producer.produce(topic, value=msg_payload)
                producer.poll(0)
                break
            except BufferError as e:
                print(e, file=sys.stderr)
                producer.poll(1)

The while loop just means that normally a record will be produced and then there's a break to move out of the while loop and back into the for loop, but in case we hit BufferError, we need to retry the produce before we move back to the for loop.

I would also suggest including some performance tuning knobs in the example.

I have a code snippet that might help. My app reads from a CSV file to produce to two different topics. Here is a relevant snippet:

# configure schema registry client and avro serializer
schema_registry_client = SchemaRegistryClient(SCHEMA_REGISTRY_CONFIG)
my_avro_serializer = AvroSerializer(
    schema_registry_client=schema_registry_client,
    schema_str=MY_AVRO_SCHEMA
)
producer_config = KAFKA_CONFIG.copy()
producer_config['value.serializer'] = my_avro_serializer

# Configure other producer properties to tune performance. 
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
producer_config['queue.buffering.max.messages'] = 1000000
producer_config['linger.ms'] = 500
producer_config['batch.size'] = 2000000
producer_config['compression.type'] = 'snappy'

# Create producer and produce data
producer = SerializingProducer(producer_config)
records_produced = 0
with open('./my_data.csv') as data:
    reader = csv.DictReader(data,fieldnames=MY_FIELDNAMES)
    for row in reader:
        # preprocess csv row to use float values instead of scientific notation
        row_with_float_values = {field:float(value) for field, value in row.items()}
        while True:
            try:
                producer.produce(
                        topic=MY_TOPIC,
                        value=row_with_float_values,
                        on_delivery=log_kafka_errors)
                producer.poll(0)
                records_produced += 1
                break
            except BufferError as err:
                print(err, file=sys.stderr)
                print(f"produced {records_produced} records so far", file=sys.stdout)
                producer.flush()
print(f"produced a totoal of {records_produced} records", file=sys.stdout)
producer.flush()

There is some discussion about the extent to which calling poll(0) after each produce affects performance. Magnus says it's negligible but here's someone who doesn't agree.

It made sense to me to do producer.flush() when we hit a BufferError to clear everything out, but Magnus' example does poll(1) to wait for just 1 second to register callbacks for sent messages. I found that to cause me to hit the BufferError more often. To be honest, I don't really have a good sense of how poll works or what I should do with it from reading the docs, so maybe some more comments around how to use poll() and flush() would be good.

For reference, my full working example is here:

@chuck-confluent chuck-confluent added the enhancement New feature or request label Jul 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant