Skip to content

Commit

Permalink
CXP-3043: Update kafka-connect-s3 to use AWS Java SDK 2
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Mar 28, 2024
1 parent baa5e2c commit fd8f344
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 388 deletions.
8 changes: 5 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ allprojects {

dependencies {
constraints {
api("com.amazonaws:aws-java-sdk-s3:1.12.351")
api("software.amazon.awssdk:s3:2.24.12")
api("org.apache.kafka:connect-api:3.5.1")
api("org.slf4j:slf4j-api:2.0.7")
api("com.fasterxml.jackson.core:jackson-databind:2.14.3")

testImplementation("junit:junit:4.13.2")
testImplementation("com.google.guava:guava:31.1-jre")
testImplementation("javax.xml.bind:jaxb-api:2.4.0-b180830.0359")
testImplementation("org.mockito:mockito-core:4.9.0")

testImplementation("io.debezium:debezium-testing-testcontainers:2.0.0.Final")
testImplementation("io.debezium:debezium-testing-testcontainers:2.5.4.Final")
testImplementation("net.mguenther.kafka:kafka-junit:3.3.0")
testImplementation("org.testcontainers:kafka:1.17.6")
testImplementation("org.testcontainers:localstack:1.17.6")
testImplementation("org.testcontainers:localstack:1.19.7")
}
}

Expand Down
3 changes: 2 additions & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ description = ""
dependencies {
api project(':api')
api("org.apache.kafka:connect-api")
api("com.amazonaws:aws-java-sdk-s3")
api("software.amazon.awssdk:s3")
api("org.slf4j:slf4j-api")
api("com.fasterxml.jackson.core:jackson-databind")

testImplementation("junit:junit")
testImplementation("org.mockito:mockito-core:4.9.0")
Expand Down
25 changes: 12 additions & 13 deletions common/src/main/java/com/spredfast/kafka/connect/s3/S3.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
package com.spredfast.kafka.connect.s3;

import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import java.net.URI;
import java.util.Map;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;

public class S3 {

public static AmazonS3 s3client(Map<String, String> config) {
public static S3Client s3client(Map<String, String> config) {

AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
S3ClientBuilder builder = S3Client.builder();

String s3Region = config.get("s3.region");
String s3Endpoint = config.get("s3.endpoint");

// If worker config sets explicit endpoint override (e.g. for testing) use that
if (s3Endpoint != null) {
if (s3Region == null) {
throw new IllegalArgumentException("s3.region must be set if s3.endpoint is set");
}
builder = builder.endpointOverride(URI.create(s3Endpoint));
}

builder = builder.withEndpointConfiguration(new EndpointConfiguration(s3Endpoint, s3Region));
} else if (s3Region != null) {
builder = builder.withRegion(s3Region);
if (s3Region != null) {
builder = builder.region(Region.of(s3Region));
}

boolean s3PathStyle = Boolean.parseBoolean(config.get("s3.path_style"));
builder = builder.forcePathStyle(Boolean.parseBoolean(config.get("s3.path_style")));

return builder.withPathStyleAccessEnabled(s3PathStyle).build();
return builder.build();
}
}
2 changes: 1 addition & 1 deletion sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies {
implementation project(':common')
testImplementation("junit:junit")
testImplementation("org.awaitility:awaitility:4.2.0")
testImplementation("org.mockito:mockito-core:4.9.0")
testImplementation("org.mockito:mockito-core")
testImplementation("io.debezium:debezium-testing-testcontainers")
testImplementation("net.mguenther.kafka:kafka-junit")
testImplementation("org.testcontainers:kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;

import com.amazonaws.services.s3.AmazonS3;
import com.spredfast.kafka.connect.s3.AlreadyBytesConverter;
import com.spredfast.kafka.connect.s3.BlockMetadata;
import com.spredfast.kafka.connect.s3.Configure;
Expand Down Expand Up @@ -33,6 +32,7 @@
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;

public class S3SinkTask extends SinkTask {

Expand Down Expand Up @@ -102,7 +102,7 @@ public void start(Map<String, String> props) throws ConnectException {
.filter(s -> !s.isEmpty())
.orElseThrow(() -> new ConnectException("S3 bucket must be configured"));
String prefix = configGet("s3.prefix").orElse("");
AmazonS3 s3Client = S3.s3client(config);
S3Client s3Client = S3.s3client(config);

Layout layout = Configure.createLayout(props);

Expand Down
104 changes: 34 additions & 70 deletions sink/src/main/java/com/spredfast/kafka/connect/s3/sink/S3Writer.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
package com.spredfast.kafka.connect.s3.sink;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.spredfast.kafka.connect.s3.BlockMetadata;
import com.spredfast.kafka.connect.s3.Layout;
import com.spredfast.kafka.connect.s3.json.ChunkDescriptor;
import com.spredfast.kafka.connect.s3.json.ChunksIndex;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.TimeZone;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

/**
* S3Writer provides necessary operations over S3 to store files and retrieve Last commit offsets
Expand All @@ -33,38 +27,21 @@
*/
public class S3Writer {
private static final Logger log = LoggerFactory.getLogger(S3SinkTask.class);
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
private final ObjectReader reader = new ObjectMapper().readerFor(ChunksIndex.class);
private String keyPrefix;
private String bucket;
private final String keyPrefix;
private final String bucket;
private final Layout.Builder layoutBuilder;
private AmazonS3 s3Client;
private TransferManager tm;
private final S3Client s3Client;

public S3Writer(
String bucket, String keyPrefix, Layout.Builder layoutBuilder, AmazonS3 s3Client) {
this(
bucket,
keyPrefix,
layoutBuilder,
s3Client,
TransferManagerBuilder.standard().withS3Client(s3Client).build());
}

public S3Writer(
String bucket,
String keyPrefix,
Layout.Builder layoutBuilder,
AmazonS3 s3Client,
TransferManager tm) {
if (keyPrefix.length() > 0 && !keyPrefix.endsWith("/")) {
String bucket, String keyPrefix, Layout.Builder layoutBuilder, S3Client s3Client) {
if (!keyPrefix.isEmpty() && !keyPrefix.endsWith("/")) {
keyPrefix += "/";
}
this.keyPrefix = keyPrefix;
this.bucket = bucket;
this.layoutBuilder = layoutBuilder;
this.s3Client = s3Client;
this.tm = tm;
}

public void putChunk(File dataFile, File indexFile, BlockMetadata metadata) throws IOException {
Expand All @@ -78,11 +55,9 @@ public void putChunk(File dataFile, File indexFile, BlockMetadata metadata) thro
final String indexObjectKey = baseKey + ".index.json";

try {
Upload upload = tm.upload(this.bucket, dataObjectKey, dataFile);
upload.waitForCompletion();
s3Client.putObject(putKey(dataObjectKey), dataFile.toPath());
log.debug("uploaded {} object to s3", dataObjectKey);
upload = tm.upload(this.bucket, indexObjectKey, indexFile);
upload.waitForCompletion();
s3Client.putObject(putKey(indexObjectKey), indexFile.toPath());
log.debug("uploaded {} object to s3", indexObjectKey);
} catch (Exception e) {
throw new IOException("Failed to upload to S3", e);
Expand All @@ -92,63 +67,52 @@ public void putChunk(File dataFile, File indexFile, BlockMetadata metadata) thro
}

public long fetchOffset(TopicPartition tp) throws IOException {

// See if cursor file exists
String indexFileKey;
String cursorFileKey = getCursorFileKey(tp);

try (S3Object cursorObj =
s3Client.getObject(this.bucket, this.getTopicPartitionLastIndexFileKey(tp));
InputStreamReader input = new InputStreamReader(cursorObj.getObjectContent(), "UTF-8"); ) {
StringBuilder sb = new StringBuilder(1024);
final char[] buffer = new char[1024];

for (int read = input.read(buffer, 0, buffer.length);
read != -1;
read = input.read(buffer, 0, buffer.length)) {
sb.append(buffer, 0, read);
}
indexFileKey = sb.toString();
} catch (AmazonS3Exception ase) {
if (ase.getStatusCode() == 404) {
// Topic partition has no data in S3, start from beginning
return 0;
} else {
throw new IOException("Failed to fetch cursor file", ase);
}
try {
indexFileKey = s3Client.getObjectAsBytes(getKey(cursorFileKey)).asUtf8String();
} catch (NoSuchKeyException e) {
// Topic partition has no data in S3, start from beginning
return 0;
} catch (Exception e) {
throw new IOException("Failed to fetch or read cursor file", e);
}

// Now fetch last written index file...
try (S3Object indexObj = s3Client.getObject(this.bucket, indexFileKey);
InputStreamReader isr = new InputStreamReader(indexObj.getObjectContent(), "UTF-8"); ) {
return getNextOffsetFromIndexFileContents(isr);
try {
byte[] indexJSON = s3Client.getObjectAsBytes(getKey(indexFileKey)).asByteArray();
return getNextOffsetFromIndexFileContents(indexJSON);
} catch (Exception e) {
throw new IOException("Failed to fetch or parse last index file", e);
}
}

private long getNextOffsetFromIndexFileContents(Reader indexJSON) throws IOException {
private long getNextOffsetFromIndexFileContents(byte[] indexJSON) throws IOException {
ChunksIndex index = reader.readValue(indexJSON);
ChunkDescriptor lastChunk = index.chunks.get(index.chunks.size() - 1);
return lastChunk.first_record_offset + lastChunk.num_records;
}

private String getTopicPartitionLastIndexFileKey(TopicPartition tp) {
private String getCursorFileKey(TopicPartition tp) {
return keyPrefix + layoutBuilder.buildIndexPath(tp);
}

private void updateCursorFile(String lastIndexFileKey, TopicPartition tp) throws IOException {
String cursorFileKey = getCursorFileKey(tp);
try {
byte[] contentAsBytes = lastIndexFileKey.getBytes("UTF-8");
ByteArrayInputStream contentsAsStream = new ByteArrayInputStream(contentAsBytes);
ObjectMetadata md = new ObjectMetadata();
md.setContentLength(contentAsBytes.length);
s3Client.putObject(
new PutObjectRequest(
this.bucket, this.getTopicPartitionLastIndexFileKey(tp), contentsAsStream, md));
s3Client.putObject(putKey(cursorFileKey), RequestBody.fromString(lastIndexFileKey));
} catch (Exception ex) {
throw new IOException("Failed to update cursor file", ex);
}
}

private Consumer<PutObjectRequest.Builder> putKey(String key) {
return builder -> builder.bucket(bucket).key(key);
}

private Consumer<GetObjectRequest.Builder> getKey(String key) {
return builder -> builder.bucket(bucket).key(key);
}
}
Loading

0 comments on commit fd8f344

Please sign in to comment.