Skip to content

Commit

Permalink
AWS backend reworked (Issue #5004) (#5468)
Browse files Browse the repository at this point in the history
  • Loading branch information
markjschreiber authored Jun 18, 2020
1 parent 390b39c commit 88a35b1
Show file tree
Hide file tree
Showing 32 changed files with 1,015 additions and 886 deletions.
6 changes: 0 additions & 6 deletions .idea/inspectionProfiles/Project_Default.xml

This file was deleted.

60 changes: 44 additions & 16 deletions filesystems/s3/src/main/java/org/lerch/s3fs/AmazonS3Factory.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package org.lerch.s3fs;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;

import java.net.URI;
import java.util.Properties;
Expand Down Expand Up @@ -40,6 +44,9 @@ public abstract class AmazonS3Factory {
public static final String USER_AGENT = "s3fs_user_agent";
public static final String SIGNER_OVERRIDE = "s3fs_signer_override";
public static final String PATH_STYLE_ACCESS = "s3fs_path_style_access";
public static final String REGION = "s3fs_region";

Logger log = LoggerFactory.getLogger(AmazonS3Factory.class);

/**
* Build a new Amazon S3 instance with the URI and the properties provided
Expand All @@ -48,23 +55,36 @@ public abstract class AmazonS3Factory {
* @return S3Client
*/
public S3Client getS3Client(URI uri, Properties props) {

final SdkHttpClient httpClient = getHttpClient(props);
final S3Configuration s3Configuration = getServiceConfiguration(props);
final ClientOverrideConfiguration clientOverrideConfiguration = getOverrideConfiguration(props);
final Region region = getRegion(props);

log.debug("Creating s3fs client with httpClient:{}\ns3Configuration:{}\nclientOverrideConfiguration{}\nregion:{}",
httpClient, s3Configuration, clientOverrideConfiguration, region);

S3ClientBuilder builder = S3Client.builder();
if (uri != null && uri.getHost() != null)
if (uri != null && uri.getHost() != null) {
log.info("\tOverriding endpoint to {}", uri);
builder.endpointOverride(uri);
}

builder.credentialsProvider(getCredentialsProvider(props))
.httpClient(getSdkHttpClient(props))
.serviceConfiguration(getConfiguration(props))
.overrideConfiguration(getOverrideConfiguration(props));
//.region(getRegion(props));
.httpClient(httpClient)
.serviceConfiguration(s3Configuration)
.overrideConfiguration(clientOverrideConfiguration)
.region(region);


return createS3Client(builder);
}

/**
* should return a new S3Client
* should return a new S3Client given the content of the builder
* @param builder the builder that provides the settings for the client
* @return an s3 client
*
* @return {@link software.amazon.awssdk.services.s3.S3Client}
*/
protected abstract S3Client createS3Client(S3ClientBuilder builder);

Expand All @@ -81,12 +101,12 @@ protected AwsCredentials getAWSCredentials(Properties props) {
return AwsBasicCredentials.create(props.getProperty(ACCESS_KEY), props.getProperty(SECRET_KEY));
}

protected SdkHttpClient getSdkHttpClient(Properties props) {
protected SdkHttpClient getHttpClient(Properties props) {
// TODO: custom http configuration based on properties
return ApacheHttpClient.builder().build();
}

protected S3Configuration getConfiguration(Properties props) {
protected S3Configuration getServiceConfiguration(Properties props) {
// TODO: custom configuration based on properties
return S3Configuration.builder().build();
}
Expand All @@ -95,4 +115,12 @@ protected ClientOverrideConfiguration getOverrideConfiguration(Properties props)
// TODO: custom configuration based on properties
return ClientOverrideConfiguration.builder().build();
}

protected Region getRegion(Properties props) {
if(props == null || props.getProperty(REGION) == null) {
return new DefaultAwsRegionProviderChain().getRegion();
} else {
return Region.of(props.getProperty(REGION));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public int write(ByteBuffer src, long position) throws IOException {
}

@Override
public MappedByteBuffer map(FileChannel.MapMode mode, long position, long size) throws IOException {
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
return filechannel.map(mode, position, size);
}

Expand Down
158 changes: 68 additions & 90 deletions filesystems/s3/src/main/java/org/lerch/s3fs/S3FileSystemProvider.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package org.lerch.s3fs;

import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.transfer.Copy;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.google.common.annotations.VisibleForTesting;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Bucket;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAclRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand All @@ -14,16 +20,9 @@
import org.lerch.s3fs.attribute.S3BasicFileAttributes;
import org.lerch.s3fs.attribute.S3PosixFileAttributeView;
import org.lerch.s3fs.attribute.S3PosixFileAttributes;
import org.lerch.s3fs.util.AmazonS3ClientProvider;
import org.lerch.s3fs.util.AttributesUtils;
import org.lerch.s3fs.util.Cache;
import org.lerch.s3fs.util.S3Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -36,14 +35,12 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

import static com.google.common.collect.Sets.difference;
import static java.lang.String.format;
import static org.lerch.s3fs.AmazonS3Factory.*;
import static java.lang.String.format;

/**
*
* Spec:
* <p>
* URI: s3://[endpoint]/{bucket}/{key} If endpoint is missing, it's assumed to
Expand Down Expand Up @@ -72,14 +69,13 @@
* </p>
*/
public class S3FileSystemProvider extends FileSystemProvider {
public static final String CHARSET_KEY = "s3fs_charset";
public static final String AMAZON_S3_FACTORY_CLASS = "s3fs_amazon_s3_factory";

private static final String AMAZON_S3_FACTORY_CLASS = "s3fs_amazon_s3_factory";

private static final ConcurrentMap<String, S3FileSystem> fileSystems = new ConcurrentHashMap<>();
private static final List<String> PROPS_TO_OVERLOAD = Arrays.asList(ACCESS_KEY, SECRET_KEY, REQUEST_METRIC_COLLECTOR_CLASS, CONNECTION_TIMEOUT, MAX_CONNECTIONS, MAX_ERROR_RETRY, PROTOCOL, PROXY_DOMAIN,
PROXY_HOST, PROXY_PASSWORD, PROXY_PORT, PROXY_USERNAME, PROXY_WORKSTATION, SOCKET_SEND_BUFFER_SIZE_HINT, SOCKET_RECEIVE_BUFFER_SIZE_HINT, SOCKET_TIMEOUT,
USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS);
private static final Logger log = LoggerFactory.getLogger(S3FileSystemProvider.class);

private S3Utils s3Utils = new S3Utils();
private Cache cache = new Cache();
Expand All @@ -91,46 +87,20 @@ public String getScheme() {

@Override
public FileSystem newFileSystem(URI uri, Map<String, ?> env) {
return newFileSystem(uri, env, props -> createFileSystem(uri, props), true);
}

/**
* Get existing filesystem based on a combination of URI and env settings. Create new filesystem otherwise.
*
* @param uri URI of existing, or to be created filesystem.
* @param env environment settings.
* @return new or existing filesystem.
*/
public FileSystem getFileSystem(URI uri, Map<String, ?> env, S3Client client) {
return newFileSystem(uri, env, props -> createFileSystem(uri, props, client), false);
}

private FileSystem newFileSystem(URI uri, Map<String, ?> env,
Function<Properties, S3FileSystem> createFileSystemFunc,
boolean throwExceptionIfAlreadyExists) {
validateUri(uri);
// get properties for the env or properties or system
Properties props = getProperties(uri, env);
validateProperties(props);
// try to get the filesystem by the key
String key = getFileSystemKey(uri, props);
FileSystemAlreadyExistsException alreadyExistsException = new FileSystemAlreadyExistsException("File system " + uri.getScheme() + ':' + key + " already exists");
if (!fileSystems.containsKey(key)) {
synchronized (fileSystems) {
if (!fileSystems.containsKey(key)) {
// create the filesystem with the final properties, store and return
S3FileSystem fileSystem = createFileSystemFunc.apply(props);
S3FileSystem fileSystem = createFileSystem(uri, props);
fileSystems.put(fileSystem.getKey(), fileSystem);
} else {
if (throwExceptionIfAlreadyExists) {
throw alreadyExistsException;
}
}
}
} else {
if (throwExceptionIfAlreadyExists) {
throw alreadyExistsException;
}
}

return fileSystems.get(key);
Expand Down Expand Up @@ -290,6 +260,22 @@ public String systemGetEnv(String key) {
return System.getenv(key);
}

/**
* Get existing filesystem based on a combination of URI and env settings. Create new filesystem otherwise.
*
* @param uri URI of existing, or to be created filesystem.
* @param env environment settings.
* @return new or existing filesystem.
*/
public FileSystem getFileSystem(URI uri, Map<String, ?> env) {
validateUri(uri);
Properties props = getProperties(uri, env);
String key = this.getFileSystemKey(uri, props); // s3fs_access_key is part of the key here.
if (fileSystems.containsKey(key))
return fileSystems.get(key);
return newFileSystem(uri, env);
}

@Override
public S3FileSystem getFileSystem(URI uri) {
validateUri(uri);
Expand Down Expand Up @@ -348,13 +334,13 @@ public InputStream newInputStream(Path path, OpenOption... options) throws IOExc

try {
ResponseInputStream<GetObjectResponse> res = s3Path
.getFileSystem()
.getClient()
.getObject(GetObjectRequest
.builder()
.bucket(s3Path.getFileStore().name())
.key(key)
.build());
.getFileSystem()
.getClient()
.getObject(GetObjectRequest
.builder()
.bucket(s3Path.getFileStore().name())
.key(key)
.build());

if (res == null)
throw new IOException(String.format("The specified path is a directory: %s", path));
Expand Down Expand Up @@ -401,8 +387,8 @@ public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOExcept
PutObjectRequest.Builder builder = PutObjectRequest.builder();
String directoryKey = s3Path.getKey().endsWith("/") ? s3Path.getKey() : s3Path.getKey() + "/";
builder.bucket(bucketName)
.key(directoryKey)
.contentLength(0L);
.key(directoryKey)
.contentLength(0L);
s3Path.getFileSystem().getClient().putObject(builder.build(), RequestBody.fromBytes(new byte[0]));
}

Expand All @@ -426,8 +412,12 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep
if (isSameFile(source, target))
return;

final S3Path s3Source = toS3Path(source);
final S3Path s3Target = toS3Path(target);
S3Path s3Source = toS3Path(source);
S3Path s3Target = toS3Path(target);
// TODO: implements support for copying directories

Preconditions.checkArgument(!Files.isDirectory(source), "copying directories is not yet supported: %s", source);
Preconditions.checkArgument(!Files.isDirectory(target), "copying directories is not yet supported: %s", target);

final ImmutableSet<CopyOption> actualOptions = ImmutableSet.copyOf(options);
verifySupportedOptions(EnumSet.of(StandardCopyOption.REPLACE_EXISTING), actualOptions);
Expand All @@ -436,36 +426,17 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep
throw new FileAlreadyExistsException(format("target already exists: %s", target));
}

final String bucketNameOrigin = s3Source.getFileStore().name();
final String keySource = s3Source.getKey();
final String bucketNameTarget = s3Target.getFileStore().name();
final String keyTarget = s3Target.getKey();

final AmazonS3 s3Client = AmazonS3ClientProvider.buildAmazonS3Client();
final TransferManager manager = TransferManagerBuilder.standard().withS3Client(s3Client).build();
final Copy copy = manager.copy(new com.amazonaws.services.s3.model.CopyObjectRequest(
bucketNameOrigin,
keySource,
bucketNameTarget,
keyTarget
));

try {
log.info("Starting file transferring:\nsource: {},\ntarget: {}",
s3Source.toAbsolutePath(),
s3Target.toAbsolutePath());
copy.waitForCompletion();
} catch (InterruptedException e) {
log.error("File transferring was interrupted.\nSource path: "
+ s3Source.toAbsolutePath()
+ ",\ndestination path: "
+ s3Target.toAbsolutePath()
+ ",\nerror: ",
e
);
} finally {
manager.shutdownNow();
}
String sourceBucketName = s3Source.getFileStore().name();
String keySource = s3Source.getKey();
String bucketNameTarget = s3Target.getFileStore().name();
String keyTarget = s3Target.getKey();
s3Source.getFileSystem()
.getClient()
.copyObject(CopyObjectRequest.builder()
.copySource(sourceBucketName + "/" + keySource)
.bucket(bucketNameTarget)
.key(keyTarget)
.build());
}

@Override
Expand Down Expand Up @@ -601,8 +572,16 @@ public S3FileSystem createFileSystem(URI uri, Properties props) {
return new S3FileSystem(this, getFileSystemKey(uri, props), getS3Client(uri, props), uri.getHost());
}

/**
* Create the fileSystem
*
* @param uri URI
* @param props Properties
* @param client
* @return S3FileSystem never null
*/
public S3FileSystem createFileSystem(URI uri, Properties props, S3Client client) {
return new S3FileSystem(this, getFileSystemKey(uri, props), client, uri.getHost());
return new S3FileSystem(this, getFileSystemKey(uri, props), getS3Client(uri, props), uri.getHost());
}

protected S3Client getS3Client(URI uri, Properties props) {
Expand Down Expand Up @@ -675,8 +654,7 @@ public boolean isOpen(S3FileSystem s3FileSystem) {
* only 4 testing
*/

@VisibleForTesting
public static ConcurrentMap<String, S3FileSystem> getFilesystems() {
protected static ConcurrentMap<String, S3FileSystem> getFilesystems() {
return fileSystems;
}

Expand Down
Loading

0 comments on commit 88a35b1

Please sign in to comment.