Skip to content

Commit

Permalink
Fixed Docker container, YARN runtime, and OpenWhisk runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
SamHjelmfelt committed May 2, 2019
1 parent a0e5240 commit 74c43b7
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 282 deletions.
12 changes: 8 additions & 4 deletions nifi-docker/dockermaven-stateless/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ ENV NIFI_HOME ${NIFI_BASE_DIR}/nifi-current
# Setup NiFi user
RUN addgroup -g ${GID} nifi && adduser -s /bin/sh -u ${UID} -G nifi -D nifi

RUN mkdir -p $NIFI_HOME
RUN mkdir -p $NIFI_HOME && chown nifi:nifi $NIFI_HOME
RUN mkdir -p ${NIFI_HOME}/work/ && chown nifi:nifi ${NIFI_HOME}/work/ && chmod 777 ${NIFI_HOME}/work/

COPY --chown=nifi:nifi $WORKING_DIR ${NIFI_HOME}/work/
COPY --chown=nifi:nifi $STATELESS_LIB_DIR ${NIFI_HOME}/stateless-lib/
COPY --chown=nifi:nifi $WORKING_DIR ${NIFI_HOME}/working/


#NiFi's HDFS processors require core-site.xml or hdfs-site.xml to exist on disk before they can be started...
Expand All @@ -63,13 +65,15 @@ RUN echo '<configuration> \n\
<name>fs.hdfs.impl</name> \n\
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value> \n\
</property> \n\
</configuration>' > /tmp/core-site.xml
RUN chmod 666 /tmp/core-site.xml
</configuration>' > /tmp/core-site.xml && chown nifi /tmp/core-site.xml && chmod 777 /tmp/core-site.xml

RUN mkdir -p /hadoop/yarn/local && chown nifi /hadoop/yarn/local && chmod 777 /hadoop/yarn/local

USER nifi

EXPOSE 8080

WORKDIR ${NIFI_HOME}

ENTRYPOINT ["/usr/bin/java", "-cp", "stateless-lib/*", "org.apache.nifi.stateless.NiFiStateless"]
CMD ["RunOpenwhiskActionServer", "8080"]
18 changes: 17 additions & 1 deletion nifi-docker/dockermaven-stateless/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@
<goal>run</goal>
</goals>
</execution>
<execution>
<id>remove framework nar</id>
<phase>process-sources</phase>
<configuration>
<tasks>
<!--Not needed in docker image-->
<delete dir="${project.basedir}/target/lib" />

<!--Remove conflicting JAR. TODO: create custom assembly instead of using ..nifi-assembly/target/nifi-${nifi.version}-bin.zip-->
<delete dir="${project.basedir}/target/work/nifi-framework-nar-1.10.0-SNAPSHOT.nar-unpacked" />
</tasks>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
Expand All @@ -84,7 +100,7 @@
<GID>1000</GID>
<NIFI_VERSION>${project.version}</NIFI_VERSION>
<STATELESS_LIB_DIR>target/stateless-lib</STATELESS_LIB_DIR>
<WORKING_DIR>target/working</WORKING_DIR>
<WORKING_DIR>target/work</WORKING_DIR>
</buildArgs>
<repository>apache/nifi-stateless</repository>
<tag>${project.version}-dockermaven</tag>
Expand Down
32 changes: 12 additions & 20 deletions nifi-stateless/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
# Stateless NiFi
# Stateless NiFi
Similar to other stream processing frameworks, receipt of incoming data is not acknowledged until it is written to a destination. In the event of failure, data can be replayed from the source rather than relying on a stateful content repository. This will not work for all cases (e.g. fire-and-forget HTTP/tcp), but a large portion of use cases have a resilient source to retry from.

Note: Provenance, metrics, logs are not extracted at this time. Docker and other container engines can be used for logs and metrics.
### Build:
`mvn package -P docker`

Expand All @@ -25,40 +27,30 @@ After building, the image can be used as follows

Where the arguments dictate the runtime to use:
```
1) RunFromRegistry [Once|Continuous] <NiFi registry URL> <Bucket ID> <Flow ID> <Input Variables> [<Failure Output Ports>] [<Input FlowFile>]
RunFromRegistry [Once|Continuous] --json <JSON>
1) RunFromRegistry [Once|Continuous] --json <JSON>
RunFromRegistry [Once|Continuous] --file <File Name> # Filename of JSON file that matches the examples below.
2) RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> \
<NiFi registry URL> <Bucket ID> <Flow ID> <Input Variables> [<Failure Output Ports>] [<Input FlowFile>]
RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --json <JSON>
RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>
2) RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --json <JSON>
RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>
3) RunOpenwhiskActionServer <Port>
3) RunOpenwhiskActionServer <Port>
```

### Examples:
```
1) docker run --rm -it nifi-stateless:1.10.0-SNAPSHOT-dockermaven \
RunFromRegistry Once http://172.0.0.1:61080 e53b8a0d-5c85-4fcd-912a-1c549a586c83 6cf8277a-c402-4957-8623-0fa9890dd45d \
"DestinationDirectory-/tmp/nifistateless/output2/" "" "absolute.path-/tmp/nifistateless/input/;filename-test.txt" "absolute.path-/tmp/nifistateless/input/;filename-test2.txt"
2) docker run --rm -it nifi-stateless:1.10.0-SNAPSHOT-dockermaven \
RunFromRegistry Once --file /Users/nifi/nifi-stateless-configs/flow-abc.json
3) docker run --rm -it nifi-stateless:1.10.0-SNAPSHOT-dockermaven \
2) docker run --rm -it nifi-stateless:1.10.0-SNAPSHOT-dockermaven \
RunYARNServiceFromRegistry http://127.0.0.1:8088 nifi-stateless:latest kafka-to-solr 3 --file kafka-to-solr.json
4) docker run -d nifi-stateless:1.10.0-SNAPSHOT-dockermaven \
3) docker run -d nifi-stateless:1.10.0-SNAPSHOT-dockermaven \
RunOpenwhiskActionServer 8080
```

###Notes:
```
1) <Input Variables> will be split on ';' and '-' then injected into the flow using the variable registry interface.
2) <Failure Output Ports> will be split on ';'. FlowFiles routed to matching output ports will immediately fail the flow.
3) <Input FlowFile> will be split on ';' and '-' then injected into the flow using the "nifi_content" field as the FlowFile content.
4) Multiple <Input FlowFile> arguments can be provided.
5) The configuration file must be in JSON format.
6) When providing configurations via JSON, the following attributes must be provided: nifi_registry, nifi_bucket, nifi_flow.
All other attributes will be passed to the flow using the variable registry interface
1) The configuration file must be in JSON format.
2) When providing configurations via JSON, the following attributes must be provided: nifi_registry, nifi_bucket, nifi_flow.
All other attributes will be passed to the flow using the variable registry interface
```

### JSON Format
Expand Down
5 changes: 5 additions & 0 deletions nifi-stateless/nifi-stateless-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<artifactId>nifi-api</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public static void main(final String[] args) throws IOException, ClassNotFoundEx

final File libDir = new File(nifi_home+"/lib");
final File statelesslibDir = new File(nifi_home+"/stateless-lib");
final File narWorkingDirectory = new File(nifi_home+"/working");
final File narWorkingDirectory = new File(nifi_home+"/work");

if(args[0].equals(EXTRACT_NARS)){
if(args.length >= 1 && args[0].equals(EXTRACT_NARS)){
if (!libDir.exists()) {
System.out.println("Specified lib directory <" + libDir + "> does not exist");
return;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,6 @@ public static SSLContext getSSLContext(final JsonObject config) {
return null;
}

public static StatelessFlow createAndEnqueueFromJSON(final JsonObject args) throws InitializationException, IOException, ProcessorInstantiationException, NiFiRegistryException {
return createAndEnqueueFromJSON(args, ClassLoader.getSystemClassLoader(), new File(DEFAULT_WORKING_DIR));
}

public static StatelessFlow createAndEnqueueFromJSON(final JsonObject args, final ClassLoader systemClassLoader, final File narWorkingDir)
throws InitializationException, IOException, ProcessorInstantiationException, NiFiRegistryException {
if (args == null) {
Expand Down
Loading

0 comments on commit 74c43b7

Please sign in to comment.