Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CXP-3043: Update kafka-connect-s3 to use AWS Java SDK 2 #44

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ allprojects {

dependencies {
constraints {
api("com.amazonaws:aws-java-sdk-s3:1.12.351")
api("software.amazon.awssdk:s3:2.24.12")
api("org.apache.kafka:connect-api:3.5.1")
api("org.slf4j:slf4j-api:2.0.7")
api("com.fasterxml.jackson.core:jackson-databind:2.14.3")

testImplementation("junit:junit:4.13.2")
testImplementation("com.google.guava:guava:31.1-jre")
testImplementation("javax.xml.bind:jaxb-api:2.4.0-b180830.0359")
testImplementation("org.mockito:mockito-core:4.9.0")

testImplementation("io.debezium:debezium-testing-testcontainers:2.0.0.Final")
testImplementation("io.debezium:debezium-testing-testcontainers:2.5.4.Final")
testImplementation("net.mguenther.kafka:kafka-junit:3.3.0")
testImplementation("org.testcontainers:kafka:1.17.6")
testImplementation("org.testcontainers:localstack:1.17.6")
testImplementation("org.testcontainers:localstack:1.19.7")
}
}

Expand Down
3 changes: 2 additions & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ description = ""
dependencies {
api project(':api')
api("org.apache.kafka:connect-api")
api("com.amazonaws:aws-java-sdk-s3")
api("software.amazon.awssdk:s3")
api("org.slf4j:slf4j-api")
api("com.fasterxml.jackson.core:jackson-databind")

testImplementation("junit:junit")
testImplementation("org.mockito:mockito-core:4.9.0")
Expand Down
22 changes: 12 additions & 10 deletions common/src/main/java/com/spredfast/kafka/connect/s3/S3.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package com.spredfast.kafka.connect.s3;

import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import java.net.URI;
import java.util.Map;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;

public class S3 {

public static AmazonS3 s3client(Map<String, String> config) {
public static S3Client s3client(Map<String, String> config) {

AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
S3ClientBuilder builder = S3Client.builder();

String s3Region = config.get("s3.region");
String s3Endpoint = config.get("s3.endpoint");
Expand All @@ -19,14 +20,15 @@ public static AmazonS3 s3client(Map<String, String> config) {
if (s3Region == null) {
throw new IllegalArgumentException("s3.region must be set if s3.endpoint is set");
}
builder = builder.endpointOverride(URI.create(s3Endpoint));
}
sugarcrm-jgminder marked this conversation as resolved.
Show resolved Hide resolved

builder = builder.withEndpointConfiguration(new EndpointConfiguration(s3Endpoint, s3Region));
} else if (s3Region != null) {
builder = builder.withRegion(s3Region);
if (s3Region != null) {
builder = builder.region(Region.of(s3Region));
}

boolean s3PathStyle = Boolean.parseBoolean(config.get("s3.path_style"));
builder = builder.forcePathStyle(Boolean.parseBoolean(config.get("s3.path_style")));

return builder.withPathStyleAccessEnabled(s3PathStyle).build();
return builder.build();
}
}
2 changes: 1 addition & 1 deletion sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies {
implementation project(':common')
testImplementation("junit:junit")
testImplementation("org.awaitility:awaitility:4.2.0")
testImplementation("org.mockito:mockito-core:4.9.0")
testImplementation("org.mockito:mockito-core")
testImplementation("io.debezium:debezium-testing-testcontainers")
testImplementation("net.mguenther.kafka:kafka-junit")
testImplementation("org.testcontainers:kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;

import com.amazonaws.services.s3.AmazonS3;
import com.spredfast.kafka.connect.s3.AlreadyBytesConverter;
import com.spredfast.kafka.connect.s3.BlockMetadata;
import com.spredfast.kafka.connect.s3.Configure;
Expand Down Expand Up @@ -33,6 +32,7 @@
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;

public class S3SinkTask extends SinkTask {

Expand Down Expand Up @@ -102,7 +102,7 @@ public void start(Map<String, String> props) throws ConnectException {
.filter(s -> !s.isEmpty())
.orElseThrow(() -> new ConnectException("S3 bucket must be configured"));
String prefix = configGet("s3.prefix").orElse("");
AmazonS3 s3Client = S3.s3client(config);
S3Client s3Client = S3.s3client(config);

Layout layout = Configure.createLayout(props);

Expand Down
104 changes: 34 additions & 70 deletions sink/src/main/java/com/spredfast/kafka/connect/s3/sink/S3Writer.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
package com.spredfast.kafka.connect.s3.sink;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.spredfast.kafka.connect.s3.BlockMetadata;
import com.spredfast.kafka.connect.s3.Layout;
import com.spredfast.kafka.connect.s3.json.ChunkDescriptor;
import com.spredfast.kafka.connect.s3.json.ChunksIndex;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.TimeZone;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

/**
* S3Writer provides necessary operations over S3 to store files and retrieve Last commit offsets
Expand All @@ -33,38 +27,21 @@
*/
public class S3Writer {
private static final Logger log = LoggerFactory.getLogger(S3SinkTask.class);
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
private final ObjectReader reader = new ObjectMapper().readerFor(ChunksIndex.class);
private String keyPrefix;
private String bucket;
private final String keyPrefix;
private final String bucket;
private final Layout.Builder layoutBuilder;
private AmazonS3 s3Client;
private TransferManager tm;
sugarcrm-jgminder marked this conversation as resolved.
Show resolved Hide resolved
private final S3Client s3Client;

public S3Writer(
String bucket, String keyPrefix, Layout.Builder layoutBuilder, AmazonS3 s3Client) {
this(
bucket,
keyPrefix,
layoutBuilder,
s3Client,
TransferManagerBuilder.standard().withS3Client(s3Client).build());
}

public S3Writer(
String bucket,
String keyPrefix,
Layout.Builder layoutBuilder,
AmazonS3 s3Client,
TransferManager tm) {
if (keyPrefix.length() > 0 && !keyPrefix.endsWith("/")) {
String bucket, String keyPrefix, Layout.Builder layoutBuilder, S3Client s3Client) {
if (!keyPrefix.isEmpty() && !keyPrefix.endsWith("/")) {
keyPrefix += "/";
}
this.keyPrefix = keyPrefix;
this.bucket = bucket;
this.layoutBuilder = layoutBuilder;
this.s3Client = s3Client;
this.tm = tm;
}

public void putChunk(File dataFile, File indexFile, BlockMetadata metadata) throws IOException {
Expand All @@ -78,11 +55,9 @@ public void putChunk(File dataFile, File indexFile, BlockMetadata metadata) thro
final String indexObjectKey = baseKey + ".index.json";

try {
Upload upload = tm.upload(this.bucket, dataObjectKey, dataFile);
upload.waitForCompletion();
s3Client.putObject(putKey(dataObjectKey), dataFile.toPath());
log.debug("uploaded {} object to s3", dataObjectKey);
upload = tm.upload(this.bucket, indexObjectKey, indexFile);
upload.waitForCompletion();
s3Client.putObject(putKey(indexObjectKey), indexFile.toPath());
log.debug("uploaded {} object to s3", indexObjectKey);
} catch (Exception e) {
throw new IOException("Failed to upload to S3", e);
Expand All @@ -92,63 +67,52 @@ public void putChunk(File dataFile, File indexFile, BlockMetadata metadata) thro
}

public long fetchOffset(TopicPartition tp) throws IOException {

// See if cursor file exists
String indexFileKey;
String cursorFileKey = getCursorFileKey(tp);

try (S3Object cursorObj =
s3Client.getObject(this.bucket, this.getTopicPartitionLastIndexFileKey(tp));
InputStreamReader input = new InputStreamReader(cursorObj.getObjectContent(), "UTF-8"); ) {
StringBuilder sb = new StringBuilder(1024);
final char[] buffer = new char[1024];

for (int read = input.read(buffer, 0, buffer.length);
read != -1;
read = input.read(buffer, 0, buffer.length)) {
sb.append(buffer, 0, read);
}
indexFileKey = sb.toString();
} catch (AmazonS3Exception ase) {
if (ase.getStatusCode() == 404) {
// Topic partition has no data in S3, start from beginning
return 0;
} else {
throw new IOException("Failed to fetch cursor file", ase);
}
try {
indexFileKey = s3Client.getObjectAsBytes(getKey(cursorFileKey)).asUtf8String();
} catch (NoSuchKeyException e) {
// Topic partition has no data in S3, start from beginning
return 0;
} catch (Exception e) {
throw new IOException("Failed to fetch or read cursor file", e);
}

// Now fetch last written index file...
try (S3Object indexObj = s3Client.getObject(this.bucket, indexFileKey);
InputStreamReader isr = new InputStreamReader(indexObj.getObjectContent(), "UTF-8"); ) {
return getNextOffsetFromIndexFileContents(isr);
try {
byte[] indexJSON = s3Client.getObjectAsBytes(getKey(indexFileKey)).asByteArray();
return getNextOffsetFromIndexFileContents(indexJSON);
} catch (Exception e) {
throw new IOException("Failed to fetch or parse last index file", e);
}
}

private long getNextOffsetFromIndexFileContents(Reader indexJSON) throws IOException {
private long getNextOffsetFromIndexFileContents(byte[] indexJSON) throws IOException {
ChunksIndex index = reader.readValue(indexJSON);
ChunkDescriptor lastChunk = index.chunks.get(index.chunks.size() - 1);
return lastChunk.first_record_offset + lastChunk.num_records;
}

private String getTopicPartitionLastIndexFileKey(TopicPartition tp) {
private String getCursorFileKey(TopicPartition tp) {
return keyPrefix + layoutBuilder.buildIndexPath(tp);
}

private void updateCursorFile(String lastIndexFileKey, TopicPartition tp) throws IOException {
String cursorFileKey = getCursorFileKey(tp);
try {
byte[] contentAsBytes = lastIndexFileKey.getBytes("UTF-8");
ByteArrayInputStream contentsAsStream = new ByteArrayInputStream(contentAsBytes);
ObjectMetadata md = new ObjectMetadata();
md.setContentLength(contentAsBytes.length);
s3Client.putObject(
new PutObjectRequest(
this.bucket, this.getTopicPartitionLastIndexFileKey(tp), contentsAsStream, md));
s3Client.putObject(putKey(cursorFileKey), RequestBody.fromString(lastIndexFileKey));
} catch (Exception ex) {
throw new IOException("Failed to update cursor file", ex);
}
}

private Consumer<PutObjectRequest.Builder> putKey(String key) {
return builder -> builder.bucket(bucket).key(key);
}

private Consumer<GetObjectRequest.Builder> getKey(String key) {
return builder -> builder.bucket(bucket).key(key);
}
}
Loading
Loading