Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ private void fill() {
_lostMessageCount.incrBy(omitted.size());
}

_pending.headMap(offset).clear();

LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
}

Expand Down Expand Up @@ -356,7 +358,7 @@ public void commit() {
}
}

private String committedPath() {
protected String committedPath() {
return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,145 @@
*/
package org.apache.storm.kafka;

import kafka.admin.AdminUtils;
import kafka.api.PartitionMetadata;
import kafka.api.TopicMetadata;
import kafka.common.ErrorMapping;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import scala.collection.JavaConversions;

import java.io.File;
import java.io.IOException;
import java.util.Properties;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import java.util.concurrent.TimeUnit;

/**
* Date: 11/01/2014
* Time: 13:15
*/
public class KafkaTestBroker {

// Bind services to the loopback address for environments where _localhost_ may resolve to an unreachable host
private static final String LOCALHOST = "127.0.0.1";

private int port;
private KafkaServerStartable kafka;
private TestingServer server;
private CuratorFramework zookeeper;
private File logDir;

public KafkaTestBroker() {
this(new Properties());
}

public KafkaTestBroker(Properties brokerProps) {
try {
server = new TestingServer();
InstanceSpec spec = new InstanceSpec(
null,
-1,
-1,
-1,
true,
-1,
-1,
-1,
null,
LOCALHOST
);
server = new TestingServer(spec, true);
String zookeeperConnectionString = server.getConnectString();
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
zookeeper.start();
port = InstanceSpec.getRandomPort();
logDir = new File(System.getProperty("java.io.tmpdir"), "kafka/logs/kafka-test-" + port);
KafkaConfig config = buildKafkaConfig(zookeeperConnectionString);
KafkaConfig config = buildKafkaConfig(brokerProps, zookeeperConnectionString);
kafka = new KafkaServerStartable(config);
kafka.startup();
} catch (Exception ex) {
throw new RuntimeException("Could not start test broker", ex);
}
}

private kafka.server.KafkaConfig buildKafkaConfig(String zookeeperConnectionString) {
Properties p = new Properties();
public void createTopic(String topicName, int numPartitions, Properties properties) {
ZkClient zkClient = new ZkClient(getZookeeperConnectionString());
zkClient.setZkSerializer(ZKStringSerializer$.MODULE$);

try {
AdminUtils.createTopic(zkClient, topicName, numPartitions, 1, properties);

ensureTopicCreated(zkClient, topicName);
} finally {
zkClient.close();
}
}


/**
* Wait for up to 30 seconds for the topic to be created and leader assignments for all partitions
*/
private void ensureTopicCreated(ZkClient zkClient, String topicName) {
long maxWaitTime = TimeUnit.SECONDS.toNanos(30);
long waitTime = 0;
boolean partitionsHaveLeaders = false;

while (!partitionsHaveLeaders && waitTime < maxWaitTime) {
partitionsHaveLeaders = true;
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
for (PartitionMetadata partitionMetadata : JavaConversions.seqAsJavaList(topicMetadata.partitionsMetadata())) {
if (partitionMetadata.leader().isEmpty() || partitionMetadata.errorCode() != ErrorMapping.NoError()) {
partitionsHaveLeaders = false;
}
}

if (!partitionsHaveLeaders) {
long start = System.nanoTime();
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for topic to be available");
}

waitTime += (System.nanoTime() - start);
}
}

if (!partitionsHaveLeaders) {
throw new RuntimeException("Could not create topic: " + topicName);
}
}

private kafka.server.KafkaConfig buildKafkaConfig(Properties brokerProps, String zookeeperConnectionString) {
Properties p = new Properties(brokerProps);
p.setProperty("zookeeper.connect", zookeeperConnectionString);
p.setProperty("broker.id", "0");
p.setProperty("port", "" + port);
p.setProperty("host.name", LOCALHOST);
p.setProperty("log.dirs", logDir.getAbsolutePath());
return new KafkaConfig(p);
}

public String getBrokerConnectionString() {
return "localhost:" + port;
return LOCALHOST + ":" + port;
}

public String getZookeeperConnectionString() {
return server.getConnectString();
}

public int getZookeeperPort() {
return server.getPort();
}

public int getPort() {
Expand All @@ -95,3 +179,4 @@ public void shutdown() {
FileUtils.deleteQuietly(logDir);
}
}

Loading