diff --git a/sink/src/main/java/com/spredfast/kafka/connect/s3/sink/S3SinkTask.java b/sink/src/main/java/com/spredfast/kafka/connect/s3/sink/S3SinkTask.java index b2c8cd1..8de9ad2 100644 --- a/sink/src/main/java/com/spredfast/kafka/connect/s3/sink/S3SinkTask.java +++ b/sink/src/main/java/com/spredfast/kafka/connect/s3/sink/S3SinkTask.java @@ -282,12 +282,12 @@ public boolean shouldFlush() { long timeSinceFirstRecordProduced = now - firstRecord.timestamp(); long timeSinceLastRecordReceived = now - lastRecordReceiveTime; - boolean doWallClockFlush = + boolean doWallTimeFlush = flushIntervalMs != -1 && timeSinceFirstRecordProduced >= (flushIntervalMs + gracePeriodMs) && timeSinceLastRecordReceived > gracePeriodMs; - if (doWallClockFlush) { - log.debug("{} performing a wall clock flush on {}", name(), tp); + if (doWallTimeFlush) { + log.debug("{} performing a wall time flush on {}", name(), tp); return true; } diff --git a/sink/src/test/java/com/spredfast/kafka/connect/s3/S3SinkConnectorIT.java b/sink/src/test/java/com/spredfast/kafka/connect/s3/S3SinkConnectorIT.java index 8baf1f8..8935b24 100644 --- a/sink/src/test/java/com/spredfast/kafka/connect/s3/S3SinkConnectorIT.java +++ b/sink/src/test/java/com/spredfast/kafka/connect/s3/S3SinkConnectorIT.java @@ -2,6 +2,7 @@ import static net.mguenther.kafka.junit.ObserveKeyValues.on; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.amazonaws.SdkClientException; import com.amazonaws.auth.AWSStaticCredentialsProvider; @@ -9,6 +10,7 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.S3Object; import io.debezium.testing.testcontainers.Connector.State; import io.debezium.testing.testcontainers.ConnectorConfiguration; @@ -23,6 +25,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -34,6 +38,14 @@ import net.mguenther.kafka.junit.TopicConfig; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.lang.StringUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringSerializer; import org.awaitility.Awaitility; import org.junit.AfterClass; import org.junit.Before; @@ -86,6 +98,9 @@ public class S3SinkConnectorIT { private ExternalKafkaCluster kafkaCluster = ExternalKafkaCluster.at(kafkaContainer.getBootstrapServers()); + private Admin kafkaAdmin; + private KafkaProducer kafkaProducer; + @BeforeClass public static void startContainers() { Startables.deepStart(Stream.of(kafkaContainer, localStackContainer, kafkaConnectContainer)) @@ -107,6 +122,23 @@ public void setup() { .build(); kafkaCluster = ExternalKafkaCluster.at(kafkaContainer.getBootstrapServers()); + kafkaAdmin = + Admin.create( + Map.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers())); + kafkaProducer = + new KafkaProducer<>( + Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + kafkaContainer.getBootstrapServers(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class, + ProducerConfig.ACKS_CONFIG, + "all", + ProducerConfig.RETRIES_CONFIG, + "1")); } @AfterClass @@ -134,6 +166,8 @@ public void testSinkWithGzipFileThreshold() throws Exception { kafkaConnectContainer.registerConnector(connectorName, connectorConfiguration); kafkaConnectContainer.ensureConnectorTaskState(connectorName, 0, State.RUNNING); + assertEquals(-1, getConsumerOffset("connect-" + connectorName, topicName, 0)); + // Produce messages to the topic (uncompressed: 3190, compressed: ~340) StringBuilder sb = new StringBuilder(); for (int i = 0; i < 95; i++) { @@ -176,6 +210,10 @@ public void testSinkWithGzipFileThreshold() throws Exception { objectContents = getS3FileOutput(bucketName, indexObjectKey); assertEquals(expectedIndexObjectContents, objectContents); + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .until(() -> getConsumerOffset("connect-" + connectorName, topicName, 0) == 95); + // It's not enough to trigger another flush for (int i = 100; i < 150; i++) { kafkaCluster.send( @@ -191,6 +229,9 @@ public void testSinkWithGzipFileThreshold() throws Exception { .atMost(30, TimeUnit.SECONDS) .until(() -> !objectKeyExists(bucketName, indexObjectKey2)); + // Make sure the committed offset didn't change as there were no new file uploads + assertEquals(95, getConsumerOffset("connect-" + connectorName, topicName, 0)); + // Delete the connector deleteConnector(connectorName); } @@ -207,6 +248,7 @@ public void testSinkWithBigFlushInterval() throws Exception { kafkaCluster.createTopic(TopicConfig.withName(topicName).withNumberOfPartitions(1)); // Define, register, and start the connector + final long flushIntevalMs = 12 * 3600 * 1000; ConnectorConfiguration connectorConfiguration = getConnectorConfiguration( connectorName, @@ -215,31 +257,60 @@ public void testSinkWithBigFlushInterval() throws Exception { topicName, 67108864, 100 * 1024 * 1024, - 12 * 3600 * 1000); + flushIntevalMs); kafkaConnectContainer.registerConnector(connectorName, connectorConfiguration); kafkaConnectContainer.ensureConnectorTaskState(connectorName, 0, State.RUNNING); // Produce messages to the topic + StringBuilder sb = new StringBuilder(); + long ts = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i); - kafkaCluster.send(SendValues.to(topicName, json)); + kafkaProducer.send(new ProducerRecord<>(topicName, 0, ts, "", json)); + sb.append(json).append("\n"); } LocalDate today = LocalDate.now(ZoneOffset.UTC); // Define the expected files - List expectedObjects = - List.of( - String.format("%s/%s/%s-00000-000000000000.index.json", prefix, today, topicName), - String.format("%s/%s/%s-00000-000000000000.gz", prefix, today, topicName), - String.format("%s/last_chunk_index.%s-00000.txt", prefix, topicName)); + String dataObjectKey = + String.format("%s/%s/%s-00000-000000000000.gz", prefix, today, topicName); + String indexObjectKey = + String.format("%s/%s/%s-00000-000000000000.index.json", prefix, today, topicName); + String lastChunkIndexObjectKey = + String.format("%s/last_chunk_index.%s-00000.txt", prefix, topicName); + List expectedObjects = List.of(indexObjectKey, dataObjectKey, lastChunkIndexObjectKey); - // Await for all the files to be produced by all tasks + // Make sure no files are produced yet as the timestamp based flushing threshold is not reached Awaitility.await() .pollDelay(5, TimeUnit.SECONDS) .during(20, TimeUnit.SECONDS) .atMost(30, TimeUnit.SECONDS) .until(() -> expectedObjects.stream().noneMatch(key -> objectKeyExists(bucketName, key))); + assertEquals(-1, getConsumerOffset("connect-" + connectorName, topicName, 0)); + + ts += flushIntevalMs + 60 * 1000; + for (int i = 100; i < 200; i++) { + String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i); + kafkaProducer.send(new ProducerRecord<>(topicName, 0, ts, "", json)); + } + + // Await for all the files to be produced by all tasks + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .until(() -> expectedObjects.stream().allMatch(key -> objectKeyExists(bucketName, key))); + + // Check the data object contains expected records + // in particular, it shouldn't contain the record that triggered flushing + String objectContents = getS3FileOutput(bucketName, dataObjectKey); + assertEquals(sb.toString(), objectContents); + + // Check that the committed offset corresponds to the message offset that triggered timestamp + // based flushing + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .until(() -> getConsumerOffset("connect-" + connectorName, topicName, 0) == 100); + // Delete the connector deleteConnector(connectorName); } @@ -404,6 +475,143 @@ public void testSinkWithRestart() throws Exception { deleteConnector(connectorName); } + @Test + public void testSinkWithWallTimeFlushingAndRewind() throws Exception { + String bucketName = "connect-system-test-wall-time"; + String prefix = "systest"; + String topicName = "system_test_wall_time"; + String connectorName = "s3-sink-wall-time"; + s3.createBucket(bucketName); + + // Create the test topic + kafkaCluster.createTopic(TopicConfig.withName(topicName).withNumberOfPartitions(1)); + + // Define, register, and start the connector + final long flushIntevalMs = 30 * 1000; + ConnectorConfiguration connectorConfiguration = + getConnectorConfiguration( + connectorName, + bucketName, + prefix, + topicName, + 67108864, + 100 * 1024 * 1024, + flushIntevalMs); + kafkaConnectContainer.registerConnector(connectorName, connectorConfiguration); + kafkaConnectContainer.ensureConnectorTaskState(connectorName, 0, State.RUNNING); + + assertEquals(-1, getConsumerOffset("connect-" + connectorName, topicName, 0)); + + // Produce messages to the topic + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 100; i++) { + String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i); + kafkaCluster.send(SendValues.to(topicName, json)); + sb.append(json).append("\n"); + } + String firstObjectExpectedContent = sb.toString(); + LocalDate today = LocalDate.now(ZoneOffset.UTC); + + // Define the expected files + String dataObjectKey = + String.format("%s/%s/%s-00000-000000000000.gz", prefix, today, topicName); + String indexObjectKey = + String.format("%s/%s/%s-00000-000000000000.index.json", prefix, today, topicName); + String lastChunkIndexObjectKey = + String.format("%s/last_chunk_index.%s-00000.txt", prefix, topicName); + List expectedObjects = List.of(indexObjectKey, dataObjectKey, lastChunkIndexObjectKey); + + // Make sure files are produced using wall time flushing + Awaitility.await() + .atMost(flushIntevalMs * 3, TimeUnit.MILLISECONDS) + .until(() -> expectedObjects.stream().allMatch(key -> objectKeyExists(bucketName, key))); + + assertEquals(100, getConsumerOffset("connect-" + connectorName, topicName, 0)); + + String objectContents = getS3FileOutput(bucketName, dataObjectKey); + assertEquals(firstObjectExpectedContent, objectContents); + + sb = new StringBuilder(); + for (int i = 100; i < 200; i++) { + String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i); + kafkaCluster.send(SendValues.to(topicName, json)); + sb.append(json).append("\n"); + } + String secondObjectExpectedContent = sb.toString(); + + String dataObjectKey2 = + String.format("%s/%s/%s-00000-000000000100.gz", prefix, today, topicName); + String indexObjectKey2 = + String.format("%s/%s/%s-00000-000000000100.index.json", prefix, today, topicName); + List expectedObjects2 = + List.of(indexObjectKey2, dataObjectKey2, lastChunkIndexObjectKey); + + // Make sure files are produced using wall time flushing + Awaitility.await() + .atMost(flushIntevalMs * 3, TimeUnit.MILLISECONDS) + .until(() -> expectedObjects2.stream().allMatch(key -> objectKeyExists(bucketName, key))); + + assertEquals(200, getConsumerOffset("connect-" + connectorName, topicName, 0)); + + objectContents = getS3FileOutput(bucketName, dataObjectKey2); + assertEquals(secondObjectExpectedContent, objectContents); + + deleteConnector(connectorName); + + for (int i = 200; i < 300; i++) { + String json = String.format("{{\"foo\": \"bar\", \"counter\": %d}}", i); + kafkaCluster.send(SendValues.to(topicName, json)); + } + + s3.deleteObjects( + new DeleteObjectsRequest(bucketName) + .withKeys( + lastChunkIndexObjectKey, + dataObjectKey, + indexObjectKey, + dataObjectKey2, + indexObjectKey2)); + s3.deleteBucket(bucketName); + s3.createBucket(bucketName); + + assertTrue(expectedObjects.stream().noneMatch(key -> objectKeyExists(bucketName, key))); + assertTrue(expectedObjects2.stream().noneMatch(key -> objectKeyExists(bucketName, key))); + + // Rewind the consumer to the first record + kafkaAdmin + .alterConsumerGroupOffsets( + "connect-" + connectorName, + Map.of(new TopicPartition(topicName, 0), new OffsetAndMetadata(0))) + .all() + .get(); + + assertEquals(0, getConsumerOffset("connect-" + connectorName, topicName, 0)); + + kafkaConnectContainer.registerConnector(connectorName, connectorConfiguration); + kafkaConnectContainer.ensureConnectorTaskState(connectorName, 0, State.RUNNING); + + // Check files are produced again shortly after the connector restart + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .until( + () -> + Stream.concat(expectedObjects.stream(), expectedObjects2.stream()) + .allMatch(key -> objectKeyExists(bucketName, key))); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .until(() -> getConsumerOffset("connect-" + connectorName, topicName, 0) == 200); + + // Check same files with same content are produced again + objectContents = getS3FileOutput(bucketName, dataObjectKey); + assertEquals(firstObjectExpectedContent, objectContents); + + objectContents = getS3FileOutput(bucketName, dataObjectKey2); + assertEquals(secondObjectExpectedContent, objectContents); + + deleteConnector(connectorName); + } + @Test public void testSinkWithBinaryFormat() throws Exception { String bucketName = "connect-system-test"; @@ -548,6 +756,17 @@ private boolean objectKeyExists(String bucketName, String objectKey) { return returnValue; } + private long getConsumerOffset(String groupId, String topic, int partition) + throws ExecutionException, InterruptedException { + var offset = + kafkaAdmin + .listConsumerGroupOffsets(groupId) + .partitionsToOffsetAndMetadata() + .get() + .get(new TopicPartition(topic, partition)); + return offset == null ? -1 : offset.offset(); + } + private String getS3FileOutput(String bucketName, String objectKey) throws IOException { StringBuilder sb = new StringBuilder(); S3Object s3Object = s3.getObject(bucketName, objectKey);