diff --git a/nifi-docker/dockermaven-stateless/Dockerfile b/nifi-docker/dockermaven-stateless/Dockerfile index 8340487d2528e..9dece1f15137e 100644 --- a/nifi-docker/dockermaven-stateless/Dockerfile +++ b/nifi-docker/dockermaven-stateless/Dockerfile @@ -48,9 +48,11 @@ ENV NIFI_HOME ${NIFI_BASE_DIR}/nifi-current # Setup NiFi user RUN addgroup -g ${GID} nifi && adduser -s /bin/sh -u ${UID} -G nifi -D nifi -RUN mkdir -p $NIFI_HOME +RUN mkdir -p $NIFI_HOME && chown nifi:nifi $NIFI_HOME +RUN mkdir -p ${NIFI_HOME}/work/ && chown nifi:nifi ${NIFI_HOME}/work/ && chmod 777 ${NIFI_HOME}/work/ + +COPY --chown=nifi:nifi $WORKING_DIR ${NIFI_HOME}/work/ COPY --chown=nifi:nifi $STATELESS_LIB_DIR ${NIFI_HOME}/stateless-lib/ -COPY --chown=nifi:nifi $WORKING_DIR ${NIFI_HOME}/working/ #NiFi's HDFS processors require core-site.xml or hdfs-site.xml to exist on disk before they can be started... @@ -63,13 +65,15 @@ RUN echo ' \n\ fs.hdfs.impl \n\ org.apache.hadoop.hdfs.DistributedFileSystem \n\ \n\ - ' > /tmp/core-site.xml -RUN chmod 666 /tmp/core-site.xml + ' > /tmp/core-site.xml && chown nifi /tmp/core-site.xml && chmod 777 /tmp/core-site.xml + +RUN mkdir -p /hadoop/yarn/local && chown nifi /hadoop/yarn/local && chmod 777 /hadoop/yarn/local USER nifi EXPOSE 8080 WORKDIR ${NIFI_HOME} + ENTRYPOINT ["/usr/bin/java", "-cp", "stateless-lib/*", "org.apache.nifi.stateless.NiFiStateless"] CMD ["RunOpenwhiskActionServer", "8080"] \ No newline at end of file diff --git a/nifi-docker/dockermaven-stateless/pom.xml b/nifi-docker/dockermaven-stateless/pom.xml index fec7ac2f6c6fd..97dadaec32667 100644 --- a/nifi-docker/dockermaven-stateless/pom.xml +++ b/nifi-docker/dockermaven-stateless/pom.xml @@ -66,6 +66,22 @@ run + + remove framework nar + process-sources + + + + + + + + + + + run + + @@ -84,7 +100,7 @@ 1000 ${project.version} target/stateless-lib - target/working + target/work apache/nifi-stateless ${project.version}-dockermaven diff --git a/nifi-stateless/README.md b/nifi-stateless/README.md index 943c382544362..566afac60dcef 100644 --- a/nifi-stateless/README.md +++ b/nifi-stateless/README.md @@ -12,8 +12,10 @@ See the License for the specific language governing permissions and limitations under the License. --> -# Stateless NiFi +# Stateless NiFi +Similar to other stream processing frameworks, receipt of incoming data is not acknowledged until it is written to a destination. In the event of failure, data can be replayed from the source rather than relying on a stateful content repository. This will not work for all cases (e.g. fire-and-forget HTTP/tcp), but a large portion of use cases have a resilient source to retry from. +Note: Provenance, metrics, logs are not extracted at this time. Docker and other container engines can be used for logs and metrics. ### Build: `mvn package -P docker` @@ -25,40 +27,30 @@ After building, the image can be used as follows Where the arguments dictate the runtime to use: ``` -1) RunFromRegistry [Once|Continuous] [] [] - RunFromRegistry [Once|Continuous] --json +1) RunFromRegistry [Once|Continuous] --json RunFromRegistry [Once|Continuous] --file # Filename of JSON file that matches the examples below. -2) RunYARNServiceFromRegistry <# of Containers> \ - [] [] - RunYARNServiceFromRegistry <# of Containers> --json - RunYARNServiceFromRegistry <# of Containers> --file +2) RunYARNServiceFromRegistry <# of Containers> --json + RunYARNServiceFromRegistry <# of Containers> --file -3) RunOpenwhiskActionServer +3) RunOpenwhiskActionServer ``` ### Examples: ``` 1) docker run --rm -it nifi-stateless:1.10.0-SNAPSHOT-dockermaven \ - RunFromRegistry Once http://172.0.0.1:61080 e53b8a0d-5c85-4fcd-912a-1c549a586c83 6cf8277a-c402-4957-8623-0fa9890dd45d \ - "DestinationDirectory-/tmp/nifistateless/output2/" "" "absolute.path-/tmp/nifistateless/input/;filename-test.txt" "absolute.path-/tmp/nifistateless/input/;filename-test2.txt" -2) docker run --rm -it nifi-stateless:1.10.0-SNAPSHOT-dockermaven \ RunFromRegistry Once --file /Users/nifi/nifi-stateless-configs/flow-abc.json -3) docker run --rm -it nifi-stateless:1.10.0-SNAPSHOT-dockermaven \ +2) docker run --rm -it nifi-stateless:1.10.0-SNAPSHOT-dockermaven \ RunYARNServiceFromRegistry http://127.0.0.1:8088 nifi-stateless:latest kafka-to-solr 3 --file kafka-to-solr.json -4) docker run -d nifi-stateless:1.10.0-SNAPSHOT-dockermaven \ +3) docker run -d nifi-stateless:1.10.0-SNAPSHOT-dockermaven \ RunOpenwhiskActionServer 8080 ``` ###Notes: ``` -1) will be split on ';' and '-' then injected into the flow using the variable registry interface. - 2) will be split on ';'. FlowFiles routed to matching output ports will immediately fail the flow. - 3) will be split on ';' and '-' then injected into the flow using the "nifi_content" field as the FlowFile content. - 4) Multiple arguments can be provided. - 5) The configuration file must be in JSON format. - 6) When providing configurations via JSON, the following attributes must be provided: nifi_registry, nifi_bucket, nifi_flow. - All other attributes will be passed to the flow using the variable registry interface +1) The configuration file must be in JSON format. +2) When providing configurations via JSON, the following attributes must be provided: nifi_registry, nifi_bucket, nifi_flow. + All other attributes will be passed to the flow using the variable registry interface ``` ### JSON Format diff --git a/nifi-stateless/nifi-stateless-assembly/pom.xml b/nifi-stateless/nifi-stateless-assembly/pom.xml index 6395ae99a30ff..5a86602835af2 100644 --- a/nifi-stateless/nifi-stateless-assembly/pom.xml +++ b/nifi-stateless/nifi-stateless-assembly/pom.xml @@ -75,6 +75,11 @@ nifi-api 1.10.0-SNAPSHOT + + org.apache.nifi + nifi-utils + 1.10.0-SNAPSHOT + org.apache.nifi nifi-framework-api diff --git a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java index 8446843b03ba4..778881bd4ad17 100644 --- a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java +++ b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java @@ -48,9 +48,9 @@ public static void main(final String[] args) throws IOException, ClassNotFoundEx final File libDir = new File(nifi_home+"/lib"); final File statelesslibDir = new File(nifi_home+"/stateless-lib"); - final File narWorkingDirectory = new File(nifi_home+"/working"); + final File narWorkingDirectory = new File(nifi_home+"/work"); - if(args[0].equals(EXTRACT_NARS)){ + if(args.length >= 1 && args[0].equals(EXTRACT_NARS)){ if (!libDir.exists()) { System.out.println("Specified lib directory <" + libDir + "> does not exist"); return; diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/bootstrap/RunnableFlowFactory.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/bootstrap/RunnableFlowFactory.java deleted file mode 100644 index 570a5b311250f..0000000000000 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/bootstrap/RunnableFlowFactory.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stateless.bootstrap; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import org.apache.nifi.controller.exception.ProcessorInstantiationException; -import org.apache.nifi.stateless.core.StatelessFlow; -import org.apache.nifi.stateless.core.RegistryUtil; -import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.registry.VariableDescriptor; -import org.apache.nifi.registry.client.NiFiRegistryException; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedProcessGroup; -import org.apache.nifi.reporting.InitializationException; - -import javax.net.ssl.SSLContext; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.security.NoSuchAlgorithmException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -public class RunnableFlowFactory { - - public static RunnableFlow fromJson(final String json) throws NiFiRegistryException, InitializationException, - IOException, ProcessorInstantiationException { - final JsonObject config = new JsonParser().parse(json).getAsJsonObject(); - return StatelessFlow.createAndEnqueueFromJSON(config); - } - - public static RunnableFlow fromJsonFile(final String filename, final ClassLoader systemClassLoader, final File narWorkingDir) throws IOException, - NiFiRegistryException, ProcessorInstantiationException, InitializationException { - final String json = new String(Files.readAllBytes(Paths.get(filename))); - final JsonObject config = new JsonParser().parse(json).getAsJsonObject(); - return StatelessFlow.createAndEnqueueFromJSON(config, systemClassLoader, narWorkingDir); - } - - public static RunnableFlow fromCommandLineArgs(final String[] args) throws InitializationException, IOException, ProcessorInstantiationException, NiFiRegistryException { - //Initialize flow - final String registryUrl = args[2]; - final String bucketID = args[3]; - final String flowID = args[4]; - final Map inputVariables = new HashMap<>(); - - if (args.length >= 6) { - final String[] variables = args[5].split(";"); - for (final String v : variables) { - String[] tokens = v.split("-"); - inputVariables.put(new VariableDescriptor(tokens[0]), tokens[1]); - } - } - - final String[] failureOutputPorts = args.length >= 7 ? args[6].split(";") : new String[]{}; - final SSLContext sslContext; - try { - sslContext = SSLContext.getDefault(); - } catch (NoSuchAlgorithmException e) { - throw new NiFiRegistryException("Could not get Default SSL Context", e); - } - - final VersionedFlowSnapshot snapshot = new RegistryUtil(registryUrl, sslContext).getFlowByID(bucketID, flowID); - final VersionedProcessGroup versionedFlow = snapshot.getFlowContents(); - - final ExtensionManager extensionManager = ExtensionDiscovery.discover(new File("./work"), ClassLoader.getSystemClassLoader()); - final StatelessFlow flow = new StatelessFlow(versionedFlow, extensionManager, () -> inputVariables, Arrays.asList(failureOutputPorts), true, sslContext); - - // Enqueue all provided flow files - if (7 < args.length) { - int i = 7; - while (i++ < args.length) { - final Map attributes = new HashMap<>(); - byte[] content = {}; - - final String[] attributesArr = args[i].split(";"); - for (final String v : attributesArr) { - final String[] tokens = v.split("-"); - if (tokens[0].equals(StatelessFlow.CONTENT)) { - content = tokens[1].getBytes(); - } else { - attributes.put(tokens[0], tokens[1]); - } - } - - flow.enqueueFlowFile(content, attributes); - } - } - - return flow; - } -} diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java index ba89d3de541cf..a2a8f042f5c40 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java @@ -347,10 +347,6 @@ public static SSLContext getSSLContext(final JsonObject config) { return null; } - public static StatelessFlow createAndEnqueueFromJSON(final JsonObject args) throws InitializationException, IOException, ProcessorInstantiationException, NiFiRegistryException { - return createAndEnqueueFromJSON(args, ClassLoader.getSystemClassLoader(), new File(DEFAULT_WORKING_DIR)); - } - public static StatelessFlow createAndEnqueueFromJSON(final JsonObject args, final ClassLoader systemClassLoader, final File narWorkingDir) throws InitializationException, IOException, ProcessorInstantiationException, NiFiRegistryException { if (args == null) { diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java index 0d56f383ab917..e26d0a4ef8c00 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java @@ -16,21 +16,21 @@ */ package org.apache.nifi.stateless.runtimes; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; import org.apache.nifi.stateless.bootstrap.RunnableFlow; -import org.apache.nifi.stateless.bootstrap.RunnableFlowFactory; import org.apache.nifi.stateless.core.StatelessFlow; import org.apache.nifi.stateless.runtimes.openwhisk.StatelessNiFiOpenWhiskAction; import org.apache.nifi.stateless.runtimes.yarn.YARNServiceUtil; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; +import java.util.*; public class Program { @@ -40,6 +40,25 @@ public class Program { public static void launch(final String[] args, final ClassLoader systemClassLoader, final File narWorkingDirectory) throws Exception { + + //Workaround for YARN + //TODO make configurable + String hadoopTokenFileLocation = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); + if(hadoopTokenFileLocation != null && !hadoopTokenFileLocation.equals("")) { + File targetFile = new File(hadoopTokenFileLocation); + File parent = targetFile.getParentFile(); + if (!parent.exists() && !parent.mkdirs()) { + throw new IllegalStateException("Couldn't create dir: " + parent); + } + try (FileOutputStream fos = new FileOutputStream(targetFile)) { + fos.write("HDTS".getBytes(StandardCharsets.UTF_8)); + fos.write((byte) 0x00); + fos.write((byte) 0x00); + fos.write((byte) 0x00); + } + System.out.println("Created empty hadoop token file: " + System.getenv("HADOOP_TOKEN_FILE_LOCATION")); + } + if (args.length == 0) { printUsage(); System.exit(1); @@ -48,7 +67,7 @@ public static void launch(final String[] args, final ClassLoader systemClassLoad } else if (args[0].equals(RUN_YARN_SERVICE_FROM_REGISTRY) && args.length >= 7) { runOnYarn(args); } else if (args[0].equals(RUN_OPENWHISK_ACTION_SERVER) && args.length == 2) { - runOnOpenWhisk(args); + runOnOpenWhisk(args, systemClassLoader, narWorkingDirectory); } else { System.out.println("Invalid input: " + String.join(",", args)); printUsage(); @@ -56,8 +75,8 @@ public static void launch(final String[] args, final ClassLoader systemClassLoad } } - private static void runOnOpenWhisk(final String[] args) throws IOException { - StatelessNiFiOpenWhiskAction action = new StatelessNiFiOpenWhiskAction(Integer.parseInt(args[1])); + private static void runOnOpenWhisk(final String[] args, final ClassLoader systemClassLoader, final File narWorkingDirectory) throws IOException { + StatelessNiFiOpenWhiskAction action = new StatelessNiFiOpenWhiskAction(Integer.parseInt(args[1]), systemClassLoader, narWorkingDirectory); action.start(); } @@ -66,48 +85,51 @@ private static void runOnYarn(final String[] args) throws IOException { String imageName = args[2]; String serviceName = args[3]; int numberOfContainers = Integer.parseInt(args[4]); - List launchCommand = Arrays.asList(RUN_FROM_REGISTRY, "Continuous"); + String json; if (args[5].equals("--file")) { - launchCommand.add("--json"); - launchCommand.add(new String(Files.readAllBytes(Paths.get(args[6])))); + json = new String(Files.readAllBytes(Paths.get(args[6]))); } else if (args[5].equals("--json")) { - launchCommand.add("--json"); - launchCommand.add(args[6]); - } - - if (args.length >= 9) { - for (int i = 5; i < args.length; i++) { - launchCommand.add(args[i]); - } + json = args[6]; } else { System.out.println("Invalid input: " + String.join(",", args)); printUsage(); System.exit(1); + return; } + String[] launchCommand = { + RUN_FROM_REGISTRY, + "Continuous", + "--json", + new JsonParser().parse(json).toString() //validate and minify + }; StringBuilder message = new StringBuilder(); YARNServiceUtil yarnServiceUtil = new YARNServiceUtil(YARNUrl, imageName); - yarnServiceUtil.launchYARNService(serviceName, numberOfContainers, launchCommand.toArray(new String[0]), message); + yarnServiceUtil.launchYARNService(serviceName, numberOfContainers, launchCommand, message); System.out.println(message); } private static void runLocal(final String[] args, final ClassLoader systemClassLoader, final File narWorkingDirectory) throws Exception { final boolean once = args[1].equalsIgnoreCase("Once"); - final RunnableFlow flow; + final String json; if (args[2].equals("--file")) { - flow = RunnableFlowFactory.fromJsonFile(args[3], systemClassLoader, narWorkingDirectory); + json = new String(Files.readAllBytes(Paths.get(args[3]))); } else if (args[2].equals("--json")) { - flow = RunnableFlowFactory.fromJson(args[3]); - } else if (args.length >= 5) { - flow = RunnableFlowFactory.fromCommandLineArgs(args); + json = args[3]; + } else if (args[2].equals("--yarnjson")) { + json = args[3].replace(';',','); } else { System.out.println("Invalid input: " + String.join(",", args)); printUsage(); System.exit(1); return; } + JsonObject jsonObject = new JsonParser().parse(json).getAsJsonObject(); + System.out.println("Running from json:"); + System.out.println(jsonObject.toString()); + final RunnableFlow flow = StatelessFlow.createAndEnqueueFromJSON(jsonObject, systemClassLoader, narWorkingDirectory); // Run Flow final Queue outputFlowFiles = new LinkedList<>(); @@ -131,33 +153,22 @@ private static void runLocal(final String[] args, final ClassLoader systemClassL private static void printUsage() { System.out.println("Usage:"); - System.out.println(" 1) " + RUN_FROM_REGISTRY + " [Once|Continuous] [] []"); System.out.println(" " + RUN_FROM_REGISTRY + " [Once|Continuous] --json "); System.out.println(" " + RUN_FROM_REGISTRY + " [Once|Continuous] --file "); System.out.println(); - System.out.println(" 2) " + RUN_YARN_SERVICE_FROM_REGISTRY + " <# of Containers> \\"); - System.out.println(" [] []"); System.out.println(" " + RUN_YARN_SERVICE_FROM_REGISTRY + " <# of Containers> --json "); System.out.println(" " + RUN_YARN_SERVICE_FROM_REGISTRY + " <# of Containers> --file "); System.out.println(); System.out.println(" 3) " + RUN_OPENWHISK_ACTION_SERVER + " "); System.out.println(); System.out.println("Examples:"); - System.out.println(" 1) " + RUN_FROM_REGISTRY + " Once http://172.0.0.1:61080 e53b8a0d-5c85-4fcd-912a-1c549a586c83 6cf8277a-c402-4957-8623-0fa9890dd45d \\"); - System.out.println(" \"DestinationDirectory-/tmp/nifistateless/output2/\" \"\" \"absolute.path-/tmp/nifistateless/input/;filename-test.txt\" \"absolute.path-/tmp/nifistateless/input/;" + - "filename-test2.txt\""); - System.out.println(" 2) " + RUN_FROM_REGISTRY + " Once http://172.0.0.1:61080 e53b8a0d-5c85-4fcd-912a-1c549a586c83 6cf8277a-c402-4957-8623-0fa9890dd45d \\"); - System.out.println(" \"DestinationDirectory-/tmp/nifistateless/output2/\" \"f25c9204-6c95-3aa9-b0a8-c556f5f61849\" \"absolute.path-/tmp/nifistateless/input/;filename-test.txt\""); - System.out.println(" 3) " + RUN_YARN_SERVICE_FROM_REGISTRY + " http://127.0.0.1:8088 nifi-stateless:latest kafka-to-solr 3 --file kafka-to-solr.json"); - System.out.println(" 4) " + RUN_OPENWHISK_ACTION_SERVER + " 8080"); + System.out.println(" 1) " + RUN_FROM_REGISTRY + " Once --json \"{\\\"registryUrl\\\":\\\"http://172.26.198.107:61080\\\",\\\"bucketId\\\":\\\"5eec8794-01b3-4cd7-8536-0167c8b4ce8c\\\",\\\"flowId\\\": \\\"c5fa1d4f-b453-4bf5-8ff3-352352c418f3\\\"}\""); + System.out.println(" 2) " + RUN_YARN_SERVICE_FROM_REGISTRY + " http://127.0.0.1:8088 nifi-stateless:latest kafka-to-solr 3 --file kafka-to-solr.json"); + System.out.println(" 3) " + RUN_OPENWHISK_ACTION_SERVER + " 8080"); System.out.println(); System.out.println("Notes:"); - System.out.println(" 1) will be split on ';' and '-' then injected into the flow using the variable registry interface."); - System.out.println(" 2) will be split on ';'. FlowFiles routed to matching output ports will immediately fail the flow."); - System.out.println(" 3) will be split on ';' and '-' then injected into the flow using the \"" + StatelessFlow.CONTENT + "\" field as the FlowFile content."); - System.out.println(" 4) Multiple arguments can be provided."); - System.out.println(" 5) The configuration file must be in JSON format. "); - System.out.println(" 6) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID + ", " + StatelessFlow.FLOWID + "."); + System.out.println(" 1) The configuration file must be in JSON format. "); + System.out.println(" 2) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID + ", " + StatelessFlow.FLOWID + "."); System.out.println(" All other attributes will be passed to the flow using the variable registry interface"); System.out.println(); } diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/JavaAction.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/JavaAction.java deleted file mode 100644 index 1b5ce82cda3b2..0000000000000 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/JavaAction.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stateless.runtimes.openwhisk; - -import com.google.gson.JsonObject; -import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; -import org.apache.nifi.stateless.core.StatelessFlow; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.LinkedList; -import java.util.Queue; - -public class JavaAction { - - //wsk action create nifistateless NiFiStateless/target/NiFi-Stateless-1.0-SNAPSHOT.jar --main org.apache.nifi.stateless.runtimes.OpenWhisk.JavaAction#OpenWhiskJavaEntry - //wsk action invoke -br nifistateless -p registryurl http://172.0.0.1:61080 -p bucket e53b8a0d-5c85-4fcd-912a-1c549a586c83 -p flow 6cf8277a-c402-4957-8623-0fa9890dd45d -p variable1 val1 -p variable2 val2 - public static JsonObject OpenWhiskJavaEntry(JsonObject args) { - - JsonObject result = new JsonObject(); - try { - StatelessFlow flow = StatelessFlow.createAndEnqueueFromJSON(args); - Queue output = new LinkedList<>(); - boolean successful = flow.runOnce(output); - - StringBuilder response = new StringBuilder(); - for (InMemoryFlowFile flowFile : output) { - response.append("\n").append(flowFile); - } - - result.addProperty("success", successful); - result.addProperty("message", response.toString()); - } catch (Exception ex) { - StringWriter sw = new StringWriter(); - ex.printStackTrace(new PrintWriter(sw)); - result.addProperty("success", false); - result.addProperty("message", "Flow exception: " + ex.getMessage() + "--" + sw.toString()); - } - return result; - } -} diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java index 9c89adb34a9fe..849ce135d9845 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java @@ -22,25 +22,30 @@ import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; +import org.apache.nifi.stateless.bootstrap.RunnableFlow; import org.apache.nifi.stateless.core.StatelessFlow; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; +import java.io.*; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.LinkedList; +import java.util.Map; import java.util.Queue; +import java.util.stream.Collectors; public class StatelessNiFiOpenWhiskAction { private HttpServer server; private boolean initialized = false; - private StatelessFlow flow = null; + private RunnableFlow flow = null; + private ClassLoader systemClassLoader; + private File narWorkingDirectory; + + public StatelessNiFiOpenWhiskAction(int port, final ClassLoader systemClassLoader, final File narWorkingDirectory) throws IOException { + + this.systemClassLoader = systemClassLoader; + this.narWorkingDirectory = narWorkingDirectory; - public StatelessNiFiOpenWhiskAction(int port) throws IOException { this.server = HttpServer.create(new InetSocketAddress(port), -1); this.server.createContext("/init", new InitHandler()); @@ -48,24 +53,23 @@ public StatelessNiFiOpenWhiskAction(int port) throws IOException { this.server.setExecutor(null); // creates a default executor } + public void start() { server.start(); } private static void writeResponse(HttpExchange t, int code, String content) throws IOException { - byte[] bytes = content.getBytes(StandardCharsets.UTF_8); + if(content.isEmpty()) + content = "success"; + + JsonObject message = new JsonObject(); + message.addProperty("result", content); + byte[] bytes = message.toString().getBytes(StandardCharsets.UTF_8); t.sendResponseHeaders(code, bytes.length); OutputStream os = t.getResponseBody(); os.write(bytes); os.close(); } - - private static void writeError(HttpExchange t, String errorMessage) throws IOException { - JsonObject message = new JsonObject(); - message.addProperty("error", errorMessage); - writeResponse(t, 502, message.toString()); - } - private static void writeLogMarkers() { System.out.println("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"); System.err.println("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"); @@ -75,28 +79,34 @@ private static void writeLogMarkers() { private class InitHandler implements HttpHandler { public void handle(HttpExchange t) throws IOException { - initialized = true; - writeResponse(t, 200, "Initialized"); - - InputStream is = t.getRequestBody(); - JsonParser parser = new JsonParser(); - JsonObject body = parser.parse(new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))).getAsJsonObject(); - System.out.println("Init input: " + body); - String code = body.get("value").getAsJsonObject().get("code").getAsString(); - - if (code.equals("GENERIC")) { - initialized = true; - writeResponse(t, 200, "Initialized Generic Action"); - } else { - JsonObject flowDefinition = parser.parse(code).getAsJsonObject(); - try { - flow = StatelessFlow.createAndEnqueueFromJSON(flowDefinition); + System.out.println("Initializing"); + + try { + InputStream is = t.getRequestBody(); + JsonParser parser = new JsonParser(); + JsonObject body = parser.parse(new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))).getAsJsonObject(); + System.out.println("Init input: " + body); + String code = body.get("value").getAsJsonObject().get("code").getAsJsonPrimitive().getAsString(); + System.out.println("Code input: " + code); + + if (code.equals("GENERIC")) { initialized = true; - writeResponse(t, 200, "Initialized " + flow); - } catch (Exception e) { - e.printStackTrace(System.err); - writeResponse(t, 400, "Error: " + e.getMessage()); + writeResponse(t, 200, "Initialized Generic Action"); + } else { + + final JsonObject config = new JsonParser().parse(code).getAsJsonObject(); + flow = StatelessFlow.createAndEnqueueFromJSON(config, systemClassLoader, narWorkingDirectory); + + initialized = true; + writeResponse(t, 200, "Initialized Flow"); } + } catch (Exception e) { + e.printStackTrace(System.err); + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + String sStackTrace = sw.toString(); + writeResponse(t, 500, "Error: " + e.getMessage()+"\n"+sStackTrace); } } } @@ -104,7 +114,7 @@ public void handle(HttpExchange t) throws IOException { private class RunHandler implements HttpHandler { public void handle(HttpExchange t) throws IOException { if (!initialized) { - StatelessNiFiOpenWhiskAction.writeError(t, "Cannot invoke an uninitialized action."); + writeResponse(t, 500, "Cannot invoke an uninitialized action."); return; } @@ -119,7 +129,7 @@ public void handle(HttpExchange t) throws IOException { "activation_id":"e212d293aa73479d92d293aa73c79dc9", "action_name":"/guest/nifistateless", "deadline":"1541729057462", - "api_key":"23bc46b1-71f6-4ed5-8c54-816aa4f8c502:123zO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP", + "api_key":"23bc46b1-71f6-4ed5-8c54-816aa4f8c500:123zO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP", "value":{"registry":"http://172.26.224.116:61080","SourceCluster":"hdfs://172.26.224.119:8020","SourceFile":"test.txt", "SourceDirectory":"hdfs://172.26.224.119:8020/tmp/nifistateless/input/","flow":"6cf8277a-c402-4957-8623-0fa9890dd45d","bucket":"e53b8a0d-5c85-4fcd-912a-1c549a586c83", "DestinationDirectory":"hdfs://172.26.224.119:8020/tmp/nifistateless/output"}, @@ -139,10 +149,19 @@ public void handle(HttpExchange t) throws IOException { Queue output = new LinkedList<>(); boolean successful; if (flow == null) { - StatelessFlow tempFlow = StatelessFlow.createAndEnqueueFromJSON(inputObject); + System.out.println(inputObject.toString()); + + final JsonObject config = new JsonParser().parse(inputObject.get("code").getAsJsonPrimitive().getAsString()).getAsJsonObject(); + RunnableFlow tempFlow = StatelessFlow.createAndEnqueueFromJSON(config, systemClassLoader, narWorkingDirectory); successful = tempFlow.runOnce(output); } else { - flow.enqueueFromJSON(inputObject); + System.out.println("Input:"); + inputObject.entrySet().forEach(item -> System.out.println(item.getKey()+":"+item.getValue().getAsString())); + + Map Attributes = inputObject.entrySet() + .stream() + .collect(Collectors.toMap(item -> item.getKey(), item -> item.getValue().getAsString())); + ((StatelessFlow)flow).enqueueFlowFile(new byte[0],Attributes); successful = flow.runOnce(output); } @@ -151,11 +170,15 @@ public void handle(HttpExchange t) throws IOException { response.append("\n").append(flowFile); } - StatelessNiFiOpenWhiskAction.writeResponse(t, successful ? 200 : 400, response.toString()); + writeResponse(t, successful ? 200 : 500, response.toString()); } catch (Exception e) { e.printStackTrace(System.err); - StatelessNiFiOpenWhiskAction.writeError(t, "An error has occurred (see logs for details): " + e); + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + String sStackTrace = sw.toString(); + writeResponse(t, 500, "An error has occurred (see logs for details): " + e.getMessage()+"\n"+sStackTrace); } finally { writeLogMarkers(); } diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/yarn/YARNServiceUtil.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/yarn/YARNServiceUtil.java index ec238a685c4e2..0e8c4102b7594 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/yarn/YARNServiceUtil.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/yarn/YARNServiceUtil.java @@ -36,12 +36,23 @@ public YARNServiceUtil(String YARNUrl, String imageName) { this.imageName = imageName; } - public boolean launchYARNService(String name, int containerCount, String[] launchCommand, StringBuilder outMessage) { + public boolean launchYARNService(String name, int containerCount, String[] argLaunchCommand, StringBuilder outMessage) { + + //YARN cannot handle commas in a launch command... + String[] updatedLaunchCommand = new String[argLaunchCommand.length]; + for(int i = 0; i < argLaunchCommand.length; i++){ + if(argLaunchCommand[i].equals("--json")) + updatedLaunchCommand[i] = "--yarnjson"; + else + updatedLaunchCommand[i] = argLaunchCommand[i] + .replace(',',';') + .replace("}}","} }"); + } JsonObject spec = new JsonObject(); - spec.addProperty("name", name.substring(0, 25)); + spec.addProperty("name", name.substring(0, Math.min(name.length(), 25))); //truncate name spec.addProperty("version", "1.0.0"); - spec.addProperty("description", "Stateless NiFi service launched with the following command: " + String.join(",", launchCommand)); + spec.addProperty("description", "Stateless NiFi service"); JsonObject component = new JsonObject(); component.addProperty("name", "mc"); @@ -52,11 +63,11 @@ public boolean launchYARNService(String name, int containerCount, String[] launc artifact.addProperty("type", "DOCKER"); component.add("artifact", artifact); - component.addProperty("launch_command", String.join(",", launchCommand)); + component.addProperty("launch_command", String.join(",", updatedLaunchCommand)); JsonObject resource = new JsonObject(); resource.addProperty("cpus", 1); - resource.addProperty("memory", "256"); + resource.addProperty("memory", "512"); component.add("resource", resource); JsonObject env = new JsonObject(); @@ -72,6 +83,10 @@ public boolean launchYARNService(String name, int containerCount, String[] launc HttpPost request = new HttpPost( this.YARNUrl + "/app/v1/services?user.name=" + System.getProperty("user.name") ); + System.out.println("Running YARN service with the following definition:"); + System.out.println(spec); + System.out.println("Launch Command"); + System.out.println(String.join(",", updatedLaunchCommand)); try { request.setEntity(new StringEntity(spec.toString(), " application/json", StandardCharsets.UTF_8.toString())); diff --git a/pom.xml b/pom.xml index 8623dcbb67d07..2b1e3fb79741a 100644 --- a/pom.xml +++ b/pom.xml @@ -35,8 +35,8 @@ nifi-maven-archetypes nifi-external nifi-toolkit - nifi-docker nifi-stateless + nifi-docker http://nifi.apache.org