diff --git a/pom.xml b/pom.xml index 828a419fc5..9843b35c0b 100644 --- a/pom.xml +++ b/pom.xml @@ -298,6 +298,11 @@ io.siddhi.distribution.store.api.rest ${io.siddhi.distribution.version} + + io.siddhi.distribution + io.siddhi.parser + ${io.siddhi.distribution.version} + io.siddhi.distribution io.siddhi.distribution.msf4j.interceptor.common.feature @@ -317,6 +322,12 @@ ${io.siddhi.distribution.version} zip + + io.siddhi.distribution + io.siddhi.parser.service.feature + ${io.siddhi.distribution.version} + zip + io.siddhi.distribution io.siddhi.distribution.store.api.rest.feature @@ -1103,8 +1114,8 @@ 5.1.0-SNAPSHOT - 5.1.0 - 5.1.0 + 5.1.2 + 5.1.2 [5.0.0, 6.0.0) 6.1.30 [6.1.0, 6.2.0) @@ -1123,8 +1134,8 @@ 1.5.10.wso2v1 - 5.2.8 - 5.2.8 + 5.2.10 + 5.2.10 6.0.295 3.0.3 5.2.2 diff --git a/resources/carbon-home/bin/runner.bat b/resources/carbon-home/bin/runner.bat index 3492444fde..603e81e766 100644 --- a/resources/carbon-home/bin/runner.bat +++ b/resources/carbon-home/bin/runner.bat @@ -53,6 +53,9 @@ rem find CARBON_HOME if it does not exist due to either an invalid value passed rem by the user or the %0 problem on Windows 9x if not exist "%CARBON_HOME%\bin\version.txt" goto noServerHome +REM Installing jars +java -cp ".\*;..\bin\tools\*" -Dwso2.carbon.tool="install-jars" org.wso2.carbon.tools.CarbonToolExecutor "%CURRENT_DIR%" + goto startServer :noServerHome diff --git a/resources/carbon-home/bin/runner.sh b/resources/carbon-home/bin/runner.sh index 4aa9201580..2bc2433b9a 100755 --- a/resources/carbon-home/bin/runner.sh +++ b/resources/carbon-home/bin/runner.sh @@ -55,6 +55,9 @@ PRGDIR=`dirname "$PRG"` [ -z "$RUNTIME_HOME" ] && RUNTIME_HOME=`cd "$PRGDIR/../wso2/runner" ; pwd` +# Installing jars +java -cp "../bin/tools/*" -Dwso2.carbon.tool="install-jars" org.wso2.carbon.tools.CarbonToolExecutor "$CARBON_HOME" + ########################################################################### NAME=start-runner # Daemon name, where is the actual executable diff --git a/resources/carbon-home/bin/tooling.bat b/resources/carbon-home/bin/tooling.bat index 224add96b7..d5d4dccb1d 100644 --- a/resources/carbon-home/bin/tooling.bat +++ b/resources/carbon-home/bin/tooling.bat @@ -53,6 +53,9 @@ rem find CARBON_HOME if it does not exist due to either an invalid value passed rem by the user or the %0 problem on Windows 9x if not exist "%CARBON_HOME%\bin\version.txt" goto noServerHome +REM Installing jars +java -cp ".\*;..\bin\tools\*" -Dwso2.carbon.tool="install-jars" org.wso2.carbon.tools.CarbonToolExecutor "%CURRENT_DIR%" + goto startServer :noServerHome diff --git a/resources/carbon-home/bin/tooling.sh b/resources/carbon-home/bin/tooling.sh index f1e0a64652..ae40a2765b 100755 --- a/resources/carbon-home/bin/tooling.sh +++ b/resources/carbon-home/bin/tooling.sh @@ -55,6 +55,9 @@ PRGDIR=`dirname "$PRG"` [ -z "$RUNTIME_HOME" ] && RUNTIME_HOME=`cd "$PRGDIR/../wso2/tooling" ; pwd` +# Installing jars +java -cp "../bin/tools/*" -Dwso2.carbon.tool="install-jars" org.wso2.carbon.tools.CarbonToolExecutor "$CARBON_HOME" + ########################################################################### NAME=start-tooling # Daemon name, where is the actual executable diff --git a/runner/components/io.siddhi.distribution.common/src/main/java/io/siddhi/distribution/common/common/utils/config/FileConfigManager.java b/runner/components/io.siddhi.distribution.common/src/main/java/io/siddhi/distribution/common/common/utils/config/FileConfigManager.java index 835e656f7d..1bd6434618 100644 --- a/runner/components/io.siddhi.distribution.common/src/main/java/io/siddhi/distribution/common/common/utils/config/FileConfigManager.java +++ b/runner/components/io.siddhi.distribution.common/src/main/java/io/siddhi/distribution/common/common/utils/config/FileConfigManager.java @@ -42,26 +42,22 @@ public class FileConfigManager implements ConfigManager { private static final Logger LOGGER = LoggerFactory.getLogger(FileConfigManager.class); private ConfigProvider configProvider; - private List extensions; - private List references; - private Map properties; - - public FileConfigManager(ConfigProvider configProvider) { - this.configProvider = configProvider; - } + private List extensions = new ArrayList<>(); + private List references = new ArrayList<>(); + private Map properties = new HashMap<>(); public void init() { if (configProvider != null) { initialiseExtensions(); initialiseReferences(); initaliseProperties(); - } else { - extensions = new ArrayList<>(); - references = new ArrayList<>(); - properties = new HashMap<>(); } } + public FileConfigManager(ConfigProvider configProvider) { + this.configProvider = configProvider; + } + private void initaliseProperties() { // load siddhi properties try { @@ -90,7 +86,6 @@ private void initaliseProperties() { } } catch (ConfigurationException e) { LOGGER.error("Could not initiate the siddhi configuration object, " + e.getMessage(), e); - this.properties = new HashMap<>(); } } @@ -111,7 +106,6 @@ private void initialiseReferences() { } } catch (Exception e) { LOGGER.error("Could not initiate the refs configuration object, " + e.getMessage(), e); - this.references = new ArrayList<>(); } } @@ -133,7 +127,6 @@ private void initialiseExtensions() { } } catch (Exception e) { LOGGER.error("Could not initiate the extensions configuration object, " + e.getMessage(), e); - this.extensions = new ArrayList<>(); } } diff --git a/runner/components/io.siddhi.parser/findbugs-exclude.xml b/runner/components/io.siddhi.parser/findbugs-exclude.xml new file mode 100644 index 0000000000..6e81a77f6e --- /dev/null +++ b/runner/components/io.siddhi.parser/findbugs-exclude.xml @@ -0,0 +1,20 @@ + + + + + + diff --git a/runner/components/io.siddhi.parser/pom.xml b/runner/components/io.siddhi.parser/pom.xml new file mode 100644 index 0000000000..0a4b53e095 --- /dev/null +++ b/runner/components/io.siddhi.parser/pom.xml @@ -0,0 +1,203 @@ + + + + + + io.siddhi.distribution + io.siddhi.distribution.parent + 5.1.0-SNAPSHOT + ../../../pom.xml + + + 4.0.0 + io.siddhi.parser + bundle + + Siddhi Parser + http://wso2.org + + + javax.ws.rs + javax.ws.rs-api + + + javax.servlet + javax.servlet-api + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + com.fasterxml.jackson.datatype + jackson-datatype-joda + + + + + io.siddhi + siddhi-query-api + ${siddhi.version} + + + io.siddhi + siddhi-core + ${siddhi.version} + + + io.siddhi + siddhi-query-compiler + ${siddhi.version} + + + io.siddhi + siddhi-annotations + ${siddhi.version} + + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + + org.wso2.msf4j + msf4j-core + + + io.siddhi.distribution + io.siddhi.distribution.common + + + io.siddhi.distribution + io.siddhi.distribution.msf4j.interceptor.common + + + org.testng + testng + ${testng.version} + test + + + org.testcontainers + testcontainers + ${org.testcontainers.version} + test + + + io.nats + java-nats-streaming + ${io.nats.version} + test + + + io.siddhi.extension.io.http + siddhi-io-http + test + + + io.siddhi.extension.io.nats + siddhi-io-nats + test + + + io.siddhi.extension.map.json + siddhi-map-json + test + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.9.1 + + + add-source + generate-sources + + add-source + + + + src/gen/java + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${project.build.outputDirectory}/io/siddhi/parser/core + + + + org.jacoco + jacoco-maven-plugin + + + default-instrument + + instrument + + + + default-restore-instrumented-classes + + restore-instrumented-classes + + + + + + * + + + + + + + findbugs-exclude.xml + + io.siddhi.parser.* + + + org.osgi.framework.*;version="${osgi.framework.import.version.range}", + org.wso2.msf4j.*;version="${msf4j.import.version.range}", + org.slf4j.*;version="${slf4j.logging.package.import.version.range}", + javax.ws.rs.*;version="${javax.ws.rs.version.range}", + *;resolution:=optional + + + diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/SiddhiParserDataHolder.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/SiddhiParserDataHolder.java new file mode 100644 index 0000000000..702e71fa22 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/SiddhiParserDataHolder.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.parser; + +import io.siddhi.core.SiddhiManager; +import org.wso2.carbon.config.provider.ConfigProvider; + +public class SiddhiParserDataHolder { + private static ConfigProvider configFileReader; + private static SiddhiManager siddhiManager; + + public static SiddhiManager getSiddhiManager() { + return siddhiManager; + } + + public static void setSiddhiManager(SiddhiManager siddhiManager) { + SiddhiParserDataHolder.siddhiManager = siddhiManager; + } + + public static ConfigProvider getConfigProvider() { + return configFileReader; + } + + public static void setConfigProvider(ConfigProvider configProvider) { + SiddhiParserDataHolder.configFileReader = configProvider; + } + +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/SiddhiAppCreator.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/SiddhiAppCreator.java new file mode 100644 index 0000000000..743bf3e1d6 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/SiddhiAppCreator.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core; + + +import io.siddhi.parser.core.appcreator.DeployableSiddhiQueryGroup; +import io.siddhi.parser.core.topology.SiddhiTopology; +import io.siddhi.parser.service.model.MessagingSystem; + +import java.util.List; +/** + * This interface is utilized by which will be implemented by different + * distributed deployment implementations. Implementor can either choose to + * implement from scratch using this interface or use. + */ +public interface SiddhiAppCreator { + /** + * Create valid concrete Siddhi Apps for each Query Group in the given {@link SiddhiTopology}. + * + * @param topology Input topology to create Siddhi Apps + * @return List of {@link DeployableSiddhiQueryGroup}s. Length of the list should be equal to no. of groups user + * has defined. Length of the list should be greater than zero always. + */ + List createApps(SiddhiTopology topology, MessagingSystem messagingSystem); +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/SiddhiTopologyCreator.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/SiddhiTopologyCreator.java new file mode 100644 index 0000000000..ef9d78e9dc --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/SiddhiTopologyCreator.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core; + +import io.siddhi.parser.core.topology.SiddhiTopology; + +/** + * Topology Creator will consume a Siddhi App and produce a {@link SiddhiTopology} based on distributed annotations. + * Implementation of this should not change depending on different distribution methodologies(Ex: default, yarn + * based, container based). + */ +public interface SiddhiTopologyCreator { + /** + * consume a Siddhi App and produce a {@link SiddhiTopology} based on distributed annotations. + * + * @param userDefinedSiddhiApp Siddhi app deployed by the user + * @return {@link SiddhiTopology} representing the given siddhi app + */ + SiddhiTopology createTopology(String userDefinedSiddhiApp); +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/AbstractSiddhiAppCreator.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/AbstractSiddhiAppCreator.java new file mode 100644 index 0000000000..165f7c5ab5 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/AbstractSiddhiAppCreator.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.appcreator; + +import io.siddhi.parser.core.SiddhiAppCreator; +import io.siddhi.parser.core.topology.SiddhiQueryGroup; +import io.siddhi.parser.core.topology.SiddhiTopology; +import io.siddhi.parser.service.model.MessagingSystem; +import org.apache.commons.lang3.text.StrSubstitutor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Abstract implementation of {@link SiddhiAppCreator}. Developers can use this extension point + * to implement custom Siddhi App Creator based on the distribute implementation. + */ +public abstract class AbstractSiddhiAppCreator implements SiddhiAppCreator { + + public List createApps(SiddhiTopology topology, MessagingSystem messagingSystem) { + List deployableSiddhiQueryGroupList = + new ArrayList(topology.getQueryGroupList().size()); + for (SiddhiQueryGroup queryGroup : topology.getQueryGroupList()) { + DeployableSiddhiQueryGroup deployableQueryGroup = + new DeployableSiddhiQueryGroup(queryGroup.getName(), + queryGroup.isReceiverQueryGroup(), /*queryGroup.isUserGivenSource(),*/ + queryGroup.getParallelism()); + deployableQueryGroup.setSiddhiQueries(createApps(topology.getName(), queryGroup, messagingSystem)); + deployableSiddhiQueryGroupList.add(deployableQueryGroup); + } + return deployableSiddhiQueryGroupList; + } + + /** + * This method should return valid concrete Siddhi App/s as Strings. No. of returned Siddhi + * Apps should equal the parallelism count for parse group. + * + * @param queryGroup Input parse group to produce Siddhi Apps. + * @return List of valid concrete Siddhi Apps as String. + */ + protected abstract List createApps(String siddhiAppName, + SiddhiQueryGroup queryGroup, MessagingSystem messagingSystem); + + protected List getPartitionNumbers(int appParallelism, int availablePartitionCount, + int currentAppNum) { + List partitionNumbers = new ArrayList(); + if (availablePartitionCount == appParallelism) { + partitionNumbers.add(currentAppNum); + return partitionNumbers; + } else { + //availablePartitionCount < appParallelism scenario cannot occur according to design. + // Hence if availablePartitionCount > appParallelism + //// TODO: 10/19/17 improve logic + int partitionsPerNode = availablePartitionCount / appParallelism; + if (currentAppNum + 1 == appParallelism) { //if last app + int remainingPartitions = availablePartitionCount - ((appParallelism - 1) * + partitionsPerNode); + for (int j = 0; j < remainingPartitions; j++) { + partitionNumbers.add((currentAppNum * partitionsPerNode) + j); + } + return partitionNumbers; + } else { + for (int j = 0; j < partitionsPerNode; j++) { + partitionNumbers.add((currentAppNum * partitionsPerNode) + j); + } + return partitionNumbers; + } + } + } + + protected List generateQueryList(String queryTemplate, String queryGroupName, + int parallelism) { + List queries = new ArrayList(parallelism); + for (int i = 0; i < parallelism; i++) { + Map valuesMap = new HashMap(1); + String appName = queryGroupName + "-" + (i + 1); + valuesMap.put("appName", appName); + StrSubstitutor substitutor = new StrSubstitutor(valuesMap); + queries.add(new SiddhiQuery(appName, substitutor.replace(queryTemplate), + false)); + } + return queries; + } + + protected void updateQueryList(List queryList, Map valuesMap) { + StrSubstitutor substitutor = new StrSubstitutor(valuesMap); + for (SiddhiQuery query : queryList) { + String updatedQuery = substitutor.replace(query.getApp()); + query.setApp(updatedQuery); + } + } + + protected String getUpdatedQuery(String query, Map valuesMap) { + StrSubstitutor substitutor = new StrSubstitutor(valuesMap); + return substitutor.replace(query); + } + + /** + *@param siddhiAppName Name of the userdefined siddhi app + * @param streamName Currently processing stream name + * @param groupingField Partition key field, if available otherwise null + * @return created topic name + * + * Creates the topic name from above parameters + */ + protected String getTopicName(String siddhiAppName, String streamName, String groupingField) { + return siddhiAppName + "_" + streamName + (groupingField == null ? "" + : ("_" + groupingField)); + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/DeployableSiddhiQueryGroup.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/DeployableSiddhiQueryGroup.java new file mode 100644 index 0000000000..bc959dc67c --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/DeployableSiddhiQueryGroup.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.appcreator; + +import java.util.ArrayList; +import java.util.List; + +/** + * POJO Class to hold created Siddhi Apps belonging to a single parse group. + */ +public class DeployableSiddhiQueryGroup { + + private String groupName; + private List siddhiQueries; + private boolean isReceiverQueryGroup; + private int parallelism; + private boolean isUserGivenSource; + + public DeployableSiddhiQueryGroup(String groupName, boolean receiverQueryGroup, int parallelism) { + this.groupName = groupName; + siddhiQueries = new ArrayList(); + isReceiverQueryGroup = receiverQueryGroup; + this.parallelism = parallelism; + } + + public List getSiddhiQueries() { + return siddhiQueries; + } + + public void setSiddhiQueries(List siddhiQueries) { + this.siddhiQueries = siddhiQueries; + } + + public String getGroupName() { + return groupName; + } + + public boolean isReceiverQueryGroup() { + return isReceiverQueryGroup; + } + + public void setReceiverQueryGroup(boolean receiverQueryGroup) { + isReceiverQueryGroup = receiverQueryGroup; + } + + public int getParallelism() { + return parallelism; + } + + public boolean isUserGivenSource() { + return isUserGivenSource; + } + + public void setUserGivenSource(boolean userGivenSource) { + isUserGivenSource = userGivenSource; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DeployableSiddhiQueryGroup that = (DeployableSiddhiQueryGroup) o; + if (getGroupName() != null ? !getGroupName().equals(that.getGroupName()) : that.getGroupName() != null) { + return false; + } + return getSiddhiQueries() != null ? getSiddhiQueries().equals(that.getSiddhiQueries()) + : that.getSiddhiQueries() == null; + } + + @Override + public int hashCode() { + int result = getGroupName() != null ? getGroupName().hashCode() : 0; + result = 31 * result + (getSiddhiQueries() != null ? getSiddhiQueries().hashCode() : 0); + return result; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/NatsSiddhiAppCreator.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/NatsSiddhiAppCreator.java new file mode 100644 index 0000000000..52228111bf --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/NatsSiddhiAppCreator.java @@ -0,0 +1,232 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.parser.core.appcreator; + +import io.siddhi.parser.core.topology.InputStreamDataHolder; +import io.siddhi.parser.core.topology.OutputStreamDataHolder; +import io.siddhi.parser.core.topology.PublishingStrategyDataHolder; +import io.siddhi.parser.core.topology.SiddhiQueryGroup; +import io.siddhi.parser.core.topology.SubscriptionStrategyDataHolder; +import io.siddhi.parser.core.util.TransportStrategy; +import io.siddhi.parser.service.model.MessagingSystem; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Creates distributed siddhi application which can be distributed using Nats-streaming. + */ +public class NatsSiddhiAppCreator extends AbstractSiddhiAppCreator { + + private static final Logger log = Logger.getLogger(NatsSiddhiAppCreator.class); + //App creator constants + public static final String APP_NAME = "appName"; + public static final String TOPIC_LIST = "topicList"; + public static final String CONSUMER_GROUP_ID = "groupID"; + public static final String BOOTSTRAP_SERVER_URL = "bootstrapServerURL"; + public static final String PARTITION_LIST = "partitionList"; + public static final String PARTITION_KEY = "partitionKey"; + public static final String DESTINATIONS = "destinations"; + public static final String PARTITION_NO = "partitionNo"; + public static final String MAPPING = "text"; + public static final String PARTITION_TOPIC = "partitionTopic"; + public static final String DESTINATION_TOPIC = "@destination(destination = '${" + + PARTITION_TOPIC + "}')"; + public static final String CLUSTER_ID = "clusterid"; + public static final String NATS_SERVER_URL = "natsserverurl"; + public static final String PARTITIONED_NATS_SINK_TEMPLATE = "@sink(type='nats'," + + "cluster.id='${" + CLUSTER_ID + "}'," + + "@distribution(strategy='partitioned', partitionKey='${" + PARTITION_KEY + "}'," + + "${" + DESTINATIONS + "}), bootstrap.servers=" + + "'${" + NATS_SERVER_URL + "}',@map(type='" + MAPPING + "'))"; + public static final String DEFAULT_NATS_SINK_TEMPLATE = "@sink(type='nats'," + + "cluster.id='${" + CLUSTER_ID + "}'," + + "destination = '${" + TOPIC_LIST + "}', bootstrap.servers=" + + "'${" + NATS_SERVER_URL + "}',@map(type='" + MAPPING + "'))"; + public static final String DEFAULT_NATS_SOURCE_TEMPLATE = "@source(type='nats'," + + "cluster.id='${" + CLUSTER_ID + "}'," + + "destination = '${" + TOPIC_LIST + "}', bootstrap.servers=" + + "'${" + NATS_SERVER_URL + "}',@map(type='" + MAPPING + "'))"; + public static final String QUEUE_GROUP_NAME = "queueGroupName"; + public static final String RR_NATS_SOURCE_TEMPLATE = "@source(type='nats'," + + "cluster.id='${" + CLUSTER_ID + "}'," + + "queue.group.name='${" + QUEUE_GROUP_NAME + "}'," + + "destination = '${" + TOPIC_LIST + "}', bootstrap.servers=" + + "'${" + NATS_SERVER_URL + "}',@map(type='" + MAPPING + "'))"; + + private String clusterId = ""; + private String natsServerUrl = ""; + + @Override + protected List createApps(String siddhiAppName, SiddhiQueryGroup queryGroup, + MessagingSystem messagingSystem) { + String groupName = queryGroup.getName(); + String queryTemplate = queryGroup.getSiddhiApp(); + List queryList = generateQueryList(queryTemplate, groupName, queryGroup + .getParallelism()); + if (messagingSystem != null && messagingSystem.getConfig() != null) { + natsServerUrl = messagingSystem.getConfig().getBootstrapServerURLs(); + clusterId = messagingSystem.getConfig().getClusterId(); + } + processInputStreams(siddhiAppName, groupName, queryList, queryGroup.getInputStreams().values()); + processOutputStreams(siddhiAppName, queryList, queryGroup.getOutputStreams().values()); + if (log.isDebugEnabled()) { + log.debug("Following parse list is created for the Siddhi Query Group " + queryGroup.getName() + " " + + "representing Siddhi App " + siddhiAppName + "."); + for (SiddhiQuery siddhiQuery : queryList) { + log.debug(siddhiQuery.getApp()); + } + } + return queryList; + } + + /** + * @param siddhiAppName Name of the initial user defined siddhi application. + * @param queryList Contains the parse of the current execution group replicated + * to the parallelism of the group. + * @param outputStreams Collection of current execution group's output streams + * Assigns the nats sink configurations for output streams. + */ + private void processOutputStreams(String siddhiAppName, List queryList, + Collection outputStreams) { + + Map sinkValuesMap = new HashMap(); + sinkValuesMap.put(CLUSTER_ID, clusterId); + sinkValuesMap.put(NATS_SERVER_URL, natsServerUrl); + + for (OutputStreamDataHolder outputStream : outputStreams) { + Map sinkList = new HashMap(); + Map partitionKeys = new HashMap(); + + for (PublishingStrategyDataHolder holder : outputStream.getPublishingStrategyList()) { + sinkValuesMap.put("topicList", siddhiAppName + "_" + + outputStream.getStreamName() + (holder.getGroupingField() == null ? "" : ("_" + holder + .getGroupingField()))); + if (holder.getStrategy() == TransportStrategy.FIELD_GROUPING) { + if (partitionKeys.get(holder.getGroupingField()) != null && + partitionKeys.get(holder.getGroupingField()) > holder.getParallelism()) { + continue; + } + + partitionKeys.put(holder.getGroupingField(), holder.getParallelism()); + sinkValuesMap.put(PARTITION_KEY, holder.getGroupingField()); + List destinations = new ArrayList(holder.getParallelism()); + + for (int i = 0; i < holder.getParallelism(); i++) { + Map destinationMap = new HashMap(holder.getParallelism()); + destinationMap.put(PARTITION_TOPIC, + sinkValuesMap.get(TOPIC_LIST) + + "_" + String.valueOf(i)); + destinations.add(getUpdatedQuery(DESTINATION_TOPIC, + destinationMap)); + } + + sinkValuesMap.put(DESTINATIONS, + StringUtils.join(destinations, ",")); + String sinkString = + getUpdatedQuery(PARTITIONED_NATS_SINK_TEMPLATE, + sinkValuesMap); + sinkList.put(sinkValuesMap.get(TOPIC_LIST), + sinkString); + } else { + //ATM we are handling both strategies in same manner. Later will improve to have multiple + // partitions for RR + String sinkString = getUpdatedQuery(DEFAULT_NATS_SINK_TEMPLATE, + sinkValuesMap); + sinkList.put(sinkValuesMap.get(TOPIC_LIST), sinkString); + } + } + Map queryValuesMap = new HashMap(1); + queryValuesMap.put(outputStream.getStreamName(), StringUtils.join(sinkList.values(), "\n")); + updateQueryList(queryList, queryValuesMap); + } + } + + /** + * @param siddhiAppName Name of the initial user defined siddhi application. + * @param queryList Contains the parse of the current execution group replicated + * to the parallelism of the group. + * @param inputStreams Collection of current execution group's input streams + * Assigns the nats source configurations for input streams. + */ + private void processInputStreams(String siddhiAppName, String groupName, List queryList, + Collection inputStreams) { + + Map sourceValuesMap = new HashMap(); + for (InputStreamDataHolder inputStream : inputStreams) { + SubscriptionStrategyDataHolder subscriptionStrategy = inputStream.getSubscriptionStrategy(); + sourceValuesMap.put(CLUSTER_ID, clusterId); + sourceValuesMap.put(NATS_SERVER_URL, natsServerUrl); + + if (!inputStream.isUserGiven()) { + if (subscriptionStrategy.getStrategy() == TransportStrategy.FIELD_GROUPING) { + sourceValuesMap.put(TOPIC_LIST, getTopicName(siddhiAppName, + inputStream.getStreamName(), inputStream.getSubscriptionStrategy().getPartitionKey())); + for (int i = 0; i < queryList.size(); i++) { + List sourceQueries = new ArrayList(); + List partitionNumbers = getPartitionNumbers(queryList.size(), subscriptionStrategy + .getOfferedParallelism(), i); + for (int topicCount : partitionNumbers) { + String topicName = getTopicName(siddhiAppName, inputStream.getStreamName(), + inputStream.getSubscriptionStrategy().getPartitionKey()) + "_" + + Integer.toString(topicCount); + + sourceValuesMap.put(TOPIC_LIST, topicName); + String sourceQuery = getUpdatedQuery(DEFAULT_NATS_SOURCE_TEMPLATE, sourceValuesMap); + sourceQueries.add(sourceQuery); + } + + String combinedQueryHeader = StringUtils.join(sourceQueries, + System.lineSeparator()); + Map queryValuesMap = new HashMap(1); + queryValuesMap.put(inputStream.getStreamName(), combinedQueryHeader); + String updatedQuery = getUpdatedQuery(queryList.get(i).getApp() + , queryValuesMap); + queryList.get(i).setApp(updatedQuery); + } + + } else if (subscriptionStrategy.getStrategy() == TransportStrategy.ROUND_ROBIN) { + sourceValuesMap.put(TOPIC_LIST, getTopicName(siddhiAppName, + inputStream.getStreamName(), null)); + sourceValuesMap.put(QUEUE_GROUP_NAME, groupName); + String sourceString = getUpdatedQuery(RR_NATS_SOURCE_TEMPLATE, sourceValuesMap); + Map queryValuesMap = new HashMap(1); + queryValuesMap.put(inputStream.getStreamName(), sourceString); + updateQueryList(queryList, queryValuesMap); + + } else if (subscriptionStrategy.getStrategy() == TransportStrategy.ALL) { + + sourceValuesMap.put(TOPIC_LIST, getTopicName(siddhiAppName, + inputStream.getStreamName(), null)); + for (SiddhiQuery aQueryList : queryList) { + String sourceString = getUpdatedQuery(DEFAULT_NATS_SOURCE_TEMPLATE, sourceValuesMap); + Map queryValuesMap = new HashMap(1); + queryValuesMap.put(inputStream.getStreamName(), sourceString); + String updatedQuery = getUpdatedQuery(aQueryList.getApp(), queryValuesMap); + aQueryList.setApp(updatedQuery); + } + } + } + } + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/SiddhiQuery.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/SiddhiQuery.java new file mode 100644 index 0000000000..ee6ff403f5 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/appcreator/SiddhiQuery.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.appcreator; + +/** + * Gives details about siddhi application. + */ +public class SiddhiQuery { + private String appName; + private String app; + private boolean isReceiverQuery; + + private SiddhiQuery() { + // Avoiding empty initialization + } + + public SiddhiQuery(String appName, String app, boolean isReceiverQuery) { + this.appName = appName; + this.app = app; + this.isReceiverQuery = isReceiverQuery; + } + + public String getAppName() { + return appName; + } + + public SiddhiQuery setAppName(String appName) { + this.appName = appName; + return this; + } + + public String getApp() { + return app; + } + + public SiddhiQuery setApp(String app) { + this.app = app; + return this; + } + + public boolean isReceiverQuery() { + return isReceiverQuery; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SiddhiQuery that = (SiddhiQuery) o; + if (getAppName() != null ? !getAppName().equals(that.getAppName()) : that.getAppName() != null) { + return false; + } + return getApp() != null ? getApp().equals(that.getApp()) : that.getApp() == null; + } + + @Override + public int hashCode() { + int result = getAppName() != null ? getAppName().hashCode() : 0; + result = 31 * result + (getApp() != null ? getApp().hashCode() : 0); + return result; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/InputStreamDataHolder.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/InputStreamDataHolder.java new file mode 100644 index 0000000000..e74059c8da --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/InputStreamDataHolder.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.topology; + +import io.siddhi.parser.core.util.EventHolder; +import io.siddhi.parser.core.util.SiddhiTopologyCreatorConstants; + +/** + * Data Holder to hold required details of Input Streams in {@link SiddhiTopology}. + */ +public class InputStreamDataHolder { + private String streamName; + private String streamDefinition; + private EventHolder eventHolderType; + private boolean isUserGiven; + private SubscriptionStrategyDataHolder subscriptionStrategy; + private boolean isInnerGroupStream; + + public InputStreamDataHolder(String streamName, String streamDefinition, EventHolder eventHolderType, + boolean isUserGiven, SubscriptionStrategyDataHolder subscriptionStrategy) { + this.streamName = streamName; + this.streamDefinition = streamDefinition; + this.eventHolderType = eventHolderType; + this.isUserGiven = isUserGiven; + this.subscriptionStrategy = subscriptionStrategy; + isInnerGroupStream = false; + } + + public String getStreamDefinition() { + return streamDefinition; + } + + public void setStreamDefinition(String streamDefinition) { + this.streamDefinition = streamDefinition; + } + + public String getStreamName() { + return streamName; + } + + public SubscriptionStrategyDataHolder getSubscriptionStrategy() { + return subscriptionStrategy; + } + + public boolean isUserGiven() { + return isUserGiven; + } + + public void setUserGiven(boolean userGiven) { + isUserGiven = userGiven; + } + + public EventHolder getEventHolderType() { + return eventHolderType; + } + + public boolean isInnerGroupStream() { + return isInnerGroupStream; + } + + public void setInnerGroupStream(boolean innerGroupStream) { + isInnerGroupStream = innerGroupStream; + } + + public boolean isUserGivenSource() { + return streamDefinition.toLowerCase().contains(SiddhiTopologyCreatorConstants.SOURCE_IDENTIFIER); + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/OutputStreamDataHolder.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/OutputStreamDataHolder.java new file mode 100644 index 0000000000..8a710a5b29 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/OutputStreamDataHolder.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.topology; + +import io.siddhi.parser.core.util.EventHolder; + +import java.util.ArrayList; +import java.util.List; + +/** + * Data Holder to hold required details of Output Streams in {@link SiddhiTopology}. + */ +public class OutputStreamDataHolder { + private String streamName; + private String streamDefinition; + private EventHolder eventHolderType; + private List publishingStrategyList; + private boolean isUserGiven; + private boolean isInnerGroupStream; + private boolean isSinkBridgeAdded; + + public OutputStreamDataHolder(String streamName, String streamDefinition, EventHolder eventHolderType, + boolean isUserGiven) { + this.streamName = streamName; + this.streamDefinition = streamDefinition; + this.eventHolderType = eventHolderType; + this.isUserGiven = isUserGiven; + this.publishingStrategyList = new ArrayList(); + isInnerGroupStream = false; + isSinkBridgeAdded = false; + } + + public String getStreamDefinition() { + return streamDefinition; + } + + public void setStreamDefinition(String streamDefinition) { + this.streamDefinition = streamDefinition; + } + + public String getStreamName() { + return streamName; + } + + public List getPublishingStrategyList() { + return publishingStrategyList; + } + + public boolean isUserGiven() { + return isUserGiven; + } + + public void addPublishingStrategy(PublishingStrategyDataHolder publishingStrategyDataHolder) { + publishingStrategyList.add(publishingStrategyDataHolder); + } + + public EventHolder getEventHolderType() { + return eventHolderType; + } + + public boolean isInnerGroupStream() { + return isInnerGroupStream; + } + + public void setInnerGroupStream(boolean innerGroupStream) { + isInnerGroupStream = innerGroupStream; + } + + public boolean isSinkBridgeAdded() { + return isSinkBridgeAdded; + } + + public void setSinkBridgeAdded(boolean sinkBridgeAdded) { + isSinkBridgeAdded = sinkBridgeAdded; + } +} + + diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/PublishingStrategyDataHolder.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/PublishingStrategyDataHolder.java new file mode 100644 index 0000000000..d9fa008a32 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/PublishingStrategyDataHolder.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.topology; + +import io.siddhi.parser.core.util.TransportStrategy; + +/** + * Data Holder to hold required details of Publishing Strategy of an Output Stream. Given Output Stream can have + * multiple publishing strategies for each consumer. + */ +public class PublishingStrategyDataHolder { + private TransportStrategy strategy; + private String groupingField = null; + private int parallelism; + + public PublishingStrategyDataHolder(TransportStrategy strategy, int parallelism) { + this.strategy = strategy; + this.parallelism = parallelism; + } + + public PublishingStrategyDataHolder(TransportStrategy strategy, String groupingField, int parallelism) { + this.strategy = strategy; + this.groupingField = groupingField; + this.parallelism = parallelism; + } + + public TransportStrategy getStrategy() { + return strategy; + } + + public String getGroupingField() { + return groupingField; + } + + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiQueryGroup.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiQueryGroup.java new file mode 100644 index 0000000000..6943252fcb --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiQueryGroup.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.parser.core.topology; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static io.siddhi.parser.core.appcreator.NatsSiddhiAppCreator.APP_NAME; + +/** + * Data Holder to hold required details of Query Groups in {@link SiddhiTopology}. + */ +public class SiddhiQueryGroup { + private String name; + private int parallelism; + private String siddhiApp; + private Map inputStreams; + private Map outputStreams; + private boolean messagingSourceAvailable = false; + private List queryList; + private boolean isReceiverQueryGroup; + + public SiddhiQueryGroup(String name, int parallelism) { + this.name = name; + this.parallelism = parallelism; + this.queryList = new ArrayList(); + siddhiApp = " "; + inputStreams = new HashMap(); + outputStreams = new HashMap(); + + } + + public List getQueryList() { + return queryList; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getParallelism() { + return parallelism; + } + + public String getSiddhiApp() { + //combination of InputStream definitions , OutputStream and queries + StringBuilder stringBuilder = new StringBuilder("@App:name(\"${" + APP_NAME + "}\") \n"); + for (InputStreamDataHolder inputStreamDataHolder : inputStreams.values()) { + siddhiApp = inputStreamDataHolder.getStreamDefinition(); + if (siddhiApp != null) { + stringBuilder.append(siddhiApp).append(";\n"); + } + } + for (OutputStreamDataHolder outputStreamDataHolder : outputStreams.values()) { + siddhiApp = outputStreamDataHolder.getStreamDefinition(); + if (siddhiApp != null) { + stringBuilder.append(siddhiApp).append(";\n"); + } + } + for (String aQueryList : queryList) { + stringBuilder.append(aQueryList).append(";\n"); + } + siddhiApp = stringBuilder.toString(); + return stringBuilder.toString(); + } + + public void addQuery(String query) { + queryList.add(query); + } + + public void addQueryAtFirst(String query) { + queryList.add(0, query); + } + + public void addOutputStream(String key, OutputStreamDataHolder outputStreamDataHolder) { + if (outputStreamDataHolder != null) { + outputStreams.put(key, outputStreamDataHolder); + } + } + + public void addInputStreams(Map inputStreamDataHolderMap) { + if (inputStreamDataHolderMap != null) { + this.inputStreams.putAll(inputStreamDataHolderMap); + } + } + + public Map getInputStreams() { + return inputStreams; + } + + public Map getOutputStreams() { + return outputStreams; + } + + public boolean isReceiverQueryGroup() { + return isReceiverQueryGroup; + } + + public void setReceiverQueryGroup(boolean receiverQueryGroup) { + isReceiverQueryGroup = receiverQueryGroup; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public boolean isMessagingSourceAvailable() { + return messagingSourceAvailable; + } + + public void setMessagingSourceAvailable(boolean messagingSourceAvailable) { + this.messagingSourceAvailable = messagingSourceAvailable; + } + +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiTopology.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiTopology.java new file mode 100644 index 0000000000..1a4df3685f --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiTopology.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.topology; + +import java.util.List; + +/** + * Distributed Topology of a user defined distributed Siddhi App. Topology contains all the required details for the + * underlying distribution provider to create new Apps. + */ +public class SiddhiTopology { + private String name; + private List queryGroupList; + private boolean transportChannelCreationEnabled; + private boolean isStatefulApp; + private boolean userGiveSourceStateful; + + public SiddhiTopology(String name, List queryGroupList, boolean transportChannelCreationEnabled, + boolean isStatefulApp, boolean userGiveSourceStateful) { + this.name = name; + this.queryGroupList = queryGroupList; + this.transportChannelCreationEnabled = transportChannelCreationEnabled; + this.isStatefulApp = isStatefulApp; + this.userGiveSourceStateful = userGiveSourceStateful; + } + + public String getName() { + return name; + } + + public List getQueryGroupList() { + return queryGroupList; + } + + public boolean isTransportChannelCreationEnabled() { + return transportChannelCreationEnabled; + } + + public void setTransportChannelCreationEnabled(boolean transportChannelCreationEnabled) { + this.transportChannelCreationEnabled = transportChannelCreationEnabled; + } + + public boolean isStatefulApp() { + return isStatefulApp; + } + + public boolean isUserGiveSourceStateful() { + return userGiveSourceStateful; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiTopologyCreatorImpl.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiTopologyCreatorImpl.java new file mode 100644 index 0000000000..03c489928c --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiTopologyCreatorImpl.java @@ -0,0 +1,676 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.topology; + +import io.siddhi.core.SiddhiAppRuntime; +import io.siddhi.core.partition.PartitionRuntime; +import io.siddhi.core.query.QueryRuntime; +import io.siddhi.core.stream.input.source.Source; +import io.siddhi.core.stream.output.sink.Sink; +import io.siddhi.core.table.Table; +import io.siddhi.core.window.Window; +import io.siddhi.parser.SiddhiParserDataHolder; +import io.siddhi.parser.core.SiddhiTopologyCreator; +import io.siddhi.parser.core.util.EventHolder; +import io.siddhi.parser.core.util.SiddhiTopologyCreatorConstants; +import io.siddhi.parser.core.util.TransportStrategy; +import io.siddhi.query.api.SiddhiApp; +import io.siddhi.query.api.annotation.Annotation; +import io.siddhi.query.api.annotation.Element; +import io.siddhi.query.api.definition.AbstractDefinition; +import io.siddhi.query.api.definition.StreamDefinition; +import io.siddhi.query.api.exception.SiddhiAppValidationException; +import io.siddhi.query.api.execution.ExecutionElement; +import io.siddhi.query.api.execution.partition.Partition; +import io.siddhi.query.api.execution.partition.PartitionType; +import io.siddhi.query.api.execution.partition.ValuePartitionType; +import io.siddhi.query.api.execution.query.Query; +import io.siddhi.query.api.execution.query.input.stream.InputStream; +import io.siddhi.query.api.expression.Variable; +import io.siddhi.query.api.util.AnnotationHelper; +import io.siddhi.query.api.util.ExceptionUtil; +import io.siddhi.query.compiler.SiddhiCompiler; +import org.apache.commons.lang3.text.StrSubstitutor; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.UUID; + +/** + * Consumes a Siddhi App and produce a {@link SiddhiTopology} based on distributed annotations. + */ + +public class SiddhiTopologyCreatorImpl implements SiddhiTopologyCreator { + + private static final Logger log = Logger.getLogger(SiddhiTopologyCreatorImpl.class); + private static final String DEFAULT_MESSAGING_SYSTEM = "nats"; + private SiddhiTopologyDataHolder siddhiTopologyDataHolder; + private SiddhiApp siddhiApp; + private String siddhiAppName; + private SiddhiAppRuntime siddhiAppRuntime; + private String userDefinedSiddhiApp; + private boolean transportChannelCreationEnabled = true; + private boolean isUserGiveSourceStateful = false; + + @Override + public SiddhiTopology createTopology(String userDefinedSiddhiApp) { + this.userDefinedSiddhiApp = userDefinedSiddhiApp; + this.siddhiApp = SiddhiCompiler.parse(userDefinedSiddhiApp); + this.siddhiAppRuntime = SiddhiParserDataHolder.getSiddhiManager().createSiddhiAppRuntime(userDefinedSiddhiApp); + this.siddhiAppName = getSiddhiAppName(); + this.siddhiTopologyDataHolder = new SiddhiTopologyDataHolder(siddhiAppName, userDefinedSiddhiApp); + + SiddhiQueryGroup siddhiQueryGroup; + String execGroupName; + String defaultExecGroupName = siddhiAppName + "-" + UUID.randomUUID(); + //Parallelism is set to default parallelism, since distributed deployment is not supported at the moment + int parallelism = SiddhiTopologyCreatorConstants.DEFAULT_PARALLEL; + + for (ExecutionElement executionElement : siddhiApp.getExecutionElementList()) { + //groupName is set to default, since all elements go under a single group + execGroupName = defaultExecGroupName; + siddhiQueryGroup = createSiddhiQueryGroup(execGroupName, parallelism); + addExecutionElement(executionElement, siddhiQueryGroup, execGroupName); + } + + checkUserGivenSourceDistribution(); + assignPublishingStrategyOutputStream(); + cleanInnerGroupStreams(siddhiTopologyDataHolder.getSiddhiQueryGroupMap().values()); + if (log.isDebugEnabled()) { + log.debug("Topology was created with " + siddhiTopologyDataHolder.getSiddhiQueryGroupMap().values().size + () + " groups. Following are the partial Siddhi apps."); + for (SiddhiQueryGroup debugSiddhiQueryGroup : siddhiTopologyDataHolder.getSiddhiQueryGroupMap().values()) { + log.debug(debugSiddhiQueryGroup.getSiddhiApp()); + } + } + return new SiddhiTopology(siddhiTopologyDataHolder.getSiddhiAppName(), new ArrayList<> + (siddhiTopologyDataHolder.getSiddhiQueryGroupMap().values()), + transportChannelCreationEnabled, isStatefulApp(), isUserGiveSourceStateful); + } + + private boolean isStatefulApp() { + for (List sourceList : siddhiAppRuntime.getSources()) { + for (Source source : sourceList) { + if (source.isStateful()) { + if (source.getType().equalsIgnoreCase(DEFAULT_MESSAGING_SYSTEM)) { + isUserGiveSourceStateful = true; + } + return true; + } + } + } + for (List sinkList : siddhiAppRuntime.getSinks()) { + for (Sink sink : sinkList) { + if (sink.isStateful()) { + return true; + } + } + } + for (QueryRuntime queryRuntime : siddhiAppRuntime.getQueries()) { + if (queryRuntime.isStateful()) { + return true; + } + } + for (PartitionRuntime partitionRuntime : siddhiAppRuntime.getPartitions()) { + for (QueryRuntime queryRuntime : partitionRuntime.getQueries()) { + if (queryRuntime.isStateful()) { + return true; + } + } + } + for (Table table : siddhiAppRuntime.getTables()) { + if (table.isStateful()) { + return true; + } + } + for (Window window : siddhiAppRuntime.getWindows()) { + if (window.isStateful()) { + return true; + } + } + return false; + } + + /** + * Clean input and output streams of the group by removing streams that are only used within that group. + * + * @param siddhiQueryGroups Collection of Siddhi Query Groups + */ + private void cleanInnerGroupStreams(Collection siddhiQueryGroups) { + for (SiddhiQueryGroup siddhiQueryGroup : siddhiQueryGroups) { + siddhiQueryGroup.getInputStreams() + .entrySet().removeIf(stringInputStreamDataHolderEntry -> stringInputStreamDataHolderEntry.getValue() + .isInnerGroupStream()); + siddhiQueryGroup.getOutputStreams() + .entrySet().removeIf( + stringOutputStreamDataHolderEntry -> stringOutputStreamDataHolderEntry.getValue() + .isInnerGroupStream()); + } + } + + /** + * Get SiddhiAppName if given by the user unless a unique SiddhiAppName is returned. + * + * @return String SiddhiAppName + */ + private String getSiddhiAppName() { + Element element = + AnnotationHelper.getAnnotationElement(SiddhiTopologyCreatorConstants.SIDDHIAPP_NAME_IDENTIFIER, + null, siddhiApp.getAnnotations()); + if (element == null) { + return SiddhiTopologyCreatorConstants.DEFAULT_SIDDHIAPP_NAME + "-" + UUID.randomUUID(); + } else { + return element.getValue(); + } + } + + /** + * If the corresponding {@link SiddhiQueryGroup} exists that object will be returned unless new object is created. + */ + private SiddhiQueryGroup createSiddhiQueryGroup(String execGroupName, int parallel) { + SiddhiQueryGroup siddhiQueryGroup; + if (!siddhiTopologyDataHolder.getSiddhiQueryGroupMap().containsKey(execGroupName)) { + siddhiQueryGroup = new SiddhiQueryGroup(execGroupName, parallel); + } else { + siddhiQueryGroup = siddhiTopologyDataHolder.getSiddhiQueryGroupMap().get(execGroupName); + } + return siddhiQueryGroup; + } + + /** + * If {@link Query,Partition} string contains {@link SiddhiTopologyCreatorConstants#DISTRIBUTED_IDENTIFIER}, + * meta info related to distributed deployment is removed. + */ + private String removeMetaInfoQuery(ExecutionElement executionElement, String queryElement) { + int[] queryContextStartIndex; + int[] queryContextEndIndex; + + for (Annotation annotation : executionElement.getAnnotations()) { + if (annotation.getName().toLowerCase() + .equals(SiddhiTopologyCreatorConstants.DISTRIBUTED_IDENTIFIER)) { + queryContextStartIndex = annotation.getQueryContextStartIndex(); + queryContextEndIndex = annotation.getQueryContextEndIndex(); + queryElement = queryElement.replace( + ExceptionUtil.getContext(queryContextStartIndex, queryContextEndIndex, + siddhiTopologyDataHolder.getUserDefinedSiddhiApp()), ""); + break; + } + } + return queryElement; + } + + /** + * Get Map of {@link InputStreamDataHolder} corresponding to a {@link Query}. + * + * @return Map of StreamID and {@link InputStreamDataHolder} + */ + private Map getInputStreamHolderInfo(Query executionElement, + SiddhiQueryGroup siddhiQueryGroup, + boolean isQuery) { + Map inputStreamDataHolderMap = new HashMap<>(); + int parallel = siddhiQueryGroup.getParallelism(); + String execGroupName = siddhiQueryGroup.getName(); + StreamDataHolder streamDataHolder; + InputStream inputStream = (executionElement).getInputStream(); + for (String inputStreamId : inputStream.getUniqueStreamIds()) { + //not an inner Stream + if (!inputStreamId.startsWith(SiddhiTopologyCreatorConstants.INNERSTREAM_IDENTIFIER)) { + streamDataHolder = extractStreamHolderInfo(inputStreamId, execGroupName); + // Handle transportStrategy when parallelism > 1 + TransportStrategy transportStrategy = TransportStrategy.ALL; + InputStreamDataHolder inputStreamDataHolder = siddhiQueryGroup.getInputStreams().get(inputStreamId); + String partitionKey = siddhiTopologyDataHolder.getPartitionKeyMap().get(inputStreamId); + inputStreamDataHolder = new InputStreamDataHolder(inputStreamId, + streamDataHolder.getStreamDefinition(), + streamDataHolder.getEventHolderType(), + streamDataHolder.isUserGiven(), + new SubscriptionStrategyDataHolder(parallel, transportStrategy, partitionKey)); + inputStreamDataHolderMap.put(inputStreamId, inputStreamDataHolder); + } + } + return inputStreamDataHolderMap; + } + + /** + * Get {@link OutputStreamDataHolder} for an OutputStream of a {@link Query}. + * + * @return {@link OutputStreamDataHolder} + */ + private OutputStreamDataHolder getOutputStreamHolderInfo(String outputStreamId, int parallel, + String execGroupName) { + if (!outputStreamId.startsWith(SiddhiTopologyCreatorConstants.INNERSTREAM_IDENTIFIER)) { + StreamDataHolder streamDataHolder = extractStreamHolderInfo(outputStreamId, execGroupName); + return new OutputStreamDataHolder(outputStreamId, streamDataHolder.getStreamDefinition(), + streamDataHolder.getEventHolderType(), streamDataHolder.isUserGiven()); + } else { + return null; + } + } + + /** + * Extract primary information corresponding to {@link InputStreamDataHolder} and {@link OutputStreamDataHolder}. + * Information is retrieved and assigned to {@link StreamDataHolder} . + * + * @return StreamDataHolder + */ + private StreamDataHolder extractStreamHolderInfo(String streamId, String groupName) { + String streamDefinition; + int[] queryContextEndIndex; + int[] queryContextStartIndex; + StreamDataHolder streamDataHolder = new StreamDataHolder(true); + Map streamDefinitionMap = siddhiApp.getStreamDefinitionMap(); + boolean isUserGivenTransport; + if (streamDefinitionMap.containsKey(streamId)) { + StreamDefinition definition = streamDefinitionMap.get(streamId); + queryContextStartIndex = definition.getQueryContextStartIndex(); + queryContextEndIndex = definition.getQueryContextEndIndex(); + streamDefinition = ExceptionUtil.getContext(queryContextStartIndex, queryContextEndIndex, + siddhiTopologyDataHolder.getUserDefinedSiddhiApp()); + isUserGivenTransport = isUserGivenTransport(streamDefinition); + if (!isUserGivenTransport && !siddhiApp.getTriggerDefinitionMap().containsKey(streamId)) { + streamDefinition = "${" + streamId + "}" + streamDefinition; + } + streamDataHolder = + new StreamDataHolder(streamDefinition, EventHolder.STREAM, isUserGivenTransport); + + } else if (siddhiApp.getTableDefinitionMap().containsKey(streamId)) { + AbstractDefinition tableDefinition = siddhiApp.getTableDefinitionMap().get(streamId); + streamDataHolder.setEventHolderType(EventHolder.INMEMORYTABLE); + for (Annotation annotation : tableDefinition.getAnnotations()) { + if (annotation.getName().toLowerCase().equals( + SiddhiTopologyCreatorConstants.PERSISTENCETABLE_IDENTIFIER)) { + streamDataHolder.setEventHolderType(EventHolder.TABLE); + siddhiTopologyDataHolder.setStatefulApp(true); + break; + } + } + queryContextStartIndex = siddhiApp.getTableDefinitionMap().get(streamId).getQueryContextStartIndex(); + queryContextEndIndex = siddhiApp.getTableDefinitionMap().get(streamId).getQueryContextEndIndex(); + streamDataHolder.setStreamDefinition(ExceptionUtil.getContext(queryContextStartIndex, + queryContextEndIndex, siddhiTopologyDataHolder.getUserDefinedSiddhiApp())); + if (!streamDataHolder.getEventHolderType().equals(EventHolder.INMEMORYTABLE) && !siddhiTopologyDataHolder + .getInMemoryMap().containsKey(streamId)) { + siddhiTopologyDataHolder.getInMemoryMap().put(streamId, groupName); + } + } else if (siddhiApp.getWindowDefinitionMap().containsKey(streamId)) { + siddhiTopologyDataHolder.setStatefulApp(true); + queryContextStartIndex = siddhiApp.getWindowDefinitionMap().get(streamId).getQueryContextStartIndex(); + queryContextEndIndex = siddhiApp.getWindowDefinitionMap().get(streamId).getQueryContextEndIndex(); + streamDataHolder.setStreamDefinition(ExceptionUtil.getContext(queryContextStartIndex, + queryContextEndIndex, siddhiTopologyDataHolder.getUserDefinedSiddhiApp())); + streamDataHolder.setEventHolderType(EventHolder.WINDOW); + if (!siddhiTopologyDataHolder.getInMemoryMap().containsKey(streamId)) { + siddhiTopologyDataHolder.getInMemoryMap().put(streamId, groupName); + } + //if stream definition is an inferred definition + } else if (streamDataHolder.getStreamDefinition() == null) { + if (siddhiAppRuntime.getStreamDefinitionMap().containsKey(streamId)) { + streamDataHolder = new StreamDataHolder( + "${" + streamId + "}" + + siddhiAppRuntime.getStreamDefinitionMap().get(streamId).toString(), + EventHolder.STREAM, false); + } + } + return streamDataHolder; + } + + /** + * Checks whether a given Stream definition contains Sink or Source configurations. + */ + private boolean isUserGivenTransport(String streamDefinition) { + return streamDefinition.toLowerCase().contains( + SiddhiTopologyCreatorConstants.SOURCE_IDENTIFIER) || streamDefinition.toLowerCase().contains + (SiddhiTopologyCreatorConstants.SINK_IDENTIFIER); + } + + private void checkUserGivenSourceDistribution() { + boolean passthroughQueriesAvailable = false; + List siddhiQueryGroupsList = + new ArrayList<>(siddhiTopologyDataHolder.getSiddhiQueryGroupMap().values()); + List passthroughQueries = new ArrayList<>(); + + for (SiddhiQueryGroup siddhiQueryGroup : siddhiQueryGroupsList) { + for (Entry entry : siddhiQueryGroup.getInputStreams().entrySet()) { + String streamId = entry.getKey(); + InputStreamDataHolder inputStreamDataHolder = entry.getValue(); + if (inputStreamDataHolder.getEventHolderType() != null && inputStreamDataHolder + .getEventHolderType().equals(EventHolder.STREAM) && + inputStreamDataHolder.isUserGivenSource()) { + String runtimeDefinition = removeMetaInfoStream(streamId, + inputStreamDataHolder.getStreamDefinition(), SiddhiTopologyCreatorConstants + .SOURCE_IDENTIFIER); + StreamDefinition streamDefinition = siddhiAppRuntime.getStreamDefinitionMap() + .get(inputStreamDataHolder.getStreamName()); + int nonMessagingSources = 0; + for (Annotation annotation : streamDefinition.getAnnotations()) { + if (annotation.getName().equalsIgnoreCase(SiddhiTopologyCreatorConstants.SOURCE_IDENTIFIER + .replace("@", ""))) { + if (annotation.getElement("type").equalsIgnoreCase(DEFAULT_MESSAGING_SYSTEM)) { + siddhiQueryGroup.setMessagingSourceAvailable(true); + } else { + nonMessagingSources++; + } + } + } + if ((!siddhiQueryGroup.isMessagingSourceAvailable() || nonMessagingSources > 0) + && isStatefulApp()) { + passthroughQueriesAvailable = true; + passthroughQueries.addAll(generatePassthroughQueryList( + inputStreamDataHolder, runtimeDefinition)); + inputStreamDataHolder.setStreamDefinition(runtimeDefinition); + inputStreamDataHolder.setUserGiven(false); + InputStreamDataHolder holder = siddhiQueryGroup.getInputStreams().get(streamId); + String consumingStream = "${" + streamId + "} " + removeMetaInfoStream(streamId, + holder.getStreamDefinition(), SiddhiTopologyCreatorConstants.SOURCE_IDENTIFIER); + holder.setStreamDefinition(consumingStream); + holder.setUserGiven(false); + } + } + } + } + //Created SiddhiQueryGroup object will be moved as the first element of the linkedHashMap of already created + // SiddhiApps only if at least one Passthrough parse is being created. + if (passthroughQueriesAvailable) { + addFirst(passthroughQueries); + } + } + + private List generatePassthroughQueryList(InputStreamDataHolder inputStreamDataHolder, + String runtimeDefinition) { + List passthroughQueryGroupList = new ArrayList<>(); + SiddhiQueryGroup passthroughQueryGroup = createPassthroughQueryGroup(inputStreamDataHolder, + runtimeDefinition, SiddhiTopologyCreatorConstants.DEFAULT_PARALLEL); + passthroughQueryGroup.setReceiverQueryGroup(true); + passthroughQueryGroupList.add(passthroughQueryGroup); + return passthroughQueryGroupList; + } + + /** + * If the Stream definition string contains {@link SiddhiTopologyCreatorConstants#SINK_IDENTIFIER} or + * {@link SiddhiTopologyCreatorConstants#SOURCE_IDENTIFIER} ,meta info related to Sink/Source configuration is + * removed. + * + * @return Stream definition String after removing Sink/Source configuration + */ + private String removeMetaInfoStream(String streamId, String streamDefinition, String identifier) { + int[] queryContextStartIndex; + int[] queryContextEndIndex; + for (Annotation annotation : siddhiApp.getStreamDefinitionMap().get(streamId).getAnnotations()) { + if (annotation.getName().toLowerCase().equals(identifier.replace("@", ""))) { + queryContextStartIndex = annotation.getQueryContextStartIndex(); + queryContextEndIndex = annotation.getQueryContextEndIndex(); + streamDefinition = streamDefinition.replace( + ExceptionUtil.getContext(queryContextStartIndex, queryContextEndIndex, + siddhiTopologyDataHolder.getUserDefinedSiddhiApp()), ""); + } + } + return streamDefinition; + } + + /** + * PassthroughSiddhiQueryGroups will be moved as the first elements of the existing linkedHashMap of + * SiddhiQueryGroups. + * + * @param passthroughSiddhiQueryGroupList List of Passthrough queries + */ + private void addFirst(List passthroughSiddhiQueryGroupList) { + + Map output = new LinkedHashMap(); + for (SiddhiQueryGroup passthroughSiddhiQueryGroup : passthroughSiddhiQueryGroupList) { + output.put(passthroughSiddhiQueryGroup.getName(), passthroughSiddhiQueryGroup); + } + output.putAll(siddhiTopologyDataHolder.getSiddhiQueryGroupMap()); + siddhiTopologyDataHolder.getSiddhiQueryGroupMap().clear(); + siddhiTopologyDataHolder.getSiddhiQueryGroupMap().putAll(output); + } + + /** + * Passthrough parse is created using {@link SiddhiTopologyCreatorConstants#DEFAULT_PASSTROUGH_QUERY_TEMPLATE} + * and required {@link InputStreamDataHolder} and {@link OutputStreamDataHolder} are assigned to the + * SiddhiQueryGroup. + */ + private SiddhiQueryGroup createPassthroughQueryGroup(InputStreamDataHolder inputStreamDataHolder, + String runtimeDefinition, int parallelism) { + + String passthroughExecGroupName = siddhiTopologyDataHolder.getSiddhiAppName() + "-" + + SiddhiTopologyCreatorConstants.PASSTHROUGH + "-" + new Random().nextInt(99999); + SiddhiQueryGroup siddhiQueryGroup = new SiddhiQueryGroup(passthroughExecGroupName, + parallelism); + String streamId = inputStreamDataHolder.getStreamName(); + Map valuesMap = new HashMap(); + String inputStreamID = SiddhiTopologyCreatorConstants.PASSTHROUGH + inputStreamDataHolder.getStreamName(); + valuesMap.put(SiddhiTopologyCreatorConstants.INPUTSTREAMID, inputStreamID); + valuesMap.put(SiddhiTopologyCreatorConstants.OUTPUTSTREAMID, streamId); + StrSubstitutor substitutor = new StrSubstitutor(valuesMap); + String passThroughQuery = substitutor.replace(SiddhiTopologyCreatorConstants.DEFAULT_PASSTROUGH_QUERY_TEMPLATE); + siddhiQueryGroup.addQuery(passThroughQuery); + String inputStreamDefinition = inputStreamDataHolder.getStreamDefinition().replace(streamId, inputStreamID); + String outputStreamDefinition = "${" + streamId + "} " + runtimeDefinition; + siddhiQueryGroup.getInputStreams() + .put(inputStreamID, new InputStreamDataHolder(inputStreamID, + inputStreamDefinition, EventHolder.STREAM, true, + new SubscriptionStrategyDataHolder( + SiddhiTopologyCreatorConstants.DEFAULT_PARALLEL, + TransportStrategy.ALL, null))); + siddhiQueryGroup.getOutputStreams().put(streamId, new OutputStreamDataHolder(streamId, outputStreamDefinition, + EventHolder.STREAM, false)); + return siddhiQueryGroup; + } + + /** + * Adds the given execution element to the SiddhiQueryGroup. + * + * @param executionElement The execution element to be added to the SiddhiQueryGroup. + * @param siddhiQueryGroup SiddhiQueryGroup which the execution element to be added. + * @param queryGroupName Name of the above SiddhiQueryGroup. + */ + private void addExecutionElement(ExecutionElement executionElement, SiddhiQueryGroup siddhiQueryGroup, + String queryGroupName) { + int[] queryContextEndIndex; + int[] queryContextStartIndex; + int parallelism = siddhiQueryGroup.getParallelism(); + if (executionElement instanceof Query) { + //set parse string + queryContextStartIndex = ((Query) executionElement).getQueryContextStartIndex(); + queryContextEndIndex = ((Query) executionElement).getQueryContextEndIndex(); + siddhiQueryGroup.addQuery(removeMetaInfoQuery(executionElement, ExceptionUtil + .getContext(queryContextStartIndex, queryContextEndIndex, userDefinedSiddhiApp))); + siddhiQueryGroup.addInputStreams(getInputStreamHolderInfo((Query) executionElement, + siddhiQueryGroup, true)); + String outputStreamId = ((Query) executionElement).getOutputStream().getId(); + siddhiQueryGroup.addOutputStream(outputStreamId, getOutputStreamHolderInfo(outputStreamId, parallelism, + queryGroupName)); + } else if (executionElement instanceof Partition) { + queryContextStartIndex = ((Partition) executionElement) + .getQueryContextStartIndex(); + queryContextEndIndex = ((Partition) executionElement).getQueryContextEndIndex(); + siddhiQueryGroup.addQuery(removeMetaInfoQuery(executionElement, ExceptionUtil + .getContext(queryContextStartIndex, queryContextEndIndex, userDefinedSiddhiApp))); + + //store partition details + storePartitionInfo((Partition) executionElement, queryGroupName); + //for a partition iterate over containing queries to identify required inputStreams and OutputStreams + for (Query query : ((Partition) executionElement).getQueryList()) { + siddhiQueryGroup.addInputStreams(getInputStreamHolderInfo(query, siddhiQueryGroup, false)); + String outputStreamId = query.getOutputStream().getId(); + siddhiQueryGroup.addOutputStream(outputStreamId, getOutputStreamHolderInfo(outputStreamId, + parallelism, queryGroupName)); + } + + } + siddhiTopologyDataHolder.getSiddhiQueryGroupMap().put(queryGroupName, siddhiQueryGroup); + } + + + /** + * Publishing strategies for an OutputStream is assigned if the corresponding outputStream is being used as an + * InputStream in a separate group. + */ + private void assignPublishingStrategyOutputStream() { + int i = 0; + List siddhiQueryGroupsList = + new ArrayList<>(siddhiTopologyDataHolder.getSiddhiQueryGroupMap().values()); + + for (SiddhiQueryGroup siddhiQueryGroup1 : siddhiQueryGroupsList) { + for (Entry entry : siddhiQueryGroup1.getOutputStreams().entrySet()) { + OutputStreamDataHolder outputStreamDataHolder = entry.getValue(); + String streamId = entry.getKey(); + + if (outputStreamDataHolder.getEventHolderType().equals(EventHolder.STREAM)) { + Map> fieldGroupingSubscriptions = new HashMap<>(); + boolean isInnerGroupStream = true; + for (SiddhiQueryGroup siddhiQueryGroup2 : siddhiQueryGroupsList.subList(i + 1, + siddhiQueryGroupsList.size())) { + if (siddhiQueryGroup2.getInputStreams().containsKey(streamId)) { + isInnerGroupStream = false; + InputStreamDataHolder inputStreamDataHolder = siddhiQueryGroup2.getInputStreams() + .get(streamId); + + //Checks if the Distributed SiddhiApp contains OutputStreams with Sink Configurations + //which are used as an inputStream in a different execGroup + //If above then a outputStream Sink configuration is concatenated with placeholder + //corresponding to the outputStreamId + //InputStream definition will be replaced. + if (outputStreamDataHolder.isUserGiven()) { + String runtimeStreamDefinition = removeMetaInfoStream(streamId, + inputStreamDataHolder.getStreamDefinition(), + SiddhiTopologyCreatorConstants.SINK_IDENTIFIER); + + if (!outputStreamDataHolder.isSinkBridgeAdded()) { + String outputStreamDefinition = outputStreamDataHolder. + getStreamDefinition().replace(runtimeStreamDefinition, "\n" + + "${" + streamId + "} ") + runtimeStreamDefinition; + outputStreamDataHolder.setStreamDefinition(outputStreamDefinition); + outputStreamDataHolder.setSinkBridgeAdded(true); + } + + inputStreamDataHolder.setStreamDefinition("${" + streamId + "} " + + runtimeStreamDefinition); + inputStreamDataHolder.setUserGiven(false); + } + + SubscriptionStrategyDataHolder subscriptionStrategy = inputStreamDataHolder. + getSubscriptionStrategy(); + if (subscriptionStrategy.getStrategy().equals(TransportStrategy.FIELD_GROUPING)) { + String partitionKey = subscriptionStrategy.getPartitionKey(); + if (fieldGroupingSubscriptions.containsKey(partitionKey)) { + fieldGroupingSubscriptions.get(partitionKey).add( + subscriptionStrategy); + } else { + List strategyList = new ArrayList<>(); + strategyList.add(subscriptionStrategy); + fieldGroupingSubscriptions.put(partitionKey, strategyList); + } + + } else { + outputStreamDataHolder.addPublishingStrategy( + new PublishingStrategyDataHolder(subscriptionStrategy.getStrategy(), + siddhiQueryGroup2.getParallelism())); + } + + } + } + if (isInnerGroupStream && !outputStreamDataHolder.isUserGiven()) { + siddhiQueryGroup1.getOutputStreams().get(streamId).setInnerGroupStream(true); + if (siddhiQueryGroup1.getInputStreams().get(streamId) != null) { + siddhiQueryGroup1.getInputStreams().get(streamId).setInnerGroupStream(true); + } + + } + for (Entry> subscriptionParentEntry : + fieldGroupingSubscriptions.entrySet()) { + String partitionKey = subscriptionParentEntry.getKey(); + List strategyList = subscriptionParentEntry.getValue(); + strategyList.sort(new StrategyParallelismComparator().reversed()); + int parallelism = strategyList.get(0).getOfferedParallelism(); + for (SubscriptionStrategyDataHolder holder : strategyList) { + holder.setOfferedParallelism(parallelism); + } + outputStreamDataHolder.addPublishingStrategy( + new PublishingStrategyDataHolder( + TransportStrategy.FIELD_GROUPING, partitionKey, parallelism)); + } + } + + } + i++; + } + } + + /** + * Details required while processing Partitions are stored. + */ + private void storePartitionInfo(Partition partition, String execGroupName) { + + List partitionGroupList; //contains all the execGroups containing partitioned streamId + String partitionKey; + + //assign partitionGroupMap + for (Entry partitionTypeEntry : partition.getPartitionTypeMap().entrySet()) { + + String streamID = partitionTypeEntry.getKey(); + if (siddhiTopologyDataHolder.getPartitionGroupMap().containsKey(streamID)) { + partitionGroupList = siddhiTopologyDataHolder.getPartitionGroupMap().get(streamID); + } else { + partitionGroupList = new ArrayList<>(); + } + + partitionGroupList.add(execGroupName); + siddhiTopologyDataHolder.getPartitionGroupMap().put(streamID, partitionGroupList); + + if (partitionTypeEntry.getValue() instanceof ValuePartitionType) { + partitionKey = ((Variable) ((ValuePartitionType) partitionTypeEntry.getValue()).getExpression()) + .getAttributeName(); + + //More than one partition corresponding to same partition-key of a stream can not reside in the same + // execGroup. + if (siddhiTopologyDataHolder.getPartitionKeyGroupMap().get(streamID + partitionKey) != null && + siddhiTopologyDataHolder.getPartitionKeyGroupMap().get(streamID + partitionKey) + .equals(execGroupName)) { + throw new SiddhiAppValidationException("Unsupported in distributed setup :More than 1 partition " + + "residing on the same execGroup " + execGroupName + " for " + streamID + " " + + partitionKey); + } + siddhiTopologyDataHolder.getPartitionKeyGroupMap().put(streamID + partitionKey, execGroupName); + siddhiTopologyDataHolder.getPartitionKeyMap().put(streamID, partitionKey); + updateInputStreamDataHolders(streamID, partitionKey); + } else { + //Not yet supported + throw new SiddhiAppValidationException("Unsupported: " + + execGroupName + " Range PartitionType not Supported in Distributed SetUp"); + } + } + } + + private void updateInputStreamDataHolders(String streamID, String partitionKey) { + for (SiddhiQueryGroup siddhiQueryGroup : siddhiTopologyDataHolder.getSiddhiQueryGroupMap().values()) { + InputStreamDataHolder holder = siddhiQueryGroup.getInputStreams().get(streamID); + if (holder != null && holder.getSubscriptionStrategy().getStrategy() != TransportStrategy.FIELD_GROUPING) { + holder.getSubscriptionStrategy().setPartitionKey(partitionKey); + } + } + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiTopologyDataHolder.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiTopologyDataHolder.java new file mode 100644 index 0000000000..7a24bfd161 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SiddhiTopologyDataHolder.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.topology; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Data holder for data required for {@link SiddhiTopologyCreatorImpl} which is responsible for creating. + * {@link SiddhiTopology} + */ +public class SiddhiTopologyDataHolder { + + private String siddhiAppName; + private String userDefinedSiddhiApp; + private Map inMemoryMap; //InMemoryTables and Defined Windows using + private Map partitionKeyMap; // + private Map> partitionGroupMap; // + private Map partitionKeyGroupMap; // + private Map siddhiQueryGroupMap; + private boolean isStatefulApp = false; + + public SiddhiTopologyDataHolder(String siddhiAppName, String userDefinedSiddhiApp) { + this.siddhiAppName = siddhiAppName; + this.userDefinedSiddhiApp = userDefinedSiddhiApp; + this.siddhiQueryGroupMap = new LinkedHashMap<>(); + this.partitionKeyMap = new HashMap<>(); + this.partitionGroupMap = new HashMap<>(); + this.partitionKeyGroupMap = new HashMap<>(); + this.inMemoryMap = new HashMap<>(); + } + + public Map getPartitionKeyGroupMap() { + + return partitionKeyGroupMap; + } + + public Map getInMemoryMap() { + return inMemoryMap; + } + + public String getSiddhiAppName() { + return siddhiAppName; + } + + public String getUserDefinedSiddhiApp() { + + return userDefinedSiddhiApp; + } + + public Map getSiddhiQueryGroupMap() { + return siddhiQueryGroupMap; + } + + public Map getPartitionKeyMap() { + return partitionKeyMap; + } + + public Map> getPartitionGroupMap() { + return partitionGroupMap; + } + + public boolean isStatefulApp() { + return isStatefulApp; + } + + public void setStatefulApp(boolean statefulApp) { + isStatefulApp = statefulApp; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/StrategyParallelismComparator.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/StrategyParallelismComparator.java new file mode 100644 index 0000000000..c848c0b2c2 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/StrategyParallelismComparator.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.topology; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * Comparator for subscription strategies. + */ +public class StrategyParallelismComparator implements Comparator, Serializable { + @Override + public int compare(SubscriptionStrategyDataHolder o1, SubscriptionStrategyDataHolder o2) { + return o1.getOfferedParallelism() - o2.getOfferedParallelism(); + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/StreamDataHolder.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/StreamDataHolder.java new file mode 100644 index 0000000000..137715cf1c --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/StreamDataHolder.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.topology; + +import io.siddhi.parser.core.util.EventHolder; + +/** + * Temporary class for enable mapping. + */ +public class StreamDataHolder { + + private String streamDefinition; + private EventHolder eventHolderType; + private boolean isUserGiven; + + public StreamDataHolder(String streamDefinition, EventHolder eventHolderType, boolean isUserGiven) { + this.streamDefinition = streamDefinition; + this.eventHolderType = eventHolderType; + this.isUserGiven = isUserGiven; + } + + public StreamDataHolder(boolean isUserGiven) { + this.isUserGiven = isUserGiven; + } + + public boolean isUserGiven() { + return isUserGiven; + } + + public String getStreamDefinition() { + return streamDefinition; + } + + public void setStreamDefinition(String streamDefinition) { + this.streamDefinition = streamDefinition; + } + + public EventHolder getEventHolderType() { + return eventHolderType; + } + + public void setEventHolderType(EventHolder eventHolderType) { + this.eventHolderType = eventHolderType; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SubscriptionStrategyDataHolder.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SubscriptionStrategyDataHolder.java new file mode 100644 index 0000000000..3114baceca --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/topology/SubscriptionStrategyDataHolder.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.topology; + +import io.siddhi.parser.core.util.TransportStrategy; + +/** + * Holder for Subscription Strategy information. + */ +public class SubscriptionStrategyDataHolder { + private TransportStrategy strategy; + private int offeredParallelism; + private String partitionKey; + + public SubscriptionStrategyDataHolder(int offeredParallelism, TransportStrategy strategy, String partitionKey) { + this.offeredParallelism = offeredParallelism; + this.strategy = strategy; + this.partitionKey = partitionKey; + } + + public TransportStrategy getStrategy() { + return strategy; + } + + public int getOfferedParallelism() { + return offeredParallelism; + } + + public String getPartitionKey() { + return partitionKey; + } + + public void setOfferedParallelism(int offeredParallelism) { + this.offeredParallelism = offeredParallelism; + } + + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/util/EventHolder.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/util/EventHolder.java new file mode 100644 index 0000000000..7a02e5d38c --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/util/EventHolder.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.util; + +/** + * Details of Event Holder. + */ +public enum EventHolder { + STREAM, + INMEMORYTABLE, + TABLE, + WINDOW, +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/util/SiddhiTopologyCreatorConstants.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/util/SiddhiTopologyCreatorConstants.java new file mode 100644 index 0000000000..cd40e2730e --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/util/SiddhiTopologyCreatorConstants.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.util; + +/** + * This class contains constants needed for the Topology creation. + */ +public class SiddhiTopologyCreatorConstants { + public static final Integer DEFAULT_PARALLEL = 1; + + public static final String INNERSTREAM_IDENTIFIER = "#"; + + public static final String SINK_IDENTIFIER = "@sink"; + + public static final String SOURCE_IDENTIFIER = "@source"; + + public static final String PERSISTENCETABLE_IDENTIFIER = "store"; + + public static final String TYPE_IDENTIFIER = "type"; + + public static final String DEFAULT_SIDDHIAPP_NAME = "SiddhiApp"; + + public static final String DISTRIBUTED_IDENTIFIER = "dist"; + + public static final String PARALLEL_IDENTIFIER = "parallel"; + + public static final String EXECGROUP_IDENTIFIER = "execGroup"; + + public static final String INFO_IDENTIFIER = "info"; + + public static final String SIDDHIAPP_NAME_IDENTIFIER = "name"; + + public static final String INPUTSTREAMID = "inputStreamID"; + + public static final String OUTPUTSTREAMID = "outputStreamID"; + + public static final String DEFAULT_PASSTROUGH_QUERY_TEMPLATE = "from ${" + INPUTSTREAMID + "} select * " + "insert" + + " into " + "${" + OUTPUTSTREAMID + "}"; + + public static final String TRANSPORT_CHANNEL_CREATION_IDENTIFIER = "transportChannelCreationEnabled"; + + public static final String PASSTHROUGH = "passthrough"; + + public static final String AGGREGATION = "aggregation"; + + public static final String INMEMORY = "in-memory"; + + public static final String EXECUTION_ELEMENT = "Query/Partition"; +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/util/TransportStrategy.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/util/TransportStrategy.java new file mode 100644 index 0000000000..c954334751 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/core/util/TransportStrategy.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser.core.util; + +/** + * Details of TransportStrategy. + */ + +public enum TransportStrategy { + FIELD_GROUPING, + ROUND_ROBIN, + ALL +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/SiddhiParserApi.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/SiddhiParserApi.java new file mode 100644 index 0000000000..e8f9d31666 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/SiddhiParserApi.java @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://wso2.com) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.siddhi.parser.service; + +import io.siddhi.core.SiddhiAppRuntime; +import io.siddhi.core.SiddhiManager; +import io.siddhi.core.stream.ServiceDeploymentInfo; +import io.siddhi.core.stream.input.source.Source; +import io.siddhi.distribution.common.common.SiddhiAppRuntimeService; +import io.siddhi.distribution.common.common.utils.config.FileConfigManager; +import io.siddhi.parser.SiddhiParserDataHolder; +import io.siddhi.parser.core.SiddhiAppCreator; +import io.siddhi.parser.core.SiddhiTopologyCreator; +import io.siddhi.parser.core.appcreator.DeployableSiddhiQueryGroup; +import io.siddhi.parser.core.appcreator.NatsSiddhiAppCreator; +import io.siddhi.parser.core.appcreator.SiddhiQuery; +import io.siddhi.parser.core.topology.SiddhiTopology; +import io.siddhi.parser.core.topology.SiddhiTopologyCreatorImpl; +import io.siddhi.parser.service.model.ApiResponseMessage; +import io.siddhi.parser.service.model.DeployableSiddhiApp; +import io.siddhi.parser.service.model.MessagingSystem; +import io.siddhi.parser.service.model.SiddhiParserRequest; +import io.siddhi.parser.service.model.SourceDeploymentConfig; +import org.osgi.framework.BundleContext; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.osgi.service.component.annotations.ReferencePolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.carbon.config.ConfigurationException; +import org.wso2.carbon.config.provider.ConfigProvider; +import org.wso2.msf4j.MicroservicesRunner; +import org.wso2.msf4j.config.TransportsFileConfiguration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.NotFoundException; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Response; + +/** + * Siddhi Parser Service used by the Siddhi Kubernetes Operator to parse the Siddhi Apps. + */ +@Component( + name = "siddhi-parser-service", + immediate = true +) +@Path("/siddhi-parser") +public class SiddhiParserApi { + + private static final Logger log = LoggerFactory.getLogger(SiddhiParserApi.class); + private static final String TRANSPORT_ROOT_CONFIG_ELEMENT = "wso2.transport.http"; + private static final String SIDDHI_PARSER_ACTIVATION_SYS_PROPERTY = "siddhi-parser"; + private static TransportsFileConfiguration transportsFileConfiguration; + private static MicroservicesRunner microservicesRunner; + private static volatile boolean microserviceActive; + private static SiddhiAppCreator appCreator = new NatsSiddhiAppCreator(); + private static SiddhiTopologyCreator siddhiTopologyCreator = new SiddhiTopologyCreatorImpl(); + + public SiddhiParserApi() { + SiddhiManager siddhiManager = new SiddhiManager(); + FileConfigManager fileConfigManager = new FileConfigManager(SiddhiParserDataHolder.getConfigProvider()); + fileConfigManager.init(); + siddhiManager.setConfigManager(fileConfigManager); + SiddhiParserDataHolder.setSiddhiManager(siddhiManager); + } + + @GET + @Path("/") + public String get() { + return "Siddhi Parser Service is up and running."; + } + + @POST + @Path("/parse") + @Consumes({"application/json"}) + @Produces({"application/json"}) + public Response parseSiddhiApp(SiddhiParserRequest request) throws NotFoundException { + try { + List deployableSiddhiApps = new ArrayList<>(); + List userGivenApps = populateAppWithEnvs(request.getPropertyMap(), request.getSiddhiApps()); + for (String app : userGivenApps) { + List sourceDeploymentConfigs = getSourceDeploymentConfigs(app); + SiddhiTopology topology = siddhiTopologyCreator.createTopology(app); + boolean isAppStateful = topology.isStatefulApp(); + MessagingSystem messagingSystemConfig = request.getMessagingSystem(); + + if (messagingSystemConfig != null && !messagingSystemConfig.isEmpty()) { + List queryGroupList = appCreator.createApps(topology, + messagingSystemConfig); + + for (DeployableSiddhiQueryGroup deployableSiddhiQueryGroup : queryGroupList) { + if (deployableSiddhiQueryGroup.isReceiverQueryGroup()) { + for (SiddhiQuery siddhiQuery : deployableSiddhiQueryGroup.getSiddhiQueries()) { + deployableSiddhiApps.add(new DeployableSiddhiApp(siddhiQuery.getApp(), + sourceDeploymentConfigs, topology.isUserGiveSourceStateful())); + } + } else { + for (SiddhiQuery siddhiQuery : deployableSiddhiQueryGroup.getSiddhiQueries()) { + DeployableSiddhiApp deployableSiddhiApp = new DeployableSiddhiApp(siddhiQuery.getApp(), + isAppStateful); + if (deployableSiddhiQueryGroup.isUserGivenSource()) { + deployableSiddhiApp.setSourceDeploymentConfigs(sourceDeploymentConfigs); + } + deployableSiddhiApps.add(deployableSiddhiApp); + } + } + } + } else { + DeployableSiddhiApp deployableSiddhiApp = new DeployableSiddhiApp(app, isAppStateful); + if (sourceDeploymentConfigs != null && sourceDeploymentConfigs.size() != 0) { + deployableSiddhiApp.setSourceDeploymentConfigs(sourceDeploymentConfigs); + } + deployableSiddhiApps.add(deployableSiddhiApp); + } + } + return Response.ok().entity(deployableSiddhiApps).build(); + } catch (Exception e) { + log.error("Exception caught while parsing the app. " + e.getMessage(), e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(new ApiResponseMessage(ApiResponseMessage.ERROR, + "Exception caught while parsing the app. " + e.getMessage())).build(); + } + } + + private List populateAppWithEnvs(Map envMap, List siddhiApps) { + List populatedApps = new ArrayList<>(); + if (siddhiApps != null) { + for (String siddhiApp : siddhiApps) { + if (siddhiApp.contains("$")) { + if (envMap != null) { + String envPattern = "\\$\\{(\\w+)\\}"; + Pattern expr = Pattern.compile(envPattern); + Matcher matcher = expr.matcher(siddhiApp); + while (matcher.find()) { + for (int i = 1; i <= matcher.groupCount(); i++) { + String envValue = envMap.getOrDefault(matcher.group(i), ""); + envValue = envValue.replace("\\", "\\\\"); + Pattern subexpr = Pattern.compile("\\$\\{" + matcher.group(i) + "\\}"); + siddhiApp = subexpr.matcher(siddhiApp).replaceAll(envValue); + } + } + } + } + populatedApps.add(siddhiApp); + } + } + return populatedApps; + } + + private List getSourceDeploymentConfigs(String siddhiApp) { + List sourceDeploymentConfigs = new ArrayList<>(); + SiddhiAppRuntime siddhiAppRuntime = SiddhiParserDataHolder.getSiddhiManager().createSiddhiAppRuntime(siddhiApp); + Collection> sources = siddhiAppRuntime.getSources(); + for (List sourceList : sources) { + for (Source source : sourceList) { + SourceDeploymentConfig response; + ServiceDeploymentInfo serviceDeploymentInfo = source.getServiceDeploymentInfo(); + if (serviceDeploymentInfo != null) { + response = new SourceDeploymentConfig(serviceDeploymentInfo.getPort(), + serviceDeploymentInfo.getServiceProtocol().name(), + serviceDeploymentInfo.isSecured(), + serviceDeploymentInfo.isPulling(), + serviceDeploymentInfo.getDeploymentProperties()); + sourceDeploymentConfigs.add(response); + } + } + } + return sourceDeploymentConfigs; + } + + /** + * This is the activation method of Siddhi Parser Api Service Component. This will be called when its references are + * satisfied. + * + * @param bundleContext the bundle context instance of this bundle. + * @throws Exception this will be thrown if an issue occurs while executing the activate method + */ + @Activate + protected void start(BundleContext bundleContext) throws Exception { + if (transportsFileConfiguration != null) { + microservicesRunner = new MicroservicesRunner(transportsFileConfiguration); + } + String toolIdentifier = System.getProperty(SIDDHI_PARSER_ACTIVATION_SYS_PROPERTY); + Optional.ofNullable(toolIdentifier) + .ifPresent(identifier -> { + startStoresApiMicroservice(); + }); + } + + /** + * This is the deactivation method of Siddhi Parser Api Service Component. This will be called when this component + * is being stopped or references are satisfied during runtime. + * + * @throws Exception this will be thrown if an issue occurs while executing the de-activate method + */ + @Deactivate + protected void stop() throws Exception { + log.debug("Siddhi Parser API deactivated."); + stopStoresApiMicroservice(); + } + + /** + * This is the activation method of Parser Api Microservice. + */ + public static void startStoresApiMicroservice() { + if (microservicesRunner != null && !microserviceActive) { + microservicesRunner.deploy(new SiddhiParserApi()); + microservicesRunner.start(); + microserviceActive = true; + } + log.info("Siddhi Parser REST API activated."); + } + + /** + * This is the deactivate method of Parser Api Microservice. + */ + public static void stopStoresApiMicroservice() { + if (microservicesRunner != null && microserviceActive) { + microservicesRunner.stop(); + microserviceActive = false; + } + } + + @Reference( + name = "carbon.config.provider", + service = ConfigProvider.class, + cardinality = ReferenceCardinality.MANDATORY, + policy = ReferencePolicy.DYNAMIC, + unbind = "unregisterConfigProvider" + ) + protected void registerConfigProvider(ConfigProvider configProvider) { + SiddhiParserDataHolder.setConfigProvider(configProvider); + try { + transportsFileConfiguration = configProvider.getConfigurationObject(TRANSPORT_ROOT_CONFIG_ELEMENT, + TransportsFileConfiguration.class); + } catch (ConfigurationException e) { + log.error("Error while loading TransportsConfiguration for " + TRANSPORT_ROOT_CONFIG_ELEMENT, e); + } + } + + protected void unregisterConfigProvider(ConfigProvider configProvider) { + SiddhiParserDataHolder.setConfigProvider(null); + } + + @Reference( + name = "siddhi.app.runtime.service.reference", + service = SiddhiAppRuntimeService.class, + cardinality = ReferenceCardinality.AT_LEAST_ONE, + policy = ReferencePolicy.DYNAMIC, + unbind = "unsetSiddhiAppRuntimeService" + ) + protected void setSiddhiAppRuntimeService(SiddhiAppRuntimeService siddhiAppRuntimeService) { + } + + protected void unsetSiddhiAppRuntimeService(SiddhiAppRuntimeService siddhiAppRuntimeService) { + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/ApiResponseMessage.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/ApiResponseMessage.java new file mode 100644 index 0000000000..64c38af4d4 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/ApiResponseMessage.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package io.siddhi.parser.service.model; + +/** + * API response message class. + */ +public class ApiResponseMessage { + public static final int ERROR = 1; + public static final int WARNING = 2; + public static final int INFO = 3; + public static final int OK = 4; + public static final int TOO_BUSY = 5; + + int code; + String type; + String message; + + public ApiResponseMessage() { + } + + public ApiResponseMessage(int code, String message) { + this.code = code; + switch (code) { + case ERROR: + setType("error"); + break; + case WARNING: + setType("warning"); + break; + case INFO: + setType("info"); + break; + case OK: + setType("ok"); + break; + case TOO_BUSY: + setType("too busy"); + break; + default: + setType("unknown"); + break; + } + this.message = message; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/DeployableSiddhiApp.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/DeployableSiddhiApp.java new file mode 100644 index 0000000000..c74b5494b6 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/DeployableSiddhiApp.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.parser.service.model; + +import java.util.List; + +/** + * Represents a deployable partial Siddhi App. + */ +public class DeployableSiddhiApp { + private boolean persistenceEnabled = false; + int replicas = 1; + private String siddhiApp; + private List sourceDeploymentConfigs = null; + + public void setSourceDeploymentConfigs(List sourceDeploymentConfigs) { + this.sourceDeploymentConfigs = sourceDeploymentConfigs; + } + + public DeployableSiddhiApp(String siddhiApp) { + this.siddhiApp = siddhiApp; + } + + public DeployableSiddhiApp(String siddhiApp, List sourceList, boolean persistenceEnabled) { + this.siddhiApp = siddhiApp; + this.sourceDeploymentConfigs = sourceList; + this.persistenceEnabled = persistenceEnabled; + } + + public DeployableSiddhiApp(String siddhiApp, boolean persistenceEnabled) { + this.siddhiApp = siddhiApp; + this.persistenceEnabled = persistenceEnabled; + } + + public boolean isPersistenceEnabled() { + return persistenceEnabled; + } + + public int getReplicas() { + return replicas; + } + + public String getSiddhiApp() { + return siddhiApp; + } + + public List getSourceDeploymentConfigs() { + return sourceDeploymentConfigs; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/MessagingConfig.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/MessagingConfig.java new file mode 100644 index 0000000000..c5c5ea393d --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/MessagingConfig.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.parser.service.model; + +import com.google.gson.annotations.SerializedName; + +import java.util.Arrays; + +/** + * Messaging System Configuration. + */ +public class MessagingConfig { + + @SerializedName("clusterId") + private String clusterId; + + @SerializedName("bootstrapServers") + private String[] bootstrapServers; + + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + + public String getBootstrapServerURLs() { + if (bootstrapServers != null && bootstrapServers.length > 0) { + StringBuilder stringBuilder = new StringBuilder(); + for (String urls : bootstrapServers) { + stringBuilder.append(urls.replace("'", "\\'")).append(","); + } + stringBuilder.deleteCharAt(stringBuilder.length() - 1); + return stringBuilder.toString(); + } else { + return ""; + } + } + + public MessagingConfig(String clusterId, String[] bootstrapServers) { + this.clusterId = clusterId; + this.bootstrapServers = Arrays.copyOf(bootstrapServers, bootstrapServers.length); + } + + public boolean isEmpty() { + return clusterId == null || clusterId.isEmpty() || bootstrapServers == null || bootstrapServers.length == 0 + || bootstrapServers[0].isEmpty(); + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/MessagingSystem.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/MessagingSystem.java new file mode 100644 index 0000000000..2e5847acb6 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/MessagingSystem.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.siddhi.parser.service.model; + +import com.google.gson.annotations.SerializedName; + +/** + * Messaging System root namespace. + */ +public class MessagingSystem { + + @SerializedName("type") + private String type; + + @SerializedName("config") + private MessagingConfig config; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public MessagingConfig getConfig() { + return config; + } + + public void setConfig(MessagingConfig config) { + this.config = config; + } + + public boolean isEmpty() { + return type == null || type.isEmpty() || (config == null) || config.isEmpty(); + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/SiddhiParserRequest.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/SiddhiParserRequest.java new file mode 100644 index 0000000000..1f5aab5c3a --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/SiddhiParserRequest.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://wso2.com) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.siddhi.parser.service.model; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Map; + +/** + * Siddhi Parser Request Model. + */ +public class SiddhiParserRequest { + + @SerializedName("siddhiApps") + private List siddhiApps = null; + + @SerializedName("propertyMap") + private Map propertyMap = null; + + @SerializedName("messagingSystem") + private MessagingSystem messagingSystem = null; + + public Map getPropertyMap() { + return propertyMap; + } + + public SiddhiParserRequest setPropertyMap(Map propertyMap) { + this.propertyMap = propertyMap; + return this; + } + + public List getSiddhiApps() { + return siddhiApps; + } + + public void setSiddhiApps(List siddhiApps) { + this.siddhiApps = siddhiApps; + } + + public MessagingSystem getMessagingSystem() { + return messagingSystem; + } + + public void setMessagingSystem(MessagingSystem messagingSystem) { + this.messagingSystem = messagingSystem; + } +} diff --git a/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/SourceDeploymentConfig.java b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/SourceDeploymentConfig.java new file mode 100644 index 0000000000..1f6f5a3877 --- /dev/null +++ b/runner/components/io.siddhi.parser/src/main/java/io/siddhi/parser/service/model/SourceDeploymentConfig.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://wso2.com) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.siddhi.parser.service.model; + +import java.util.HashMap; +import java.util.Map; + +/** + * Source Deployment Configuration Model. + */ +public class SourceDeploymentConfig { + private String serviceProtocol; + private boolean secured = false; + private int port; + private boolean isPulling; + private Map deploymentProperties = new HashMap<>(); + + public SourceDeploymentConfig(int port, String serviceProtocol, boolean secured, boolean isPulling, Map deploymentProperties) { + this.port = port; + this.serviceProtocol = serviceProtocol; + this.secured = secured; + this.isPulling = isPulling; + this.deploymentProperties = deploymentProperties; + } + + public String getServiceProtocol() { + return serviceProtocol; + } + + public SourceDeploymentConfig setServiceProtocol(String serviceProtocol) { + this.serviceProtocol = serviceProtocol; + return this; + } + + public boolean isSecured() { + return secured; + } + + public SourceDeploymentConfig setSecured(boolean secured) { + this.secured = secured; + return this; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public boolean isPulling() { + return isPulling; + } + + public void setPulling(boolean pulling) { + isPulling = pulling; + } + + public Map getDeploymentProperties() { + return deploymentProperties; + } + + public SourceDeploymentConfig setDeploymentProperties(Map deploymentProperties) { + this.deploymentProperties = deploymentProperties; + return this; + } +} diff --git a/runner/components/io.siddhi.parser/src/test/java/io/siddhi/parser/NatsAppCreatorTestCase.java b/runner/components/io.siddhi.parser/src/test/java/io/siddhi/parser/NatsAppCreatorTestCase.java new file mode 100644 index 0000000000..0baef12b5f --- /dev/null +++ b/runner/components/io.siddhi.parser/src/test/java/io/siddhi/parser/NatsAppCreatorTestCase.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.parser; + + +import io.siddhi.core.SiddhiManager; +import io.siddhi.extension.io.http.source.HttpSource; +import io.siddhi.extension.io.nats.source.NATSSource; +import io.siddhi.extension.map.json.sourcemapper.JsonSourceMapper; +import io.siddhi.parser.core.SiddhiAppCreator; +import io.siddhi.parser.core.appcreator.DeployableSiddhiQueryGroup; +import io.siddhi.parser.core.appcreator.NatsSiddhiAppCreator; +import io.siddhi.parser.core.topology.SiddhiTopology; +import io.siddhi.parser.core.topology.SiddhiTopologyCreatorImpl; +import io.siddhi.parser.service.model.MessagingConfig; +import io.siddhi.parser.service.model.MessagingSystem; +import org.apache.log4j.Logger; +import org.testcontainers.containers.GenericContainer; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.List; + +/** + * Create the distributed partial siddhi applications which communicates using nats broker. + */ +public class NatsAppCreatorTestCase { + + private static final Logger log = Logger.getLogger(NatsAppCreatorTestCase.class); + private static final String CLUSTER_ID = "test-cluster"; + private static String NATS_SERVER_URL = "nats://localhost:4222"; + private int port; + private MessagingSystem messagingSystem = new MessagingSystem(); + + @BeforeClass + private void initializeDockerContainer() throws InterruptedException { + SiddhiManager siddhiManager = new SiddhiManager(); + siddhiManager.setExtension("http-source", HttpSource.class); + siddhiManager.setExtension("map-json", JsonSourceMapper.class); + siddhiManager.setExtension("nats-source", NATSSource.class); + SiddhiParserDataHolder.setSiddhiManager(siddhiManager); + GenericContainer simpleWebServer + = new GenericContainer("nats-streaming:0.11.2"); + simpleWebServer.setPrivilegedMode(true); + simpleWebServer.start(); + port = simpleWebServer.getMappedPort(4222); + NATS_SERVER_URL += port; + messagingSystem.setConfig(new MessagingConfig(CLUSTER_ID,new String[]{NATS_SERVER_URL})); + messagingSystem.setType("nats"); + Thread.sleep(500); + } + + /** Test the topology creation for a particular siddhi app includes nats transport. + */ + @Test + public void testSiddhiTopologyCreator() { + String siddhiApp = "@App:name('Energy-Alert-App')\n" + + "@App:description('Energy consumption and anomaly detection')\n" + + "@source(type = 'http', topic = 'device-power', @map(type = 'json'))\n" + + "@source(type='nats',cluster.id='test-cluster',destination = 'dummy-destination', " + + "bootstrap.servers='nats://localhost:8080',@map(type='json')) \n" + + "define stream DevicePowerStream (type string, deviceID string, power int," + + " roomID string);\n" + + "@sink(type = 'log')\n" + + "define stream AlertStream (deviceID string, roomID string, initialPower double, " + + "finalPower double,autorityContactEmail string);\n" + + "@info(name = 'monitered-filter')\n" + + "from DevicePowerStream[type == 'monitored']\n" + + "select deviceID, power, roomID\n" + + "insert current events into MonitoredDevicesPowerStream;\n" + + "@info(name = 'power-increase-pattern')\n" + + "partition with (deviceID of MonitoredDevicesPowerStream)\n" + + "begin\n" + + "@info(name = 'avg-calculator')\n" + + "from MonitoredDevicesPowerStream#window.time(2 min)\n" + + "select deviceID, avg(power) as avgPower, roomID\n" + + "insert current events into #AvgPowerStream;\n" + + "@info(name = 'power-increase-detector')\n" + + "from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) " + + "<= avgPower] within 10 min\n" + + "select e1.deviceID as deviceID, e1.avgPower as initialPower, " + + "e2.avgPower as finalPower, e1.roomID\n" + + "insert current events into RisingPowerStream;\n" + + "end;\n" + + "@info(name = 'power-range-filter')\n" + + "from RisingPowerStream[finalPower > 100]\n" + + "select deviceID, roomID, initialPower, finalPower, " + + "'no-reply@powermanagement.com' as autorityContactEmail\n" + + "insert current events into AlertStream;\n" + + "@info(name = 'internal-filter')\n" + + "from DevicePowerStream[type == 'internal']\n" + + "select deviceID, power\n" + + "insert current events into InternaltDevicesPowerStream;\n"; + SiddhiTopologyCreatorImpl siddhiTopologyCreator = new SiddhiTopologyCreatorImpl(); + SiddhiTopology topology = siddhiTopologyCreator.createTopology(siddhiApp); + SiddhiAppCreator appCreator = new NatsSiddhiAppCreator(); + List queryGroupList = appCreator.createApps(topology, messagingSystem); + + Assert.assertEquals(queryGroupList.size(), 2); + } +} diff --git a/runner/components/io.siddhi.parser/src/test/resources/testng.xml b/runner/components/io.siddhi.parser/src/test/resources/testng.xml new file mode 100644 index 0000000000..c613762c2c --- /dev/null +++ b/runner/components/io.siddhi.parser/src/test/resources/testng.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + + diff --git a/runner/features/io.siddhi.parser.service.feature/pom.xml b/runner/features/io.siddhi.parser.service.feature/pom.xml new file mode 100644 index 0000000000..f23ee75e72 --- /dev/null +++ b/runner/features/io.siddhi.parser.service.feature/pom.xml @@ -0,0 +1,80 @@ + + + + + + io.siddhi.distribution + io.siddhi.distribution.parent + 5.1.0-SNAPSHOT + ../../../pom.xml + + 4.0.0 + + io.siddhi.parser.service.feature + carbon-feature + Siddhi Distribution - Store Query REST API Feature + http://wso2.org + This feature contains the bundles related to Siddhi Store REST API + + + + io.siddhi.distribution + io.siddhi.parser + ${io.siddhi.distribution.version} + + + + + + + org.wso2.carbon.maven + carbon-feature-plugin + ${carbon.feature.plugin.version} + true + + + 1-p2-feature-generation + + generate + + + ../etc/feature.properties + + + org.wso2.carbon.p2.category.type + server + + + org.eclipse.equinox.p2.type.group + true + + + + + io.siddhi.parser + ${project.version} + + + + + + + + + diff --git a/runner/pom.xml b/runner/pom.xml index ad20c50626..b9bd0fc125 100644 --- a/runner/pom.xml +++ b/runner/pom.xml @@ -40,6 +40,7 @@ components/io.siddhi.distribution.store.api.rest components/io.siddhi.distribution.msf4j.interceptor.common components/io.siddhi.distribution.health.check.core + components/io.siddhi.parser features/io.siddhi.distribution.core.feature features/io.siddhi.distribution.common.feature @@ -48,6 +49,7 @@ features/io.siddhi.distribution.store.api.rest.feature features/io.siddhi.distribution.msf4j.interceptor.common.feature features/io.siddhi.distribution.health.check.core.feature + features/io.siddhi.parser.service.feature @@ -168,6 +170,11 @@ io.siddhi.distribution.core.feature zip + + io.siddhi.distribution + io.siddhi.parser.service.feature + zip + io.siddhi.distribution io.siddhi.distribution.common.feature @@ -229,6 +236,21 @@ io.siddhi.distribution.health.check.core.feature zip + + org.testcontainers + testcontainers + test + + + io.nats + java-nats-streaming + test + + + io.nats + java-nats-streaming + test + @@ -570,6 +592,10 @@ io.siddhi.distribution.core.feature ${io.siddhi.distribution.version} + + io.siddhi.parser.service.feature + ${io.siddhi.distribution.version} + io.siddhi.distribution.common.feature ${io.siddhi.distribution.version} @@ -744,6 +770,10 @@ io.siddhi.distribution.core.feature.group ${io.siddhi.distribution.version} + + io.siddhi.parser.service.feature.group + ${io.siddhi.distribution.version} + io.siddhi.distribution.common.feature.group ${io.siddhi.distribution.version} @@ -871,6 +901,14 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + +