Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting this package work with asyncio #185

Closed
amitripshtos opened this issue May 2, 2017 · 23 comments
Closed

Getting this package work with asyncio #185

amitripshtos opened this issue May 2, 2017 · 23 comments

Comments

@amitripshtos
Copy link

Hey guys,
great job on this package!

I wanted to know if there is a way to use asyncio with this package, since I saw some code that using "async await" thingy

If does, there is any example?
thanks,
Amit

@edenhill
Copy link
Contributor

edenhill commented May 5, 2017

Hi,
we're currently looking into making it compatible with asyncio.

Stay tuned!

related issue:
#100

@edenhill edenhill added this to the next feature milestone May 5, 2017
@ask
Copy link

ask commented Aug 22, 2017

Are there any updates on this? Is anyone working on it?

@ask ask unassigned hqin Aug 22, 2017
@vineetgoel
Copy link

+1

@edenhill
Copy link
Contributor

Not yet, community contributions are very much welcome.
Some initial research on what needs to be done would be great!

@edenhill edenhill removed this from the next feature milestone Sep 1, 2017
@nurikk
Copy link

nurikk commented Oct 19, 2017

+1

@jeffwidman
Copy link
Contributor

jeffwidman commented Oct 20, 2017

In the meantime, https://github.com/aio-libs/aiokafka is a python / kafka library that supports asyncio.

@marirs
Copy link

marirs commented Jan 11, 2018

Hi, just checking, is it yet compatiable with asyncio

@secretmike
Copy link

Hi, I started looking at this a bit.

asyncio has the ability to watch file descriptors for read availability (https://docs.python.org/3/library/asyncio-eventloop.html#watch-file-descriptors)

librdkafka can write to a file descriptor when a queue is readable with rd_kafka_queue_io_event_enable() (https://github.com/edenhill/librdkafka/blob/7478b5ef16aadd6543fe38bc6a2deb895c70da98/src/rdkafka.h#L2362)

Would it be possible to expose rd_kafka_queue_io_event_enable() in the python API? I don't believe the queues are exposed currently, so confluent-kafka-python would have to attach the appropriate internal queue itself.

Ideally you should be able to pass a file descriptor and a payload. When the file descriptor becomes readable, call poll() or consume().

There are still a handful of synchronous operations (like commit(), committed(), etc.) but consuming and producing messages is still the most heavily used operation.

asyncio support could probably be integrated more directly in this library, but the above might be a good starting point to allow for an asycio wrapper.

@tongda
Copy link

tongda commented Sep 13, 2018

Since go-lang wrapper can benefit channel mechanism, which makes the go-lang wrapper more idiom, async/await support may get inspired from it.

@vineetgoel
Copy link

Any updates here?

@CrazyWisdom
Copy link

+1

1 similar comment
@StarLightPL
Copy link

+1

@andreportela
Copy link

+1

@madisvain
Copy link

Leaving this here for reference until this feature gets implemented. It's possible to process the messages with Asyncio it seems.

https://stackoverflow.com/a/55498503

I have not tried this out myself yet but might be of help.

@h12w
Copy link

h12w commented Sep 4, 2019

I guess some people might just want it working with asyncio, not working with maximum efficiency. So here is an example that works now:

import asyncio

async def run_consumer(consumer):
    while True:
        msg = consumer.poll(timeout=0)
        if msg is None:
            await asyncio.sleep(delay=0)
            continue
        # handle msg
        consumer.commit(msg, asynchronous=True)

await run_consumer(consumer)

@matrixise
Copy link

The main issue is the support of the Schema Registry, this one does not exist with aiokafka.

@gjcarneiro
Copy link

asyncio has the ability to watch file descriptors for read availability (https://docs.python.org/3/library/asyncio-eventloop.html#watch-file-descriptors)

Yes, but keep in mind that this API is not very portable, e.g. it does not work with Windows ProactorEventLoop.

@mhowlett
Copy link
Contributor

mhowlett commented Nov 9, 2019

keep an eye on the confluent blog over the next couple of weeks for an in depth blog post on this client and asyncio. it walks through the producer only, though. @h12w - your solution will busy loop and will be very inefficient. to be efficient, you need to do blocking consumer calls, and to do that you'll need another thread.

@sangmeshcp
Copy link

@mhowlett any update on the detailed blog.. couldn't search for it

@mhowlett
Copy link
Contributor

https://www.confluent.io/blog/kafka-python-asyncio-integration/

@galen211
Copy link

galen211 commented Feb 3, 2020

Has anyone been able to successfully connect to Confluent Cloud with AIOKafka? I tried using a modified version of the ssl_consume_produce.py example from the AIOKafka repo at https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.py and was unsuccessful. I've configured my AIOKafkaConsumer and AIOKafkaProducer with what I believe to be the correct parameters but am getting the below runtime error. I've also included my modified script and config further below.

/Users/galen/opt/anaconda3/envs/ds/bin/python /Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py
[2020-02-06 17:26:57,060] DEBUG [asyncio]: Using selector: KqueueSelector
[2020-02-06 17:26:57,061] DEBUG [aiokafka.producer.producer]: Starting the Kafka producer
[2020-02-06 17:26:57,061] DEBUG [aiokafka]: Attempting to bootstrap via node at pkc-43n10.us-central1.gcp.confluent.cloud:9092
[2020-02-06 17:26:57,223] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 1: ApiVersionRequest_v0()
[2020-02-06 17:26:57,265] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=8), (api_key=1, min_version=0, max_version=11), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=9), (api_key=4, min_version=0, max_version=4), (api_key=5, min_version=0, max_version=2), (api_key=6, min_version=0, max_version=6), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=6), (api_key=10, min_version=0, max_version=3), (api_key=11, min_version=0, max_version=6), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=4), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=3), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=5), (api_key=20, min_version=0, max_version=4), (api_key=21, min_version=0, max_version=1), (api_key=22, min_version=0, max_version=3), (api_key=23, min_version=0, max_version=3), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=2), (api_key=29, min_version=0, max_version=1), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=2), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=1), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=1), (api_key=40, min_version=0, max_version=1), (api_key=41, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=10000, min_version=0, max_version=0)])
[2020-02-06 17:26:57,266] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 2: SaslHandShakeRequest_v1(mechanism='PLAIN')
[2020-02-06 17:26:57,303] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 2: SaslHandShakeResponse_v1(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
Traceback (most recent call last):
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 78, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 73, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 34, in produce_and_consume
    start_future = await producer.start()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start
    await self.client.bootstrap()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap
    version_hint=version_hint)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn
    await conn.connect()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect
    await self._do_sasl_handshake()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake
    payload, expect_response = res
RuntimeError: await wasn't used with future
[2020-02-06 17:26:57,315] ERROR [asyncio]: Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7fcee03a84d0>

Process finished with exit code 1

My adapted version of the code is:

import asyncio
import os
import logging

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
from kafka.common import TopicPartition
from aiokafka.errors import KafkaError

from aiokafka import AIOKafkaClient

import ccloud_lib

conf = ccloud_lib.read_ccloud_config('kafka_config.conf')
ssl_context = create_ssl_context(cafile='cacert.pem')

log_level = logging.DEBUG
log_format = '[%(asctime)s] %(levelname)s [%(name)s]: %(message)s'
logging.basicConfig(level=logging.DEBUG, format=log_format)

async def produce_and_consume(loop):
    # Produce
    producer = AIOKafkaProducer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop = loop,
        security_protocol=conf['security.protocol'],
        sasl_mechanism=conf['sasl.mechanism'],
        ssl_context=ssl_context,
        sasl_plain_username=conf['sasl.username'],
        sasl_plain_password=conf['sasl.password'],
        api_version='0.10'
    )
    try:
        start_future = await producer.start()
        response = await start_future  # wait until message is produced
    except KafkaError as err:
        print("some kafka error on produce: {}".format(err))

    try:
        msg = await producer.send_and_wait(
            'my_topic', b"Super Message", partition=0)
    finally:
        await producer.stop()

    consumer = AIOKafkaConsumer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop=loop,
        ssl_context=ssl_context,
        security_protocol=conf['security.protocol'],
        sasl_mechanism=conf['sasl.mechanism'],
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )
    try:
        start_future = await consumer.start()
        response = await start_future  # wait until message is produced
    except KafkaError as err:
        print("some kafka error on produce: {}".format(err))


    try:
        consumer.seek(TopicPartition('my_topic', 0), msg.offset)
        fetch_msg = await consumer.getone()
    finally:
        await consumer.stop()

    print("Success", msg, fetch_msg)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    task = loop.create_task(produce_and_consume(loop))
    try:
        loop.run_until_complete(task)
    finally:
        loop.run_until_complete(asyncio.sleep(0, loop=loop))
        task.cancel()
        try:
            loop.run_until_complete(task)
        except asyncio.CancelledError:
            pass

My obfuscated configuration conf looks like this:

bootstrap.servers=*********.us-central1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="*********" password\="******************";
sasl.username=*********
sasl.password=*********

Is there something about my configuration which is incorrect or missing?
[NB: also posted at StackOverflow, but thought it might have a better audience here]

@jeffwidman
Copy link
Contributor

@galen211 you should post the issue on https://github.com/aio-libs/aiokafka/issues or email confluent support... this repo is unrelated to either that lib or that cloud.

@rnpridgeon
Copy link
Contributor

As @jeffwidman pointed out you will need to take this up with aiokafka. I'm closing this issue with @mhowlett's blog being the resolution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests