Skip to content

Commit

Permalink
support Amazon S3 storage class (#13613)
Browse files Browse the repository at this point in the history
  • Loading branch information
sullis authored Jul 27, 2024
1 parent 8603164 commit a6cf197
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.services.s3.model.StorageClass;


/**
Expand All @@ -49,6 +51,8 @@ public class S3Config {
public static final String ENDPOINT = "endpoint";
public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";

public static final String STORAGE_CLASS = "storageClass";

// Encryption related configurations
public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY = "serverSideEncryption";
public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId";
Expand Down Expand Up @@ -76,6 +80,7 @@ public class S3Config {
private final String _accessKey;
private final String _secretKey;
private final String _region;
private final String _storageClass;
private final boolean _disableAcl;
private final String _endpoint;

Expand All @@ -100,6 +105,14 @@ public S3Config(PinotConfiguration pinotConfig) {
_region = pinotConfig.getProperty(REGION);
_endpoint = pinotConfig.getProperty(ENDPOINT);

_storageClass = pinotConfig.getProperty(STORAGE_CLASS);
if (_storageClass != null) {
if (StorageClass.fromValue(_storageClass) == StorageClass.UNKNOWN_TO_SDK_VERSION) {
throw new IllegalStateException(
"unknown s3 storage class: " + _storageClass + " - Valid storage classes: " + StorageClass.knownValues());
}
}

_serverSideEncryption = pinotConfig.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
_ssekmsKeyId = pinotConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
_ssekmsEncryptionContext = pinotConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
Expand Down Expand Up @@ -247,4 +260,9 @@ public long getMultiPartUploadPartSize() {
public ApacheHttpClient.Builder getHttpClientBuilder() {
return _httpClientBuilder;
}

@Nullable
public String getStorageClass() {
return _storageClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.sts.StsClient;
Expand All @@ -99,6 +100,7 @@ public class S3PinotFS extends BasePinotFS {
private String _ssekmsEncryptionContext;
private long _minObjectSizeToUploadInParts;
private long _multiPartUploadPartSize;
private @Nullable StorageClass _storageClass;

@Override
public void init(PinotConfiguration config) {
Expand Down Expand Up @@ -149,6 +151,12 @@ public void init(PinotConfiguration config) {
if (s3Config.getHttpClientBuilder() != null) {
s3ClientBuilder.httpClientBuilder(s3Config.getHttpClientBuilder());
}

if (s3Config.getStorageClass() != null) {
_storageClass = StorageClass.fromValue(s3Config.getStorageClass());
assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION);
}

_s3Client = s3ClientBuilder.build();
setMultiPartUploadConfigs(s3Config);
} catch (S3Exception e) {
Expand Down Expand Up @@ -180,6 +188,17 @@ public void init(S3Client s3Client, String serverSideEncryption, PinotConfigurat
setDisableAcl(s3Config);
}

@VisibleForTesting
void setStorageClass(@Nullable StorageClass storageClass) {
_storageClass = storageClass;
}

@VisibleForTesting
@Nullable
StorageClass getStorageClass() {
return _storageClass;
}

private void setServerSideEncryption(@Nullable String serverSideEncryption, S3Config s3Config) {
if (serverSideEncryption != null) {
try {
Expand Down Expand Up @@ -581,8 +600,13 @@ private void uploadFileInParts(File srcFile, URI dstUri)
throws Exception {
String bucket = dstUri.getHost();
String prefix = sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder();
createMultipartUploadRequestBuilder.bucket(bucket).key(prefix);
if (_storageClass != null) {
createMultipartUploadRequestBuilder.storageClass(_storageClass);
}
CreateMultipartUploadResponse multipartUpload =
_s3Client.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucket).key(prefix).build());
_s3Client.createMultipartUpload(createMultipartUploadRequestBuilder.build());
String uploadId = multipartUpload.uploadId();
// Upload parts sequentially to overcome the 5GB limit of a single PutObject call.
// TODO: parts can be uploaded in parallel for higher throughput, given a thread pool.
Expand Down Expand Up @@ -699,13 +723,21 @@ private PutObjectRequest generatePutObjectRequest(URI uri, String path) {
putReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext);
}
}

if (_storageClass != null) {
putReqBuilder.storageClass(_storageClass);
}

return putReqBuilder.build();
}

private CopyObjectRequest generateCopyObjectRequest(String copySource, URI dest, String path,
Map<String, String> metadata) {
CopyObjectRequest.Builder copyReqBuilder =
CopyObjectRequest.builder().copySource(copySource).destinationBucket(dest.getHost()).destinationKey(path);
if (_storageClass != null) {
copyReqBuilder.storageClass(_storageClass);
}
if (metadata != null) {
copyReqBuilder.metadata(metadata).metadataDirective(MetadataDirective.REPLACE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.testng.Assert;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.s3.model.StorageClass;


public class S3ConfigTest {
Expand All @@ -44,4 +45,26 @@ public void testParseDuration() {
Assert.assertEquals(S3Config.parseDuration("P1DT2H30S"), S3Config.parseDuration("1d2h30s"));
S3Config.parseDuration("10");
}

@Test
public void testDefaultStorageClassIsNull() {
PinotConfiguration pinotConfig = new PinotConfiguration();
S3Config cfg = new S3Config(pinotConfig);
Assert.assertNull(cfg.getStorageClass());
}

@Test
public void testIntelligentTieringStorageClass() {
PinotConfiguration pinotConfig = new PinotConfiguration();
pinotConfig.setProperty("storageClass", StorageClass.INTELLIGENT_TIERING.toString());
S3Config cfg = new S3Config(pinotConfig);
Assert.assertEquals(cfg.getStorageClass(), "INTELLIGENT_TIERING");
}

@Test(expectedExceptions = IllegalStateException.class)
public void testInvalidStorageClass() {
PinotConfiguration pinotConfig = new PinotConfiguration();
pinotConfig.setProperty("storageClass", "invalid-storage-class");
S3Config cfg = new S3Config(pinotConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
Expand All @@ -47,6 +49,7 @@
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.StorageClass;


@Test
Expand Down Expand Up @@ -83,9 +86,14 @@ public void tearDown()
FileUtils.deleteQuietly(TEMP_FILE);
}

@BeforeMethod
public void beforeMethod() {
_s3PinotFS.setStorageClass(null);
}

private void createEmptyFile(String folderName, String fileName) {
String fileNameWithFolder = folderName.length() == 0 ? fileName : folderName + DELIMITER + fileName;
_s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder),
_s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder, _s3PinotFS.getStorageClass()),
RequestBody.fromBytes(new byte[0]));
}

Expand Down Expand Up @@ -347,9 +355,12 @@ public void testExists()
Assert.assertFalse(fileNotExists);
}

@Test
public void testCopyFromAndToLocal()
@Test(dataProvider = "storageClasses")
public void testCopyFromAndToLocal(StorageClass storageClass)
throws Exception {

_s3PinotFS.setStorageClass(storageClass);

String fileName = "copyFile.txt";
File fileToCopy = new File(TEMP_FILE, fileName);
File fileToDownload = new File(TEMP_FILE, "copyFile_download.txt").getAbsoluteFile();
Expand All @@ -366,9 +377,12 @@ public void testCopyFromAndToLocal()
}
}

@Test
public void testMultiPartUpload()
@Test(dataProvider = "storageClasses")
public void testMultiPartUpload(StorageClass storageClass)
throws Exception {

_s3PinotFS.setStorageClass(storageClass);

String fileName = "copyFile_for_multipart.txt";
File fileToCopy = new File(TEMP_FILE, fileName);
File fileToDownload = new File(TEMP_FILE, "copyFile_download_multipart.txt").getAbsoluteFile();
Expand Down Expand Up @@ -399,7 +413,9 @@ public void testOpenFile()
String fileName = "sample.txt";
String fileContent = "Hello, World";

_s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileName), RequestBody.fromString(fileContent));
_s3Client.putObject(
S3TestUtils.getPutObjectRequest(BUCKET, fileName, _s3PinotFS.getStorageClass()),
RequestBody.fromString(fileContent));

InputStream is = _s3PinotFS.open(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
String actualContents = IOUtils.toString(is, StandardCharsets.UTF_8);
Expand All @@ -418,25 +434,28 @@ public void testMkdir()
Assert.assertTrue(headObjectResponse.sdkHttpResponse().isSuccessful());
}

@Test
public void testMoveFile()
@Test(dataProvider = "storageClasses")
public void testMoveFile(StorageClass storageClass)
throws Exception {

String fileName = "file-to-move";
_s3PinotFS.setStorageClass(storageClass);

String sourceFilename = "source-file-" + System.currentTimeMillis();
String targetFilename = "target-file-" + System.currentTimeMillis();
int fileSize = 5000;

File file = new File(TEMP_FILE, fileName);
File file = new File(TEMP_FILE, sourceFilename);

try {
createDummyFile(file, fileSize);
URI sourceUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName));
URI sourceUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, sourceFilename));

_s3PinotFS.copyFromLocalFile(file, sourceUri);

HeadObjectResponse sourceHeadObjectResponse =
_s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
_s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, sourceFilename));

URI targetUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, "move-target"));
URI targetUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, targetFilename));

boolean moveResult = _s3PinotFS.move(sourceUri, targetUri, false);
Assert.assertTrue(moveResult);
Expand All @@ -445,7 +464,7 @@ public void testMoveFile()
Assert.assertTrue(_s3PinotFS.exists(targetUri));

HeadObjectResponse targetHeadObjectResponse =
_s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, "move-target"));
_s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, targetFilename));
Assert.assertEquals(targetHeadObjectResponse.contentLength(),
fileSize);
Assert.assertEquals(targetHeadObjectResponse.storageClass(),
Expand All @@ -469,6 +488,15 @@ public void testMoveFile()
}
}

@DataProvider(name = "storageClasses")
public Object[][] createStorageClasses() {
return new Object[][] {
{ null },
{ StorageClass.STANDARD },
{ StorageClass.INTELLIGENT_TIERING }
};
}

private static void createDummyFile(File file, int size)
throws IOException {
FileUtils.deleteQuietly(file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
*/
package org.apache.pinot.plugin.filesystem;

import javax.annotation.Nullable;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.StorageClass;


public class S3TestUtils {
private S3TestUtils() {
}

public static PutObjectRequest getPutObjectRequest(String bucket, String key) {
return PutObjectRequest.builder().bucket(bucket).key(key).build();
public static PutObjectRequest getPutObjectRequest(String bucket, String key, @Nullable StorageClass storageClass) {
PutObjectRequest.Builder builder = PutObjectRequest.builder().bucket(bucket).key(key);
if (storageClass != null) {
builder.storageClass(storageClass);
}
return builder.build();
}

public static HeadObjectRequest getHeadObjectRequest(String bucket, String key) {
Expand Down

0 comments on commit a6cf197

Please sign in to comment.