From e53d221a547bdb49d0167231c112dfb2cb4e513e Mon Sep 17 00:00:00 2001 From: Levi Starrett Date: Mon, 3 Jun 2024 09:48:29 -0400 Subject: [PATCH 1/4] Add command line configuration option for auto.offset.reset in Kafka utility --- core-cpp/kafka/include/kafka/Kafka.hh | 2 ++ core-cpp/kafka/src/Consumer.cc | 2 ++ core-cpp/kafka/src/Kafka.cc | 8 +++++--- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core-cpp/kafka/include/kafka/Kafka.hh b/core-cpp/kafka/include/kafka/Kafka.hh index 6525451f..7d193004 100644 --- a/core-cpp/kafka/include/kafka/Kafka.hh +++ b/core-cpp/kafka/include/kafka/Kafka.hh @@ -6,6 +6,8 @@ namespace Kafka { extern const char *const BrokersOption; extern const char *const GroupIdOption; extern const char *const NamespaceOption; +extern const char *const NamespaceOption; +extern const char *const OffsetResetOption; } diff --git a/core-cpp/kafka/src/Consumer.cc b/core-cpp/kafka/src/Consumer.cc index 1e18ded9..d7ba011c 100644 --- a/core-cpp/kafka/src/Consumer.cc +++ b/core-cpp/kafka/src/Consumer.cc @@ -21,6 +21,7 @@ void Consumer::run() { // Get command line options const std::string brokers = SWA::CommandLine::getInstance().getOption(BrokersOption); + const std::string offsetReset = SWA::CommandLine::getInstance().getOption(OffsetResetOption, "earliest"); std::string groupId; if (SWA::CommandLine::getInstance().optionPresent(GroupIdOption)) { @@ -37,6 +38,7 @@ void Consumer::run() { // Construct the configuration cppkafka::Configuration config = {{"metadata.broker.list", brokers}, {"group.id", groupId}, + {"auto.offset.reset", offsetReset}, {"enable.auto.commit", false}}; // Create the consumer diff --git a/core-cpp/kafka/src/Kafka.cc b/core-cpp/kafka/src/Kafka.cc index fc3c1ae9..4dcc43d2 100644 --- a/core-cpp/kafka/src/Kafka.cc +++ b/core-cpp/kafka/src/Kafka.cc @@ -12,9 +12,10 @@ namespace Kafka { -const char *const BrokersOption = "-kafka-broker-list"; -const char *const GroupIdOption = "-kafka-group-id"; -const char *const NamespaceOption = "-kafka-namespace"; +const char *const BrokersOption = "-kafka-broker-list"; +const char *const GroupIdOption = "-kafka-group-id"; +const char *const NamespaceOption = "-kafka-namespace"; +const char *const OffsetResetOption = "-kafka-offset-reset"; bool startup() { if (ProcessHandler::getInstance().hasRegisteredServices()) { @@ -35,6 +36,7 @@ struct Init { SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(BrokersOption, std::string("Kafka Brokers"), true, "brokerList", true, false)); SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(GroupIdOption, std::string("Kafka Group ID"), false, "groupId", true, false)); SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(NamespaceOption, std::string("Kafka Topic Namespace"), false, "namespace", true, false)); + SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(OffsetResetOption, std::string("Consumer Offset Reset Policy"), false, "offset", true, false)); SWA::Process::getInstance().registerStartedListener(&startup); } From 2354a8fbce0fe4058f157c3ac5da9076d02d2873 Mon Sep 17 00:00:00 2001 From: Levi Starrett Date: Mon, 3 Jun 2024 11:19:33 -0400 Subject: [PATCH 2/4] Update kafka utility inputs/outputs to be byte vectors rather than strings --- core-cpp/kafka/include/kafka/Producer.hh | 2 ++ core-cpp/kafka/include/kafka/ServiceHandler.hh | 2 +- core-cpp/kafka/src/Consumer.cc | 2 +- core-cpp/kafka/src/Producer.cc | 16 ++++++++++++++-- .../masl/translate/kafka/ServiceTranslator.java | 6 +++--- 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/core-cpp/kafka/include/kafka/Producer.hh b/core-cpp/kafka/include/kafka/Producer.hh index addfb66d..f2cddacb 100644 --- a/core-cpp/kafka/include/kafka/Producer.hh +++ b/core-cpp/kafka/include/kafka/Producer.hh @@ -12,6 +12,7 @@ class Producer { public: Producer(); + void publish(int domainId, int serviceId, std::vector data, std::vector partKey); void publish(int domainId, int serviceId, std::string data, std::string partKey); static Producer &getInstance(); @@ -20,6 +21,7 @@ private: typedef std::map> TopicLookup; TopicLookup topicLookup; std::unique_ptr prod; + void publish(int domainId, int serviceId, cppkafka::Buffer& data, cppkafka::Buffer& partKey); }; } // namespace Kafka diff --git a/core-cpp/kafka/include/kafka/ServiceHandler.hh b/core-cpp/kafka/include/kafka/ServiceHandler.hh index 1bc1b8e7..f2adb1ad 100644 --- a/core-cpp/kafka/include/kafka/ServiceHandler.hh +++ b/core-cpp/kafka/include/kafka/ServiceHandler.hh @@ -10,7 +10,7 @@ typedef std::function Callable; class ServiceHandler { public: - virtual Callable getInvoker(std::string data) const { + virtual Callable getInvoker(std::vector data) const { return Callable(); } virtual ~ServiceHandler(); diff --git a/core-cpp/kafka/src/Consumer.cc b/core-cpp/kafka/src/Consumer.cc index d7ba011c..98032887 100644 --- a/core-cpp/kafka/src/Consumer.cc +++ b/core-cpp/kafka/src/Consumer.cc @@ -90,7 +90,7 @@ void Consumer::handleMessages(cppkafka::Consumer& consumer) { // TODO maybe this is the right spot to handle errors and check conditions on the msg instance? // get the service invoker - Callable service = ProcessHandler::getInstance().getServiceHandler(msg.get_topic()).getInvoker(std::string(msg.get_payload())); + Callable service = ProcessHandler::getInstance().getServiceHandler(msg.get_topic()).getInvoker(std::vector(msg.get_payload())); // run the service service(); diff --git a/core-cpp/kafka/src/Producer.cc b/core-cpp/kafka/src/Producer.cc index 933117e8..3fd2e009 100644 --- a/core-cpp/kafka/src/Producer.cc +++ b/core-cpp/kafka/src/Producer.cc @@ -16,7 +16,7 @@ Producer::Producer() { prod = std::unique_ptr(new cppkafka::Producer(config)); } -void Producer::publish(int domainId, int serviceId, std::string data, std::string partKey) { +void Producer::publish(int domainId, int serviceId, cppkafka::Buffer& data, cppkafka::Buffer& partKey) { // find/create a message builder std::shared_ptr msgBuilder; TopicLookup::iterator entry = topicLookup.find(std::make_pair(domainId, serviceId)); @@ -30,7 +30,7 @@ void Producer::publish(int domainId, int serviceId, std::string data, std::strin } // set the partion key - if (!partKey.empty()) { + if (partKey.begin() != partKey.end()) { msgBuilder->key(partKey); } @@ -41,6 +41,18 @@ void Producer::publish(int domainId, int serviceId, std::string data, std::strin prod->produce(*msgBuilder); } +void Producer::publish(int domainId, int serviceId, std::vector data, std::vector partKey) { + cppkafka::Buffer dataBuffer = cppkafka::Buffer(data); + cppkafka::Buffer keyBuffer = cppkafka::Buffer(partKey); + publish(domainId, serviceId, dataBuffer, keyBuffer); +} + +void Producer::publish(int domainId, int serviceId, std::string data, std::string partKey) { + cppkafka::Buffer dataBuffer = cppkafka::Buffer(data); + cppkafka::Buffer keyBuffer = cppkafka::Buffer(partKey); + publish(domainId, serviceId, dataBuffer, keyBuffer); +} + Producer &Producer::getInstance() { static Producer instance; return instance; diff --git a/core-java/src/main/java/org/xtuml/masl/translate/kafka/ServiceTranslator.java b/core-java/src/main/java/org/xtuml/masl/translate/kafka/ServiceTranslator.java index 16c63b63..493d353f 100644 --- a/core-java/src/main/java/org/xtuml/masl/translate/kafka/ServiceTranslator.java +++ b/core-java/src/main/java/org/xtuml/masl/translate/kafka/ServiceTranslator.java @@ -71,7 +71,7 @@ void addServiceHandler (final CodeFile codeFile) final DeclarationGroup functions = invokerClass.createDeclarationGroup(); final Function constructor = invokerClass.createConstructor(functions, Visibility.PUBLIC); constructor.declareInClass(true); - final Expression paramData = constructor.createParameter(new TypeUsage(Std.string), "param_data").asExpression(); + final Expression paramData = constructor.createParameter(new TypeUsage(Std.vector(new TypeUsage(Std.uint8))), "param_data").asExpression(); // create invoker function final Function invoker = invokerClass.createMemberFunction(functions, "operator()", Visibility.PUBLIC); @@ -97,7 +97,7 @@ void addServiceHandler (final CodeFile codeFile) paramType.getBasicType().getActualType() == ActualType.DEVICE || paramType.getBasicType().getActualType() == ActualType.ANY_INSTANCE)) { final Variable arg = invokerClass.createMemberVariable(vars, Mangler.mangleName(param), type, Visibility.PRIVATE); - Expression paramAccess = paramData; + Expression paramAccess = Std.string.callConstructor(new Function("begin").asFunctionCall(paramData, false), new Function("end").asFunctionCall(paramData, false)); if (!noParseJson) { paramAccess = NlohmannJson.get(service.getParameters().size() > 1 ? new ArrayAccess(paramJson.asExpression(), Literal.createStringLiteral(param.getName())) : paramJson.asExpression(), type); } @@ -124,7 +124,7 @@ void addServiceHandler (final CodeFile codeFile) codeFile.addClassDeclaration(invokerClass); // add implementation of 'getInvoker' to the file - final Expression paramData2 = getInvoker.createParameter(new TypeUsage(Std.string), "param_data").asExpression(); + final Expression paramData2 = getInvoker.createParameter(new TypeUsage(Std.vector(new TypeUsage(Std.uint8))), "param_data").asExpression(); getInvoker.getCode().appendStatement(new ReturnStatement(invokerClass.callConstructor(paramData2))); codeFile.addFunctionDefinition(getInvoker); From 7387d0f1359b0a4da505a0588b0c06ca4bc90360 Mon Sep 17 00:00:00 2001 From: Levi Starrett Date: Mon, 3 Jun 2024 11:44:35 -0400 Subject: [PATCH 3/4] Restructure to allow multiple consumers to be created --- core-cpp/kafka/include/kafka/Consumer.hh | 7 +++-- .../kafka/include/kafka/ProcessHandler.hh | 2 ++ core-cpp/kafka/src/Consumer.cc | 28 ++++++++----------- core-cpp/kafka/src/Kafka.cc | 2 +- core-cpp/kafka/src/ProcessHandler.cc | 6 ++++ 5 files changed, 25 insertions(+), 20 deletions(-) diff --git a/core-cpp/kafka/include/kafka/Consumer.hh b/core-cpp/kafka/include/kafka/Consumer.hh index f5a4fbbe..13267368 100644 --- a/core-cpp/kafka/include/kafka/Consumer.hh +++ b/core-cpp/kafka/include/kafka/Consumer.hh @@ -29,15 +29,16 @@ private: class Consumer { public: + Consumer(std::vector topics); void run(); - static Consumer &getInstance(); private: MessageQueue messageQueue; + std::unique_ptr consumer; - void handleMessages(cppkafka::Consumer& consumer); + void handleMessages(); - void createTopics(cppkafka::Consumer& consumer, std::vector topics); + void createTopics(std::vector topics); }; } // namespace Kafka diff --git a/core-cpp/kafka/include/kafka/ProcessHandler.hh b/core-cpp/kafka/include/kafka/ProcessHandler.hh index 2834f685..ca248530 100644 --- a/core-cpp/kafka/include/kafka/ProcessHandler.hh +++ b/core-cpp/kafka/include/kafka/ProcessHandler.hh @@ -27,6 +27,8 @@ public: std::string getTopicName(int domainId, int serviceId); + void startConsumer(); + static ProcessHandler &getInstance(); diff --git a/core-cpp/kafka/src/Consumer.cc b/core-cpp/kafka/src/Consumer.cc index 98032887..2976ef1b 100644 --- a/core-cpp/kafka/src/Consumer.cc +++ b/core-cpp/kafka/src/Consumer.cc @@ -17,8 +17,7 @@ namespace Kafka { -void Consumer::run() { - +Consumer::Consumer(std::vector topics) { // Get command line options const std::string brokers = SWA::CommandLine::getInstance().getOption(BrokersOption); const std::string offsetReset = SWA::CommandLine::getInstance().getOption(OffsetResetOption, "earliest"); @@ -42,19 +41,21 @@ void Consumer::run() { {"enable.auto.commit", false}}; // Create the consumer - cppkafka::Consumer consumer(config); + consumer = std::unique_ptr(new cppkafka::Consumer(config)); // create topics if they don't already exist - createTopics(consumer, ProcessHandler::getInstance().getTopicNames()); + createTopics(ProcessHandler::getInstance().getTopicNames()); // short delay to avoid race conditions if other processes initiated topic creation SWA::delay(SWA::Duration::fromMillis(100)); // Subscribe to topics - consumer.subscribe(ProcessHandler::getInstance().getTopicNames()); + consumer->subscribe(ProcessHandler::getInstance().getTopicNames()); +} +void Consumer::run() { // Create a consumer dispatcher - cppkafka::ConsumerDispatcher dispatcher(consumer); + cppkafka::ConsumerDispatcher dispatcher(*consumer); // Stop processing on SIGINT // TODO set up lifecycle event to stop dispatcher @@ -63,7 +64,7 @@ void Consumer::run() { // create a signal listener SWA::RealTimeSignalListener listener( - [this, &consumer](int pid, int uid) { this->handleMessages(consumer); }, + [this](int pid, int uid) { this->handleMessages(); }, SWA::Process::getInstance().getActivityMonitor()); // Now run the dispatcher, providing a callback to handle messages, one to @@ -80,7 +81,7 @@ void Consumer::run() { ); } -void Consumer::handleMessages(cppkafka::Consumer& consumer) { +void Consumer::handleMessages() { // drain the message queue if (!messageQueue.empty()) { std::vector msgs = messageQueue.dequeue_all(); @@ -96,12 +97,12 @@ void Consumer::handleMessages(cppkafka::Consumer& consumer) { service(); // commit offset - consumer.commit(msg); + consumer->commit(msg); } } } -void Consumer::createTopics(cppkafka::Consumer& consumer, std::vector topics) { +void Consumer::createTopics(std::vector topics) { // TODO clean up error handling in this routine for (auto it = topics.begin(); it != topics.end(); it++) { @@ -109,7 +110,7 @@ void Consumer::createTopics(cppkafka::Consumer& consumer, std::vectorget_handle(); rd_kafka_NewTopic_t *newt[1]; const size_t newt_cnt = 1; rd_kafka_AdminOptions_t *options; @@ -171,11 +172,6 @@ void Consumer::createTopics(cppkafka::Consumer& consumer, std::vector vec(data.begin(), data.end()); */ diff --git a/core-cpp/kafka/src/Kafka.cc b/core-cpp/kafka/src/Kafka.cc index 4dcc43d2..dc2c369b 100644 --- a/core-cpp/kafka/src/Kafka.cc +++ b/core-cpp/kafka/src/Kafka.cc @@ -20,7 +20,7 @@ const char *const OffsetResetOption = "-kafka-offset-reset"; bool startup() { if (ProcessHandler::getInstance().hasRegisteredServices()) { // only start the consumer if there are registered services - std::thread{[] { Consumer::getInstance().run(); }}.detach(); + std::thread{[] { ProcessHandler::getInstance().startConsumer(); }}.detach(); } return true; diff --git a/core-cpp/kafka/src/ProcessHandler.cc b/core-cpp/kafka/src/ProcessHandler.cc index 1d6fd734..1d4e25c5 100644 --- a/core-cpp/kafka/src/ProcessHandler.cc +++ b/core-cpp/kafka/src/ProcessHandler.cc @@ -1,6 +1,7 @@ #include "kafka/ProcessHandler.hh" #include "kafka/Kafka.hh" +#include "kafka/Consumer.hh" #include "swa/CommandLine.hh" #include "swa/Process.hh" @@ -62,6 +63,11 @@ std::string ProcessHandler::getTopicName(int domainId, int serviceId) { return name; } +void ProcessHandler::startConsumer() { + Consumer consumer(getTopicNames()); + consumer.run(); +} + ProcessHandler &ProcessHandler::getInstance() { static ProcessHandler instance; return instance; From b43276ccca240ae5546c86c215e7b6080ff5c474 Mon Sep 17 00:00:00 2001 From: Levi Starrett Date: Wed, 5 Jun 2024 10:27:47 -0400 Subject: [PATCH 4/4] Extend Kafka utility to allow marking outbound terminator services as polled kafka topics --- core-cpp/kafka/CMakeLists.txt | 2 + core-cpp/kafka/include/kafka/Consumer.hh | 6 +- core-cpp/kafka/include/kafka/DataConsumer.hh | 18 ++ core-cpp/kafka/include/kafka/Producer.hh | 2 - .../kafka/include/kafka/ServiceHandler.hh | 3 +- core-cpp/kafka/src/Consumer.cc | 25 +- core-cpp/kafka/src/DataConsumer.cc | 7 + core-java/.gitignore | 4 + .../kafka/DomainServiceTranslator.java | 271 ++++++++++++++++++ .../DomainTerminatorServiceTranslator.java | 156 ++++++++++ .../translate/kafka/DomainTranslator.java | 157 +++++----- .../org/xtuml/masl/translate/kafka/Kafka.java | 32 +-- .../translate/kafka/ServiceTranslator.java | 261 ++++------------- 13 files changed, 628 insertions(+), 316 deletions(-) create mode 100644 core-cpp/kafka/include/kafka/DataConsumer.hh create mode 100644 core-cpp/kafka/src/DataConsumer.cc create mode 100644 core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainServiceTranslator.java create mode 100644 core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainTerminatorServiceTranslator.java diff --git a/core-cpp/kafka/CMakeLists.txt b/core-cpp/kafka/CMakeLists.txt index 0bbd0c3a..2d9563a9 100644 --- a/core-cpp/kafka/CMakeLists.txt +++ b/core-cpp/kafka/CMakeLists.txt @@ -9,6 +9,7 @@ simple_add_shared_library ( NAME Kafka SOURCES Consumer.cc + DataConsumer.cc Kafka.cc ProcessHandler.cc Producer.cc @@ -24,6 +25,7 @@ simple_add_shared_library ( EXPORT MaslCore INCLUDES kafka/Consumer.hh + kafka/DataConsumer.hh kafka/Kafka.hh kafka/ProcessHandler.hh kafka/Producer.hh diff --git a/core-cpp/kafka/include/kafka/Consumer.hh b/core-cpp/kafka/include/kafka/Consumer.hh index 13267368..85f36b7a 100644 --- a/core-cpp/kafka/include/kafka/Consumer.hh +++ b/core-cpp/kafka/include/kafka/Consumer.hh @@ -4,6 +4,8 @@ #include "cppkafka/consumer.h" #include "cppkafka/message.h" +#include "DataConsumer.hh" + #include #include #include @@ -29,15 +31,17 @@ private: class Consumer { public: + Consumer(std::string topic); Consumer(std::vector topics); + bool consumeOne(DataConsumer& dataConsumer); void run(); private: MessageQueue messageQueue; std::unique_ptr consumer; + void initialize(std::vector topics); void handleMessages(); - void createTopics(std::vector topics); }; diff --git a/core-cpp/kafka/include/kafka/DataConsumer.hh b/core-cpp/kafka/include/kafka/DataConsumer.hh new file mode 100644 index 00000000..43848312 --- /dev/null +++ b/core-cpp/kafka/include/kafka/DataConsumer.hh @@ -0,0 +1,18 @@ +#ifndef Kafka_DataConsumer_HH +#define Kafka_DataConsumer_HH + +#include +#include + +namespace Kafka { + +class DataConsumer { +public: + virtual void accept(std::vector data) const { + } + virtual ~DataConsumer(); +}; + +} // namespace Kafka + +#endif diff --git a/core-cpp/kafka/include/kafka/Producer.hh b/core-cpp/kafka/include/kafka/Producer.hh index f2cddacb..a3156333 100644 --- a/core-cpp/kafka/include/kafka/Producer.hh +++ b/core-cpp/kafka/include/kafka/Producer.hh @@ -1,8 +1,6 @@ #ifndef Kafka_Producer_HH #define Kafka_Producer_HH -#include - #include "cppkafka/message_builder.h" #include "cppkafka/producer.h" diff --git a/core-cpp/kafka/include/kafka/ServiceHandler.hh b/core-cpp/kafka/include/kafka/ServiceHandler.hh index f2adb1ad..28f7b8d1 100644 --- a/core-cpp/kafka/include/kafka/ServiceHandler.hh +++ b/core-cpp/kafka/include/kafka/ServiceHandler.hh @@ -1,8 +1,9 @@ #ifndef Kafka_ServiceHandler_HH #define Kafka_ServiceHandler_HH -#include +#include #include +#include namespace Kafka { diff --git a/core-cpp/kafka/src/Consumer.cc b/core-cpp/kafka/src/Consumer.cc index 2976ef1b..d5e48ec1 100644 --- a/core-cpp/kafka/src/Consumer.cc +++ b/core-cpp/kafka/src/Consumer.cc @@ -18,6 +18,16 @@ namespace Kafka { Consumer::Consumer(std::vector topics) { + initialize(topics); +} + +Consumer::Consumer(std::string topic) { + std::vector topics; + topics.push_back(topic); + initialize(topics); +} + +void Consumer::initialize(std::vector topics) { // Get command line options const std::string brokers = SWA::CommandLine::getInstance().getOption(BrokersOption); const std::string offsetReset = SWA::CommandLine::getInstance().getOption(OffsetResetOption, "earliest"); @@ -44,13 +54,13 @@ Consumer::Consumer(std::vector topics) { consumer = std::unique_ptr(new cppkafka::Consumer(config)); // create topics if they don't already exist - createTopics(ProcessHandler::getInstance().getTopicNames()); + createTopics(topics); // short delay to avoid race conditions if other processes initiated topic creation SWA::delay(SWA::Duration::fromMillis(100)); // Subscribe to topics - consumer->subscribe(ProcessHandler::getInstance().getTopicNames()); + consumer->subscribe(topics); } void Consumer::run() { @@ -102,6 +112,17 @@ void Consumer::handleMessages() { } } +bool Consumer::consumeOne(DataConsumer& dataConsumer) { + cppkafka::Message msg = consumer->poll(); + if (msg) { + dataConsumer.accept(std::vector(msg.get_payload())); + consumer->commit(msg); + return true; + } else { + return false; + } +} + void Consumer::createTopics(std::vector topics) { // TODO clean up error handling in this routine for (auto it = topics.begin(); it != topics.end(); it++) { diff --git a/core-cpp/kafka/src/DataConsumer.cc b/core-cpp/kafka/src/DataConsumer.cc new file mode 100644 index 00000000..81b02378 --- /dev/null +++ b/core-cpp/kafka/src/DataConsumer.cc @@ -0,0 +1,7 @@ +#include "kafka/DataConsumer.hh" + +namespace Kafka { + +DataConsumer::~DataConsumer() {} + +} // namespace Kafka diff --git a/core-java/.gitignore b/core-java/.gitignore index cac48bb0..d194ce91 100644 --- a/core-java/.gitignore +++ b/core-java/.gitignore @@ -6,3 +6,7 @@ libs resources scripts tmp +.classpath +.project +.settings/ +bin/ diff --git a/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainServiceTranslator.java b/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainServiceTranslator.java new file mode 100644 index 00000000..75d0964b --- /dev/null +++ b/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainServiceTranslator.java @@ -0,0 +1,271 @@ +package org.xtuml.masl.translate.kafka; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.xtuml.masl.cppgen.ArrayAccess; +import org.xtuml.masl.cppgen.BinaryExpression; +import org.xtuml.masl.cppgen.BinaryOperator; +import org.xtuml.masl.cppgen.Class; +import org.xtuml.masl.cppgen.CodeFile; +import org.xtuml.masl.cppgen.DeclarationGroup; +import org.xtuml.masl.cppgen.Expression; +import org.xtuml.masl.cppgen.ExpressionStatement; +import org.xtuml.masl.cppgen.Function; +import org.xtuml.masl.cppgen.FunctionObjectCall; +import org.xtuml.masl.cppgen.FundamentalType; +import org.xtuml.masl.cppgen.Literal; +import org.xtuml.masl.cppgen.NewExpression; +import org.xtuml.masl.cppgen.ReturnStatement; +import org.xtuml.masl.cppgen.Std; +import org.xtuml.masl.cppgen.TypeUsage; +import org.xtuml.masl.cppgen.TypedefType; +import org.xtuml.masl.cppgen.Variable; +import org.xtuml.masl.cppgen.VariableDefinitionStatement; +import org.xtuml.masl.cppgen.Visibility; +import org.xtuml.masl.metamodel.common.ParameterDefinition; +import org.xtuml.masl.metamodel.domain.DomainService; +import org.xtuml.masl.translate.main.Architecture; +import org.xtuml.masl.translate.main.Mangler; +import org.xtuml.masl.translate.main.NlohmannJson; +import org.xtuml.masl.translate.main.ParameterTranslator; +import org.xtuml.masl.translate.main.Types; + +public class DomainServiceTranslator extends ServiceTranslator { + + private final CodeFile consumerCodeFile; + private final CodeFile publisherCodeFile; + + private Class handlerClass; + private Class invokerClass; + private Function getInvokerFn; + private Variable topicNameSet; + private Variable consumerRegisteredVar; + private Variable publisherRegisteredVar; + private Function publishFn; + + DomainServiceTranslator(DomainService service, DomainTranslator domainTranslator, CodeFile consumerCodeFile, + CodeFile publisherCodeFile) { + super(service, domainTranslator); + this.consumerCodeFile = consumerCodeFile; + this.publisherCodeFile = publisherCodeFile; + } + + @Override + DomainService getService() { + return (DomainService) super.getService(); + } + + @Override + List getFilePopulators() { + return List.of(() -> consumerCodeFile.addClassDeclaration(handlerClass), + () -> consumerCodeFile.addClassDeclaration(invokerClass), + () -> consumerCodeFile.addFunctionDefinition(getInvokerFn), + topicNameSet != null ? () -> consumerCodeFile.addVariableDefinition(topicNameSet) : () -> {}, + () -> consumerCodeFile.addVariableDefinition(consumerRegisteredVar), + () -> publisherCodeFile.addFunctionDefinition(publishFn), + topicNameSet != null ? () -> publisherCodeFile.addVariableDefinition(topicNameSet) : () -> {}, + () -> publisherCodeFile.addVariableDefinition(publisherRegisteredVar)); + } + + @Override + void translate() { + translateConsumer(); + translatePublisher(); + translateCustomTopicName(); + } + + void translateConsumer() { + // create and add the handler class to the file + handlerClass = new Class(Mangler.mangleName(getService()) + "Handler", getDomainNamespace()); + handlerClass.addSuperclass(Kafka.serviceHandlerClass, Visibility.PUBLIC); + final DeclarationGroup group = handlerClass.createDeclarationGroup(); + getInvokerFn = handlerClass.createMemberFunction(group, "getInvoker", Visibility.PUBLIC); + getInvokerFn.setReturnType(new TypeUsage(Kafka.callable)); + getInvokerFn.setConst(true); + + // create the invoker class + invokerClass = new Class(Mangler.mangleName(getService()) + "Invoker", getDomainNamespace()); + final DeclarationGroup functions = invokerClass.createDeclarationGroup(); + final Function constructor = invokerClass.createConstructor(functions, Visibility.PUBLIC); + constructor.declareInClass(true); + final Expression paramData = constructor + .createParameter(new TypeUsage(Std.vector(new TypeUsage(Std.uint8))), "param_data").asExpression(); + + // create invoker function + final Function invoker = invokerClass.createMemberFunction(functions, "operator()", Visibility.PUBLIC); + invoker.declareInClass(true); + + // if the service has a single string parameter, just pass through + final boolean noParseJson = hasSingleStringParameter(getService()); + + // handle parameters + final List invokeArgs = new ArrayList<>(); + final DeclarationGroup vars = invokerClass.createDeclarationGroup(); + final Variable paramJson = new Variable(new TypeUsage(NlohmannJson.json), "params", + NlohmannJson.parse(paramData)); + if (!noParseJson && getService().getParameters().stream().map(p -> p.getType().getBasicType()) + .anyMatch(ServiceTranslator::isTypeSerializable)) { + // Only build a JSON object if it will be used + constructor.getCode().appendStatement(new VariableDefinitionStatement(paramJson)); + } + for (final ParameterDefinition param : getService().getParameters()) { + final TypeUsage type = Types.getInstance().getType(param.getType()); + if (isTypeSerializable(param.getType().getBasicType())) { + final Variable arg = invokerClass.createMemberVariable(vars, Mangler.mangleName(param), type, + Visibility.PRIVATE); + Expression paramAccess = Std.string.callConstructor( + new Function("begin").asFunctionCall(paramData, false), + new Function("end").asFunctionCall(paramData, false)); + if (!noParseJson) { + paramAccess = NlohmannJson.get(getService().getParameters().size() > 1 + ? new ArrayAccess(paramJson.asExpression(), Literal.createStringLiteral(param.getName())) + : paramJson.asExpression(), type); + } + constructor.getCode().appendStatement( + new BinaryExpression(arg.asExpression(), BinaryOperator.ASSIGN, paramAccess).asStatement()); + invokeArgs.add(arg.asExpression()); + } else { + final Variable arg = new Variable(type, Mangler.mangleName(param)); + invoker.getCode().appendStatement(arg.asStatement()); + invokeArgs.add(arg.asExpression()); + } + } + + // create the call to the service interceptor + final org.xtuml.masl.translate.main.DomainServiceTranslator serviceTranslator = org.xtuml.masl.translate.main.DomainServiceTranslator + .getInstance(getService()); + final TypedefType serviceInterceptor = serviceTranslator.getServiceInterceptor(); + final Function serviceFunction = new Function("callService"); + final Expression instanceFnCall = serviceInterceptor.asClass().callStaticFunction("instance"); + Expression invokeExpression = new FunctionObjectCall(serviceFunction.asFunctionCall(instanceFnCall, false), + invokeArgs); + invoker.getCode().appendStatement(invokeExpression.asStatement()); + + // add implementation of 'getInvoker' to the file + final Expression paramData2 = getInvokerFn + .createParameter(new TypeUsage(Std.vector(new TypeUsage(Std.uint8))), "param_data").asExpression(); + getInvokerFn.getCode().appendStatement(new ReturnStatement(invokerClass.callConstructor(paramData2))); + + // void addTopicRegistration(final CodeFile codeFile) { + final Expression processHandler = Kafka.processHandlerClass.callStaticFunction("getInstance"); + final Expression domainId = new Function("getId") + .asFunctionCall(new Function("getDomain").asFunctionCall(Architecture.process, false, + Literal.createStringLiteral(getDomainTranslator().getDomain().getName())), false); + final Expression serviceId = org.xtuml.masl.translate.main.DomainServiceTranslator + .getInstance((DomainService) getService()).getServiceId(); + final Expression handler = Std.shared_ptr(new TypeUsage(handlerClass)) + .callConstructor(new NewExpression(new TypeUsage(handlerClass))); + final Function registerServiceFunc = new Function("registerServiceHandler"); + final Expression registerService = registerServiceFunc.asFunctionCall(processHandler, false, domainId, + serviceId, handler); + consumerRegisteredVar = new Variable(new TypeUsage(FundamentalType.BOOL), + Mangler.mangleName(getService()) + "_registered", getDomainNamespace(), registerService); + } + + void translatePublisher() { + // create service function + publishFn = new Function(Mangler.mangleName(getService()), getDomainNamespace()); + publishFn.setReturnType(getDomainTranslator().getTypes().getType(getService().getReturnType())); + + // if the service has a single string parameter, just pass through + final boolean noParseJson = hasSingleStringParameter(getService()); + + // create output buffer + final Variable paramData = new Variable(new TypeUsage(NlohmannJson.json), "param_data"); + if (!noParseJson) { + // Only build a JSON object if it will be used + publishFn.getCode().appendStatement(new VariableDefinitionStatement(paramData)); + } + + // create partition key buffer + final Variable partKey = new Variable(new TypeUsage(NlohmannJson.json), "part_key"); + final boolean includePartKey = getService().getParameters().stream().anyMatch( + param -> getService().getDeclarationPragmas().hasPragma(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA) + && getService().getDeclarationPragmas() + .getPragmaValues(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA) + .contains(param.getName())); + if (includePartKey) { + publishFn.getCode().appendStatement(new VariableDefinitionStatement(partKey)); + } + + // handle parameters + final Map paramTranslators = new HashMap<>(); + for (final ParameterDefinition param : getService().getParameters()) { + final ParameterTranslator paramTrans = new ParameterTranslator(param, publishFn); + paramTranslators.put(param, paramTrans); + final Expression jsonAccess = getService().getParameters().size() > 1 + ? new ArrayAccess(paramData.asExpression(), Literal.createStringLiteral(param.getName())) + : paramData.asExpression(); + final Expression writeExpr = new BinaryExpression(jsonAccess, BinaryOperator.ASSIGN, + paramTrans.getVariable().asExpression()); + if (!noParseJson) { + publishFn.getCode().appendStatement(new ExpressionStatement(writeExpr)); + } + if (getService().getDeclarationPragmas().hasPragma(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA) + && getService().getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA) + .contains(param.getName())) { + final Expression keyJsonAccess = getService().getDeclarationPragmas() + .getPragmaValues(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA).size() > 1 + ? new ArrayAccess(partKey.asExpression(), Literal.createStringLiteral(param.getName())) + : partKey.asExpression(); + final Expression keyWriteExpr = new BinaryExpression(keyJsonAccess, BinaryOperator.ASSIGN, + paramTrans.getVariable().asExpression()); + publishFn.getCode().appendStatement(new ExpressionStatement(keyWriteExpr)); + } + } + + // call publisher + final Expression producer = Kafka.producerClass.callStaticFunction("getInstance"); + final Function publishFunc = new Function("publish"); + final Expression domainId = new Function("getId") + .asFunctionCall(new Function("getDomain").asFunctionCall(Architecture.process, false, + Literal.createStringLiteral(getDomainTranslator().getDomain().getName())), false); + final Expression serviceId = org.xtuml.masl.translate.main.DomainServiceTranslator + .getInstance((DomainService) getService()).getServiceId(); + final Expression publishExpr = publishFunc.asFunctionCall(producer, false, domainId, serviceId, + noParseJson + ? Std.string.callConstructor( + paramTranslators.get(getService().getParameters().get(0)).getVariable().asExpression()) + : NlohmannJson.dump(paramData.asExpression()), + includePartKey ? NlohmannJson.dump(partKey.asExpression()) : Literal.createStringLiteral("")); + publishFn.getCode().appendStatement(new ExpressionStatement(publishExpr)); + + // add service registration + final org.xtuml.masl.translate.main.DomainServiceTranslator serviceTranslator = org.xtuml.masl.translate.main.DomainServiceTranslator + .getInstance((DomainService) getService()); + final TypedefType serviceInterceptor = serviceTranslator.getServiceInterceptor(); + final Expression interceptorFnCall = serviceInterceptor.asClass().callStaticFunction("instance"); + final Function registerFunction = new Function("registerLocal"); + final Expression initialValue = registerFunction.asFunctionCall(interceptorFnCall, false, + publishFn.asFunctionPointer()); + publisherRegisteredVar = new Variable(new TypeUsage(FundamentalType.BOOL, TypeUsage.Const), + "localServiceRegistration_" + Mangler.mangleName(getService()), getDomainNamespace(), initialValue); + publisherRegisteredVar.setStatic(true); + } + + void translateCustomTopicName() { + if (getService().getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_TOPIC_PRAGMA).size() == 1) { + final String topicNameString = getService().getDeclarationPragmas() + .getPragmaValues(DomainTranslator.KAFKA_TOPIC_PRAGMA).get(0); + if (!isBoolean(topicNameString) && !isNumeric(topicNameString)) { + final Expression processHandler = Kafka.processHandlerClass.callStaticFunction("getInstance"); + final Expression domainId = new Function("getId") + .asFunctionCall( + new Function("getDomain").asFunctionCall(Architecture.process, false, + Literal.createStringLiteral(getDomainTranslator().getDomain().getName())), + false); + final Expression serviceId = org.xtuml.masl.translate.main.DomainServiceTranslator + .getInstance((DomainService) getService()).getServiceId(); + final Expression topicName = Literal.createStringLiteral(topicNameString); + final Function setTopicNameFunc = new Function("setCustomTopicName"); + final Expression setTopicName = setTopicNameFunc.asFunctionCall(processHandler, false, domainId, + serviceId, topicName); + topicNameSet = new Variable(new TypeUsage(FundamentalType.BOOL), + Mangler.mangleName(getService()) + "_topic_name_set", getDomainNamespace(), setTopicName); + } + } + } +} diff --git a/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainTerminatorServiceTranslator.java b/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainTerminatorServiceTranslator.java new file mode 100644 index 00000000..d4492af6 --- /dev/null +++ b/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainTerminatorServiceTranslator.java @@ -0,0 +1,156 @@ +package org.xtuml.masl.translate.kafka; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +import org.xtuml.masl.cppgen.ArrayAccess; +import org.xtuml.masl.cppgen.BinaryExpression; +import org.xtuml.masl.cppgen.BinaryOperator; +import org.xtuml.masl.cppgen.Class; +import org.xtuml.masl.cppgen.CodeFile; +import org.xtuml.masl.cppgen.DeclarationGroup; +import org.xtuml.masl.cppgen.Expression; +import org.xtuml.masl.cppgen.Function; +import org.xtuml.masl.cppgen.FundamentalType; +import org.xtuml.masl.cppgen.Literal; +import org.xtuml.masl.cppgen.ReturnStatement; +import org.xtuml.masl.cppgen.Std; +import org.xtuml.masl.cppgen.TypeUsage; +import org.xtuml.masl.cppgen.Variable; +import org.xtuml.masl.cppgen.VariableDefinitionStatement; +import org.xtuml.masl.cppgen.Visibility; +import org.xtuml.masl.metamodel.common.ParameterDefinition; +import org.xtuml.masl.metamodel.common.ParameterDefinition.Mode; +import org.xtuml.masl.metamodel.domain.DomainTerminatorService; +import org.xtuml.masl.metamodel.type.BasicType; +import org.xtuml.masl.metamodel.type.TypeDefinition.ActualType; +import org.xtuml.masl.metamodelImpl.type.StringType; +import org.xtuml.masl.translate.main.Mangler; +import org.xtuml.masl.translate.main.NlohmannJson; +import org.xtuml.masl.translate.main.ParameterTranslator; +import org.xtuml.masl.translate.main.TerminatorServiceTranslator; +import org.xtuml.masl.translate.main.Types; + +public class DomainTerminatorServiceTranslator extends ServiceTranslator { + + private final CodeFile codeFile; + + private Function acceptFn; + private Variable consumer; + private Function overrider; + private Variable register; + private Class consumerClass; + + DomainTerminatorServiceTranslator(DomainTerminatorService service, DomainTranslator domainTranslator, + CodeFile codeFile) { + super(service, domainTranslator); + this.codeFile = codeFile; + } + + @Override + DomainTerminatorService getService() { + return (DomainTerminatorService) super.getService(); + } + + @Override + List getFilePopulators() { + return List.of(() -> codeFile.addClassDeclaration(consumerClass), + () -> codeFile.addVariableDefinition(consumer), () -> codeFile.addFunctionDefinition(acceptFn), + () -> codeFile.addFunctionDefinition(overrider), () -> codeFile.addVariableDefinition(register)); + } + + @Override + void translate() { + + // if the service has a single string parameter, just pass through without + // parsing JSON + final boolean noParseJson = getService().getParameters().size() == 1 + && getService().getParameters().get(0).getType().isAssignableFrom(StringType.createAnonymous()); + + // create and add the consumer class to the file + consumerClass = new Class(Mangler.mangleName(getService()) + "Consumer", getDomainNamespace()); + consumerClass.addSuperclass(Kafka.dataConsumerClass, Visibility.PUBLIC); + final DeclarationGroup functions = consumerClass.createDeclarationGroup(); + + // create the constructor + final Function constructor = consumerClass.createConstructor(functions, Visibility.PUBLIC); + constructor.declareInClass(true); + + // create the accept function + acceptFn = consumerClass.createMemberFunction(functions, "accept", Visibility.PUBLIC); + final Expression data = acceptFn.createParameter(new TypeUsage(Std.vector(new TypeUsage(Std.uint8))), "data") + .asExpression(); + acceptFn.setConst(true); + final Predicate typeIsSerializable = paramType -> !(paramType.getBasicType() + .getActualType() == ActualType.EVENT || paramType.getBasicType().getActualType() == ActualType.DEVICE + || paramType.getBasicType().getActualType() == ActualType.ANY_INSTANCE); + final Variable paramJson = new Variable(new TypeUsage(NlohmannJson.json), "params", NlohmannJson.parse(data)); + if (!noParseJson && getService().getParameters().stream().map(p -> p.getType().getBasicType()) + .anyMatch(typeIsSerializable)) { + // Only build a JSON object if it will be used + acceptFn.getCode().appendStatement(new VariableDefinitionStatement(paramJson)); + } + + // create the overrider function + overrider = new Function(Mangler.mangleName(getService()), getDomainNamespace()); + overrider.setReturnType(Types.getInstance().getType(getService().getReturnType())); + + final List consumerArgs = new ArrayList<>(); + final DeclarationGroup vars = consumerClass.createDeclarationGroup(); + for (final ParameterDefinition param : getService().getParameters()) { + final TypeUsage type = Types.getInstance().getType(param.getType()); + + // add the parameter to the overrider function + final ParameterTranslator paramTrans = new ParameterTranslator(param, overrider); + + // only process "out" parameters + if (param.getMode() == Mode.OUT) { + + // capture each parameter as a member variable + final Variable constructorParam = constructor.createParameter(type.getReferenceType(), + Mangler.mangleName(param)); + final Variable memberVar = consumerClass.createMemberVariable(vars, Mangler.mangleName(param), + type.getReferenceType(), Visibility.PRIVATE); + constructor.setInitialValue(memberVar, constructorParam.asExpression()); + + // parse out each parameter and assign it to the member variable + if (typeIsSerializable.test(param.getType().getBasicType())) { + Expression paramAccess = Std.string.callConstructor( + new Function("begin").asFunctionCall(data, false), + new Function("end").asFunctionCall(data, false)); + if (!noParseJson) { + paramAccess = NlohmannJson + .get(getService().getParameters().size() > 1 + ? new ArrayAccess(paramJson.asExpression(), + Literal.createStringLiteral(param.getName())) + : paramJson.asExpression(), type); + } + acceptFn.getCode().appendStatement( + new BinaryExpression(memberVar.asExpression(), BinaryOperator.ASSIGN, paramAccess) + .asStatement()); + } + + // add to the list for the consumer call + consumerArgs.add(paramTrans.getVariable().asExpression()); + } + } + + // create consumer instance + consumer = new Variable(new TypeUsage(Kafka.consumerClass), "consumer_" + Mangler.mangleName(getService()), + getDomainNamespace(), + Kafka.consumerClass.callConstructor(Std.string.callConstructor(getTopicName(getService())))); + + // add the call to consume to the overrider + final Variable dataConsumer = new Variable(new TypeUsage(consumerClass), "dataConsumer", + consumerClass.callConstructor(consumerArgs)); + overrider.getCode().appendStatement(new VariableDefinitionStatement(dataConsumer)); + overrider.getCode().appendStatement(new ReturnStatement(new Function("consumeOne") + .asFunctionCall(consumer.asExpression(), false, dataConsumer.asExpression()))); + + // register the overrider function + register = new Variable(new TypeUsage(FundamentalType.BOOL), "register_" + Mangler.mangleName(getService()), + getDomainNamespace(), TerminatorServiceTranslator.getInstance(getService()).getRegisterOverride() + .asFunctionCall(overrider.asFunctionPointer())); + } +} diff --git a/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainTranslator.java b/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainTranslator.java index 09264a21..015b6ade 100644 --- a/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainTranslator.java +++ b/core-java/src/main/java/org/xtuml/masl/translate/kafka/DomainTranslator.java @@ -1,5 +1,6 @@ package org.xtuml.masl.translate.kafka; +import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -8,101 +9,91 @@ import org.xtuml.masl.cppgen.Namespace; import org.xtuml.masl.cppgen.SharedLibrary; import org.xtuml.masl.metamodel.domain.Domain; -import org.xtuml.masl.metamodel.type.TypeDeclaration; +import org.xtuml.masl.metamodelImpl.type.BooleanType; import org.xtuml.masl.translate.Alias; import org.xtuml.masl.translate.Default; import org.xtuml.masl.translate.main.Mangler; - @Alias("Kafka") @Default -public class DomainTranslator extends org.xtuml.masl.translate.DomainTranslator -{ - - public static final String KAFKA_TOPIC_PRAGMA = "kafka_topic"; - public static final String KAFKA_PARTITION_KEY_PRAGMA = "kafka_partition_key"; - - private final org.xtuml.masl.translate.main.DomainTranslator mainTranslator; - private final Namespace domainNamespace; - private final Library library; - private final Library interfaceLibrary; - - public static DomainTranslator getInstance ( final Domain domain ) - { - return getInstance(DomainTranslator.class, domain); - } - - private DomainTranslator ( final Domain domain ) - { - super(domain); - mainTranslator = org.xtuml.masl.translate.main.DomainTranslator.getInstance(domain); - domainNamespace = new Namespace(Mangler.mangleName(domain), Kafka.kafkaNamespace); - - - library = new SharedLibrary(mainTranslator.getLibrary().getName() + "_kafka").inBuildSet(mainTranslator.getBuildSet()).withCCDefaultExtensions(); - library.addDependency(Kafka.library); - library.addDependency(Kafka.cppkafkaLibrary); - library.addDependency(Kafka.rdkafkaLibrary); - - interfaceLibrary = new SharedLibrary(mainTranslator.getLibrary().getName() + "_if_kafka").inBuildSet(mainTranslator.getBuildSet()).withCCDefaultExtensions(); - interfaceLibrary.addDependency(Kafka.library); - interfaceLibrary.addDependency(Kafka.cppkafkaLibrary); - interfaceLibrary.addDependency(Kafka.rdkafkaLibrary); - } - - @Override - public void translate () - { - // create code files - final CodeFile consumerCodeFile = library.createBodyFile("Kafka" + Mangler.mangleFile(domain)); - final CodeFile publisherCodeFile = interfaceLibrary.createBodyFile("Kafka_publishers" + Mangler.mangleFile(domain)); - - // create service translators - final List serviceTranslators = domain.getServices().stream() - .filter(service -> service.getDeclarationPragmas().hasPragma(KAFKA_TOPIC_PRAGMA) && !service.isFunction() && !service.isExternal() && !service.isScenario()) - .map(service -> new ServiceTranslator(service, this)).collect(Collectors.toList()); - - - // translate service handlers - for (final ServiceTranslator serviceTranslator : serviceTranslators) - { - serviceTranslator.addServiceHandler(consumerCodeFile); - } +public class DomainTranslator extends org.xtuml.masl.translate.DomainTranslator { - // handle custom topics (consumer) - for (final ServiceTranslator serviceTranslator : serviceTranslators) - { - serviceTranslator.addCustomTopicName(consumerCodeFile); - } + public static final String KAFKA_TOPIC_PRAGMA = "kafka_topic"; + public static final String KAFKA_PARTITION_KEY_PRAGMA = "kafka_partition_key"; - // add topic registrations - for (final ServiceTranslator serviceTranslator : serviceTranslators) - { - serviceTranslator.addTopicRegistration(consumerCodeFile); - } + private final org.xtuml.masl.translate.main.DomainTranslator mainTranslator; + private final Namespace domainNamespace; + private final Library library; + private final Library interfaceLibrary; - // create publisher services - for (final ServiceTranslator serviceTranslator : serviceTranslators) - { - serviceTranslator.addPublisher(publisherCodeFile); + public static DomainTranslator getInstance(final Domain domain) { + return getInstance(DomainTranslator.class, domain); } - // handle custom topics (publisher) - for (final ServiceTranslator serviceTranslator : serviceTranslators) - { - serviceTranslator.addCustomTopicName(publisherCodeFile); + private DomainTranslator(final Domain domain) { + super(domain); + mainTranslator = org.xtuml.masl.translate.main.DomainTranslator.getInstance(domain); + domainNamespace = new Namespace(Mangler.mangleName(domain), Kafka.kafkaNamespace); + + library = new SharedLibrary(mainTranslator.getLibrary().getName() + "_kafka") + .inBuildSet(mainTranslator.getBuildSet()).withCCDefaultExtensions(); + library.addDependency(Kafka.library); + library.addDependency(Kafka.cppkafkaLibrary); + library.addDependency(Kafka.rdkafkaLibrary); + + interfaceLibrary = new SharedLibrary(mainTranslator.getLibrary().getName() + "_if_kafka") + .inBuildSet(mainTranslator.getBuildSet()).withCCDefaultExtensions(); + interfaceLibrary.addDependency(Kafka.library); + interfaceLibrary.addDependency(Kafka.cppkafkaLibrary); + interfaceLibrary.addDependency(Kafka.rdkafkaLibrary); } - } - - Namespace getNamespace() - { - return domainNamespace; - } - - org.xtuml.masl.translate.main.DomainTranslator getMainTranslator() - { - return mainTranslator; - } + @Override + public void translate() { + // create code files + final CodeFile consumerCodeFile = library.createBodyFile("Kafka_consumers" + Mangler.mangleFile(domain)); + final CodeFile pollerCodeFile = library.createBodyFile("Kafka_pollers" + Mangler.mangleFile(domain)); + final CodeFile publisherCodeFile = interfaceLibrary + .createBodyFile("Kafka_publishers" + Mangler.mangleFile(domain)); + + // create domain service translators + final List domainServiceTranslators = domain.getServices().stream() + .filter(service -> service.getDeclarationPragmas().hasPragma(KAFKA_TOPIC_PRAGMA) + && !service.isFunction() && !service.isExternal() && !service.isScenario()) + .map(service -> new DomainServiceTranslator(service, this, consumerCodeFile, publisherCodeFile)) + .collect(Collectors.toList()); + + // translate domain service handlers + domainServiceTranslators.forEach(DomainServiceTranslator::translate); + + // populate the code for the domain services + List> domainServiceFilePopulators = domainServiceTranslators.stream() + .map(ServiceTranslator::getFilePopulators).map(List::iterator).collect(Collectors.toList()); + while (domainServiceFilePopulators.stream().anyMatch(Iterator::hasNext)) { + domainServiceFilePopulators.stream().filter(Iterator::hasNext).map(Iterator::next).forEach(Runnable::run); + } + + // create domain terminator service translators + final List terminatorServiceTranslators = domain.getTerminators().stream() + .flatMap(terminator -> terminator.getServices().stream()) + .filter(service -> service.getDeclarationPragmas().hasPragma(KAFKA_TOPIC_PRAGMA) && service.isFunction() + && service.getReturnType().isAssignableFrom(BooleanType.createAnonymous())) + .map(service -> new DomainTerminatorServiceTranslator(service, this, pollerCodeFile)) + .collect(Collectors.toList()); + + // translate domain terminator service handlers + terminatorServiceTranslators.forEach(DomainTerminatorServiceTranslator::translate); + + // populate the code for the terminator services + List> terminatorServiceFilePopulators = terminatorServiceTranslators.stream() + .map(ServiceTranslator::getFilePopulators).map(List::iterator).collect(Collectors.toList()); + while (terminatorServiceFilePopulators.stream().anyMatch(Iterator::hasNext)) { + terminatorServiceFilePopulators.stream().filter(Iterator::hasNext).map(Iterator::next) + .forEach(Runnable::run); + } + } + Namespace getNamespace() { + return domainNamespace; + } } diff --git a/core-java/src/main/java/org/xtuml/masl/translate/kafka/Kafka.java b/core-java/src/main/java/org/xtuml/masl/translate/kafka/Kafka.java index 3a8cb36d..778e7aa3 100644 --- a/core-java/src/main/java/org/xtuml/masl/translate/kafka/Kafka.java +++ b/core-java/src/main/java/org/xtuml/masl/translate/kafka/Kafka.java @@ -8,23 +8,23 @@ import org.xtuml.masl.cppgen.Namespace; import org.xtuml.masl.translate.main.Architecture; +public class Kafka { -public class Kafka -{ + static Namespace kafkaNamespace = new Namespace("Kafka"); + static Library library = new InterfaceLibrary("Kafka").inBuildSet(Architecture.buildSet); + static Library cppkafkaLibrary = new ImportedLibrary("cppkafka"); + static Library rdkafkaLibrary = new ImportedLibrary("rdkafka"); - static Namespace kafkaNamespace = new Namespace("Kafka"); - static Library library = new InterfaceLibrary("Kafka").inBuildSet(Architecture.buildSet); - static Library cppkafkaLibrary = new ImportedLibrary("cppkafka"); - static Library rdkafkaLibrary = new ImportedLibrary("rdkafka"); - - static CodeFile bufferedIOInc = library.createInterfaceHeader("kafka/BufferedIO.hh"); - static CodeFile processHandlerInc = library.createInterfaceHeader("kafka/ProcessHandler.hh"); - static CodeFile producerInc = library.createInterfaceHeader("kafka/Producer.hh"); - static CodeFile serviceHandlerInc = library.createInterfaceHeader("kafka/ServiceHandler.hh"); - - static Class processHandlerClass = new Class("ProcessHandler", kafkaNamespace, processHandlerInc); - static Class producerClass = new Class("Producer", kafkaNamespace, producerInc); - static Class callable = new Class("Callable", kafkaNamespace, processHandlerInc); - static Class serviceHandlerClass = new Class("ServiceHandler", kafkaNamespace, serviceHandlerInc); + static CodeFile processHandlerInc = library.createInterfaceHeader("kafka/ProcessHandler.hh"); + static CodeFile producerInc = library.createInterfaceHeader("kafka/Producer.hh"); + static CodeFile serviceHandlerInc = library.createInterfaceHeader("kafka/ServiceHandler.hh"); + static CodeFile dataConsumerInc = library.createInterfaceHeader("kafka/DataConsumer.hh"); + static CodeFile consumerInc = library.createInterfaceHeader("kafka/Consumer.hh"); + static Class processHandlerClass = new Class("ProcessHandler", kafkaNamespace, processHandlerInc); + static Class producerClass = new Class("Producer", kafkaNamespace, producerInc); + static Class callable = new Class("Callable", kafkaNamespace, processHandlerInc); + static Class serviceHandlerClass = new Class("ServiceHandler", kafkaNamespace, serviceHandlerInc); + static Class dataConsumerClass = new Class("DataConsumer", kafkaNamespace, dataConsumerInc); + static Class consumerClass = new Class("Consumer", kafkaNamespace, consumerInc); } diff --git a/core-java/src/main/java/org/xtuml/masl/translate/kafka/ServiceTranslator.java b/core-java/src/main/java/org/xtuml/masl/translate/kafka/ServiceTranslator.java index 493d353f..ea3a8829 100644 --- a/core-java/src/main/java/org/xtuml/masl/translate/kafka/ServiceTranslator.java +++ b/core-java/src/main/java/org/xtuml/masl/translate/kafka/ServiceTranslator.java @@ -1,243 +1,82 @@ package org.xtuml.masl.translate.kafka; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.xtuml.masl.cppgen.ArrayAccess; -import org.xtuml.masl.cppgen.BinaryExpression; -import org.xtuml.masl.cppgen.BinaryOperator; -import org.xtuml.masl.cppgen.Class; -import org.xtuml.masl.cppgen.CodeFile; -import org.xtuml.masl.cppgen.DeclarationGroup; import org.xtuml.masl.cppgen.Expression; -import org.xtuml.masl.cppgen.ExpressionStatement; import org.xtuml.masl.cppgen.Function; -import org.xtuml.masl.cppgen.FunctionObjectCall; -import org.xtuml.masl.cppgen.FundamentalType; import org.xtuml.masl.cppgen.Literal; import org.xtuml.masl.cppgen.Namespace; -import org.xtuml.masl.cppgen.NewExpression; -import org.xtuml.masl.cppgen.ReturnStatement; -import org.xtuml.masl.cppgen.Std; -import org.xtuml.masl.cppgen.TypeUsage; -import org.xtuml.masl.cppgen.TypedefType; -import org.xtuml.masl.cppgen.Variable; -import org.xtuml.masl.cppgen.VariableDefinitionStatement; -import org.xtuml.masl.cppgen.Visibility; -import org.xtuml.masl.metamodel.common.ParameterDefinition; -import org.xtuml.masl.metamodel.domain.DomainService; +import org.xtuml.masl.metamodel.common.Service; +import org.xtuml.masl.metamodel.domain.DomainTerminatorService; import org.xtuml.masl.metamodel.type.BasicType; import org.xtuml.masl.metamodel.type.TypeDefinition.ActualType; -import org.xtuml.masl.metamodelImpl.common.PragmaDefinition; import org.xtuml.masl.metamodelImpl.type.StringType; import org.xtuml.masl.translate.main.Architecture; -import org.xtuml.masl.translate.main.DomainServiceTranslator; -import org.xtuml.masl.translate.main.Mangler; -import org.xtuml.masl.translate.main.NlohmannJson; -import org.xtuml.masl.translate.main.ParameterTranslator; -import org.xtuml.masl.translate.main.Types; +import org.xtuml.masl.translate.main.TerminatorServiceTranslator; +abstract class ServiceTranslator { -class ServiceTranslator -{ + private final Service service; + private final DomainTranslator domainTranslator; - private final DomainService service; - private final DomainTranslator domainTranslator; - private final Class handlerClass; - private final Namespace domainNamespace; - - ServiceTranslator ( final DomainService service, final DomainTranslator domainTranslator ) - { - this.service = service; - this.domainTranslator = domainTranslator; - domainNamespace = domainTranslator.getNamespace(); - handlerClass = new Class(Mangler.mangleName(service) + "Handler", domainNamespace); - } - - void addServiceHandler (final CodeFile codeFile) - { - // create and add the handler class to the file - handlerClass.addSuperclass(Kafka.serviceHandlerClass, Visibility.PUBLIC); - final DeclarationGroup group = handlerClass.createDeclarationGroup(); - final Function getInvoker = handlerClass.createMemberFunction(group, "getInvoker", Visibility.PUBLIC); - getInvoker.setReturnType(new TypeUsage(Kafka.callable)); - getInvoker.setConst(true); - codeFile.addClassDeclaration(handlerClass); - - // create the invoker class - final Class invokerClass = new Class(Mangler.mangleName(service) + "Invoker", domainNamespace); - final DeclarationGroup functions = invokerClass.createDeclarationGroup(); - final Function constructor = invokerClass.createConstructor(functions, Visibility.PUBLIC); - constructor.declareInClass(true); - final Expression paramData = constructor.createParameter(new TypeUsage(Std.vector(new TypeUsage(Std.uint8))), "param_data").asExpression(); - - // create invoker function - final Function invoker = invokerClass.createMemberFunction(functions, "operator()", Visibility.PUBLIC); - invoker.declareInClass(true); - - // if the service has a single string parameter, just pass through - final boolean noParseJson = service.getParameters().size() == 1 && service.getParameters().get(0).getType().isAssignableFrom(StringType.createAnonymous()); - - // handle parameters - final List invokeArgs = new ArrayList(); - final DeclarationGroup vars = invokerClass.createDeclarationGroup(); - final Variable paramJson = new Variable(new TypeUsage(NlohmannJson.json), "params", NlohmannJson.parse(paramData)); - if (service.getParameters().stream().map(p -> p.getType().getBasicType()).anyMatch(paramType -> - !(paramType.getBasicType().getActualType() == ActualType.EVENT || - paramType.getBasicType().getActualType() == ActualType.DEVICE || paramType.getBasicType().getActualType() == ActualType.ANY_INSTANCE)) && !noParseJson) { - constructor.getCode().appendStatement(new VariableDefinitionStatement(paramJson)); + ServiceTranslator(final Service service, final DomainTranslator domainTranslator) { + this.service = service; + this.domainTranslator = domainTranslator; } - for ( final ParameterDefinition param : service.getParameters() ) - { - final TypeUsage type = Types.getInstance().getType(param.getType()); - final BasicType paramType = param.getType().getBasicType(); - if (!(paramType.getBasicType().getActualType() == ActualType.EVENT || - paramType.getBasicType().getActualType() == ActualType.DEVICE || paramType.getBasicType().getActualType() == ActualType.ANY_INSTANCE)) - { - final Variable arg = invokerClass.createMemberVariable(vars, Mangler.mangleName(param), type, Visibility.PRIVATE); - Expression paramAccess = Std.string.callConstructor(new Function("begin").asFunctionCall(paramData, false), new Function("end").asFunctionCall(paramData, false)); - if (!noParseJson) { - paramAccess = NlohmannJson.get(service.getParameters().size() > 1 ? new ArrayAccess(paramJson.asExpression(), Literal.createStringLiteral(param.getName())) : paramJson.asExpression(), type); - } - constructor.getCode().appendStatement(new BinaryExpression(arg.asExpression(), BinaryOperator.ASSIGN, paramAccess).asStatement()); - invokeArgs.add(arg.asExpression()); - } - else - { - final Variable arg = new Variable(type, Mangler.mangleName(param)); - invoker.getCode().appendStatement(arg.asStatement()); - invokeArgs.add(arg.asExpression()); - } - } - - // create the call to the service interceptor - final DomainServiceTranslator serviceTranslator = DomainServiceTranslator.getInstance(service); - final TypedefType serviceInterceptor = serviceTranslator.getServiceInterceptor(); - final Function serviceFunction = new Function("callService"); - final Expression instanceFnCall = serviceInterceptor.asClass().callStaticFunction("instance"); - Expression invokeExpression = new FunctionObjectCall(serviceFunction.asFunctionCall(instanceFnCall, false), invokeArgs); - invoker.getCode().appendStatement(invokeExpression.asStatement()); - - // add the invoker class to the file - codeFile.addClassDeclaration(invokerClass); - - // add implementation of 'getInvoker' to the file - final Expression paramData2 = getInvoker.createParameter(new TypeUsage(Std.vector(new TypeUsage(Std.uint8))), "param_data").asExpression(); - getInvoker.getCode().appendStatement(new ReturnStatement(invokerClass.callConstructor(paramData2))); - codeFile.addFunctionDefinition(getInvoker); - - } - void addTopicRegistration(final CodeFile codeFile) - { - final Expression processHandler = Kafka.processHandlerClass.callStaticFunction("getInstance"); - final Expression domainId = new Function("getId").asFunctionCall(new Function("getDomain").asFunctionCall(Architecture.process, false, Literal.createStringLiteral(domainTranslator.getDomain().getName())), false); - final Expression serviceId = domainTranslator.getMainTranslator().getServiceTranslator(service).getServiceId(); - final Expression handler = Std.shared_ptr(new TypeUsage(handlerClass)).callConstructor(new NewExpression(new TypeUsage(handlerClass))); - final Function registerServiceFunc = new Function("registerServiceHandler"); - final Expression registerService = registerServiceFunc.asFunctionCall(processHandler, false, domainId, serviceId, handler); - final Variable registered = new Variable(new TypeUsage(FundamentalType.BOOL), Mangler.mangleName(service) + "_registered", new Namespace(""), registerService); - codeFile.addVariableDefinition(registered); - } + abstract void translate(); - void addPublisher(final CodeFile codeFile) - { - // create service function - final Function function = new Function(Mangler.mangleName(service), domainNamespace); - function.setReturnType(domainTranslator.getTypes().getType(service.getReturnType())); + abstract List getFilePopulators(); - // if the service has a single string parameter, just pass through - final boolean noParseJson = service.getParameters().size() == 1 && StringType.createAnonymous().isAssignableFrom(service.getParameters().get(0).getType()); - - // create output buffer - final Variable paramData = new Variable(new TypeUsage(NlohmannJson.json), "param_data"); - if (!noParseJson) { - function.getCode().appendStatement(new VariableDefinitionStatement(paramData)); + Service getService() { + return service; } - // create partition key buffer - final Variable partKey = new Variable(new TypeUsage(NlohmannJson.json), "part_key"); - final boolean includePartKey = service.getParameters().stream().anyMatch(param -> service.getDeclarationPragmas().hasPragma(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA) && - service.getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA).contains(param.getName())); - if (includePartKey) { - function.getCode().appendStatement(new VariableDefinitionStatement(partKey)); + DomainTranslator getDomainTranslator() { + return domainTranslator; } - // handle parameters - final Map paramTranslators = new HashMap<>(); - for ( final ParameterDefinition param : service.getParameters() ) - { - final ParameterTranslator paramTrans = new ParameterTranslator(param, function); - paramTranslators.put(param, paramTrans); - final Expression jsonAccess = service.getParameters().size() > 1 ? new ArrayAccess(paramData.asExpression(), Literal.createStringLiteral(param.getName())) : paramData.asExpression(); - final Expression writeExpr = new BinaryExpression(jsonAccess, BinaryOperator.ASSIGN, paramTrans.getVariable().asExpression()); - if (!noParseJson) { - function.getCode().appendStatement(new ExpressionStatement(writeExpr)); - } - if ( service.getDeclarationPragmas().hasPragma(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA) && - service.getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA).contains(param.getName()) ) - { - final Expression keyJsonAccess = service.getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_PARTITION_KEY_PRAGMA).size() > 1 ? new ArrayAccess(partKey.asExpression(), Literal.createStringLiteral(param.getName())) : partKey.asExpression(); - final Expression keyWriteExpr = new BinaryExpression(keyJsonAccess, BinaryOperator.ASSIGN, paramTrans.getVariable().asExpression()); - function.getCode().appendStatement(new ExpressionStatement(keyWriteExpr)); - } + Namespace getDomainNamespace() { + return getDomainTranslator().getNamespace(); } - // call publisher - final Expression producer = Kafka.producerClass.callStaticFunction("getInstance"); - final Function publishFunc = new Function("publish"); - final Expression domainId = new Function("getId").asFunctionCall(new Function("getDomain").asFunctionCall(Architecture.process, false, Literal.createStringLiteral(domainTranslator.getDomain().getName())), false); - final Expression serviceId = domainTranslator.getMainTranslator().getServiceTranslator(service).getServiceId(); - final Expression publishExpr = publishFunc.asFunctionCall(producer, false, domainId, serviceId, noParseJson ? - Std.string.callConstructor(paramTranslators.get(service.getParameters().get(0)).getVariable().asExpression()) : - NlohmannJson.dump(paramData.asExpression()), - includePartKey ? NlohmannJson.dump(partKey.asExpression()) : Literal.createStringLiteral("")); - function.getCode().appendStatement(new ExpressionStatement(publishExpr)); - - // add function to file - codeFile.addFunctionDefinition(function); - - // add service registration - final DomainServiceTranslator serviceTranslator = DomainServiceTranslator.getInstance(service); - final TypedefType serviceInterceptor = serviceTranslator.getServiceInterceptor(); - final Expression interceptorFnCall = serviceInterceptor.asClass().callStaticFunction("instance"); - final Function registerFunction = new Function("registerLocal"); - final Expression initialValue = registerFunction.asFunctionCall(interceptorFnCall, false, function.asFunctionPointer()); - final Variable registrationVar = new Variable(new TypeUsage(FundamentalType.BOOL, TypeUsage.Const), "localServiceRegistration_" + Mangler.mangleName(service), domainNamespace, initialValue); - registrationVar.setStatic(true); - codeFile.addVariableDefinition(registrationVar); - - } - - void addCustomTopicName(final CodeFile codeFile) { - if (service.getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_TOPIC_PRAGMA).size() == 1) { - final String topicNameString = service.getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_TOPIC_PRAGMA).get(0); - if (!isBoolean(topicNameString) && !isNumeric(topicNameString)) { + Expression getTopicName(final DomainTerminatorService service) { + if (service.getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_TOPIC_PRAGMA).size() == 1) { + final String topicNameString = service.getDeclarationPragmas() + .getPragmaValues(DomainTranslator.KAFKA_TOPIC_PRAGMA).get(0); + if (!isBoolean(topicNameString) && !isNumeric(topicNameString)) { + return Literal.createStringLiteral(topicNameString); + } + } final Expression processHandler = Kafka.processHandlerClass.callStaticFunction("getInstance"); - final Expression domainId = new Function("getId").asFunctionCall(new Function("getDomain").asFunctionCall(Architecture.process, false, Literal.createStringLiteral(domainTranslator.getDomain().getName())), false); - final Expression serviceId = domainTranslator.getMainTranslator().getServiceTranslator(service).getServiceId(); - final Expression topicName = Literal.createStringLiteral(topicNameString); - final Function setTopicNameFunc = new Function("setCustomTopicName"); - final Expression setTopicName = setTopicNameFunc.asFunctionCall(processHandler, false, domainId, serviceId, topicName); - final Variable topicNameSet = new Variable(new TypeUsage(FundamentalType.BOOL), Mangler.mangleName(service) + "_topic_name_set", new Namespace(""), setTopicName); - codeFile.addVariableDefinition(topicNameSet); - } + final Expression domainId = new Function("getId") + .asFunctionCall(new Function("getDomain").asFunctionCall(Architecture.process, false, + Literal.createStringLiteral(getDomainTranslator().getDomain().getName())), false); + final Expression serviceId = TerminatorServiceTranslator.getInstance(service).getServiceId(); + return new Function("getTopicName").asFunctionCall(processHandler, false, domainId, serviceId); } - } - private boolean isBoolean(final String value) { - return "true".equals(value) || "false".equals(value); - } + static boolean isBoolean(final String value) { + return "true".equals(value) || "false".equals(value); + } - private boolean isNumeric(final String value) { - try { - double d = Double.parseDouble(value); - return true; - } catch (NumberFormatException e) { - return false; + static boolean isNumeric(final String value) { + try { + Double.parseDouble(value); + return true; + } catch (NumberFormatException e) { + return false; + } } - } + static boolean hasSingleStringParameter(Service service) { + return service.getParameters().size() == 1 + && service.getParameters().get(0).getType().isAssignableFrom(StringType.createAnonymous()); + } + + static boolean isTypeSerializable(BasicType type) { + return !(type.getBasicType().getActualType() == ActualType.EVENT + || type.getBasicType().getActualType() == ActualType.DEVICE + || type.getBasicType().getActualType() == ActualType.ANY_INSTANCE); + } }