Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
*/
package org.apache.storm.redis.bolt;

import org.apache.storm.redis.common.config.JedisConfig;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;

import java.util.Map;
Expand All @@ -49,27 +52,20 @@
*/
// TODO: Separate Jedis / JedisCluster to provide full operations for each environment to users
public abstract class AbstractRedisBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRedisBolt.class);

protected OutputCollector collector;

private transient JedisCommandsInstanceContainer container;

private JedisPoolConfig jedisPoolConfig;
private JedisClusterConfig jedisClusterConfig;

/**
* Constructor for single Redis environment (JedisPool)
* @param config configuration for initializing JedisPool
*/
public AbstractRedisBolt(JedisPoolConfig config) {
this.jedisPoolConfig = config;
}
private JedisConfig jedisConfig;

/**
* Constructor for Redis Cluster environment (JedisCluster)
* @param config configuration for initializing JedisCluster
* Constructor for single Redis or Redis cluster environment
* @param config configuration for initializing JedisPool or JedisCluster
*/
public AbstractRedisBolt(JedisClusterConfig config) {
this.jedisClusterConfig = config;
public AbstractRedisBolt(JedisConfig config) {
this.jedisConfig = config;
}

/**
Expand All @@ -80,10 +76,12 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector co
// FIXME: stores map (stormConf), topologyContext and expose these to derived classes
this.collector = collector;

if (jedisPoolConfig != null) {
this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
} else if (jedisClusterConfig != null) {
this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
if (jedisConfig instanceof JedisPoolConfig) {
LOG.info("Using Jedis Pool Config");
this.container = JedisCommandsContainerBuilder.build((JedisPoolConfig) jedisConfig);
} else if (jedisConfig instanceof JedisClusterConfig) {
LOG.info("Using Jedis Cluster Config");
this.container = JedisCommandsContainerBuilder.build((JedisClusterConfig) jedisConfig);
} else {
throw new IllegalArgumentException("Jedis configuration not found");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
*/
package org.apache.storm.redis.bolt;

import org.apache.storm.redis.common.config.JedisConfig;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import redis.clients.jedis.JedisCommands;

import java.util.List;
Expand All @@ -39,11 +38,11 @@ public class RedisLookupBolt extends AbstractRedisBolt {
private final String additionalKey;

/**
* Constructor for single Redis environment (JedisPool)
* @param config configuration for initializing JedisPool
* Constructor for single Redis or Redis cluster environment
* @param config configuration for initializing JedisPool or JedisCluster
* @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
*/
public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) {
public RedisLookupBolt(JedisConfig config, RedisLookupMapper lookupMapper) {
super(config);

this.lookupMapper = lookupMapper;
Expand All @@ -53,20 +52,6 @@ public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) {
this.additionalKey = dataTypeDescription.getAdditionalKey();
}

/**
* Constructor for Redis Cluster environment (JedisCluster)
* @param config configuration for initializing JedisCluster
* @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
*/
public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) {
super(config);

this.lookupMapper = lookupMapper;

RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}

/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
*/
package org.apache.storm.redis.bolt;

import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.config.JedisConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -36,25 +35,11 @@ public class RedisStoreBolt extends AbstractRedisBolt {
private final String additionalKey;

/**
* Constructor for single Redis environment (JedisPool)
* @param config configuration for initializing JedisPool
* Constructor for single Redis or Redis cluster environment
* @param config configuration for initializing JedisPool or JedisCluster
* @param storeMapper mapper containing which datatype, storing value's key that Bolt uses
*/
public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;

RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}

/**
* Constructor for Redis Cluster environment (JedisCluster)
* @param config configuration for initializing JedisCluster
* @param storeMapper mapper containing which datatype, storing value's key that Bolt uses
*/
public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
public RedisStoreBolt(JedisConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Configuration for JedisCluster.
*/
public class JedisClusterConfig implements Serializable {
public class JedisClusterConfig implements Serializable, JedisConfig {
private Set<InetSocketAddress> nodes;
private int timeout;
private int maxRedirections;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.apache.storm.redis.common.config;

public interface JedisConfig {
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* Configuration for JedisPool.
*/
public class JedisPoolConfig implements Serializable {
public class JedisPoolConfig implements Serializable, JedisConfig {

private String host;
private int port;
Expand Down