Skip to content

Commit

Permalink
CXP-2809: Extend and improve integration tests of kafka connect s3
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Sep 20, 2023
1 parent 991d061 commit e0b9ea7
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,12 @@ public boolean shouldFlush() {
long timeSinceFirstRecordProduced = now - firstRecord.timestamp();
long timeSinceLastRecordReceived = now - lastRecordReceiveTime;

boolean doWallClockFlush =
boolean doWallTimeFlush =
flushIntervalMs != -1
&& timeSinceFirstRecordProduced >= (flushIntervalMs + gracePeriodMs)
&& timeSinceLastRecordReceived > gracePeriodMs;
if (doWallClockFlush) {
log.debug("{} performing a wall clock flush on {}", name(), tp);
if (doWallTimeFlush) {
log.debug("{} performing a wall time flush on {}", name(), tp);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import static net.mguenther.kafka.junit.ObserveKeyValues.on;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.S3Object;
import io.debezium.testing.testcontainers.Connector.State;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
Expand All @@ -23,6 +25,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -34,6 +38,14 @@
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -86,6 +98,9 @@ public class S3SinkConnectorIT {
private ExternalKafkaCluster kafkaCluster =
ExternalKafkaCluster.at(kafkaContainer.getBootstrapServers());

private Admin kafkaAdmin;
private KafkaProducer<String, String> kafkaProducer;

@BeforeClass
public static void startContainers() {
Startables.deepStart(Stream.of(kafkaContainer, localStackContainer, kafkaConnectContainer))
Expand All @@ -107,6 +122,23 @@ public void setup() {
.build();

kafkaCluster = ExternalKafkaCluster.at(kafkaContainer.getBootstrapServers());
kafkaAdmin =
Admin.create(
Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()));
kafkaProducer =
new KafkaProducer<>(
Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class,
ProducerConfig.ACKS_CONFIG,
"all",
ProducerConfig.RETRIES_CONFIG,
"1"));
}

@AfterClass
Expand Down Expand Up @@ -134,6 +166,8 @@ public void testSinkWithGzipFileThreshold() throws Exception {
kafkaConnectContainer.registerConnector(connectorName, connectorConfiguration);
kafkaConnectContainer.ensureConnectorTaskState(connectorName, 0, State.RUNNING);

assertEquals(-1, getConsumerOffset("connect-" + connectorName, topicName, 0));

// Produce messages to the topic (uncompressed: 3190, compressed: ~340)
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 95; i++) {
Expand Down Expand Up @@ -176,6 +210,10 @@ public void testSinkWithGzipFileThreshold() throws Exception {
objectContents = getS3FileOutput(bucketName, indexObjectKey);
assertEquals(expectedIndexObjectContents, objectContents);

Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.until(() -> getConsumerOffset("connect-" + connectorName, topicName, 0) == 95);

// It's not enough to trigger another flush
for (int i = 100; i < 150; i++) {
kafkaCluster.send(
Expand All @@ -191,6 +229,9 @@ public void testSinkWithGzipFileThreshold() throws Exception {
.atMost(30, TimeUnit.SECONDS)
.until(() -> !objectKeyExists(bucketName, indexObjectKey2));

// Make sure the committed offset didn't change as there were no new file uploads
assertEquals(95, getConsumerOffset("connect-" + connectorName, topicName, 0));

// Delete the connector
deleteConnector(connectorName);
}
Expand All @@ -207,6 +248,7 @@ public void testSinkWithBigFlushInterval() throws Exception {
kafkaCluster.createTopic(TopicConfig.withName(topicName).withNumberOfPartitions(1));

// Define, register, and start the connector
final long flushIntevalMs = 12 * 3600 * 1000;
ConnectorConfiguration connectorConfiguration =
getConnectorConfiguration(
connectorName,
Expand All @@ -215,31 +257,60 @@ public void testSinkWithBigFlushInterval() throws Exception {
topicName,
67108864,
100 * 1024 * 1024,
12 * 3600 * 1000);
flushIntevalMs);
kafkaConnectContainer.registerConnector(connectorName, connectorConfiguration);
kafkaConnectContainer.ensureConnectorTaskState(connectorName, 0, State.RUNNING);

// Produce messages to the topic
StringBuilder sb = new StringBuilder();
long ts = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i);
kafkaCluster.send(SendValues.to(topicName, json));
kafkaProducer.send(new ProducerRecord<>(topicName, 0, ts, "", json));
sb.append(json).append("\n");
}
LocalDate today = LocalDate.now(ZoneOffset.UTC);

// Define the expected files
List<String> expectedObjects =
List.of(
String.format("%s/%s/%s-00000-000000000000.index.json", prefix, today, topicName),
String.format("%s/%s/%s-00000-000000000000.gz", prefix, today, topicName),
String.format("%s/last_chunk_index.%s-00000.txt", prefix, topicName));
String dataObjectKey =
String.format("%s/%s/%s-00000-000000000000.gz", prefix, today, topicName);
String indexObjectKey =
String.format("%s/%s/%s-00000-000000000000.index.json", prefix, today, topicName);
String lastChunkIndexObjectKey =
String.format("%s/last_chunk_index.%s-00000.txt", prefix, topicName);
List<String> expectedObjects = List.of(indexObjectKey, dataObjectKey, lastChunkIndexObjectKey);

// Await for all the files to be produced by all tasks
// Make sure no files are produced yet as the timestamp based flushing threshold is not reached
Awaitility.await()
.pollDelay(5, TimeUnit.SECONDS)
.during(20, TimeUnit.SECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> expectedObjects.stream().noneMatch(key -> objectKeyExists(bucketName, key)));

assertEquals(-1, getConsumerOffset("connect-" + connectorName, topicName, 0));

ts += flushIntevalMs + 60 * 1000;
for (int i = 100; i < 200; i++) {
String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i);
kafkaProducer.send(new ProducerRecord<>(topicName, 0, ts, "", json));
}

// Await for all the files to be produced by all tasks
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.until(() -> expectedObjects.stream().allMatch(key -> objectKeyExists(bucketName, key)));

// Check the data object contains expected records
// in particular, it shouldn't contain the record that triggered flushing
String objectContents = getS3FileOutput(bucketName, dataObjectKey);
assertEquals(sb.toString(), objectContents);

// Check that the committed offset corresponds to the message offset that triggered timestamp
// based flushing
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.until(() -> getConsumerOffset("connect-" + connectorName, topicName, 0) == 100);

// Delete the connector
deleteConnector(connectorName);
}
Expand Down Expand Up @@ -404,6 +475,143 @@ public void testSinkWithRestart() throws Exception {
deleteConnector(connectorName);
}

@Test
public void testSinkWithWallTimeFlushingAndRewind() throws Exception {
String bucketName = "connect-system-test-wall-time";
String prefix = "systest";
String topicName = "system_test_wall_time";
String connectorName = "s3-sink-wall-time";
s3.createBucket(bucketName);

// Create the test topic
kafkaCluster.createTopic(TopicConfig.withName(topicName).withNumberOfPartitions(1));

// Define, register, and start the connector
final long flushIntevalMs = 30 * 1000;
ConnectorConfiguration connectorConfiguration =
getConnectorConfiguration(
connectorName,
bucketName,
prefix,
topicName,
67108864,
100 * 1024 * 1024,
flushIntevalMs);
kafkaConnectContainer.registerConnector(connectorName, connectorConfiguration);
kafkaConnectContainer.ensureConnectorTaskState(connectorName, 0, State.RUNNING);

assertEquals(-1, getConsumerOffset("connect-" + connectorName, topicName, 0));

// Produce messages to the topic
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 100; i++) {
String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i);
kafkaCluster.send(SendValues.to(topicName, json));
sb.append(json).append("\n");
}
String firstObjectExpectedContent = sb.toString();
LocalDate today = LocalDate.now(ZoneOffset.UTC);

// Define the expected files
String dataObjectKey =
String.format("%s/%s/%s-00000-000000000000.gz", prefix, today, topicName);
String indexObjectKey =
String.format("%s/%s/%s-00000-000000000000.index.json", prefix, today, topicName);
String lastChunkIndexObjectKey =
String.format("%s/last_chunk_index.%s-00000.txt", prefix, topicName);
List<String> expectedObjects = List.of(indexObjectKey, dataObjectKey, lastChunkIndexObjectKey);

// Make sure files are produced using wall time flushing
Awaitility.await()
.atMost(flushIntevalMs * 3, TimeUnit.MILLISECONDS)
.until(() -> expectedObjects.stream().allMatch(key -> objectKeyExists(bucketName, key)));

assertEquals(100, getConsumerOffset("connect-" + connectorName, topicName, 0));

String objectContents = getS3FileOutput(bucketName, dataObjectKey);
assertEquals(firstObjectExpectedContent, objectContents);

sb = new StringBuilder();
for (int i = 100; i < 200; i++) {
String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i);
kafkaCluster.send(SendValues.to(topicName, json));
sb.append(json).append("\n");
}
String secondObjectExpectedContent = sb.toString();

String dataObjectKey2 =
String.format("%s/%s/%s-00000-000000000100.gz", prefix, today, topicName);
String indexObjectKey2 =
String.format("%s/%s/%s-00000-000000000100.index.json", prefix, today, topicName);
List<String> expectedObjects2 =
List.of(indexObjectKey2, dataObjectKey2, lastChunkIndexObjectKey);

// Make sure files are produced using wall time flushing
Awaitility.await()
.atMost(flushIntevalMs * 3, TimeUnit.MILLISECONDS)
.until(() -> expectedObjects2.stream().allMatch(key -> objectKeyExists(bucketName, key)));

assertEquals(200, getConsumerOffset("connect-" + connectorName, topicName, 0));

objectContents = getS3FileOutput(bucketName, dataObjectKey2);
assertEquals(secondObjectExpectedContent, objectContents);

deleteConnector(connectorName);

for (int i = 200; i < 300; i++) {
String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i);
kafkaCluster.send(SendValues.to(topicName, json));
}

s3.deleteObjects(
new DeleteObjectsRequest(bucketName)
.withKeys(
lastChunkIndexObjectKey,
dataObjectKey,
indexObjectKey,
dataObjectKey2,
indexObjectKey2));
s3.deleteBucket(bucketName);
s3.createBucket(bucketName);

assertTrue(expectedObjects.stream().noneMatch(key -> objectKeyExists(bucketName, key)));
assertTrue(expectedObjects2.stream().noneMatch(key -> objectKeyExists(bucketName, key)));

// Rewind the consumer to the first record
kafkaAdmin
.alterConsumerGroupOffsets(
"connect-" + connectorName,
Map.of(new TopicPartition(topicName, 0), new OffsetAndMetadata(0)))
.all()
.get();

assertEquals(0, getConsumerOffset("connect-" + connectorName, topicName, 0));

kafkaConnectContainer.registerConnector(connectorName, connectorConfiguration);
kafkaConnectContainer.ensureConnectorTaskState(connectorName, 0, State.RUNNING);

// Check files are produced again shortly after the connector restart
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.until(
() ->
Stream.concat(expectedObjects.stream(), expectedObjects2.stream())
.allMatch(key -> objectKeyExists(bucketName, key)));

Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.until(() -> getConsumerOffset("connect-" + connectorName, topicName, 0) == 200);

// Check same files with same content are produced again
objectContents = getS3FileOutput(bucketName, dataObjectKey);
assertEquals(firstObjectExpectedContent, objectContents);

objectContents = getS3FileOutput(bucketName, dataObjectKey2);
assertEquals(secondObjectExpectedContent, objectContents);

deleteConnector(connectorName);
}

@Test
public void testSinkWithBinaryFormat() throws Exception {
String bucketName = "connect-system-test";
Expand Down Expand Up @@ -548,6 +756,17 @@ private boolean objectKeyExists(String bucketName, String objectKey) {
return returnValue;
}

private long getConsumerOffset(String groupId, String topic, int partition)
throws ExecutionException, InterruptedException {
var offset =
kafkaAdmin
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get()
.get(new TopicPartition(topic, partition));
return offset == null ? -1 : offset.offset();
}

private String getS3FileOutput(String bucketName, String objectKey) throws IOException {
StringBuilder sb = new StringBuilder();
S3Object s3Object = s3.getObject(bucketName, objectKey);
Expand Down

0 comments on commit e0b9ea7

Please sign in to comment.