22#include < csp/adapters/kafka/KafkaConsumer.h>
33#include < csp/adapters/kafka/KafkaPublisher.h>
44#include < csp/adapters/kafka/KafkaSubscriber.h>
5- #include < csp/engine/Dictionary.h>
65#include < csp/core/Platform.h>
6+ #include < csp/engine/Dictionary.h>
77
88#include < iostream>
99#include < librdkafka/rdkafkacpp.h>
1010
1111namespace csp
1212{
1313
14- INIT_CSP_ENUM ( csp::adapters::kafka::KafkaStatusMessageType,
15- " OK" ,
16- " MSG_DELIVERY_FAILED" ,
17- " MSG_SEND_ERROR" ,
18- " MSG_RECV_ERROR"
19- );
14+ INIT_CSP_ENUM ( csp::adapters::kafka::KafkaStatusMessageType, " OK" , " MSG_DELIVERY_FAILED" , " MSG_SEND_ERROR" ,
15+ " MSG_RECV_ERROR" );
2016
2117}
2218
@@ -26,59 +22,68 @@ namespace csp::adapters::kafka
2622class DeliveryReportCb : public RdKafka ::DeliveryReportCb
2723{
2824public:
29- DeliveryReportCb ( KafkaAdapterManager * mgr ) : m_adapterManager( mgr )
25+ DeliveryReportCb ( KafkaAdapterManager * mgr )
26+ : m_adapterManager( mgr )
3027 {
3128 }
3229
33- void dr_cb ( RdKafka::Message &message ) final
30+ void dr_cb ( RdKafka::Message & message ) final
3431 {
3532 /* If message.err() is non-zero the message delivery failed permanently
3633 * for the message. */
3734 if ( message.err () )
3835 {
39- std::string msg = " KafkaPublisher: Message delivery failed for topic " + message.topic_name () + " . Failure: " + message.errstr ();
40- m_adapterManager -> pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::MSG_DELIVERY_FAILED, msg );
36+ std::string msg = " KafkaPublisher: Message delivery failed for topic " + message.topic_name ()
37+ + " . Failure: " + message.errstr ();
38+ m_adapterManager->pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::MSG_DELIVERY_FAILED, msg );
4139 }
4240 }
41+
4342private:
4443 KafkaAdapterManager * m_adapterManager;
4544};
4645
4746class EventCb : public RdKafka ::EventCb
4847{
4948public:
50- EventCb ( KafkaAdapterManager * mgr ) : m_adapterManager( mgr ) {}
49+ EventCb ( KafkaAdapterManager * mgr )
50+ : m_adapterManager( mgr )
51+ {
52+ }
5153
5254 void event_cb ( RdKafka::Event & event ) override
5355 {
5456 if ( event.type () == RdKafka::Event::EVENT_LOG )
5557 {
5658 if ( event.severity () < RdKafka::Event::EVENT_SEVERITY_NOTICE )
5759 {
58- std::string errmsg = " KafkaConsumer: error " + RdKafka::err2str ( ( RdKafka::ErrorCode ) event.err () ) + " . Reason: " + event.str ();
59- m_adapterManager -> pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::GENERIC_ERROR, errmsg );
60+ std::string errmsg = " KafkaConsumer: error " + RdKafka::err2str ( (RdKafka::ErrorCode)event.err () )
61+ + " . Reason: " + event.str ();
62+ m_adapterManager->pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::GENERIC_ERROR, errmsg );
6063 }
6164 }
6265 else if ( event.type () == RdKafka::Event::EVENT_ERROR )
6366 {
64- // We shutdown the app if its a fatal error OR if its an authentication issue which has plagued users multiple times
67+ // We shutdown the app if its a fatal error OR if its an authentication issue which has plagued users
68+ // multiple times
6569 if ( event.fatal () || event.err () == RdKafka::ErrorCode::ERR__AUTHENTICATION )
66- m_adapterManager -> forceShutdown ( RdKafka::err2str ( ( RdKafka::ErrorCode ) event.err () ) + event.str () );
70+ m_adapterManager-> forceShutdown ( RdKafka::err2str ( (RdKafka::ErrorCode) event.err () ) + event.str () );
6771 }
6872 }
6973
7074private:
7175 KafkaAdapterManager * m_adapterManager;
7276};
7377
74- KafkaAdapterManager::KafkaAdapterManager ( csp::Engine * engine, const Dictionary & properties ) : AdapterManager( engine ),
75- m_consumerIdx ( 0 ),
76- m_producerPollThreadActive( false )
78+ KafkaAdapterManager::KafkaAdapterManager ( csp::Engine * engine, const Dictionary & properties )
79+ : AdapterManager( engine )
80+ , m_consumerIdx( 0 )
81+ , m_producerPollThreadActive( false )
7782{
78- m_maxThreads = properties.get <uint64_t >( " max_threads" );
83+ m_maxThreads = properties.get <uint64_t >( " max_threads" );
7984 m_pollTimeoutMs = properties.get <TimeDelta>( " poll_timeout" ).asMilliseconds ();
8085
81- m_eventCb = std::make_unique<EventCb>( this );
86+ m_eventCb = std::make_unique<EventCb>( this );
8287 m_producerCb = std::make_unique<DeliveryReportCb>( this );
8388
8489 std::string errstr;
@@ -90,19 +95,19 @@ KafkaAdapterManager::KafkaAdapterManager( csp::Engine * engine, const Dictionary
9095 setConfProperties ( m_consumerConf.get (), *properties.get <DictionaryPtr>( " rd_kafka_consumer_conf_properties" ) );
9196 if ( properties.exists ( " start_offset" ) )
9297 {
93- // used later in start since we need starttime
98+ // used later in start since we need starttime
9499 m_startOffsetProperty = properties.getUntypedValue ( " start_offset" );
95100 }
96101
97- if ( m_consumerConf -> set ( " event_cb" , m_eventCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
102+ if ( m_consumerConf-> set ( " event_cb" , m_eventCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
98103 CSP_THROW ( RuntimeException, " Failed to set consumer error cb: " << errstr );
99104
100105 m_producerConf.reset ( RdKafka::Conf::create ( RdKafka::Conf::CONF_GLOBAL ) );
101106 setConfProperties ( m_producerConf.get (), rdKafkaProperties );
102107 setConfProperties ( m_producerConf.get (), *properties.get <DictionaryPtr>( " rd_kafka_producer_conf_properties" ) );
103- if ( m_producerConf -> set ( " dr_cb" , m_producerCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
108+ if ( m_producerConf-> set ( " dr_cb" , m_producerCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
104109 CSP_THROW ( RuntimeException, " Failed to set producer callback: " << errstr );
105- if ( m_producerConf -> set ( " event_cb" , m_eventCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
110+ if ( m_producerConf-> set ( " event_cb" , m_eventCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
106111 CSP_THROW ( RuntimeException, " Failed to set producer error cb: " << errstr );
107112}
108113
@@ -112,7 +117,7 @@ KafkaAdapterManager::~KafkaAdapterManager()
112117 if ( m_producerPollThreadActive )
113118 {
114119 m_producerPollThreadActive = false ;
115- m_producerPollThread -> join ();
120+ m_producerPollThread-> join ();
116121 }
117122}
118123
@@ -122,9 +127,9 @@ void KafkaAdapterManager::setConfProperties( RdKafka::Conf * conf, const Diction
122127
123128 for ( auto it = properties.begin (); it != properties.end (); ++it )
124129 {
125- std::string key = it.key ();
130+ std::string key = it.key ();
126131 std::string value = properties.get <std::string>( key );
127- if ( conf -> set ( key, value, errstr ) != RdKafka::Conf::CONF_OK )
132+ if ( conf-> set ( key, value, errstr ) != RdKafka::Conf::CONF_OK )
128133 CSP_THROW ( RuntimeException, " Failed to set property " << key << " : " << errstr );
129134 }
130135}
@@ -134,18 +139,18 @@ void KafkaAdapterManager::forceShutdown( const std::string & err )
134139 forceConsumerReplayComplete ();
135140 try
136141 {
137- CSP_THROW ( RuntimeException, " Kafka fatal error. " + err );
142+ CSP_THROW ( RuntimeException, " Kafka fatal error. " + err );
138143 }
139144 catch ( const RuntimeException & )
140145 {
141- rootEngine () -> shutdown ( std::current_exception () );
146+ rootEngine ()-> shutdown ( std::current_exception () );
142147 }
143148}
144149
145150void KafkaAdapterManager::forceConsumerReplayComplete ()
146151{
147152 for ( auto & consumer : m_consumerVector )
148- consumer -> forceReplayCompleted ();
153+ consumer-> forceReplayCompleted ();
149154}
150155
151156void KafkaAdapterManager::start ( DateTime starttime, DateTime endtime )
@@ -155,29 +160,29 @@ void KafkaAdapterManager::start( DateTime starttime, DateTime endtime )
155160 if ( !m_staticPublishers.empty () || !m_dynamicPublishers.empty () )
156161 {
157162 m_producer.reset ( RdKafka::Producer::create ( m_producerConf.get (), errstr ) );
158- if ( !m_producer )
163+ if ( !m_producer )
159164 {
160165 CSP_THROW ( RuntimeException, " Failed to create producer: " << errstr );
161166 }
162167 }
163168
164169 // start all consumers
165170 for ( auto & it : m_consumerVector )
166- it -> start ( starttime );
171+ it-> start ( starttime );
167172
168173 // start all publishers
169174 for ( auto & it : m_staticPublishers )
170- it.second -> start ( m_producer );
175+ it.second -> start ( m_producer );
171176
172177 for ( auto & it : m_dynamicPublishers )
173- it -> start ( m_producer );
178+ it-> start ( m_producer );
174179
175180 AdapterManager::start ( starttime, endtime );
176181
177182 if ( !m_staticPublishers.empty () || !m_dynamicPublishers.empty () )
178183 {
179184 m_producerPollThreadActive = true ;
180- m_producerPollThread = std::make_unique<std::thread>( [ this ](){ pollProducers (); } );
185+ m_producerPollThread = std::make_unique<std::thread>( [this ]() { pollProducers (); } );
181186 }
182187}
183188
@@ -187,20 +192,20 @@ void KafkaAdapterManager::stop()
187192
188193 // stop all consumers
189194 for ( auto & it : m_consumerVector )
190- it -> stop ();
195+ it-> stop ();
191196
192197 if ( m_producerPollThreadActive )
193198 {
194199 m_producerPollThreadActive = false ;
195- m_producerPollThread -> join ();
200+ m_producerPollThread-> join ();
196201 }
197202
198203 // stop all publishers
199204 for ( auto & it : m_staticPublishers )
200- it.second -> stop ();
205+ it.second -> stop ();
201206
202207 for ( auto & it : m_dynamicPublishers )
203- it -> stop ();
208+ it-> stop ();
204209
205210 m_staticPublishers.clear ();
206211 m_dynamicPublishers.clear ();
@@ -218,44 +223,46 @@ void KafkaAdapterManager::pollProducers()
218223{
219224 while ( m_producerPollThreadActive )
220225 {
221- m_producer -> poll ( 1000 );
226+ m_producer-> poll ( 1000 );
222227 }
223228
224229 try
225230 {
226231 while ( true )
227232 {
228- auto rc = m_producer -> flush ( 10000 );
233+ auto rc = m_producer-> flush ( 10000 );
229234 if ( !rc )
230235 break ;
231236
232237 if ( rc && rc != RdKafka::ERR__TIMED_OUT )
233- CSP_THROW ( RuntimeException, " KafkaProducer failed to flush pending msgs on shutdown: " << RdKafka::err2str ( rc ) );
238+ CSP_THROW ( RuntimeException,
239+ " KafkaProducer failed to flush pending msgs on shutdown: " << RdKafka::err2str ( rc ) );
234240 }
235241 }
236242 catch ( ... )
237243 {
238- rootEngine () -> shutdown ( std::current_exception () );
244+ rootEngine ()-> shutdown ( std::current_exception () );
239245 }
240246}
241247
242- PushInputAdapter * KafkaAdapterManager::getInputAdapter ( CspTypePtr & type, PushMode pushMode, const Dictionary & properties )
248+ PushInputAdapter * KafkaAdapterManager::getInputAdapter ( CspTypePtr & type, PushMode pushMode,
249+ const Dictionary & properties )
243250{
244- std::string topic = properties.get <std::string>( " topic" );
245- std::string key = properties.get <std::string>( " key" );
246- KafkaSubscriber * subscriber = this -> getSubscriber ( topic, key, properties );
247- return subscriber -> getInputAdapter ( type, pushMode, properties );
251+ std::string topic = properties.get <std::string>( " topic" );
252+ std::string key = properties.get <std::string>( " key" );
253+ KafkaSubscriber * subscriber = this -> getSubscriber ( topic, key, properties );
254+ return subscriber-> getInputAdapter ( type, pushMode, properties );
248255}
249256
250257OutputAdapter * KafkaAdapterManager::getOutputAdapter ( CspTypePtr & type, const Dictionary & properties )
251258{
252259 std::string topic = properties.get <std::string>( " topic" );
253260 try
254261 {
255- auto key = properties.get <std::string>( " key" );
256- auto pair = TopicKeyPair ( topic, key );
257- KafkaPublisher * publisher = this -> getStaticPublisher ( pair, properties );
258- return publisher -> getOutputAdapter ( type, properties, key );
262+ auto key = properties.get <std::string>( " key" );
263+ auto pair = TopicKeyPair ( topic, key );
264+ KafkaPublisher * publisher = this -> getStaticPublisher ( pair, properties );
265+ return publisher-> getOutputAdapter ( type, properties, key );
259266 }
260267 catch ( TypeError & e )
261268 {
@@ -264,8 +271,8 @@ OutputAdapter * KafkaAdapterManager::getOutputAdapter( CspTypePtr & type, const
264271 for ( auto & it : key )
265272 keyFields.emplace_back ( std::get<std::string>( it._data ) );
266273
267- KafkaPublisher * publisher = this -> getDynamicPublisher ( topic, properties );
268- return publisher -> getOutputAdapter ( type, properties, keyFields );
274+ KafkaPublisher * publisher = this -> getDynamicPublisher ( topic, properties );
275+ return publisher-> getOutputAdapter ( type, properties, keyFields );
269276 }
270277}
271278
@@ -276,37 +283,38 @@ KafkaConsumer * KafkaAdapterManager::getConsumer( const std::string & topic, con
276283 // If we have reached m_maxThreads, then round-robin the topic onto a consumer (and insert it into the map)
277284 if ( m_consumerMap.find ( topic ) != m_consumerMap.end () )
278285 {
279- return m_consumerMap[ topic ].get ();
286+ return m_consumerMap[topic].get ();
280287 }
281288 if ( m_consumerVector.size () < m_maxThreads )
282289 {
283290 auto consumer = std::make_shared<KafkaConsumer>( this , properties );
284291 m_consumerVector.emplace_back ( consumer );
285292 m_consumerMap.emplace ( topic, consumer );
286- return m_consumerMap[ topic ].get ();
293+ return m_consumerMap[topic].get ();
287294 }
288295
289- auto consumer = m_consumerVector[ m_consumerIdx++ ];
296+ auto consumer = m_consumerVector[m_consumerIdx++];
290297 m_consumerMap.emplace ( topic, consumer );
291298 if ( m_consumerIdx >= m_maxThreads )
292299 m_consumerIdx = 0 ;
293300 return consumer.get ();
294301}
295302
296- KafkaSubscriber * KafkaAdapterManager::getSubscriber ( const std::string & topic, const std::string & key, const Dictionary & properties )
303+ KafkaSubscriber * KafkaAdapterManager::getSubscriber ( const std::string & topic, const std::string & key,
304+ const Dictionary & properties )
297305{
298306 auto pair = TopicKeyPair ( topic, key );
299- auto rv = m_subscribers.emplace ( pair, nullptr );
307+ auto rv = m_subscribers.emplace ( pair, nullptr );
300308
301309 if ( rv.second )
302310 {
303311 std::unique_ptr<KafkaSubscriber> subscriber ( new KafkaSubscriber ( this , properties ) );
304- rv.first -> second = std::move ( subscriber );
312+ rv.first -> second = std::move ( subscriber );
305313
306- this -> getConsumer ( topic, properties ) -> addSubscriber ( topic, key, rv.first -> second.get () );
314+ this -> getConsumer ( topic, properties )-> addSubscriber ( topic, key, rv.first -> second .get () );
307315 }
308316
309- return rv.first -> second.get ();
317+ return rv.first -> second .get ();
310318}
311319
312320// for static (string) keys, we create one publisher instance per <topic, key> pair
@@ -317,10 +325,10 @@ KafkaPublisher * KafkaAdapterManager::getStaticPublisher( const TopicKeyPair & p
317325 if ( rv.second )
318326 {
319327 std::unique_ptr<KafkaPublisher> publisher ( new KafkaPublisher ( this , properties, pair.first ) );
320- rv.first -> second = std::move ( publisher );
328+ rv.first -> second = std::move ( publisher );
321329 }
322330
323- KafkaPublisher * p = rv.first -> second.get ();
331+ KafkaPublisher * p = rv.first -> second .get ();
324332 return p;
325333}
326334
@@ -332,4 +340,4 @@ KafkaPublisher * KafkaAdapterManager::getDynamicPublisher( const std::string & t
332340 return p;
333341}
334342
335- }
343+ } // namespace csp::adapters::kafka
0 commit comments