Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement polled kafka topics #56

Merged
merged 4 commits into from
Jun 13, 2024
Merged

Implement polled kafka topics #56

merged 4 commits into from
Jun 13, 2024

Conversation

leviathan747
Copy link
Contributor

Allow terminator services to be marked as a kafka topic.

Eligible terminator services must:

  • return boolean
  • carry only "out" parameters of serializable types

When a terminator service marked as a kafka topic is called, the architecture will poll the kafka broker for one message. If a message is available, it will attempt to parse out the individual parameters from the received JSON object and populate the "out" parameters. It will then return true. If no messages are available, it will leave the parameters unchanged and return false. If the message payload is not valid JSON or if the fields do not match the names and types of the parameters, an exception will be raised and the process will exit.

If the "-util Kafka" flag is not passed to the process on startup, the code in the local terminator service body will be executed.

@leviathan747 leviathan747 requested a review from tristanpye June 5, 2024 14:43
@leviathan747
Copy link
Contributor Author

I did a decent amount of code restructuring as part of this pull request. The key areas to look at are consumeOne in Consumer.cc:115 and DomainTerminatorServiceTranslator.java. Also, here is an example of the generated code for a terminator service marked as a kafka topic:

#include "JobManagement_OOA/__JobManagement_terminators.hh"
#include "kafka/Consumer.hh"
#include "kafka/DataConsumer.hh"
#include <stdint.h>
#include <string>
#include "swa/FunctionOverrider.hh"
#include "swa/String.hh"
#include <vector>

namespace Kafka
{
  namespace masld_JobManagement
  {
    class masls_obtainEventConsumer
      : public DataConsumer
    {

      public:
        masls_obtainEventConsumer ( ::SWA::String& maslp_auditEvent )
          : maslp_auditEvent(maslp_auditEvent)

        {
        }
        void accept ( ::std::vector<uint8_t> data ) const;


      private:
        ::SWA::String& maslp_auditEvent;


    };
    Consumer consumer_masls_obtainEvent = Consumer( ::std::string( "Protocol_Verifier_Reception" ) );

    void masls_obtainEventConsumer::accept ( ::std::vector<uint8_t> data ) const
    {
      maslp_auditEvent = ::std::string( data.begin(), data.end() );
    }

    bool masls_obtainEvent ( ::SWA::String& maslp_auditEvent )
    {
      masls_obtainEventConsumer dataConsumer = masls_obtainEventConsumer( maslp_auditEvent );
      return consumer_masls_obtainEvent.consumeOne( dataConsumer );
    }

    bool register_masls_obtainEvent = ::masld_JobManagement::maslb_Reception::register_masls_obtainEvent( &masls_obtainEvent );

  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants