Skip to content

Commit

Permalink
Update example.py (dpkp#2081)
Browse files Browse the repository at this point in the history
Co-authored-by: MostafaElmenabawy <momenabawy@synapse-analytics.io>
  • Loading branch information
mmenbawy and MostafaElmenabawy authored Sep 7, 2020
1 parent f6677cf commit 3a9d830
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#!/usr/bin/env python
import threading, logging, time
import multiprocessing
import threading, time

from kafka import KafkaConsumer, KafkaProducer
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic


class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()

def stop(self):
self.stop_event.set()

Expand All @@ -23,14 +23,15 @@ def run(self):

producer.close()

class Consumer(multiprocessing.Process):

class Consumer(threading.Thread):
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_event = multiprocessing.Event()
threading.Thread.__init__(self)
self.stop_event = threading.Event()

def stop(self):
self.stop_event.set()

def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
Expand All @@ -44,29 +45,38 @@ def run(self):
break

consumer.close()


def main():
# Create 'my-topic' Kafka topic
try:
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')

topic = NewTopic(name='my-topic',
num_partitions=1,
replication_factor=1)
admin.create_topics([topic])
except Exception:
pass

tasks = [
Producer(),
Consumer()
]

# Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
for t in tasks:
t.start()

time.sleep(10)


# Stop threads
for task in tasks:
task.stop()

for task in tasks:
task.join()


if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()

0 comments on commit 3a9d830

Please sign in to comment.