Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated kafka returner to use confluent-kafka. #50579

Merged
merged 3 commits into from
Nov 23, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 31 additions & 28 deletions salt/returners/kafka_return.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,68 @@
'''
Return data to a Kafka topic

:maintainer: Christer Edwards (christer.edwards@gmail.com)
:maturity: 0.1
:depends: kafka-python
:maintainer: Justin Desilets (justin.desilets@gmail.com)
:maturity: 20181119
:depends: confluent-kafka
:platform: all

To enable this returner install kafka-python and enable the following settings
in the minion config:
To enable this returner install confluent-kafka and enable the following
settings in the minion config:

returner.kafka.hostnames:
- "server1"
- "server2"
- "server3"
returner.kafka.bootstrap:
- "server1:9092"
- "server2:9092"
- "server3:9092"

returner.kafka.topic: 'topic'

To use the kafka returner, append '--return kafka' to the Salt command, eg;
To use the kafka returner, append `--return kafka` to the Salt command, eg;

salt '*' test.ping --return kafka

'''

# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import salt.utils.json

# Import third-party libs
try:
from kafka import KafkaClient, SimpleProducer
from confluent_kafka import Producer
HAS_KAFKA = True
except ImportError:
HAS_KAFKA = False

log = logging.getLogger(__name__)


__virtualname__ = 'kafka'


def __virtual__():
if not HAS_KAFKA:
return False, 'Could not import kafka returner; kafka-python is not installed.'
return False, 'Could not import kafka returner; confluent-kafka is not installed.'
return __virtualname__


def _get_conn(ret=None):
def _get_conn():
'''
Return a kafka connection
'''
if __salt__['config.option']('returner.kafka.hostnames'):
hostnames = __salt__['config.option']('returner.kafka.hostnames')
return KafkaClient(hostnames)
if __salt__['config.option']('returner.kafka.bootstrap'):
bootstrap = ','.join(__salt__['config.option']('returner.kafka.bootstrap'))
else:
log.error('Unable to find kafka returner config option: hostnames')
log.error('Unable to find kafka returner config option: bootstrap')
return None
return bootstrap


def _close_conn(conn):
'''
Close the kafka connection
'''
conn.close()
def _delivery_report(err, msg):
''' Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). '''
if err is not None:
log.error('Message delivery failed: %s', err)
else:
log.debug('Message delivered to %s [%s]', msg.topic(), msg.partition())


def returner(ret):
Expand All @@ -72,10 +74,11 @@ def returner(ret):
if __salt__['config.option']('returner.kafka.topic'):
topic = __salt__['config.option']('returner.kafka.topic')

conn = _get_conn(ret)
producer = SimpleProducer(conn)
producer.send_messages(topic, salt.utils.json.dumps(ret))
conn = _get_conn()
producer = Producer({'bootstrap.servers': conn})
producer.poll(0)
producer.produce(topic, salt.utils.json.dumps(ret), str(ret).encode('utf-8'), callback=_delivery_report)

_close_conn(conn)
producer.flush()
else:
log.error('Unable to find kafka returner config option: topic')