Skip to content

Commit

Permalink
Get topic/partiton from the record
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed Apr 1, 2021
1 parent fdd54fc commit 0b44bbd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p

@SuppressWarnings("rawtypes")
private SinkRecord toSinkRecord(Record<Object> sourceRecord) {
final int partition = 0;
final int partition = sourceRecord.getPartitionIndex().orElse(0);
final String topic = sourceRecord.getTopicName().orElse(topicName);
final Object key;
final Object value;
final Schema keySchema;
Expand Down Expand Up @@ -297,11 +298,11 @@ private SinkRecord toSinkRecord(Record<Object> sourceRecord) {
long offset = sourceRecord.getRecordSequence()
.orElse(-1L);
if (offset < 0) {
offset = taskContext.currentOffset(topicName, partition)
offset = taskContext.currentOffset(topic, partition)
.incrementAndGet();
} else {
final long curr = offset;
taskContext.currentOffset(topicName, partition)
taskContext.currentOffset(topic, partition)
.updateAndGet(curMax -> Math.max(curr, curMax));
}

Expand All @@ -314,7 +315,7 @@ private SinkRecord toSinkRecord(Record<Object> sourceRecord) {
timestamp = sourceRecord.getMessage().get().getPublishTime();
timestampType = TimestampType.LOG_APPEND_TIME;
}
SinkRecord sinkRecord = new SinkRecord(topicName,
SinkRecord sinkRecord = new SinkRecord(topic,
partition,
keySchema,
key,
Expand All @@ -327,8 +328,8 @@ private SinkRecord toSinkRecord(Record<Object> sourceRecord) {
}

@VisibleForTesting
protected long currentOffset() {
return taskContext.currentOffset(topicName, 0).get();
protected long currentOffset(String topic, int partition) {
return taskContext.currentOffset(topic, partition).get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -245,14 +243,17 @@ public void KeyValueSchemaTest() throws Exception {

@Test
public void offsetTest() throws Exception {
AtomicLong entryId = new AtomicLong(0L);
final AtomicLong entryId = new AtomicLong(0L);
Message msg = mock(MessageImpl.class);
when(msg.getValue()).thenReturn("value");
when(msg.getMessageId()).then(x -> new MessageIdImpl(0, entryId.getAndIncrement(), 0));

final String topicName = "testTopic";
final int partition = 1;
final AtomicInteger status = new AtomicInteger(0);
Record<Object> record = PulsarRecord.<String>builder()
.topicName("fake-topic")
.topicName(topicName)
.partition(partition)
.message(msg)
.ackFunction(() -> status.incrementAndGet())
.failFunction(() -> status.decrementAndGet())
Expand All @@ -263,32 +264,32 @@ public void offsetTest() throws Exception {
sink.open(props, null);

// offset is -1 before any data is written
assertEquals(-1L, sink.currentOffset());
assertEquals(-1L, sink.currentOffset(topicName, partition));

sink.write(record);
sink.flush();

// offset is 0 for the first written record
assertEquals(0L, sink.currentOffset());
assertEquals(0, sink.currentOffset(topicName, partition));

sink.write(record);
sink.flush();
// offset is 1 for the second written record
assertEquals(1L, sink.currentOffset());
assertEquals(1, sink.currentOffset(topicName, partition));

sink.close();

// close the producer, open again
sink = new KafkaConnectSink();
sink.open(props, null);

// offset is 1 after reopening teh producer
assertEquals(1L, sink.currentOffset());
// offset is 1 after reopening the producer
assertEquals(1, sink.currentOffset(topicName, partition));

sink.write(record);
sink.flush();
// offset is 2 for the next written record
assertEquals(2L, sink.currentOffset());
assertEquals(2, sink.currentOffset(topicName, partition));

sink.close();
}
Expand Down

0 comments on commit 0b44bbd

Please sign in to comment.