diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index eb98eef12ca46f..2656e9b7c986f5 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -256,7 +256,8 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p @SuppressWarnings("rawtypes") private SinkRecord toSinkRecord(Record sourceRecord) { - final int partition = 0; + final int partition = sourceRecord.getPartitionIndex().orElse(0); + final String topic = sourceRecord.getTopicName().orElse(topicName); final Object key; final Object value; final Schema keySchema; @@ -297,11 +298,11 @@ private SinkRecord toSinkRecord(Record sourceRecord) { long offset = sourceRecord.getRecordSequence() .orElse(-1L); if (offset < 0) { - offset = taskContext.currentOffset(topicName, partition) + offset = taskContext.currentOffset(topic, partition) .incrementAndGet(); } else { final long curr = offset; - taskContext.currentOffset(topicName, partition) + taskContext.currentOffset(topic, partition) .updateAndGet(curMax -> Math.max(curr, curMax)); } @@ -314,7 +315,7 @@ private SinkRecord toSinkRecord(Record sourceRecord) { timestamp = sourceRecord.getMessage().get().getPublishTime(); timestampType = TimestampType.LOG_APPEND_TIME; } - SinkRecord sinkRecord = new SinkRecord(topicName, + SinkRecord sinkRecord = new SinkRecord(topic, partition, keySchema, key, @@ -327,8 +328,8 @@ private SinkRecord toSinkRecord(Record sourceRecord) { } @VisibleForTesting - protected long currentOffset() { - return taskContext.currentOffset(topicName, 0).get(); + protected long currentOffset(String topic, int partition) { + return taskContext.currentOffset(topic, partition).get(); } } diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 72a121959a03d7..1d5e3f085970fb 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -46,8 +46,6 @@ import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -245,14 +243,17 @@ public void KeyValueSchemaTest() throws Exception { @Test public void offsetTest() throws Exception { - AtomicLong entryId = new AtomicLong(0L); + final AtomicLong entryId = new AtomicLong(0L); Message msg = mock(MessageImpl.class); when(msg.getValue()).thenReturn("value"); when(msg.getMessageId()).then(x -> new MessageIdImpl(0, entryId.getAndIncrement(), 0)); + final String topicName = "testTopic"; + final int partition = 1; final AtomicInteger status = new AtomicInteger(0); Record record = PulsarRecord.builder() - .topicName("fake-topic") + .topicName(topicName) + .partition(partition) .message(msg) .ackFunction(() -> status.incrementAndGet()) .failFunction(() -> status.decrementAndGet()) @@ -263,18 +264,18 @@ public void offsetTest() throws Exception { sink.open(props, null); // offset is -1 before any data is written - assertEquals(-1L, sink.currentOffset()); + assertEquals(-1L, sink.currentOffset(topicName, partition)); sink.write(record); sink.flush(); // offset is 0 for the first written record - assertEquals(0L, sink.currentOffset()); + assertEquals(0, sink.currentOffset(topicName, partition)); sink.write(record); sink.flush(); // offset is 1 for the second written record - assertEquals(1L, sink.currentOffset()); + assertEquals(1, sink.currentOffset(topicName, partition)); sink.close(); @@ -282,13 +283,13 @@ public void offsetTest() throws Exception { sink = new KafkaConnectSink(); sink.open(props, null); - // offset is 1 after reopening teh producer - assertEquals(1L, sink.currentOffset()); + // offset is 1 after reopening the producer + assertEquals(1, sink.currentOffset(topicName, partition)); sink.write(record); sink.flush(); // offset is 2 for the next written record - assertEquals(2L, sink.currentOffset()); + assertEquals(2, sink.currentOffset(topicName, partition)); sink.close(); }