Skip to content

Commit

Permalink
Issue 95: Make Kafka OffsetCommit and Pulsar backlog align (apache#105)
Browse files Browse the repository at this point in the history
Fix apache#95.
Make Kafka OffsetCommit and Pulsar backlog align.

Add backlog tracker in GroupCoordinator.
After fetch message, ack the fetched position.

* pass tests
* add consumer when sync group
* change log level bacck
* ignore testSessionTimeout, and opened new issue to track this
* bin/kop was accidently deleted, add it back
* remove LOOKUP_CACHE when protocol close
  • Loading branch information
jiazhai authored Feb 17, 2020
1 parent 02e5465 commit 4c8651f
Show file tree
Hide file tree
Showing 11 changed files with 478 additions and 43 deletions.
4 changes: 4 additions & 0 deletions kafka-impl/conf/kop.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ saslAllowedMechanisms=
# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=false

allowAutoTopicCreation=true

allowAutoTopicCreationType=partitioned

# Name of the cluster to which this broker belongs to
clusterName=kafka-cluster

Expand Down
4 changes: 4 additions & 0 deletions kafka-impl/conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ saslAllowedMechanisms=
# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=false

allowAutoTopicCreation=true

allowAutoTopicCreationType=partitioned

### --- General broker settings --- ###

# Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -47,12 +46,7 @@
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.ReaderBuilderImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -290,6 +284,7 @@ public void close() {
if (groupCoordinator != null) {
groupCoordinator.shutdown();
}
KafkaTopicManager.LOOKUP_CACHE.clear();
}

public void initGroupCoordinator(BrokerService service) throws Exception {
Expand All @@ -314,19 +309,8 @@ public void initGroupCoordinator(BrokerService service) throws Exception {
// topicName in pulsar format: tenant/ns/topic
createKafkaOffsetsTopic(service);

ProducerBuilder<ByteBuffer> groupCoordinatorTopicProducer = service.pulsar().getClient()
.newProducer(Schema.BYTEBUFFER)
.maxPendingMessages(100000);

// TODO: replace this back to service.pulsar().getClient().newReader after merge pulsar PR:
// https://github.com/apache/pulsar/pull/5923
ReaderBuilder<ByteBuffer> groupCoordinatorTopicReader =
new ReaderBuilderImpl<>((PulsarClientImpl) (service.pulsar().getClient()), Schema.BYTEBUFFER);
groupCoordinatorTopicReader.startMessageId(MessageId.earliest);

this.groupCoordinator = GroupCoordinator.of(
groupCoordinatorTopicProducer,
groupCoordinatorTopicReader,
(PulsarClientImpl) (service.pulsar().getClient()),
groupConfig,
offsetConfig,
SystemTimer.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ private Producer registerInPersistentTopic(PersistentTopic persistentTopic) thro
// call pulsarclient.lookup.getbroker to get and own a topic
public CompletableFuture<InetSocketAddress> getTopicBroker(String topicName) {
return LOOKUP_CACHE.computeIfAbsent(topicName, t -> {
if (log.isDebugEnabled()) {
log.debug("topic {} not in Lookup_cache, call lookupBroker",
topicName);
}
CompletableFuture<InetSocketAddress> returnFuture = new CompletableFuture<>();
Backoff backoff = new Backoff(
100, TimeUnit.MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -314,10 +315,16 @@ public void readEntriesComplete(List<Entry> list, Object o) {
if (!list.isEmpty()) {
entry = list.get(0);
long offset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId());
PositionImpl currentPosition = PositionImpl
.get(entry.getLedgerId(), entry.getEntryId());

// commit the offset, so backlog not affect by this cursor.
commitOffset((NonDurableCursorImpl) cursor, currentPosition);

// get next offset
PositionImpl nextPosition = ((NonDurableCursorImpl ) cursor)
.getNextAvailablePosition(PositionImpl
.get(entry.getLedgerId(), entry.getEntryId()));
PositionImpl nextPosition = ((NonDurableCursorImpl) cursor)
.getNextAvailablePosition(currentPosition);

long nextOffset = MessageIdUtils
.getOffset(nextPosition.getLedgerId(), nextPosition.getEntryId());

Expand Down Expand Up @@ -366,4 +373,23 @@ public void readEntriesFailed(ManagedLedgerException e, Object o) {
return readFutures;
}

// commit the offset, so backlog not affect by this cursor.
private static void commitOffset(NonDurableCursorImpl cursor, PositionImpl currentPosition) {
cursor.asyncMarkDelete(currentPosition, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("Mark delete success for position: {}", currentPosition);
}
}

// this is OK, since this is kind of cumulative ack, following commit will come.
@Override
public void markDeleteFailed(ManagedLedgerException e, Object ctx) {
log.warn("Mark delete success for position: {} with error:",
currentPosition, e);
}
}, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -61,10 +62,14 @@
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.ReaderBuilderImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.FutureUtil;

Expand All @@ -75,8 +80,7 @@
public class GroupCoordinator {

public static GroupCoordinator of(
ProducerBuilder<ByteBuffer> producer,
ReaderBuilder<ByteBuffer> reader,
PulsarClientImpl pulsarClient,
GroupConfig groupConfig,
OffsetConfig offsetConfig,
Timer timer,
Expand All @@ -86,6 +90,13 @@ public static GroupCoordinator of(
.name("group-coordinator-executor")
.build();

// __offset partitions producers and readers builder.
ProducerBuilder<ByteBuffer> producer = pulsarClient
.newProducer(Schema.BYTEBUFFER)
.maxPendingMessages(100000);
ReaderBuilder<ByteBuffer> reader = new ReaderBuilderImpl<>(pulsarClient, Schema.BYTEBUFFER);

reader.startMessageId(MessageId.earliest);
GroupMetadataManager metadataManager = new GroupMetadataManager(
offsetConfig,
producer,
Expand All @@ -105,12 +116,14 @@ public static GroupCoordinator of(
.timeoutTimer(timer)
.build();

OffsetAcker offsetAcker = new OffsetAcker(pulsarClient);
return new GroupCoordinator(
groupConfig,
metadataManager,
heartbeatPurgatory,
joinPurgatory,
time
time,
offsetAcker
);
}

Expand All @@ -133,6 +146,9 @@ public static GroupCoordinator of(
Collections.emptyList()
);

// for topic backlog tracking.
@Getter
private final OffsetAcker offsetAcker;
private final AtomicBoolean isActive = new AtomicBoolean(false);
private final GroupConfig groupConfig;
private final GroupMetadataManager groupManager;
Expand All @@ -145,12 +161,14 @@ public GroupCoordinator(
GroupMetadataManager groupManager,
DelayedOperationPurgatory<DelayedHeartbeat> heartbeatPurgatory,
DelayedOperationPurgatory<DelayedJoin> joinPurgatory,
Time time) {
Time time,
OffsetAcker offsetAcker) {
this.groupConfig = groupConfig;
this.groupManager = groupManager;
this.heartbeatPurgatory = heartbeatPurgatory;
this.joinPurgatory = joinPurgatory;
this.time = time;
this.offsetAcker = offsetAcker;
}

/**
Expand All @@ -173,6 +191,7 @@ public void shutdown() {
groupManager.shutdown();
heartbeatPurgatory.shutdown();
joinPurgatory.shutdown();
offsetAcker.close();
log.info("Shutdown group coordinator completely.");
}

Expand Down Expand Up @@ -433,6 +452,12 @@ public CompletableFuture<KeyValue<Errors, byte[]>> handleSyncGroup(
(assignment, errors) -> resultFuture.complete(
new KeyValue<>(errors, assignment))
);

resultFuture.whenCompleteAsync((kv, throwable) -> {
if (throwable == null && kv.getKey() == Errors.NONE) {
offsetAcker.addOffsetsTracker(groupId, kv.getValue());
}
});
return resultFuture;
}

Expand Down Expand Up @@ -642,6 +667,7 @@ public Map<String, Errors> handleDeleteGroups(Set<String> groupIds) {
);
}

offsetAcker.close(groupIds);
return groupErrors;
}

Expand Down Expand Up @@ -740,7 +766,7 @@ public CompletableFuture<Map<TopicPartition, Errors>> handleCommitOffsets(
int generationId,
Map<TopicPartition, OffsetAndMetadata> offsetMetadata
) {
return validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT)
CompletableFuture<Map<TopicPartition, Errors>> result = validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT)
.map(error ->
CompletableFuture.completedFuture(
CoreUtils.mapValue(
Expand Down Expand Up @@ -771,6 +797,14 @@ public CompletableFuture<Map<TopicPartition, Errors>> handleCommitOffsets(
}
});
});

result.whenCompleteAsync((ignore, e) ->{
if (e == null){
offsetAcker.ackOffsets(groupId, offsetMetadata);
}
});

return result;
}

public Future<?> scheduleHandleTxnCompletion(
Expand Down
Loading

0 comments on commit 4c8651f

Please sign in to comment.