Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
30ac6ec
Added new option to allow Kafka spout to save offset and other state …
cm-cnnxty Aug 28, 2015
eea001c
Fixed a jdk8 syntax error in KafkaDataStore
cm-cnnxty Aug 30, 2015
16fb3c5
STORM-1015: coding style fixes based on feedback from pull-705
cm-cnnxty Sep 12, 2015
c6a3c26
A few more style changes based on comments from pull/705
cm-cnnxty Sep 16, 2015
032187e
Renamed state store config name for legacy storm mechanism to zookeep…
cm-cnnxty Sep 22, 2015
458ba4c
STORM-1015: refactored the partition state store implementation based…
cm-cnnxty Oct 5, 2015
7e73dd2
STORM-1015: merged code with latest from master
cm-cnnxty Oct 5, 2015
75e2598
STORM-1015: deprecated KafkaConfig.ignoreZkOffsets
cm-cnnxty Oct 5, 2015
95c08c1
STORM-1015: refactored state store implementation, renamed classes/va…
cm-cnnxty Oct 6, 2015
346b85d
STORM-1015: simplified kafka state store implementation so that it ca…
cm-cnnxty Oct 6, 2015
e0c4e82
STORM-1015: allow user to define their own implementation of state store
cm-cnnxty Oct 6, 2015
1d27c6d
STORM-1015: improved logging and documentation on state store impleme…
cm-cnnxty Oct 6, 2015
1e41cc8
STORM-1015: created specific store config classes for kafka and ZK store
cm-cnnxty Nov 12, 2015
91822d5
STORM-1015: merged with latest code form master
cm-cnnxty Nov 13, 2015
bb43db8
Merge remote-tracking branch 'apache/master' into kafka-storage
cm-cnnxty Nov 19, 2015
a39273a
STORM-1015: merged with apache/master
cm-cnnxty Jan 6, 2016
b6a8f65
STORM-1015: merged with apache/master
cm-cnnxty Jan 20, 2016
9e481e7
Merge remote-tracking branch 'apache/master' into kafka-storage
cm-cnnxty Jan 26, 2016
cc873e0
STORM-1015: merged with latest apache master branch
cm-cnnxty Feb 24, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ _site
dependency-reduced-pom.xml
derby.log
metastore_db
*.versionsBackup
.settings/
.project
.classpath
46 changes: 31 additions & 15 deletions external/storm-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,24 @@ The optional ClientId is used as a part of the ZooKeeper path where the spout's
There are 2 extensions of KafkaConfig currently in use.

Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely
identify your spout.
behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset if you chose ZooKeeper as the storage.
The id should uniquely identify your spout.
```java
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
public SpoutConfig(BrokerHosts hosts, String topic, String id);
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
```
In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
```java
// setting for how often to save the current Kafka offset to ZooKeeper
// setting for how often to save the current kafka offset
public long stateUpdateIntervalMs = 2000;

// offset state information storage. validate options are zookeeper and kafka
public String stateStore = "zookeeper";
// timeout in millis for state read/write operations
public int stateOpTimeout = 5000;
// max retries allowed for state read/write operations
public int stateOpMaxRetry = 3;

// Exponential back-off retry settings. These are used when retrying messages after a bolt
// calls OutputCollector.fail().
// Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
Expand All @@ -84,6 +91,8 @@ The KafkaConfig class also has bunch of public variables that controls your appl
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreStoredOffsets = false;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a mechanism/section in the release notes to call out this change? changing this var name would break existing code.

// ignoreZkOffsets is now deprecated. Although it is still honored now, you should change to use ignoreStoredOffsets.
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
Expand Down Expand Up @@ -148,24 +157,31 @@ As shown in the above KafkaConfig properties, you can control from where in the
setting `KafkaConfig.startOffsetTime` as follows:

1. `kafka.api.OffsetRequest.EarliestTime()`: read from the beginning of the topic (i.e. from the oldest messages onwards)
2. `kafka.api.OffsetRequest.LatestTime()`: read from the end of the topic (i.e. any new messsages that are being written to the topic)
2. `kafka.api.OffsetRequest.LatestTime()`: read from the end of the topic (i.e. any new messages that are being written to the topic)
3. A Unix timestamp aka seconds since the epoch (e.g. via `System.currentTimeMillis()`):
see [How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?) in the Kafka FAQ

As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information
under the ZooKeeper path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`. In the case of failures it recovers from the last
written offset in ZooKeeper.
As the topology runs the Kafka spout keeps track of the offsets it has read and emitted. Kafka spout offers two built-in options for offset storage which
can be configured by setting `SpoutConfig.stateStore`. By default, the `zookeeper` option is chosen which stores offset state information
under the ZooKeeper path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`. The second option is `kafka` which stores offset state information using
Kafka's built-in offset management API. In addition, you may supply your own custom state store implementation by providing the full class name of your
implementation. The custom state store must implement the storm.kafka.StateStore interface and must have a public constructor that takes two arguments as
```java
public MyStateStore(Map stormConf, SpoutConfig spoutConfig)
```

In the case of failures Kafka spout recovers from the last written offset.

> **Important:** When re-deploying a topology make sure that the settings for `SpoutConfig.zkRoot` and `SpoutConfig.id`
> were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the
> offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.
> **Important:** When re-deploying a topology make sure that the settings for `SpoutConfig.zkRoot` (if `zookeeper` is chosen as storage option)
> and `SpoutConfig.id` were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the
> offsets) from storage -- which may lead to unexpected behavior and/or to data loss, depending on your use case.

This means that when a topology has run once the setting `KafkaConfig.startOffsetTime` will not have an effect for
subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in
ZooKeeper to determine from where it should begin (more precisely: resume) reading.
If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should
set the parameter `KafkaConfig.ignoreZkOffsets` to `true`. If `true`, the spout will always begin reading from the
offset defined by `KafkaConfig.startOffsetTime` as described above.
storage to determine from where it should begin (more precisely: resume) reading.
If you want to force the spout to ignore any consumer state information stored in storage, then you should
set the parameter `KafkaConfig.ignoreStoredOffsets` to `true` (`KafkaConfig.ignoreZkOffsets` can be used as alias for backward compatibility).
If `true`, the spout will always begin reading from the offset defined by `KafkaConfig.startOffsetTime` as described above.


## Using storm-kafka with different versions of Scala
Expand Down
29 changes: 29 additions & 0 deletions external/storm-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,34 @@
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>

<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

public class DynamicBrokersReader {

public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
private static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);

private CuratorFramework _curator;
private String _zkPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public class DynamicPartitionConnections {

public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);

static class ConnectionInfo {
SimpleConsumer consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,22 @@ public class KafkaConfig implements Serializable {
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;

/**
* Whether the spout should ignore the previously stored offsets when it starts.
*/
public boolean ignoreStoredOffsets = false;

/**
* @deprecated
* This parameter is deprecated now. Please use {@link KafkaConfig#ignoreStoredOffsets} instead.
*/
public boolean ignoreZkOffsets = false;

public KafkaConfig(BrokerHosts hosts, String topic) {
this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
}
Expand Down
55 changes: 27 additions & 28 deletions external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.storm.kafka;

import com.google.common.base.Strings;

import org.apache.storm.Config;
import kafka.message.Message;
import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.spout.SpoutOutputCollector;
Expand All @@ -29,27 +28,41 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

// TODO: need to add blacklisting
// TODO: need to make a best effort to not re-emit messages if don't have to
public class KafkaSpout extends BaseRichSpout {
static enum EmitState {

public static class MessageAndRealOffset {
public Message msg;
public long offset;

public MessageAndRealOffset(Message msg, long offset) {
this.msg = msg;
this.offset = offset;
}
}

enum EmitState {
EMITTED_MORE_LEFT,
EMITTED_END,
NO_EMITTED
}

public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);

SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
PartitionCoordinator _coordinator;
DynamicPartitionConnections _connections;
ZkState _state;
PartitionStateManagerFactory _partitionStateManagerFactory;

long _lastUpdateMs = 0;

int _currPartitionIndex = 0;

public KafkaSpout(SpoutConfig spoutConf) {
Expand All @@ -60,32 +73,18 @@ public KafkaSpout(SpoutConfig spoutConf) {
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
String topologyInstanceId = context.getStormId();
Map stateConf = new HashMap(conf);
List<String> zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);

_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
_partitionStateManagerFactory = new PartitionStateManagerFactory(conf, _spoutConfig);

// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, topologyInstanceId);
_coordinator = new StaticCoordinator(_connections, _partitionStateManagerFactory, conf, _spoutConfig,
context.getThisTaskIndex(), totalTasks, topologyInstanceId);
} else {
_coordinator = new ZkCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, topologyInstanceId);
_coordinator = new ZkCoordinator(_connections, _partitionStateManagerFactory, conf, _spoutConfig,
context.getThisTaskIndex(), totalTasks, topologyInstanceId);
}

context.registerMetric("kafkaOffset", new IMetric() {
Expand All @@ -94,7 +93,7 @@ public void open(Map conf, final TopologyContext context, final SpoutOutputColle
@Override
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
Set<Partition> latestPartitions = new HashSet();
Set<Partition> latestPartitions = new HashSet<>();
for (PartitionManager pm : pms) {
latestPartitions.add(pm.getPartition());
}
Expand All @@ -121,7 +120,7 @@ public Object getValueAndReset() {

@Override
public void close() {
_state.close();
_partitionStateManagerFactory.close();
}

@Override
Expand Down
Loading