Skip to content

Commit

Permalink
Merge pull request #56 from xtuml/lps_kafka_polling
Browse files Browse the repository at this point in the history
Implement polled kafka topics
  • Loading branch information
cortlandstarrett authored Jun 13, 2024
2 parents cc8a3bc + b43276c commit e122983
Show file tree
Hide file tree
Showing 18 changed files with 677 additions and 340 deletions.
2 changes: 2 additions & 0 deletions core-cpp/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ simple_add_shared_library (
NAME Kafka
SOURCES
Consumer.cc
DataConsumer.cc
Kafka.cc
ProcessHandler.cc
Producer.cc
Expand All @@ -24,6 +25,7 @@ simple_add_shared_library (
EXPORT MaslCore
INCLUDES
kafka/Consumer.hh
kafka/DataConsumer.hh
kafka/Kafka.hh
kafka/ProcessHandler.hh
kafka/Producer.hh
Expand Down
13 changes: 9 additions & 4 deletions core-cpp/kafka/include/kafka/Consumer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "cppkafka/consumer.h"
#include "cppkafka/message.h"

#include "DataConsumer.hh"

#include <condition_variable>
#include <mutex>
#include <queue>
Expand All @@ -29,15 +31,18 @@ private:
class Consumer {

public:
Consumer(std::string topic);
Consumer(std::vector<std::string> topics);
bool consumeOne(DataConsumer& dataConsumer);
void run();
static Consumer &getInstance();

private:
MessageQueue messageQueue;
std::unique_ptr<cppkafka::Consumer> consumer;

void handleMessages(cppkafka::Consumer& consumer);

void createTopics(cppkafka::Consumer& consumer, std::vector<std::string> topics);
void initialize(std::vector<std::string> topics);
void handleMessages();
void createTopics(std::vector<std::string> topics);
};

} // namespace Kafka
Expand Down
18 changes: 18 additions & 0 deletions core-cpp/kafka/include/kafka/DataConsumer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef Kafka_DataConsumer_HH
#define Kafka_DataConsumer_HH

#include <cstdint>
#include <vector>

namespace Kafka {

class DataConsumer {
public:
virtual void accept(std::vector<std::uint8_t> data) const {
}
virtual ~DataConsumer();
};

} // namespace Kafka

#endif
2 changes: 2 additions & 0 deletions core-cpp/kafka/include/kafka/Kafka.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace Kafka {
extern const char *const BrokersOption;
extern const char *const GroupIdOption;
extern const char *const NamespaceOption;
extern const char *const NamespaceOption;
extern const char *const OffsetResetOption;

}

Expand Down
2 changes: 2 additions & 0 deletions core-cpp/kafka/include/kafka/ProcessHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public:

std::string getTopicName(int domainId, int serviceId);

void startConsumer();

static ProcessHandler &getInstance();


Expand Down
4 changes: 2 additions & 2 deletions core-cpp/kafka/include/kafka/Producer.hh
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#ifndef Kafka_Producer_HH
#define Kafka_Producer_HH

#include <nlohmann/json.hpp>

#include "cppkafka/message_builder.h"
#include "cppkafka/producer.h"

Expand All @@ -12,6 +10,7 @@ class Producer {

public:
Producer();
void publish(int domainId, int serviceId, std::vector<std::uint8_t> data, std::vector<std::uint8_t> partKey);
void publish(int domainId, int serviceId, std::string data, std::string partKey);
static Producer &getInstance();

Expand All @@ -20,6 +19,7 @@ private:
typedef std::map<ServiceKey, std::shared_ptr<cppkafka::MessageBuilder>> TopicLookup;
TopicLookup topicLookup;
std::unique_ptr<cppkafka::Producer> prod;
void publish(int domainId, int serviceId, cppkafka::Buffer& data, cppkafka::Buffer& partKey);
};

} // namespace Kafka
Expand Down
5 changes: 3 additions & 2 deletions core-cpp/kafka/include/kafka/ServiceHandler.hh
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#ifndef Kafka_ServiceHandler_HH
#define Kafka_ServiceHandler_HH

#include <nlohmann/json.hpp>
#include <cstdint>
#include <functional>
#include <vector>

namespace Kafka {

typedef std::function<void()> Callable;

class ServiceHandler {
public:
virtual Callable getInvoker(std::string data) const {
virtual Callable getInvoker(std::vector<std::uint8_t> data) const {
return Callable();
}
virtual ~ServiceHandler();
Expand Down
51 changes: 35 additions & 16 deletions core-cpp/kafka/src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@

namespace Kafka {

void Consumer::run() {
Consumer::Consumer(std::vector<std::string> topics) {
initialize(topics);
}

Consumer::Consumer(std::string topic) {
std::vector<std::string> topics;
topics.push_back(topic);
initialize(topics);
}

void Consumer::initialize(std::vector<std::string> topics) {
// Get command line options
const std::string brokers = SWA::CommandLine::getInstance().getOption(BrokersOption);
const std::string offsetReset = SWA::CommandLine::getInstance().getOption(OffsetResetOption, "earliest");

std::string groupId;
if (SWA::CommandLine::getInstance().optionPresent(GroupIdOption)) {
Expand All @@ -37,22 +47,25 @@ void Consumer::run() {
// Construct the configuration
cppkafka::Configuration config = {{"metadata.broker.list", brokers},
{"group.id", groupId},
{"auto.offset.reset", offsetReset},
{"enable.auto.commit", false}};

// Create the consumer
cppkafka::Consumer consumer(config);
consumer = std::unique_ptr<cppkafka::Consumer>(new cppkafka::Consumer(config));

// create topics if they don't already exist
createTopics(consumer, ProcessHandler::getInstance().getTopicNames());
createTopics(topics);

// short delay to avoid race conditions if other processes initiated topic creation
SWA::delay(SWA::Duration::fromMillis(100));

// Subscribe to topics
consumer.subscribe(ProcessHandler::getInstance().getTopicNames());
consumer->subscribe(topics);
}

void Consumer::run() {
// Create a consumer dispatcher
cppkafka::ConsumerDispatcher dispatcher(consumer);
cppkafka::ConsumerDispatcher dispatcher(*consumer);

// Stop processing on SIGINT
// TODO set up lifecycle event to stop dispatcher
Expand All @@ -61,7 +74,7 @@ void Consumer::run() {

// create a signal listener
SWA::RealTimeSignalListener listener(
[this, &consumer](int pid, int uid) { this->handleMessages(consumer); },
[this](int pid, int uid) { this->handleMessages(); },
SWA::Process::getInstance().getActivityMonitor());

// Now run the dispatcher, providing a callback to handle messages, one to
Expand All @@ -78,7 +91,7 @@ void Consumer::run() {
);
}

void Consumer::handleMessages(cppkafka::Consumer& consumer) {
void Consumer::handleMessages() {
// drain the message queue
if (!messageQueue.empty()) {
std::vector<cppkafka::Message> msgs = messageQueue.dequeue_all();
Expand All @@ -88,26 +101,37 @@ void Consumer::handleMessages(cppkafka::Consumer& consumer) {
// TODO maybe this is the right spot to handle errors and check conditions on the msg instance?

// get the service invoker
Callable service = ProcessHandler::getInstance().getServiceHandler(msg.get_topic()).getInvoker(std::string(msg.get_payload()));
Callable service = ProcessHandler::getInstance().getServiceHandler(msg.get_topic()).getInvoker(std::vector<uint8_t>(msg.get_payload()));

// run the service
service();

// commit offset
consumer.commit(msg);
consumer->commit(msg);
}
}
}

void Consumer::createTopics(cppkafka::Consumer& consumer, std::vector<std::string> topics) {
bool Consumer::consumeOne(DataConsumer& dataConsumer) {
cppkafka::Message msg = consumer->poll();
if (msg) {
dataConsumer.accept(std::vector<uint8_t>(msg.get_payload()));
consumer->commit(msg);
return true;
} else {
return false;
}
}

void Consumer::createTopics(std::vector<std::string> topics) {
// TODO clean up error handling in this routine
for (auto it = topics.begin(); it != topics.end(); it++) {

const char* topicname = (*it).data();
int partition_cnt = 1;
int replication_factor = 1;

rd_kafka_t *rk = consumer.get_handle();
rd_kafka_t *rk = consumer->get_handle();
rd_kafka_NewTopic_t *newt[1];
const size_t newt_cnt = 1;
rd_kafka_AdminOptions_t *options;
Expand Down Expand Up @@ -169,11 +193,6 @@ void Consumer::createTopics(cppkafka::Consumer& consumer, std::vector<std::strin
}
}

Consumer &Consumer::getInstance() {
static Consumer instance;
return instance;
}

void MessageQueue::enqueue(cppkafka::Message &msg) {
/* const cppkafka::Buffer &data = msg.get_payload(); */
/* std::vector<unsigned char> vec(data.begin(), data.end()); */
Expand Down
7 changes: 7 additions & 0 deletions core-cpp/kafka/src/DataConsumer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#include "kafka/DataConsumer.hh"

namespace Kafka {

DataConsumer::~DataConsumer() {}

} // namespace Kafka
10 changes: 6 additions & 4 deletions core-cpp/kafka/src/Kafka.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@

namespace Kafka {

const char *const BrokersOption = "-kafka-broker-list";
const char *const GroupIdOption = "-kafka-group-id";
const char *const NamespaceOption = "-kafka-namespace";
const char *const BrokersOption = "-kafka-broker-list";
const char *const GroupIdOption = "-kafka-group-id";
const char *const NamespaceOption = "-kafka-namespace";
const char *const OffsetResetOption = "-kafka-offset-reset";

bool startup() {
if (ProcessHandler::getInstance().hasRegisteredServices()) {
// only start the consumer if there are registered services
std::thread{[] { Consumer::getInstance().run(); }}.detach();
std::thread{[] { ProcessHandler::getInstance().startConsumer(); }}.detach();
}

return true;
Expand All @@ -35,6 +36,7 @@ struct Init {
SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(BrokersOption, std::string("Kafka Brokers"), true, "brokerList", true, false));
SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(GroupIdOption, std::string("Kafka Group ID"), false, "groupId", true, false));
SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(NamespaceOption, std::string("Kafka Topic Namespace"), false, "namespace", true, false));
SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(OffsetResetOption, std::string("Consumer Offset Reset Policy"), false, "offset", true, false));

SWA::Process::getInstance().registerStartedListener(&startup);
}
Expand Down
6 changes: 6 additions & 0 deletions core-cpp/kafka/src/ProcessHandler.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "kafka/ProcessHandler.hh"

#include "kafka/Kafka.hh"
#include "kafka/Consumer.hh"

#include "swa/CommandLine.hh"
#include "swa/Process.hh"
Expand Down Expand Up @@ -62,6 +63,11 @@ std::string ProcessHandler::getTopicName(int domainId, int serviceId) {
return name;
}

void ProcessHandler::startConsumer() {
Consumer consumer(getTopicNames());
consumer.run();
}

ProcessHandler &ProcessHandler::getInstance() {
static ProcessHandler instance;
return instance;
Expand Down
16 changes: 14 additions & 2 deletions core-cpp/kafka/src/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Producer::Producer() {
prod = std::unique_ptr<cppkafka::Producer>(new cppkafka::Producer(config));
}

void Producer::publish(int domainId, int serviceId, std::string data, std::string partKey) {
void Producer::publish(int domainId, int serviceId, cppkafka::Buffer& data, cppkafka::Buffer& partKey) {
// find/create a message builder
std::shared_ptr<cppkafka::MessageBuilder> msgBuilder;
TopicLookup::iterator entry = topicLookup.find(std::make_pair(domainId, serviceId));
Expand All @@ -30,7 +30,7 @@ void Producer::publish(int domainId, int serviceId, std::string data, std::strin
}

// set the partion key
if (!partKey.empty()) {
if (partKey.begin() != partKey.end()) {
msgBuilder->key(partKey);
}

Expand All @@ -41,6 +41,18 @@ void Producer::publish(int domainId, int serviceId, std::string data, std::strin
prod->produce(*msgBuilder);
}

void Producer::publish(int domainId, int serviceId, std::vector<std::uint8_t> data, std::vector<std::uint8_t> partKey) {
cppkafka::Buffer dataBuffer = cppkafka::Buffer(data);
cppkafka::Buffer keyBuffer = cppkafka::Buffer(partKey);
publish(domainId, serviceId, dataBuffer, keyBuffer);
}

void Producer::publish(int domainId, int serviceId, std::string data, std::string partKey) {
cppkafka::Buffer dataBuffer = cppkafka::Buffer(data);
cppkafka::Buffer keyBuffer = cppkafka::Buffer(partKey);
publish(domainId, serviceId, dataBuffer, keyBuffer);
}

Producer &Producer::getInstance() {
static Producer instance;
return instance;
Expand Down
4 changes: 4 additions & 0 deletions core-java/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ libs
resources
scripts
tmp
.classpath
.project
.settings/
bin/
Loading

0 comments on commit e122983

Please sign in to comment.