Skip to content

Commit

Permalink
CR feedback. Still ToDo: handle subscriptionType check, PR is pending
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed May 4, 2021
1 parent 61b2aea commit f7232b4
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 67 deletions.
41 changes: 7 additions & 34 deletions pulsar-io/kafka-connect-adaptor-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,13 @@
<artifactId>pulsar-io-kafka-connect-adaptor-nar</artifactId>
<name>Pulsar IO :: Kafka Connect Adaptor NAR</name>

<profiles>
<profile>
<id>main</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>packageKafkaConnect</id>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
<version>${project.version}</version>
</dependency>
<!--
as an example, package connect-file is included when nar is built as
mvn clean package -DskipTests -P packageKafkaConnect
-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-file</artifactId>
<version>${kafka-client.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>


<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -50,6 +51,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
Expand Down Expand Up @@ -92,7 +94,9 @@ public class KafkaConnectSink implements Sink<GenericObject> {
private SinkConnector connector;
private SinkTask task;

private int batchSize;
private long maxBatchSize;
private final AtomicLong currentBatchSize = new AtomicLong(0L);

private long lingerMs;
private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
Expand All @@ -113,11 +117,15 @@ public void write(Record<GenericObject> sourceRecord) {
}

if (!isRunning) {
log.error("Sink is stopped. Cannot send the record {}", sourceRecord);
log.warn("Sink is stopped. Cannot send the record {}", sourceRecord);
sourceRecord.fail();
return;
}

// while sourceRecord.getMessage() is Optional<>
// it should always be present in Sink which gets instance of PulsarRecord
// let's avoid checks for .isPresent() in teh rest of the code
Preconditions.checkArgument(sourceRecord.getMessage().isPresent());
try {
SinkRecord record = toSinkRecord(sourceRecord);
task.put(Lists.newArrayList(record));
Expand All @@ -127,6 +135,7 @@ public void write(Record<GenericObject> sourceRecord) {
return;
}
pendingFlushQueue.add(sourceRecord);
currentBatchSize.addAndGet(sourceRecord.getMessage().get().size());
flushIfNeeded(false);
}

Expand Down Expand Up @@ -178,7 +187,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
task.initialize(taskContext);
task.start(configs.get(0));

batchSize = kafkaSinkConfig.getBatchSize();
maxBatchSize = kafkaSinkConfig.getBatchSize();
lingerMs = kafkaSinkConfig.getLingerTimeMs();
scheduledExecutor.scheduleAtFixedRate(() ->
this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS);
Expand All @@ -189,7 +198,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
}

private void flushIfNeeded(boolean force) {
if (force || pendingFlushQueue.stream().limit(batchSize).count() >= batchSize) {
if (force || currentBatchSize.get() >= maxBatchSize) {
scheduledExecutor.submit(this::flush);
}
}
Expand All @@ -198,7 +207,7 @@ private void flushIfNeeded(boolean force) {
public void flush() {
if (log.isDebugEnabled()) {
log.debug("flush requested, pending: {}, batchSize: {}",
pendingFlushQueue.size(), batchSize);
currentBatchSize.get(), maxBatchSize);
}

if (pendingFlushQueue.isEmpty()) {
Expand All @@ -222,6 +231,7 @@ private void ackUntil(Record<GenericObject> lastNotFlushed, java.util.function.C
while (!pendingFlushQueue.isEmpty()) {
Record<GenericObject> r = pendingFlushQueue.pollFirst();
cb.accept(r);
currentBatchSize.addAndGet(-1 * r.getMessage().get().size());
if (r == lastNotFlushed) {
break;
}
Expand Down Expand Up @@ -267,7 +277,7 @@ private SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final Object value;
final Schema keySchema;
final Schema valueSchema;
sourceRecord.getMessage().get().getData().length

// sourceRecord is never instanceof KVRecord
// https://github.com/apache/pulsar/pull/10113
if (unwrapKeyValueIfAvailable && sourceRecord.getSchema() != null
Expand All @@ -280,9 +290,14 @@ private SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
keySchema = getKafkaConnectSchema(kvSchema.getKeySchema(), key);
valueSchema = getKafkaConnectSchema(kvSchema.getValueSchema(), value);
} else {
key = sourceRecord.getKey().orElse(null);
if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
key = sourceRecord.getMessage().get().getKeyBytes();
keySchema = Schema.BYTES_SCHEMA;
} else {
key = sourceRecord.getKey().orElse(null);
keySchema = Schema.STRING_SCHEMA;
}
value = sourceRecord.getValue().getNativeObject();
keySchema = Schema.STRING_SCHEMA;
valueSchema = getKafkaConnectSchema(sourceRecord.getSchema(), value);
}

Expand All @@ -292,15 +307,17 @@ private SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
log.error("Message without sequenceId. Key: {} Value: {}", key, value);
throw new IllegalStateException("Message without sequenceId");
}
taskContext.offset(new TopicPartition(topic, partition), offset);

Long timestamp = null;
TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
if (sourceRecord.getEventTime().isPresent()) {
timestamp = sourceRecord.getEventTime().get();
timestampType = TimestampType.CREATE_TIME;
} else if (sourceRecord.getMessage().isPresent()) {
} else {
// publishTime is not a log append time.
// keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
timestamp = sourceRecord.getMessage().get().getPublishTime();
timestampType = TimestampType.LOG_APPEND_TIME;
}
return new SinkRecord(topic,
partition,
Expand All @@ -315,7 +332,7 @@ private SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {

@VisibleForTesting
protected long currentOffset(String topic, int partition) {
return taskContext.currentOffset(topic, partition).get();
return taskContext.currentOffset(topic, partition);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
private static final long serialVersionUID = 1L;

@FieldDoc(
defaultValue = "1",
help = "Number of messages the sink processes before flush.")
private int batchSize = 1;
defaultValue = "16384",
help = "Size of messages in bytes the sink will attempt to batch messages together before flush.")
private int batchSize = 16384;

@FieldDoc(
defaultValue = "2147483647L",
help = "The batch size that Kafka producer will attempt to batch records together.")
help = "Time interval in milliseconds the sink will attempt to batch messages together before flush.")
private long lingerTimeMs = 2147483647L;

@FieldDoc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public void requestTaskReconfiguration() {

@Override
public void raiseError(Exception e) {
log.error("raiseError requested", e);
throw new UnsupportedOperationException("not implemented", e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.pulsar.io.kafka.connect;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
private final Consumer<Collection<TopicPartition>> onPartitionChange;
private final AtomicBoolean runRepartition = new AtomicBoolean(false);

private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets = new ConcurrentHashMap<>();
private final ConcurrentHashMap<TopicPartition, Long> currentOffsets = new ConcurrentHashMap<>();

public PulsarKafkaSinkTaskContext(Map<String, String> config,
SinkContext ctx,
Expand All @@ -80,12 +81,15 @@ public Map<String, String> configs() {
return config;
}

public AtomicLong currentOffset(String topic, int partition) {
// for tests
@VisibleForTesting
protected Long currentOffset(String topic, int partition) {
return currentOffset(new TopicPartition(topic, partition));
}

public AtomicLong currentOffset(TopicPartition topicPartition) {
AtomicLong offset = currentOffsets.computeIfAbsent(topicPartition, kv -> {
// for tests
private Long currentOffset(TopicPartition topicPartition) {
Long offset = currentOffsets.computeIfAbsent(topicPartition, kv -> {
List<ByteBuffer> req = Lists.newLinkedList();
ByteBuffer key = topicPartitionAsKey(topicPartition);
req.add(key);
Expand All @@ -111,9 +115,8 @@ public AtomicLong currentOffset(TopicPartition topicPartition) {
}
});

runRepartition.set(true);
try {
return new AtomicLong(offsetFuture.get());
return offsetFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("error getting initial state of {}", topicPartition, e);
Expand All @@ -122,18 +125,15 @@ public AtomicLong currentOffset(TopicPartition topicPartition) {
log.error("error getting initial state of {}", topicPartition, e);
throw new RuntimeException("error getting initial state of " + topicPartition, e); }
});
if (runRepartition.compareAndSet(true, false)) {
onPartitionChange.accept(currentOffsets.keySet());
}
return offset;
}

public Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
Map<TopicPartition, OffsetAndMetadata> snapshot = Maps.newHashMapWithExpectedSize(currentOffsets.size());
currentOffsets.forEach((topicPartition, offset) -> {
if (offset.get() > 0) {
if (offset > 0) {
snapshot.put(topicPartition,
new OffsetAndMetadata(offset.get(), Optional.empty(), null));
new OffsetAndMetadata(offset, Optional.empty(), null));
}
});
return snapshot;
Expand All @@ -159,7 +159,7 @@ public void offset(Map<TopicPartition, Long> map) {
if (!currentOffsets.containsKey(key)) {
runRepartition.set(true);
}
currentOffsets.put(key, new AtomicLong(value));
currentOffsets.put(key, value);
});

if (runRepartition.compareAndSet(true, false)) {
Expand All @@ -169,9 +169,14 @@ public void offset(Map<TopicPartition, Long> map) {

@Override
public void offset(TopicPartition topicPartition, long l) {
Map<TopicPartition, Long> map = Maps.newHashMap();
map.put(topicPartition, l);
this.offset(map);
if (!currentOffsets.containsKey(topicPartition)) {
runRepartition.set(true);
}
currentOffsets.put(topicPartition, l);

if (runRepartition.compareAndSet(true, false)) {
onPartitionChange.accept(currentOffsets.keySet());
}
}

@Override
Expand Down Expand Up @@ -203,7 +208,7 @@ public void flushOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws
Map<ByteBuffer, ByteBuffer> offsetMap = Maps.newHashMapWithExpectedSize(offsets.size());

offsets.forEach((tp, om) -> fillOffsetMap(offsetMap, tp, om.offset()));
CompletableFuture<Void> result = new CompletableFuture();
CompletableFuture<Void> result = new CompletableFuture<>();
offsetStore.set(offsetMap, (ex, ignore) -> {
if (ex == null) {
result.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void offsetTest() throws Exception {
KafkaConnectSink sink = new KafkaConnectSink();
sink.open(props, null);

// offset is -1 before any data is written
// offset is -1 before any data is written (aka no offset)
assertEquals(-1L, sink.currentOffset(topicName, partition));

sink.write(record);
Expand Down

0 comments on commit f7232b4

Please sign in to comment.