From 5b741a0ab5f14ff60c99c72690d16c5b55a37574 Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Thu, 29 Jun 2017 16:53:46 -0400 Subject: [PATCH] STORM-2608 Remove any pending offsets that are no longer valid --- .../apache/storm/kafka/PartitionManager.java | 4 +- .../apache/storm/kafka/KafkaTestBroker.java | 101 +++++++- .../storm/kafka/PartitionManagerTest.java | 243 ++++++++++++++++++ integration-test/pom.xml | 2 +- 4 files changed, 340 insertions(+), 10 deletions(-) create mode 100644 external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index 928a5630e1d..5420887fd27 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -253,6 +253,8 @@ private void fill() { _lostMessageCount.incrBy(omitted.size()); } + _pending.headMap(offset).clear(); + LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted); } @@ -356,7 +358,7 @@ public void commit() { } } - private String committedPath() { + protected String committedPath() { return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); } diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java index fed615554fa..0952764206b 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java @@ -17,6 +17,14 @@ */ package org.apache.storm.kafka; +import kafka.admin.AdminUtils; +import kafka.api.PartitionMetadata; +import kafka.api.TopicMetadata; +import kafka.common.ErrorMapping; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.ZKStringSerializer$; +import org.I0Itec.zkclient.ZkClient; import org.apache.commons.io.FileUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -24,13 +32,12 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingServer; +import scala.collection.JavaConversions; import java.io.File; import java.io.IOException; import java.util.Properties; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; +import java.util.concurrent.TimeUnit; /** * Date: 11/01/2014 @@ -38,6 +45,9 @@ */ public class KafkaTestBroker { + // Bind services to the loopback address for environments where _localhost_ may resolve to an unreachable host + private static final String LOCALHOST = "127.0.0.1"; + private int port; private KafkaServerStartable kafka; private TestingServer server; @@ -45,15 +55,31 @@ public class KafkaTestBroker { private File logDir; public KafkaTestBroker() { + this(new Properties()); + } + + public KafkaTestBroker(Properties brokerProps) { try { - server = new TestingServer(); + InstanceSpec spec = new InstanceSpec( + null, + -1, + -1, + -1, + true, + -1, + -1, + -1, + null, + LOCALHOST + ); + server = new TestingServer(spec, true); String zookeeperConnectionString = server.getConnectString(); ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); zookeeper.start(); port = InstanceSpec.getRandomPort(); logDir = new File(System.getProperty("java.io.tmpdir"), "kafka/logs/kafka-test-" + port); - KafkaConfig config = buildKafkaConfig(zookeeperConnectionString); + KafkaConfig config = buildKafkaConfig(brokerProps, zookeeperConnectionString); kafka = new KafkaServerStartable(config); kafka.startup(); } catch (Exception ex) { @@ -61,17 +87,75 @@ public KafkaTestBroker() { } } - private kafka.server.KafkaConfig buildKafkaConfig(String zookeeperConnectionString) { - Properties p = new Properties(); + public void createTopic(String topicName, int numPartitions, Properties properties) { + ZkClient zkClient = new ZkClient(getZookeeperConnectionString()); + zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); + + try { + AdminUtils.createTopic(zkClient, topicName, numPartitions, 1, properties); + + ensureTopicCreated(zkClient, topicName); + } finally { + zkClient.close(); + } + } + + + /** + * Wait for up to 30 seconds for the topic to be created and leader assignments for all partitions + */ + private void ensureTopicCreated(ZkClient zkClient, String topicName) { + long maxWaitTime = TimeUnit.SECONDS.toNanos(30); + long waitTime = 0; + boolean partitionsHaveLeaders = false; + + while (!partitionsHaveLeaders && waitTime < maxWaitTime) { + partitionsHaveLeaders = true; + TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient); + for (PartitionMetadata partitionMetadata : JavaConversions.seqAsJavaList(topicMetadata.partitionsMetadata())) { + if (partitionMetadata.leader().isEmpty() || partitionMetadata.errorCode() != ErrorMapping.NoError()) { + partitionsHaveLeaders = false; + } + } + + if (!partitionsHaveLeaders) { + long start = System.nanoTime(); + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for topic to be available"); + } + + waitTime += (System.nanoTime() - start); + } + } + + if (!partitionsHaveLeaders) { + throw new RuntimeException("Could not create topic: " + topicName); + } + } + + private kafka.server.KafkaConfig buildKafkaConfig(Properties brokerProps, String zookeeperConnectionString) { + Properties p = new Properties(brokerProps); p.setProperty("zookeeper.connect", zookeeperConnectionString); p.setProperty("broker.id", "0"); p.setProperty("port", "" + port); + p.setProperty("host.name", LOCALHOST); p.setProperty("log.dirs", logDir.getAbsolutePath()); return new KafkaConfig(p); } public String getBrokerConnectionString() { - return "localhost:" + port; + return LOCALHOST + ":" + port; + } + + public String getZookeeperConnectionString() { + return server.getConnectString(); + } + + public int getZookeeperPort() { + return server.getPort(); } public int getPort() { @@ -95,3 +179,4 @@ public void shutdown() { FileUtils.deleteQuietly(logDir); } } + diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java new file mode 100644 index 00000000000..888ecde4a9f --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.storm.Config; +import org.apache.storm.kafka.KafkaSpout.EmitState; +import org.apache.storm.kafka.PartitionManager.KafkaMessageId; +import org.apache.storm.kafka.trident.ZkBrokerReader; +import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.spout.SpoutOutputCollector; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +public class PartitionManagerTest { + + private static final String TOPIC_NAME = "testTopic"; + + private KafkaTestBroker broker; + private TestingSpoutOutputCollector outputCollector; + private ZkState zkState; + private ZkCoordinator coordinator; + private KafkaProducer producer; + + @Before + public void setup() { + outputCollector = new TestingSpoutOutputCollector(); + + Properties brokerProps = new Properties(); + brokerProps.setProperty("log.retention.check.interval.ms", "1000"); + + broker = new KafkaTestBroker(brokerProps); + + // Configure Kafka to remove messages after 2 seconds + Properties topicProperties = new Properties(); + topicProperties.put("delete.retention.ms", "2000"); + topicProperties.put("retention.ms", "2000"); + + broker.createTopic(TOPIC_NAME, 1, topicProperties); + + Map conf = new HashMap<>(); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, broker.getZookeeperPort()); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Collections.singletonList("127.0.0.1")); + conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); + conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); + conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); + conf.put(Config.TOPOLOGY_NAME, "test"); + + zkState = new ZkState(conf); + + ZkHosts zkHosts = new ZkHosts(broker.getZookeeperConnectionString()); + + SpoutConfig spoutConfig = new SpoutConfig(zkHosts, TOPIC_NAME, "/test", "id"); + + coordinator = new ZkCoordinator( + new DynamicPartitionConnections(spoutConfig, new ZkBrokerReader(conf, TOPIC_NAME, zkHosts)), + conf, + spoutConfig, + zkState, + 0, + 1, + 1, + "topo" + ); + + Properties producerProps = new Properties(); + producerProps.put("acks", "1"); + producerProps.put("bootstrap.servers", broker.getBrokerConnectionString()); + producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put("metadata.fetch.timeout.ms", 1000); + producer = new KafkaProducer<>(producerProps); + } + + @After + public void shutdown() { + producer.close(); + broker.shutdown(); + } + + /** + * Test for STORM-2608 + * + * - Send a few messages to topic + * - Emit those messages from the partition manager + * - Fail those tuples so that they are added to the failedMsgRetryManager + * - Commit partition info to Zookeeper + * - Wait for kafka to roll logs and remove those messages + * - Send a new message to the topic + * - On the next fetch request, a TopicOffsetOutOfRangeException is thrown and the new offset is after + * the offset that is currently sitting in both the pending tree and the failedMsgRetryManager + * - Ack latest message to partition manager + * - Commit partition info to zookeeper + * - The committed offset should be the next offset _after_ the last one that was committed + * + */ + @Test + public void test2608() throws Exception { + SpoutOutputCollector spoutOutputCollector = new SpoutOutputCollector(outputCollector); + List partitionManagers = coordinator.getMyManagedPartitions(); + Assert.assertEquals(1, partitionManagers.size()); + + PartitionManager partitionManager = partitionManagers.get(0); + + for (int i=0; i < 5; i++) { + sendMessage("message-" + i); + } + + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_MORE_LEFT); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_MORE_LEFT); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_MORE_LEFT); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_MORE_LEFT); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_END); + + partitionManager.commit(); + + Map> emitted = outputCollector.getEmitted(); + + Assert.assertEquals(5, emitted.size()); + + for (KafkaMessageId messageId : emitted.keySet()) { + partitionManager.fail(messageId.offset); + } + + // Kafka log roller task has an initial delay of 30 seconds so we need to wait for it + Thread.sleep(TimeUnit.SECONDS.toMillis(35)); + + outputCollector.clearEmittedMessages(); + + sendMessage("new message"); + + // First request will fail due to offset out of range + Assert.assertEquals(EmitState.NO_EMITTED, partitionManager.next(spoutOutputCollector)); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_END); + + emitted = outputCollector.getEmitted(); + + Assert.assertEquals(1, emitted.size()); + KafkaMessageId messageId = emitted.keySet().iterator().next(); + + partitionManager.ack(messageId.offset); + partitionManager.commit(); + + Map json = zkState.readJSON(partitionManager.committedPath()); + Assert.assertNotNull(json); + long committedOffset = (long) json.get("offset"); + + Assert.assertEquals(messageId.offset + 1, committedOffset); + } + + private void waitForEmitState(PartitionManager partitionManager, SpoutOutputCollector outputCollector, EmitState expectedState) { + int maxRetries = 5; + EmitState state = null; + + for (int retryCount = 0; retryCount < maxRetries; retryCount++) { + state = partitionManager.next(outputCollector); + + if (state == EmitState.NO_EMITTED) { + retryCount++; + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for message"); + } + } else { + break; + } + } + + Assert.assertEquals(expectedState, state); + } + + private void sendMessage(String value) { + try { + producer.send(new ProducerRecord<>(TOPIC_NAME, (String) null, value)).get(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + private static class TestingSpoutOutputCollector implements ISpoutOutputCollector { + + private final Map> emitted = new HashMap<>(); + + Map> getEmitted() { + return emitted; + } + + void clearEmittedMessages() { + emitted.clear(); + } + + @Override + public List emit(String streamId, List tuple, Object messageId) { + emitted.put((KafkaMessageId) messageId, tuple); + return Collections.emptyList(); + } + + @Override + public void reportError(Throwable error) { + throw new RuntimeException("Spout error", error); + } + + + @Override + public void emitDirect(int taskId, String streamId, List tuple, Object messageId) { + throw new UnsupportedOperationException(); + } + + @Override + public long getPendingCount() { + throw new UnsupportedOperationException(); + } + } + +} \ No newline at end of file diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 50e9bb76d1d..03512353f5b 100755 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -22,7 +22,7 @@ storm org.apache.storm - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ..