Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
This closes #8262

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
EndzeitBegins authored and exceptionfactory committed Jan 18, 2024
1 parent 53bb995 commit 359ffa5
Show file tree
Hide file tree
Showing 31 changed files with 137 additions and 52 deletions.
5 changes: 5 additions & 0 deletions nifi-code-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,11 @@
<artifactId>nifi-event-transport</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-transfer</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-utils</artifactId>
Expand Down
59 changes: 59 additions & 0 deletions nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-extension-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-file-transfer</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-listed-entity</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-property-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.nifi.processors.standard;
package org.apache.nifi.processor.util.file.transfer;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
Expand All @@ -32,8 +32,6 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;

Expand All @@ -58,19 +56,19 @@
*/
public abstract class FetchFileTransfer extends AbstractProcessor {

static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is");
static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property");
static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");
static final String FAILURE_REASON_ATTRIBUTE = "fetch.failure.reason";
public static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is");
public static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property");
public static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");
public static final String FAILURE_REASON_ATTRIBUTE = "fetch.failure.reason";

static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The fully-qualified hostname or IP address of the host to fetch the data from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
public static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("The port to connect to on the remote host to fetch the data from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
Expand All @@ -91,7 +89,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder()
public static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder()
.name("Completion Strategy")
.description("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be "
+ "logged but the data will still be transferred.")
Expand All @@ -100,14 +98,14 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.defaultValue(COMPLETION_NONE.getValue())
.required(true)
.build();
static final PropertyDescriptor MOVE_CREATE_DIRECTORY = new PropertyDescriptor.Builder()
public static final PropertyDescriptor MOVE_CREATE_DIRECTORY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(FileTransfer.CREATE_DIRECTORY).description(String.format("Used when '%s' is '%s'. %s",
COMPLETION_STRATEGY.getDisplayName(),
COMPLETION_MOVE.getDisplayName(),
FileTransfer.CREATE_DIRECTORY.getDescription()))
.required(false)
.build();
static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
public static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
.name("Move Destination Directory")
.description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+ "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on "
Expand All @@ -118,7 +116,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.required(false)
.build();

static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
public static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
.displayName("Log level when file not found")
.name("fetchfiletransfer-notfound-loglevel")
.description("Log level to use in case the file does not exist when the processor is triggered")
Expand All @@ -131,15 +129,15 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.name("success")
.description("All FlowFiles that are received are routed to success")
.build();
static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
public static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
.name("comms.failure")
.description("Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.")
.build();
static final Relationship REL_NOT_FOUND = new Relationship.Builder()
public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not.found")
.description("Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.")
.build();
static final Relationship REL_PERMISSION_DENIED = new Relationship.Builder()
public static final Relationship REL_PERMISSION_DENIED = new Relationship.Builder()
.name("permission.denied")
.description("Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
package org.apache.nifi.processor.util.file.transfer;

import java.io.File;
import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
package org.apache.nifi.processor.util.file.transfer;

import java.io.Closeable;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
package org.apache.nifi.processor.util.file.transfer;

import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.flowfile.FlowFile;
Expand All @@ -25,8 +25,6 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.util.StopWatch;

import java.io.File;
Expand Down Expand Up @@ -67,6 +65,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
public static final String FILE_GROUP_ATTRIBUTE = "file.group";
public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
public static final String FILE_SIZE_ATTRIBUTE = "file.size";
public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern(FILE_MODIFY_DATE_ATTR_FORMAT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.nifi.processors.standard;
package org.apache.nifi.processor.util.file.transfer;

import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
Expand All @@ -24,8 +24,6 @@
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.serialization.record.RecordSchema;

import java.io.IOException;
Expand All @@ -46,7 +44,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
public static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("The port to connect to on the remote host to fetch the data from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
Expand Down Expand Up @@ -76,15 +74,15 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
@Override
protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
final Map<String, String> attributes = new HashMap<>();
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(ListFile.FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(GetFileTransfer.FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
attributes.put(getProtocolName() + ".remote.port", context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions().getValue());
attributes.put(getProtocolName() + ".listing.user", context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
attributes.put(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE, dateTimeFormatter.format(Instant.ofEpochMilli(fileInfo.getLastModifiedTime()).atZone(ZoneId.systemDefault())));
attributes.put(ListFile.FILE_PERMISSIONS_ATTRIBUTE, fileInfo.getPermissions());
attributes.put(ListFile.FILE_OWNER_ATTRIBUTE, fileInfo.getOwner());
attributes.put(ListFile.FILE_GROUP_ATTRIBUTE, fileInfo.getGroup());
attributes.put(ListFile.FILE_SIZE_ATTRIBUTE, Long.toString(fileInfo.getSize()));
attributes.put(GetFileTransfer.FILE_LAST_MODIFY_TIME_ATTRIBUTE, dateTimeFormatter.format(Instant.ofEpochMilli(fileInfo.getLastModifiedTime()).atZone(ZoneId.systemDefault())));
attributes.put(GetFileTransfer.FILE_PERMISSIONS_ATTRIBUTE, fileInfo.getPermissions());
attributes.put(GetFileTransfer.FILE_OWNER_ATTRIBUTE, fileInfo.getOwner());
attributes.put(GetFileTransfer.FILE_GROUP_ATTRIBUTE, fileInfo.getGroup());
attributes.put(GetFileTransfer.FILE_SIZE_ATTRIBUTE, Long.toString(fileInfo.getSize()));
attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
final String fullPath = fileInfo.getFullPathFileName();
if (fullPath != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.nifi.processors.standard.util;
package org.apache.nifi.processor.util.file.transfer;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
package org.apache.nifi.processor.util.file.transfer;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
Expand All @@ -26,9 +26,6 @@
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

Expand Down Expand Up @@ -127,7 +124,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
@Override
public void process(final InputStream in) throws IOException {
try (final InputStream bufferedIn = new BufferedInputStream(in)) {
if (workingDirPath != null && context.getProperty(SFTPTransfer.CREATE_DIRECTORY).asBoolean()) {
if (workingDirPath != null && context.getProperty(FileTransfer.CREATE_DIRECTORY).asBoolean()) {
transfer.ensureDirectoryExists(flowFileToTransfer, new File(workingDirPath));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.nifi.processors.standard.util;
package org.apache.nifi.processor.util.file.transfer;

import org.junit.jupiter.api.Test;

Expand Down
1 change: 1 addition & 0 deletions nifi-nar-bundles/nifi-extension-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<module>nifi-event-listen</module>
<module>nifi-event-put</module>
<module>nifi-event-transport</module>
<module>nifi-file-transfer</module>
<module>nifi-hadoop-utils</module>
<module>nifi-kerberos-test-utils</module>
<module>nifi-listed-entity</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<artifactId>nifi-event-transport</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-transfer</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-syslog-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.file.transfer.FetchFileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processor.util.file.transfer.FileTransfer;

// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.file.transfer.FetchFileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processor.util.file.transfer.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;

// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.util.file.transfer.GetFileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processor.util.file.transfer.FileTransfer;

@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"FTP", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.util.file.transfer.GetFileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processor.util.file.transfer.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;

@InputRequirement(Requirement.INPUT_FORBIDDEN)
Expand Down
Loading

0 comments on commit 359ffa5

Please sign in to comment.