Skip to content

Commit

Permalink
Move MQTT and nats connectors to their own module
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Oct 22, 2023
1 parent 0e80873 commit 7717209
Show file tree
Hide file tree
Showing 40 changed files with 390 additions and 287 deletions.
2 changes: 2 additions & 0 deletions streampipes-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
<module>streampipes-sources-watertank-simulator</module>
<module>streampipes-sources-vehicle-simulator</module>
<module>streampipes-connectors-tubemq</module>
<module>streampipes-connectors-nats</module>
<module>streampipes-connectors-mqtt</module>
</modules>

<properties>
Expand Down
14 changes: 10 additions & 4 deletions streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@
<artifactId>streampipes-connectors-kafka</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connectors-mqtt</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connectors-nats</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connectors-pulsar</artifactId>
Expand Down Expand Up @@ -156,10 +166,6 @@
<artifactId>plc4j-driver-modbus</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>tubemq-client</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.streampipes.connect.iiot.protocol.stream.FileReplayAdapter;
import org.apache.streampipes.connect.iiot.protocol.stream.HttpServerProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.HttpStreamProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.MqttProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.NatsProtocol;
import org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
import org.apache.streampipes.extensions.connectors.mqtt.adapter.MqttProtocol;
import org.apache.streampipes.extensions.connectors.nats.adapter.NatsProtocol;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
import org.apache.streampipes.extensions.connectors.pulsar.adapter.PulsarProtocol;
import org.apache.streampipes.extensions.connectors.rocketmq.adapter.RocketMQProtocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer;
import org.apache.streampipes.extensions.management.connect.adapter.parser.JsonParsers;
import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonObjectParser;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConfig;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConnectUtils;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConsumer;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
Expand Down
5 changes: 5 additions & 0 deletions streampipes-extensions/streampipes-connect-adapters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@

<dependencies>
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connectors-mqtt</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-pipeline-elements-shared</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext;
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConfig;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConnectUtils;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConsumer;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext;
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConfig;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConnectUtils;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConsumer;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext;
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConfig;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConnectUtils;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConsumer;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
import org.apache.streampipes.sdk.helpers.Labels;
Expand Down
55 changes: 55 additions & 0 deletions streampipes-extensions/streampipes-connectors-mqtt/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-extensions</artifactId>
<version>0.93.0-SNAPSHOT</version>
</parent>

<artifactId>streampipes-connectors-mqtt</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-extensions-api</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-extensions-management</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-sdk</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*
*/
package org.apache.streampipes.connect.iiot.protocol.stream;
package org.apache.streampipes.extensions.connectors.mqtt.adapter;

import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
Expand All @@ -25,14 +25,14 @@
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer;
import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
import org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConfig;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConnectUtils;
import org.apache.streampipes.pe.shared.config.mqtt.MqttConsumer;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*
*/
package org.apache.streampipes.pe.shared.config.mqtt;
package org.apache.streampipes.extensions.connectors.mqtt.shared;

public class MqttConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.pe.shared.config.mqtt;
package org.apache.streampipes.extensions.connectors.mqtt.shared;

import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*
*/
package org.apache.streampipes.pe.shared.config.mqtt;
package org.apache.streampipes.extensions.connectors.mqtt.shared;

import org.apache.streampipes.messaging.InternalEventProcessor;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.streampipes.extensions.connectors.mqtt.sink;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.extensions.connectors.mqtt.sink.common.MqttClient;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;

import java.util.Arrays;

public class MqttPublisherSink implements IStreamPipesDataSink {

private static final int DEFAULT_MQTT_PORT = 1883;
private static final int DEFAULT_RECONNECT_PERIOD = 30;
private static final int DEFAULT_KEEP_ALIVE = 30;

public static final String TOPIC = "topic";
public static final String HOST = "host";
public static final String PORT = "port";
public static final String AUTH_MODE = "auth-mode";
public static final String NO_AUTH_ALTERNATIVE = "no-auth-alternative";
public static final String AUTH_ALTERNATIVE = "basic-auth-alternative";
public static final String USERNAME_GROUP = "username-group";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String QOS_LEVEL_KEY = "qos-level";
public static final String CLEAN_SESSION_KEY = "clean-session";
public static final String WILL_RETAIN = "will-retain";
public static final String ENCRYPTION_MODE = "encryption-mode";
public static final String RECONNECT_PERIOD_IN_SEC = "reconnect-period";
public static final String WILL_MODE = "lwt-mode";
public static final String NO_WILL_ALTERNATIVE = "no-lwt-alternative";
public static final String WILL_ALTERNATIVE = "lwt-alternative";
public static final String WILL_GROUP = "lwt-group";
public static final String WILL_TOPIC = "lwt-topic";
public static final String WILL_MESSAGE = "lwt-message";
public static final String WILL_QOS = "lwt-qos-level";
public static final String RETAIN = "retain";
public static final String KEEP_ALIVE_IN_SEC = "keep-alive";
public static final String MQTT_COMPLIANT = "mqtt-version-compliant";

private MqttClient mqttClient;

@Override
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
MqttPublisherSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.mqtt")
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredStream(StreamRequirementsBuilder.any())
.requiredTextParameter(Labels.withId(TOPIC))
.requiredTextParameter(Labels.withId(HOST))
.requiredIntegerParameter(Labels.withId(PORT), DEFAULT_MQTT_PORT)
.requiredAlternatives(
Labels.withId(AUTH_MODE),
Alternatives.from(Labels.withId(NO_AUTH_ALTERNATIVE), true),
Alternatives.from(Labels.withId(AUTH_ALTERNATIVE),
StaticProperties.group(Labels.withId(USERNAME_GROUP),
StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME)),
StaticProperties.secretValue(Labels.withId(PASSWORD)))))
.requiredSingleValueSelection(
Labels.withId(ENCRYPTION_MODE),
Arrays.asList(
new Option("TCP", true),
// SSL not yet supported
new Option("SSL/TLS", false)))
.requiredSingleValueSelection(
Labels.withId(QOS_LEVEL_KEY),
Arrays.asList(
new Option("0 - at-most-once", false),
new Option("1 - at-least-once", true),
new Option("2 - exactly-once", false)))
.requiredSingleValueSelection(
Labels.withId(RETAIN),
Arrays.asList(
new Option("Yes", false),
new Option("No", true)))
.requiredSingleValueSelection(
Labels.withId(CLEAN_SESSION_KEY),
Arrays.asList(
new Option("Yes", true),
new Option("No", false)))
.requiredIntegerParameter(Labels.withId(RECONNECT_PERIOD_IN_SEC), DEFAULT_RECONNECT_PERIOD)
.requiredIntegerParameter(Labels.withId(KEEP_ALIVE_IN_SEC), DEFAULT_KEEP_ALIVE)
.requiredSingleValueSelection(
Labels.withId(MQTT_COMPLIANT),
Arrays.asList(
new Option("Yes", true),
new Option("No", false)))
.requiredAlternatives(
Labels.withId(WILL_MODE),
Alternatives.from(Labels.withId(NO_WILL_ALTERNATIVE), true),
Alternatives.from(Labels.withId(WILL_ALTERNATIVE),
StaticProperties.group(Labels.withId(WILL_GROUP),
StaticProperties.stringFreeTextProperty(Labels.withId(WILL_TOPIC)),
StaticProperties.stringFreeTextProperty(Labels.withId(WILL_MESSAGE)),
StaticProperties.singleValueSelection(Labels.withId(WILL_RETAIN),
Arrays.asList(
new Option("Yes", false),
new Option("No", true))),
StaticProperties.singleValueSelection(
Labels.withId(WILL_QOS),
Arrays.asList(
new Option("0 - at-most-once", true),
new Option("1 - at-least-once", false),
new Option("2 - exactly-once", false))))))
.build()
);
}

@Override
public void onPipelineStarted(IDataSinkParameters params, EventSinkRuntimeContext runtimeContext) {
this.mqttClient = new MqttClient(params);
this.mqttClient.connect();
}

@Override
public void onEvent(Event event) throws SpRuntimeException {
this.mqttClient.publish(event);
}

@Override
public void onPipelineStopped() {
this.mqttClient.disconnect();
}
}
Loading

0 comments on commit 7717209

Please sign in to comment.