Skip to content

Commit

Permalink
NIFI-12671 Added S3FileResourceService
Browse files Browse the repository at this point in the history
This closes apache#8368.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
  • Loading branch information
balazsgerner authored and turcsanyip committed Feb 26, 2024
1 parent 03bba70 commit 40d9750
Show file tree
Hide file tree
Showing 26 changed files with 513 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
Expand Down Expand Up @@ -58,6 +57,8 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;

/**
* Base class for AWS processors that uses AWSCredentialsProvider interface for creating AWS clients.
*
Expand Down Expand Up @@ -92,14 +93,6 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends


// Property Descriptors
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.description("The AWS Region to connect to.")
.required(true)
.allowableValues(getAvailableRegions())
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();

public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.description("The amount of time to wait in order to establish a connection to AWS or receive data from AWS before timing out.")
Expand Down Expand Up @@ -177,19 +170,6 @@ public void onStopped() {
this.clientCache.cleanUp();
}

public static AllowableValue createAllowableValue(final Regions region) {
return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName());
}

public static AllowableValue[] getAvailableRegions() {
final List<AllowableValue> values = new ArrayList<>();
for (final Regions region : Regions.values()) {
values.add(createAllowableValue(region));
}
return values.toArray(new AllowableValue[0]);
}


@Override
public void migrateProperties(final PropertyConfiguration config) {
migrateAuthenticationProperties(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.amazonaws.auth.Signer;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Builder;
import com.amazonaws.services.s3.AmazonS3Client;
Expand All @@ -35,10 +34,8 @@
import com.amazonaws.services.s3.model.Grantee;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
Expand All @@ -49,7 +46,6 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.signer.AwsCustomSignerUtil;
Expand All @@ -62,11 +58,13 @@
import java.util.Map;
import java.util.function.Consumer;

import static java.lang.String.format;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V2_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V4_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.DEFAULT_SIGNER;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.ATTRIBUTE_DEFINED_REGION;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveS3Region;

public abstract class AbstractS3Processor extends AbstractAWSCredentialsProviderProcessor<AmazonS3Client> {

Expand Down Expand Up @@ -182,16 +180,6 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
.dynamicallyModifiesClasspath(true)
.build();

public static final String S3_REGION_ATTRIBUTE = "s3.region" ;
static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region",
"Use '" + S3_REGION_ATTRIBUTE + "' Attribute",
"Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region.");

public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(REGION)
.allowableValues(getAvailableS3Regions())
.build();

public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder()
.name("encryption-service")
.displayName("Encryption Service")
Expand Down Expand Up @@ -292,7 +280,7 @@ public List<ConfigVerificationResult> verify(final ProcessContext context, final
* @return The created S3 client
*/
protected AmazonS3Client getS3Client(final ProcessContext context, final Map<String, String> attributes) {
final Region region = resolveRegion(context, attributes);
final Region region = resolveS3Region(context, attributes);
return getClient(context, region);
}

Expand All @@ -303,7 +291,7 @@ protected AmazonS3Client getS3Client(final ProcessContext context, final Map<Str
* @return The newly created S3 client
*/
protected AmazonS3Client createClient(final ProcessContext context, final Map<String, String> attributes) {
final Region region = resolveRegion(context, attributes);
final Region region = resolveS3Region(context, attributes);
return createClient(context, region);
}

Expand Down Expand Up @@ -451,36 +439,8 @@ protected final CannedAccessControlList createCannedACL(final ProcessContext con
return cannedAcl;
}

private Region parseRegionValue(String regionValue) {
if (regionValue == null) {
throw new ProcessException(format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE));
}

try {
return Region.getRegion(Regions.fromName(regionValue));
} catch (Exception e) {
throw new ProcessException(format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e);
}
}

private Region resolveRegion(final ProcessContext context, final Map<String, String> attributes) {
String regionValue = context.getProperty(S3_REGION).getValue();

if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) {
regionValue = attributes.get(S3_REGION_ATTRIBUTE);
}

return parseRegionValue(regionValue);
}

private boolean isAttributeDefinedRegion(final ProcessContext context) {
String regionValue = context.getProperty(S3_REGION).getValue();
return ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue);
}

private static AllowableValue[] getAvailableS3Regions() {
final AllowableValue[] availableRegions = getAvailableRegions();
return ArrayUtils.addAll(availableRegions, ATTRIBUTE_DEFINED_REGION);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.util;

import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.exception.ProcessException;

import java.util.Arrays;
import java.util.Map;

/**
* Utility class for AWS region methods. This class uses AWS SDK v1.
*
*/
public final class RegionUtilV1 {

private RegionUtilV1() {
}

public static final String S3_REGION_ATTRIBUTE = "s3.region" ;
public static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region",
"Use '" + S3_REGION_ATTRIBUTE + "' Attribute",
"Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region.");

public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.description("The AWS Region to connect to.")
.required(true)
.allowableValues(getAvailableRegions())
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();

public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(REGION)
.allowableValues(getAvailableS3Regions())
.build();

public static Region resolveS3Region(final PropertyContext context, final Map<String, String> attributes) {
String regionValue = context.getProperty(S3_REGION).getValue();

if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) {
regionValue = attributes.get(S3_REGION_ATTRIBUTE);
}

return parseS3RegionValue(regionValue);
}

public static AllowableValue[] getAvailableS3Regions() {
final AllowableValue[] availableRegions = getAvailableRegions();
return ArrayUtils.add(availableRegions, ATTRIBUTE_DEFINED_REGION);
}

public static AllowableValue createAllowableValue(final Regions region) {
return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName());
}

public static AllowableValue[] getAvailableRegions() {
return Arrays.stream(Regions.values())
.map(RegionUtilV1::createAllowableValue)
.toArray(AllowableValue[]::new);
}

private static Region parseS3RegionValue(String regionValue) {
if (regionValue == null) {
throw new ProcessException(String.format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE));
}

try {
return Region.getRegion(Regions.fromName(regionValue));
} catch (Exception e) {
throw new ProcessException(String.format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public abstract class AbstractAwsProcessor<T extends SdkClient> extends Abstract
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.required(true)
.allowableValues(RegionUtil.getAvailableRegions())
.defaultValue(RegionUtil.createAllowableValue(Region.US_WEST_2).getValue())
.allowableValues(RegionUtilV2.getAvailableRegions())
.defaultValue(RegionUtilV2.createAllowableValue(Region.US_WEST_2).getValue())
.build();

public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import java.util.List;

/**
* Utility class for AWS region methods.
* Utility class for AWS region methods. This class uses AWS SDK v2.
*
*/
public abstract class RegionUtil {
public final class RegionUtilV2 {

private RegionUtilV2() {
}

/**
* Creates an AllowableValue from a Region.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;


@SupportsBatching
@WritesAttributes({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;

@SupportsBatching
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;

@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
Expand Down Expand Up @@ -518,7 +520,7 @@ private void listNoTracking(ProcessContext context, ProcessSession session) {
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue());
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(REGION).getValue());
}

try {
Expand All @@ -540,7 +542,7 @@ private void listNoTracking(ProcessContext context, ProcessSession session) {
ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);

// Write the entity to the listing
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(REGION).getValue());

listCount++;
}
Expand Down Expand Up @@ -605,7 +607,7 @@ private void listByTrackingTimestamps(ProcessContext context, ProcessSession ses
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue());
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(REGION).getValue());
}

try {
Expand All @@ -626,7 +628,7 @@ private void listByTrackingTimestamps(ProcessContext context, ProcessSession ses
// Write the entity to the listing
final GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
final ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(REGION).getValue());

// Track the latest lastModified timestamp and keys having that timestamp.
// NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
Expand Down Expand Up @@ -736,7 +738,7 @@ private void publishListing(ProcessContext context, ProcessSession session, List
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue());
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(REGION).getValue());
}

try {
Expand All @@ -750,7 +752,7 @@ private void publishListing(ProcessContext context, ProcessSession session, List
final AmazonS3Client s3Client = getClient(context);
final GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary);
final ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary);
writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(REGION).getValue());

listCount++;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;


@SupportsBatching
@WritesAttributes({
Expand Down
Loading

0 comments on commit 40d9750

Please sign in to comment.