Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated kafka returner to use confluent-kafka. #50579

Merged
merged 3 commits into from
Nov 23, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 37 additions & 27 deletions salt/returners/kafka_return.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,66 @@
'''
Return data to a Kafka topic

:maintainer: Christer Edwards (christer.edwards@gmail.com)
:maturity: 0.1
:depends: kafka-python
:maintainer: Justin Desilets (justin.desilets@gmail.com)
:maturity: 20181119
:depends: confluent-kafka
:platform: all

To enable this returner install kafka-python and enable the following settings
To enable this returner install confluent-kafka and enable the following settings
in the minion config:

returner.kafka.hostnames:
- "server1"
- "server2"
- "server3"
returner.kafka.bootstrap:
- "server1:9092"
- "server2:9092"
- "server3:9092"

returner.kafka.topic: 'topic'

To use the kafka returner, append '--return kafka' to the Salt command, eg;
To use the kafka returner, append `--return kafka` to the Salt command, eg;

salt '*' test.ping --return kafka

'''

# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import salt.utils.json

# Import third-party libs
try:
from kafka import KafkaClient, SimpleProducer
from confluent_kafka import Producer
HAS_KAFKA = True
except ImportError:
HAS_KAFKA = False

log = logging.getLogger(__name__)

__virtualname__ = 'kafka'

__virtualname__ = 'kafka'

def __virtual__():
if not HAS_KAFKA:
return False, 'Could not import kafka returner; kafka-python is not installed.'
return False, 'Could not import kafka returner; confluent-kafka is not installed.'
return __virtualname__


def _get_conn(ret=None):
'''
Return a kafka connection
'''
if __salt__['config.option']('returner.kafka.hostnames'):
hostnames = __salt__['config.option']('returner.kafka.hostnames')
return KafkaClient(hostnames)
if __salt__['config.option']('returner.kafka.bootstrap'):
bootstrap = ','.join(__salt__['config.option']('returner.kafka.bootstrap'))
return bootstrap
else:
log.error('Unable to find kafka returner config option: hostnames')
log.error('Unable to find kafka returner config option: bootstrap')


def _close_conn(conn):
'''
Close the kafka connection
'''
conn.close()
def _delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be printing right to standard out like this.

else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


def returner(ret):
Expand All @@ -73,9 +73,19 @@ def returner(ret):
topic = __salt__['config.option']('returner.kafka.topic')

conn = _get_conn(ret)
producer = SimpleProducer(conn)
producer.send_messages(topic, salt.utils.json.dumps(ret))
producer = Producer({'bootstrap.servers': conn})

# Trigger any available delivery report callbacks from previous produce() calls
producer.poll(0)

# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
producer.produce(topic, salt.utils.json.dumps(ret), str(ret).encode('utf-8'), callback=_delivery_report)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
producer.flush()

_close_conn(conn)
else:
log.error('Unable to find kafka returner config option: topic')
log.error('Unable to find kafka returner config option: topic')