From 36f691e7ab05b014588b4611a4dd4c766b838374 Mon Sep 17 00:00:00 2001 From: Emil Krastev Date: Wed, 21 Dec 2022 11:01:38 +0200 Subject: [PATCH 1/6] KAFKA-12558: Do not prematurely mutate partiton state --- .../connect/mirror/MirrorSourceTask.java | 34 +++-- .../connect/mirror/MirrorSourceTaskTest.java | 141 +++++++++++++++--- 2 files changed, 138 insertions(+), 37 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index da4697ddf2545..91a712971cd4c 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -197,17 +197,19 @@ private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset long downstreamOffset) { PartitionState partitionState = partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); - if (partitionState.update(upstreamOffset, downstreamOffset)) { - sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset); + if (partitionState.shouldUpdate(upstreamOffset, downstreamOffset)) { + if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { + partitionState.update(upstreamOffset, downstreamOffset); + } } } // sends OffsetSync record upstream to internal offsets topic - private void sendOffsetSync(TopicPartition topicPartition, long upstreamOffset, + private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { if (!outstandingOffsetSyncs.tryAcquire()) { // Too many outstanding offset syncs. - return; + return false; } OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); ProducerRecord record = new ProducerRecord<>(offsetSyncsTopic, 0, @@ -221,6 +223,7 @@ private void sendOffsetSync(TopicPartition topicPartition, long upstreamOffset, } outstandingOffsetSyncs.release(); }); + return true; } private Map loadOffsets(Set topicPartitions) { @@ -277,22 +280,23 @@ static class PartitionState { this.maxOffsetLag = maxOffsetLag; } - // true if we should emit an offset sync - boolean update(long upstreamOffset, long downstreamOffset) { - boolean shouldSyncOffsets = false; - long upstreamStep = upstreamOffset - lastSyncUpstreamOffset; - long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep; - if (lastSyncDownstreamOffset == -1L - || downstreamOffset - downstreamTargetOffset >= maxOffsetLag - || upstreamOffset - previousUpstreamOffset != 1L - || downstreamOffset < previousDownstreamOffset) { + void update(long upstreamOffset, long downstreamOffset) { + if (shouldUpdate(upstreamOffset, downstreamOffset)) { lastSyncUpstreamOffset = upstreamOffset; lastSyncDownstreamOffset = downstreamOffset; - shouldSyncOffsets = true; } previousUpstreamOffset = upstreamOffset; previousDownstreamOffset = downstreamOffset; - return shouldSyncOffsets; + } + + // true if we should emit an offset sync + boolean shouldUpdate(long upstreamOffset, long downstreamOffset) { + long upstreamStep = upstreamOffset - lastSyncUpstreamOffset; + long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep; + return lastSyncDownstreamOffset == -1L + || downstreamOffset - downstreamTargetOffset >= maxOffsetLag + || upstreamOffset - previousUpstreamOffset != 1L + || downstreamOffset < previousDownstreamOffset; } } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 14cf4143c9042..9cfc852c8da17 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -35,8 +35,7 @@ import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -80,16 +79,65 @@ public void testSerde() { public void testOffsetSync() { MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50); - assertTrue(partitionState.update(0, 100), "always emit offset sync on first update"); - assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync"); - assertFalse(partitionState.update(3, 152), "no sync"); - assertFalse(partitionState.update(4, 153), "no sync"); - assertFalse(partitionState.update(5, 154), "no sync"); - assertTrue(partitionState.update(6, 205), "one past target offset"); - assertTrue(partitionState.update(2, 206), "upstream reset"); - assertFalse(partitionState.update(3, 207), "no sync"); - assertTrue(partitionState.update(4, 3), "downstream reset"); - assertFalse(partitionState.update(5, 4), "no sync"); + long upstreamOffset = 0; + long downstreamOffset = 100; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "always emit offset sync on first update"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "always emit offset sync on first update"); + + upstreamOffset = 2; + downstreamOffset = 102; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "upstream offset skipped -> resync"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "upstream offset skipped -> resync"); + + upstreamOffset = 3; + downstreamOffset = 152; + partitionState.update(upstreamOffset, downstreamOffset); + assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); + assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); + + upstreamOffset = 4; + downstreamOffset = 153; + partitionState.update(upstreamOffset, downstreamOffset); + assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); + assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); + + upstreamOffset = 5; + downstreamOffset = 154; + partitionState.update(upstreamOffset, downstreamOffset); + assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); + assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); + + upstreamOffset = 6; + downstreamOffset = 205; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "one past target offset"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "one past target offset"); + + upstreamOffset = 2; + downstreamOffset = 206; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "upstream reset"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "upstream reset"); + + upstreamOffset = 3; + downstreamOffset = 207; + partitionState.update(upstreamOffset, downstreamOffset); + assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); + assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); + + upstreamOffset = 4; + downstreamOffset = 3; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "downstream reset"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "downstream reset"); + + upstreamOffset = 5; + downstreamOffset = 4; + partitionState.update(upstreamOffset, downstreamOffset); + assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); + assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); } @Test @@ -97,16 +145,65 @@ public void testZeroOffsetSync() { MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0); // if max offset lag is zero, should always emit offset syncs - assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect"); - assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect"); - assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect"); - assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect"); - assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect"); - assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect"); - assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect"); - assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect"); - assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect"); - assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect"); + long upstreamOffset = 0; + long downstreamOffset = 100; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 100 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 100 is incorrect"); + + upstreamOffset = 2; + downstreamOffset = 102; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 102 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 102 is incorrect"); + + upstreamOffset = 3; + downstreamOffset = 153; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 153 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 153 is incorrect"); + + upstreamOffset = 4; + downstreamOffset = 154; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 154 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 154 is incorrect"); + + upstreamOffset = 5; + downstreamOffset = 155; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 155 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 155 is incorrect"); + + upstreamOffset = 6; + downstreamOffset = 207; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 207 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 207 is incorrect"); + + upstreamOffset = 2; + downstreamOffset = 208; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 208 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 208 is incorrect"); + + upstreamOffset = 3; + downstreamOffset = 209; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 209 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 209 is incorrect"); + + upstreamOffset = 4; + downstreamOffset = 3; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 3 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 3 is incorrect"); + + upstreamOffset = 5; + downstreamOffset = 4; + partitionState.update(upstreamOffset, downstreamOffset); + assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 4 is incorrect"); + assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 4 is incorrect"); } @Test From 673b7928c213ebd78caa4fc0305f80444e77d2d4 Mon Sep 17 00:00:00 2001 From: Emil Krastev Date: Fri, 23 Dec 2022 17:34:57 +0200 Subject: [PATCH 2/6] KAFKA-12558: Added unit test and improve partition mutation logic --- .../connect/mirror/MirrorSourceTask.java | 24 ++-- .../connect/mirror/MirrorSourceTaskTest.java | 118 ++++++++++++++---- .../MirrorConnectorsIntegrationBaseTest.java | 6 +- 3 files changed, 114 insertions(+), 34 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 91a712971cd4c..28db477c60b7a 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -69,7 +69,9 @@ public MirrorSourceTask() {} // for testing MirrorSourceTask(KafkaConsumer consumer, MirrorSourceMetrics metrics, String sourceClusterAlias, - ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer producer) { + ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer producer, + Semaphore outstandingOffsetSyncs, Map partitionStates, + String offsetSyncsTopic) { this.consumer = consumer; this.metrics = metrics; this.sourceClusterAlias = sourceClusterAlias; @@ -77,6 +79,9 @@ public MirrorSourceTask() {} this.maxOffsetLag = maxOffsetLag; consumerAccess = new Semaphore(1); this.offsetProducer = producer; + this.outstandingOffsetSyncs = outstandingOffsetSyncs; + this.partitionStates = partitionStates; + this.offsetSyncsTopic = offsetSyncsTopic; } @Override @@ -197,9 +202,9 @@ private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset long downstreamOffset) { PartitionState partitionState = partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); - if (partitionState.shouldUpdate(upstreamOffset, downstreamOffset)) { + if (partitionState.shouldSyncOffsets(upstreamOffset, downstreamOffset)) { if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); } } } @@ -280,23 +285,24 @@ static class PartitionState { this.maxOffsetLag = maxOffsetLag; } - void update(long upstreamOffset, long downstreamOffset) { - if (shouldUpdate(upstreamOffset, downstreamOffset)) { + void syncOffsets(long upstreamOffset, long downstreamOffset) { + if (shouldSyncOffsets(upstreamOffset, downstreamOffset)) { lastSyncUpstreamOffset = upstreamOffset; lastSyncDownstreamOffset = downstreamOffset; } - previousUpstreamOffset = upstreamOffset; - previousDownstreamOffset = downstreamOffset; } // true if we should emit an offset sync - boolean shouldUpdate(long upstreamOffset, long downstreamOffset) { + boolean shouldSyncOffsets(long upstreamOffset, long downstreamOffset) { long upstreamStep = upstreamOffset - lastSyncUpstreamOffset; long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep; - return lastSyncDownstreamOffset == -1L + boolean shouldSyncOffsets = lastSyncDownstreamOffset == -1L || downstreamOffset - downstreamTargetOffset >= maxOffsetLag || upstreamOffset - previousUpstreamOffset != 1L || downstreamOffset < previousDownstreamOffset; + previousUpstreamOffset = upstreamOffset; + previousDownstreamOffset = downstreamOffset; + return shouldSyncOffsets; } } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 9cfc852c8da17..75858c84f029f 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -20,24 +20,31 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.Semaphore; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verifyNoInteractions; @@ -55,7 +62,7 @@ public void testSerde() { @SuppressWarnings("unchecked") KafkaProducer producer = mock(KafkaProducer.class); MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7", - new DefaultReplicationPolicy(), 50, producer); + new DefaultReplicationPolicy(), 50, producer, null, null, null); SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord); assertEquals("cluster7.topic1", sourceRecord.topic(), "Failure on cluster7.topic1 consumerRecord serde"); @@ -81,61 +88,61 @@ public void testOffsetSync() { long upstreamOffset = 0; long downstreamOffset = 100; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "always emit offset sync on first update"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "always emit offset sync on first update"); upstreamOffset = 2; downstreamOffset = 102; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "upstream offset skipped -> resync"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "upstream offset skipped -> resync"); upstreamOffset = 3; downstreamOffset = 152; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); upstreamOffset = 4; downstreamOffset = 153; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); upstreamOffset = 5; downstreamOffset = 154; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); upstreamOffset = 6; downstreamOffset = 205; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "one past target offset"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "one past target offset"); upstreamOffset = 2; downstreamOffset = 206; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "upstream reset"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "upstream reset"); upstreamOffset = 3; downstreamOffset = 207; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); upstreamOffset = 4; downstreamOffset = 3; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "downstream reset"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "downstream reset"); upstreamOffset = 5; downstreamOffset = 4; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); } @@ -147,61 +154,61 @@ public void testZeroOffsetSync() { // if max offset lag is zero, should always emit offset syncs long upstreamOffset = 0; long downstreamOffset = 100; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 100 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 100 is incorrect"); upstreamOffset = 2; downstreamOffset = 102; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 102 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 102 is incorrect"); upstreamOffset = 3; downstreamOffset = 153; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 153 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 153 is incorrect"); upstreamOffset = 4; downstreamOffset = 154; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 154 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 154 is incorrect"); upstreamOffset = 5; downstreamOffset = 155; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 155 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 155 is incorrect"); upstreamOffset = 6; downstreamOffset = 207; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 207 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 207 is incorrect"); upstreamOffset = 2; downstreamOffset = 208; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 208 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 208 is incorrect"); upstreamOffset = 3; downstreamOffset = 209; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 209 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 209 is incorrect"); upstreamOffset = 4; downstreamOffset = 3; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 3 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 3 is incorrect"); upstreamOffset = 5; downstreamOffset = 4; - partitionState.update(upstreamOffset, downstreamOffset); + partitionState.syncOffsets(upstreamOffset, downstreamOffset); assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 4 is incorrect"); assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 4 is incorrect"); } @@ -237,7 +244,7 @@ public void testPoll() { String sourceClusterName = "cluster1"; ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy(); MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, - replicationPolicy, 50, producer); + replicationPolicy, 50, producer, null, null, null); List sourceRecords = mirrorSourceTask.poll(); assertEquals(2, sourceRecords.size()); @@ -283,7 +290,7 @@ public void testCommitRecordWithNullMetadata() { String sourceClusterName = "cluster1"; ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy(); MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, - replicationPolicy, 50, producer); + replicationPolicy, 50, producer, null, null, null); SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(), TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty())); @@ -293,6 +300,70 @@ public void testCommitRecordWithNullMetadata() { verifyNoInteractions(producer); } + @Test + public void testPartitionStateMutation() { + byte[] recordKey = "key".getBytes(); + byte[] recordValue = "value".getBytes(); + int maxOffsetLag = 50; + int recordPartition = 0; + int recordOffset = 0; + int metadataOffset = 100; + String topicName = "topic"; + String sourceClusterName = "sourceCluster"; + + RecordHeaders headers = new RecordHeaders(); + ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy(); + + @SuppressWarnings("unchecked") + KafkaConsumer consumer = mock(KafkaConsumer.class); + @SuppressWarnings("unchecked") + KafkaProducer producer = mock(KafkaProducer.class); + MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class); + Semaphore outstandingOffsetSyncs = mock(Semaphore.class); + PartitionState partitionState = new PartitionState(maxOffsetLag); + Map partitionStates = new HashMap<>(); + + MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, + replicationPolicy, maxOffsetLag, producer, outstandingOffsetSyncs, partitionStates, topicName); + + SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, + recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, + recordValue.length, recordKey, recordValue, headers, Optional.empty())); + + TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()); + partitionStates.put(sourceTopicPartition, partitionState); + RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); + + when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); + assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); + assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); + assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); + + int newRecordOffset = 2; + int newMetadataOffset = 102; + recordMetadata = new RecordMetadata(sourceTopicPartition, newMetadataOffset, 0, 0, 0, recordPartition); + sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, + newRecordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, + recordValue.length, recordKey, recordValue, headers, Optional.empty())); + + when(outstandingOffsetSyncs.tryAcquire()).thenReturn(false); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "no sync"); + assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "no sync"); + assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "update previuos upstream offset"); + assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "update previuos upstream offset"); + + when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + assertEquals(newRecordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); + assertEquals(newMetadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); + assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "sync offsets"); + assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); + verify(producer, times(2)).send(any(), any()); + } + private void compareHeaders(List
expectedHeaders, List taskHeaders) { assertEquals(expectedHeaders.size(), taskHeaders.size()); for (int i = 0; i < expectedHeaders.size(); i++) { @@ -304,4 +375,5 @@ private void compareHeaders(List
expectedHeaders, List backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofMillis(CHECKPOINT_DURATION_MS)); - assertTrue(backupOffsets.containsKey( - new TopicPartition("primary.test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets); + for (int i = 0; i < NUM_PARTITIONS; i++) { + assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)), + "Offsets not translated downstream to backup cluster. Found: " + backupOffsets); + } // Failover consumer group to backup cluster. try (Consumer primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) { From 27bb18f31769aa4d7c93077bf667df2877ab0d42 Mon Sep 17 00:00:00 2001 From: Emil Krastev Date: Wed, 28 Dec 2022 09:49:53 +0200 Subject: [PATCH 3/6] KAFKA-12558: Fix typo and remove new line in MirrorSourceTaskTest --- .../apache/kafka/connect/mirror/MirrorSourceTaskTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 75858c84f029f..569836527eef3 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -352,8 +352,8 @@ public void testPartitionStateMutation() { mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "no sync"); assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "no sync"); - assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "update previuos upstream offset"); - assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "update previuos upstream offset"); + assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "update previous upstream offset"); + assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "update previous upstream offset"); when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); @@ -375,5 +375,4 @@ private void compareHeaders(List
expectedHeaders, List Date: Fri, 30 Dec 2022 21:54:31 +0200 Subject: [PATCH 4/6] KAFKA-12558: Update on the partitation state must result in offset sync event --- .../connect/mirror/MirrorSourceTask.java | 26 +-- .../connect/mirror/MirrorSourceTaskTest.java | 179 ++++++------------ 2 files changed, 67 insertions(+), 138 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 28db477c60b7a..5635eb7189d71 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -202,9 +202,9 @@ private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset long downstreamOffset) { PartitionState partitionState = partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); - if (partitionState.shouldSyncOffsets(upstreamOffset, downstreamOffset)) { + if (partitionState.update(upstreamOffset, downstreamOffset)) { if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { - partitionState.syncOffsets(upstreamOffset, downstreamOffset); + partitionState.reset(); } } } @@ -280,29 +280,31 @@ static class PartitionState { long lastSyncUpstreamOffset = -1L; long lastSyncDownstreamOffset = -1L; long maxOffsetLag; + boolean shouldSyncOffsets; PartitionState(long maxOffsetLag) { this.maxOffsetLag = maxOffsetLag; } - void syncOffsets(long upstreamOffset, long downstreamOffset) { - if (shouldSyncOffsets(upstreamOffset, downstreamOffset)) { - lastSyncUpstreamOffset = upstreamOffset; - lastSyncDownstreamOffset = downstreamOffset; - } - } - // true if we should emit an offset sync - boolean shouldSyncOffsets(long upstreamOffset, long downstreamOffset) { + boolean update(long upstreamOffset, long downstreamOffset) { long upstreamStep = upstreamOffset - lastSyncUpstreamOffset; long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep; - boolean shouldSyncOffsets = lastSyncDownstreamOffset == -1L + if (lastSyncDownstreamOffset == -1L || downstreamOffset - downstreamTargetOffset >= maxOffsetLag || upstreamOffset - previousUpstreamOffset != 1L - || downstreamOffset < previousDownstreamOffset; + || downstreamOffset < previousDownstreamOffset) { + lastSyncUpstreamOffset = upstreamOffset; + lastSyncDownstreamOffset = downstreamOffset; + shouldSyncOffsets = true; + } previousUpstreamOffset = upstreamOffset; previousDownstreamOffset = downstreamOffset; return shouldSyncOffsets; } + + void reset() { + shouldSyncOffsets = false; + } } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 569836527eef3..d300fcec3ec6c 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -40,7 +40,8 @@ import java.util.concurrent.Semaphore; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -86,65 +87,26 @@ public void testSerde() { public void testOffsetSync() { MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50); - long upstreamOffset = 0; - long downstreamOffset = 100; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "always emit offset sync on first update"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "always emit offset sync on first update"); - - upstreamOffset = 2; - downstreamOffset = 102; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "upstream offset skipped -> resync"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "upstream offset skipped -> resync"); - - upstreamOffset = 3; - downstreamOffset = 152; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); - assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); - - upstreamOffset = 4; - downstreamOffset = 153; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); - assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); - - upstreamOffset = 5; - downstreamOffset = 154; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); - assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); - - upstreamOffset = 6; - downstreamOffset = 205; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "one past target offset"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "one past target offset"); - - upstreamOffset = 2; - downstreamOffset = 206; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "upstream reset"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "upstream reset"); - - upstreamOffset = 3; - downstreamOffset = 207; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); - assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); - - upstreamOffset = 4; - downstreamOffset = 3; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "downstream reset"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "downstream reset"); - - upstreamOffset = 5; - downstreamOffset = 4; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertNotEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "no sync"); - assertNotEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "no sync"); + assertTrue(partitionState.update(0, 100), "always emit offset sync on first update"); + partitionState.reset(); + assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync"); + partitionState.reset(); + assertFalse(partitionState.update(3, 152), "no sync"); + partitionState.reset(); + assertFalse(partitionState.update(4, 153), "no sync"); + partitionState.reset(); + assertFalse(partitionState.update(5, 154), "no sync"); + partitionState.reset(); + assertTrue(partitionState.update(6, 205), "one past target offset"); + partitionState.reset(); + assertTrue(partitionState.update(2, 206), "upstream reset"); + partitionState.reset(); + assertFalse(partitionState.update(3, 207), "no sync"); + partitionState.reset(); + assertTrue(partitionState.update(4, 3), "downstream reset"); + partitionState.reset(); + assertFalse(partitionState.update(5, 4), "no sync"); + partitionState.reset(); } @Test @@ -152,65 +114,25 @@ public void testZeroOffsetSync() { MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0); // if max offset lag is zero, should always emit offset syncs - long upstreamOffset = 0; - long downstreamOffset = 100; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 100 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 100 is incorrect"); - - upstreamOffset = 2; - downstreamOffset = 102; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 102 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 102 is incorrect"); - - upstreamOffset = 3; - downstreamOffset = 153; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 153 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 153 is incorrect"); - - upstreamOffset = 4; - downstreamOffset = 154; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 154 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 154 is incorrect"); - - upstreamOffset = 5; - downstreamOffset = 155; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 155 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 155 is incorrect"); - - upstreamOffset = 6; - downstreamOffset = 207; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 207 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 207 is incorrect"); - - upstreamOffset = 2; - downstreamOffset = 208; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 208 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 208 is incorrect"); - - upstreamOffset = 3; - downstreamOffset = 209; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 209 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 209 is incorrect"); - - upstreamOffset = 4; - downstreamOffset = 3; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 3 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 3 is incorrect"); - - upstreamOffset = 5; - downstreamOffset = 4; - partitionState.syncOffsets(upstreamOffset, downstreamOffset); - assertEquals(upstreamOffset, partitionState.lastSyncUpstreamOffset, "zeroOffsetSync downStreamOffset 4 is incorrect"); - assertEquals(downstreamOffset, partitionState.lastSyncDownstreamOffset, "zeroOffsetSync downStreamOffset 4 is incorrect"); + assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect"); } @Test @@ -340,6 +262,7 @@ public void testPartitionStateMutation() { assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); + assertFalse(partitionState.shouldSyncOffsets); int newRecordOffset = 2; int newMetadataOffset = 102; @@ -350,17 +273,21 @@ public void testPartitionStateMutation() { when(outstandingOffsetSyncs.tryAcquire()).thenReturn(false); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); - assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "no sync"); - assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "no sync"); - assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "update previous upstream offset"); - assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "update previous upstream offset"); - - when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true); - mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + // Expect partition state to be updated assertEquals(newRecordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); assertEquals(newMetadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "sync offsets"); assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); + assertTrue(partitionState.shouldSyncOffsets); + verify(producer, times(1)).send(any(), any()); + + when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + assertEquals(newRecordOffset, partitionState.lastSyncUpstreamOffset, "partition state is synced"); + assertEquals(newMetadataOffset, partitionState.lastSyncDownstreamOffset, "partition state is synced"); + assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "partition state is synced"); + assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "partition state is synced"); + assertFalse(partitionState.shouldSyncOffsets); verify(producer, times(2)).send(any(), any()); } From b73b226baf498409263c73b7a8bb7899a1cf0510 Mon Sep 17 00:00:00 2001 From: Emil Krastev Date: Fri, 6 Jan 2023 17:55:57 +0200 Subject: [PATCH 5/6] KAFKA-12558: Increase test coverage --- .../connect/mirror/MirrorSourceTaskTest.java | 100 ++++++++++++++---- 1 file changed, 79 insertions(+), 21 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index d300fcec3ec6c..e843769b940b0 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; @@ -31,6 +32,9 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -43,6 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -88,7 +93,9 @@ public void testOffsetSync() { MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50); assertTrue(partitionState.update(0, 100), "always emit offset sync on first update"); + assertTrue(partitionState.shouldSyncOffsets, "should sync offsets"); partitionState.reset(); + assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false"); assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync"); partitionState.reset(); assertFalse(partitionState.update(3, 152), "no sync"); @@ -106,7 +113,13 @@ public void testOffsetSync() { assertTrue(partitionState.update(4, 3), "downstream reset"); partitionState.reset(); assertFalse(partitionState.update(5, 4), "no sync"); + assertTrue(partitionState.update(7, 6), "sync"); + assertTrue(partitionState.update(7, 6), "sync"); + assertTrue(partitionState.update(8, 7), "sync"); + assertTrue(partitionState.update(10, 57), "sync"); partitionState.reset(); + assertFalse(partitionState.update(11, 58), "sync"); + assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false"); } @Test @@ -115,7 +128,9 @@ public void testZeroOffsetSync() { // if max offset lag is zero, should always emit offset syncs assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect"); + assertTrue(partitionState.shouldSyncOffsets, "should sync offsets"); partitionState.reset(); + assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false"); assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect"); partitionState.reset(); assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect"); @@ -133,6 +148,12 @@ public void testZeroOffsetSync() { assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect"); partitionState.reset(); assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect"); + assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect"); + assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect"); + assertTrue(partitionState.update(8, 7), "zeroOffsetSync downStreamOffset 7 is incorrect"); + assertTrue(partitionState.update(10, 57), "zeroOffsetSync downStreamOffset 57 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(11, 58), "zeroOffsetSync downStreamOffset 58 is incorrect"); } @Test @@ -241,7 +262,7 @@ public void testPartitionStateMutation() { @SuppressWarnings("unchecked") KafkaProducer producer = mock(KafkaProducer.class); MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class); - Semaphore outstandingOffsetSyncs = mock(Semaphore.class); + Semaphore outstandingOffsetSyncs = new Semaphore(1); PartitionState partitionState = new PartitionState(maxOffsetLag); Map partitionStates = new HashMap<>(); @@ -256,39 +277,76 @@ public void testPartitionStateMutation() { partitionStates.put(sourceTopicPartition, partitionState); RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); - when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true); + doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) { + final Callback callback = invocation.getArgument(1); + callback.onCompletion(null, null); + return null; + } + }).when(producer).send(any(), any()); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); - assertFalse(partitionState.shouldSyncOffsets); + assertFalse(partitionState.shouldSyncOffsets, "partition state reset"); + verify(producer, times(1)).send(any(), any()); - int newRecordOffset = 2; - int newMetadataOffset = 102; - recordMetadata = new RecordMetadata(sourceTopicPartition, newMetadataOffset, 0, 0, 0, recordPartition); + recordOffset = 2; + metadataOffset = 102; + recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, - newRecordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, + recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, headers, Optional.empty())); - when(outstandingOffsetSyncs.tryAcquire()).thenReturn(false); + // Do not release outstanding sync semaphore + doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) { + return null; + } + }).when(producer).send(any(), any()); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); - // Expect partition state to be updated - assertEquals(newRecordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); - assertEquals(newMetadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); - assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "sync offsets"); - assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); - assertTrue(partitionState.shouldSyncOffsets); - verify(producer, times(1)).send(any(), any()); - when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true); + assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); + assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); + assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); + assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); + assertFalse(partitionState.shouldSyncOffsets, "partition state reset"); + verify(producer, times(2)).send(any(), any()); + + recordOffset = 4; + metadataOffset = 104; + recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); + sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, + recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, + recordValue.length, recordKey, recordValue, headers, Optional.empty())); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); - assertEquals(newRecordOffset, partitionState.lastSyncUpstreamOffset, "partition state is synced"); - assertEquals(newMetadataOffset, partitionState.lastSyncDownstreamOffset, "partition state is synced"); - assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "partition state is synced"); - assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "partition state is synced"); - assertFalse(partitionState.shouldSyncOffsets); + assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); + assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); + assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); + assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); + assertTrue(partitionState.shouldSyncOffsets, "no partition state reset"); verify(producer, times(2)).send(any(), any()); + + // Should send sync event and should not update last upstream/downstream offset + recordOffset = 5; + metadataOffset = 150; + recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); + sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, + recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, + recordValue.length, recordKey, recordValue, headers, Optional.empty())); + + outstandingOffsetSyncs.release(); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + assertEquals(4, partitionState.lastSyncUpstreamOffset, "no sync last upstream offset"); + assertEquals(104, partitionState.lastSyncDownstreamOffset, "no sync last downstream offset"); + assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); + assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); + assertFalse(partitionState.shouldSyncOffsets, "partition state reset"); + verify(producer, times(3)).send(any(), any()); } private void compareHeaders(List
expectedHeaders, List taskHeaders) { From 126115f807cdbc34993a5f491f571b4285335679 Mon Sep 17 00:00:00 2001 From: Emil Krastev Date: Tue, 10 Jan 2023 10:47:34 +0200 Subject: [PATCH 6/6] KAFKA-12558: Update test for verifying offset sync events --- .../connect/mirror/MirrorSourceTaskTest.java | 58 ++++++------------- 1 file changed, 18 insertions(+), 40 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index e843769b940b0..9dfcf807ed2f6 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -32,8 +32,7 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; +import org.mockito.ArgumentCaptor; import java.util.ArrayList; import java.util.Collections; @@ -47,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -244,7 +243,7 @@ public void testCommitRecordWithNullMetadata() { } @Test - public void testPartitionStateMutation() { + public void testSendSyncEvent() { byte[] recordKey = "key".getBytes(); byte[] recordValue = "value".getBytes(); int maxOffsetLag = 50; @@ -277,20 +276,14 @@ public void testPartitionStateMutation() { partitionStates.put(sourceTopicPartition, partitionState); RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); - doAnswer(new Answer() { - @Override - public Object answer(final InvocationOnMock invocation) { - final Callback callback = invocation.getArgument(1); - callback.onCompletion(null, null); - return null; - } - }).when(producer).send(any(), any()); + ArgumentCaptor producerCallback = ArgumentCaptor.forClass(Callback.class); + when(producer.send(any(), producerCallback.capture())).thenAnswer(mockInvocation -> { + producerCallback.getValue().onCompletion(null, null); + return null; + }); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); - assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); - assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); - assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); - assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); - assertFalse(partitionState.shouldSyncOffsets, "partition state reset"); + verify(producer, times(1)).send(any(), any()); recordOffset = 2; @@ -301,21 +294,13 @@ public Object answer(final InvocationOnMock invocation) { recordValue.length, recordKey, recordValue, headers, Optional.empty())); // Do not release outstanding sync semaphore - doAnswer(new Answer() { - @Override - public Object answer(final InvocationOnMock invocation) { - return null; - } - }).when(producer).send(any(), any()); + doReturn(null).when(producer).send(any(), producerCallback.capture()); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); - assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); - assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); - assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); - assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); - assertFalse(partitionState.shouldSyncOffsets, "partition state reset"); verify(producer, times(2)).send(any(), any()); + // Do not send sync event recordOffset = 4; metadataOffset = 104; recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); @@ -324,14 +309,10 @@ public Object answer(final InvocationOnMock invocation) { recordValue.length, recordKey, recordValue, headers, Optional.empty())); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); - assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); - assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); - assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); - assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); - assertTrue(partitionState.shouldSyncOffsets, "no partition state reset"); + verify(producer, times(2)).send(any(), any()); - // Should send sync event and should not update last upstream/downstream offset + // Should send sync event recordOffset = 5; metadataOffset = 150; recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); @@ -339,13 +320,10 @@ public Object answer(final InvocationOnMock invocation) { recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, headers, Optional.empty())); - outstandingOffsetSyncs.release(); + producerCallback.getValue().onCompletion(null, null); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); - assertEquals(4, partitionState.lastSyncUpstreamOffset, "no sync last upstream offset"); - assertEquals(104, partitionState.lastSyncDownstreamOffset, "no sync last downstream offset"); - assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); - assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); - assertFalse(partitionState.shouldSyncOffsets, "partition state reset"); + verify(producer, times(3)).send(any(), any()); }