Skip to content

Commit

Permalink
[Issue apache#32] Consumer group cluster (apache#52)
Browse files Browse the repository at this point in the history
fix Issue apache#32  
Make Consumer Group work distributed.
Add unit test for multi-brokers.
Pass tests  for 2 brokers with `bin/kop kafka-broker`
  • Loading branch information
jiazhai authored and sijie committed Dec 25, 2019
1 parent 38a95aa commit a96d0a4
Show file tree
Hide file tree
Showing 24 changed files with 2,424 additions and 1,595 deletions.
21 changes: 16 additions & 5 deletions conf/kop.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,24 @@

### --- Kafka broker settings --- ###

kafkaServicePort=9092

enableGroupCoordinator=true

messagingProtocols=kafka

listeners=PLAINTEXT://127.0.0.1:9092

offsetsTopicNumPartitions=8

advertisedAddress=127.0.0.1

### --- Changed for KoP --- ###

# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=false

# Name of the cluster to which this broker belongs to
clusterName=kafka-cluster

### --- General broker settings --- ###

# Zookeeper quorum connection string
Expand Down Expand Up @@ -54,9 +68,6 @@ numHttpServerThreads=
# Flag to control features that are meant to be used when running in standalone mode
isRunningStandalone=

# Name of the cluster to which this broker belongs to
clusterName=kafka-cluster

# Enable cluster's failure-domain which can distribute brokers into logical region
failureDomainsEnabled=false

Expand Down
1 change: 1 addition & 0 deletions conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ messagingProtocols=kafka

listeners=PLAINTEXT://127.0.0.1:9092

offsetsTopicNumPartitions=8

### --- Changed for KoP --- ###

Expand Down
7 changes: 0 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
<log4j2.version>2.10.0</log4j2.version>
<lombok.version>1.18.4</lombok.version>
<mockito.version>2.22.0</mockito.version>
<netty.version>4.1.32.Final</netty.version>
<pulsar.version>2.5.0-9728d5429</pulsar.version>
<slf4j.version>1.7.25</slf4j.version>
<spotbugs-annotations.version>3.1.8</spotbugs-annotations.version>
Expand Down Expand Up @@ -123,12 +122,6 @@
<version>${pulsar.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/io/streamnative/kop/KafkaCommandDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;


/**
* A decoder that decodes kafka requests and responses.
*/
Expand Down Expand Up @@ -65,6 +66,30 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
log.info("Closed connection from {}", remoteAddress);
close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("[{}] Got exception: {}", remoteAddress, cause.getMessage(), cause);
close();
}

protected void close() {
ctx.close();
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Channel writability has changed to: {}", ctx.channel().isWritable());
}
}

// turn input ByteBuf msg, which send from client side, into KafkaHeaderAndRequest
protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg) {
return byteBufToRequest(msg, null);
Expand Down
168 changes: 148 additions & 20 deletions src/main/java/io/streamnative/kop/KafkaProtocolHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
package io.streamnative.kop;

import static com.google.common.base.Preconditions.checkState;
import static io.streamnative.kop.utils.TopicNameUtils.getKafkaTopicNameFromPulsarTopicname;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -26,8 +29,10 @@
import io.streamnative.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -37,19 +42,25 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.ReaderBuilderImpl2;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;

/**
* Kafka Protocol Handler load and run by Pulsar Service.
Expand All @@ -64,6 +75,88 @@ public class KafkaProtocolHandler implements ProtocolHandler {
public static final String TLS_HANDLER = "tls";
public static final String LISTENER_PATTEN = "^(PLAINTEXT?|SSL)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";

/**
* Listener for the changing of topic that stores offsets of consumer group.
*/
public static class OffsetTopicListener implements NamespaceBundleOwnershipListener {

final BrokerService service;
final NamespaceName kafkaMetaNs;
final GroupCoordinator groupCoordinator;
public OffsetTopicListener(BrokerService service,
KafkaServiceConfiguration kafkaConfig,
GroupCoordinator groupCoordinator) {
this.service = service;
this.kafkaMetaNs = NamespaceName
.get(kafkaConfig.getKafkaMetadataTenant(), kafkaConfig.getKafkaMetadataNamespace());
this.groupCoordinator = groupCoordinator;
}

@Override
public void onLoad(NamespaceBundle bundle) {
// 1. get new partitions owned by this pulsar service.
// 2. load partitions by GroupCoordinator.handleGroupImmigration.
service.pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
.whenComplete((topics, ex) -> {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);
// already filtered namespace, check the local name without partition
if (Topic.GROUP_METADATA_TOPIC_NAME.equals(getKafkaTopicNameFromPulsarTopicname(name))) {
checkState(name.isPartitioned(),
"OffsetTopic should be partitioned in onLoad, but get " + name);
if (log.isDebugEnabled()) {
log.debug("New offset partition load: {}, broker: {}",
name, service.pulsar().getBrokerServiceUrl());
}
groupCoordinator.handleGroupImmigration(name.getPartitionIndex());
}
}
} else {
log.error("Failed to get owned topic list for "
+ "OffsetTopicListener when triggering on-loading bundle {}.",
bundle, ex);
}
});
}

@Override
public void unLoad(NamespaceBundle bundle) {
// 1. get partitions owned by this pulsar service.
// 2. remove partitions by groupCoordinator.handleGroupEmigration.
service.pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
.whenComplete((topics, ex) -> {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);

// already filtered namespace, check the local name without partition
if (Topic.GROUP_METADATA_TOPIC_NAME.equals(getKafkaTopicNameFromPulsarTopicname(name))) {
checkState(name.isPartitioned(),
"OffsetTopic should be partitioned in unLoad, but get " + name);
if (log.isDebugEnabled()) {
log.debug("Offset partition unload: {}, broker: {}",
name, service.pulsar().getBrokerServiceUrl());
}
groupCoordinator.handleGroupEmigration(name.getPartitionIndex());
}
}
} else {
log.error("Failed to get owned topic list for "
+ "OffsetTopicListener when triggering un-loading bundle {}.",
bundle, ex);
}
});
}

// verify that this bundle is served by this broker,
// and namespace is related to kafka metadata namespace
@Override
public boolean test(NamespaceBundle namespaceBundle) {
return namespaceBundle.getNamespaceObject().equals(kafkaMetaNs);
}

}
/**
* Kafka Listener Type.
*/
Expand Down Expand Up @@ -128,6 +221,11 @@ public void start(BrokerService service) {
try {
initGroupCoordinator(brokerService);
startGroupCoordinator();
// and listener for Offset topics load/unload
brokerService.pulsar()
.getNamespaceService()
.addNamespaceBundleOwnershipListener(
new OffsetTopicListener(brokerService, kafkaConfig, groupCoordinator));
} catch (Exception e) {
log.error("initGroupCoordinator failed with", e);
}
Expand Down Expand Up @@ -199,27 +297,30 @@ public void initGroupCoordinator(BrokerService service) throws Exception {
);

OffsetConfig offsetConfig = OffsetConfig.builder()
.offsetsTopicName(kafkaConfig.getKafkaMetadataTenant() + "/"
+ kafkaConfig.getKafkaMetadataNamespace()
+ "/" + Topic.GROUP_METADATA_TOPIC_NAME)
.offsetsTopicNumPartitions(kafkaConfig.getOffsetsTopicNumPartitions())
.offsetsTopicCompressionType(CompressionType.valueOf(kafkaConfig.getOffsetsTopicCompressionCodec()))
.maxMetadataSize(kafkaConfig.getOffsetMetadataMaxSize())
.offsetsRetentionCheckIntervalMs(kafkaConfig.getOffsetsRetentionCheckIntervalMs())
.offsetsRetentionMs(TimeUnit.MINUTES.toMillis(kafkaConfig.getOffsetsRetentionMinutes()))
.build();

createKafkaMetadataNamespaceIfNeeded(service);
String offsetsTopic = createKafkaOffsetsTopic(service);

TopicName offsetsTopicName = TopicName.get(offsetsTopic);
String offsetsTopicPtn0 = offsetsTopicName.getPartition(0).toString();

Producer<ByteBuffer> groupCoordinatorTopicProducer = service.pulsar().getClient().newProducer(Schema.BYTEBUFFER)
.topic(offsetsTopicPtn0)
// TODO: make it configurable
.maxPendingMessages(100000)
.create();
Reader<ByteBuffer> groupCoordinatorTopicReader = service.pulsar().getClient().newReader(Schema.BYTEBUFFER)
.topic(offsetsTopicPtn0)
.startMessageId(MessageId.earliest)
.create();
// topicName in pulsar format: tenant/ns/topic
createKafkaOffsetsTopic(service);

ProducerBuilder<ByteBuffer> groupCoordinatorTopicProducer = service.pulsar().getClient()
.newProducer(Schema.BYTEBUFFER)
.maxPendingMessages(100000);

// TODO: replace this back to service.pulsar().getClient().newReader after merge pulsar PR:
// https://github.com/apache/pulsar/pull/5923
ReaderBuilder<ByteBuffer> groupCoordinatorTopicReader =
new ReaderBuilderImpl2<>((PulsarClientImpl) (service.pulsar().getClient()), Schema.BYTEBUFFER);
groupCoordinatorTopicReader.startMessageId(MessageId.earliest);

this.groupCoordinator = GroupCoordinator.of(
groupCoordinatorTopicProducer,
groupCoordinatorTopicReader,
Expand All @@ -230,10 +331,10 @@ public void initGroupCoordinator(BrokerService service) throws Exception {
.build(),
Time.SYSTEM
);

loadOffsetTopics(groupCoordinator);
}

// TODO: make group coordinator running in a distributed mode
// https://github.com/streamnative/kop/issues/32
public void startGroupCoordinator() throws Exception {
if (this.groupCoordinator != null) {
this.groupCoordinator.startup(false);
Expand Down Expand Up @@ -289,14 +390,41 @@ private String createKafkaOffsetsTopic(BrokerService service) throws PulsarServe
offsetsTopic);
service.pulsar().getAdminClient().topics().createPartitionedTopic(
offsetsTopic,
KafkaServiceConfiguration.DefaultOffsetsTopicNumPartitions
kafkaConfig.getOffsetsTopicNumPartitions()
);
log.info("Successfully created group metadata topic {}.", offsetsTopic);
for (int i = 0; i < kafkaConfig.getOffsetsTopicNumPartitions(); i++) {
service.pulsar().getAdminClient().topics()
.createNonPartitionedTopic(offsetsTopic + PARTITIONED_TOPIC_SUFFIX + i);
}
log.info("Successfully created group metadata topic {} with {} partitions.",
offsetsTopic, kafkaConfig.getOffsetsTopicNumPartitions());
}

return offsetsTopic;
}

private void loadOffsetTopics(GroupCoordinator groupCoordinator) throws Exception {
String offsetsTopic = kafkaConfig.getKafkaMetadataTenant() + "/" + kafkaConfig.getKafkaMetadataNamespace()
+ "/" + Topic.GROUP_METADATA_TOPIC_NAME;
int numPartitions = kafkaConfig.getOffsetsTopicNumPartitions();
List<CompletableFuture<Void>> lists = Lists.newArrayListWithExpectedSize(numPartitions);
for (int i = 0; i < numPartitions; i++) {
String partition = offsetsTopic + PARTITIONED_TOPIC_SUFFIX + i;
String broker = brokerService.pulsar().getAdminClient().lookups()
.lookupTopic(partition);

if (log.isDebugEnabled()) {
log.debug("found broker {} for offset topic partition {}. current broker: {}",
broker, partition, brokerService.pulsar().getBrokerServiceUrl());
}

if (broker.equalsIgnoreCase(brokerService.pulsar().getBrokerServiceUrl())) {
lists.add(groupCoordinator.handleGroupImmigration(i));
}
}
FutureUtil.waitForAll(lists).get();
}

public static int getListenerPort(String listener) {
checkState(listener.matches(LISTENER_PATTEN), "listener not match patten");

Expand Down
Loading

0 comments on commit a96d0a4

Please sign in to comment.