diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java index 0c64f43005f..83bd4fb8726 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java @@ -17,6 +17,7 @@ */ package org.apache.storm.redis.bolt; +import org.apache.storm.redis.common.config.JedisConfig; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichBolt; @@ -24,6 +25,8 @@ import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder; import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCommands; import java.util.Map; @@ -49,27 +52,20 @@ */ // TODO: Separate Jedis / JedisCluster to provide full operations for each environment to users public abstract class AbstractRedisBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(AbstractRedisBolt.class); + protected OutputCollector collector; private transient JedisCommandsInstanceContainer container; - private JedisPoolConfig jedisPoolConfig; - private JedisClusterConfig jedisClusterConfig; - - /** - * Constructor for single Redis environment (JedisPool) - * @param config configuration for initializing JedisPool - */ - public AbstractRedisBolt(JedisPoolConfig config) { - this.jedisPoolConfig = config; - } + private JedisConfig jedisConfig; /** - * Constructor for Redis Cluster environment (JedisCluster) - * @param config configuration for initializing JedisCluster + * Constructor for single Redis or Redis cluster environment + * @param config configuration for initializing JedisPool or JedisCluster */ - public AbstractRedisBolt(JedisClusterConfig config) { - this.jedisClusterConfig = config; + public AbstractRedisBolt(JedisConfig config) { + this.jedisConfig = config; } /** @@ -80,10 +76,12 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector co // FIXME: stores map (stormConf), topologyContext and expose these to derived classes this.collector = collector; - if (jedisPoolConfig != null) { - this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig); - } else if (jedisClusterConfig != null) { - this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig); + if (jedisConfig instanceof JedisPoolConfig) { + LOG.info("Using Jedis Pool Config"); + this.container = JedisCommandsContainerBuilder.build((JedisPoolConfig) jedisConfig); + } else if (jedisConfig instanceof JedisClusterConfig) { + LOG.info("Using Jedis Cluster Config"); + this.container = JedisCommandsContainerBuilder.build((JedisClusterConfig) jedisConfig); } else { throw new IllegalArgumentException("Jedis configuration not found"); } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java index 968ade0daf4..f454a80b5bd 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java @@ -17,13 +17,12 @@ */ package org.apache.storm.redis.bolt; +import org.apache.storm.redis.common.config.JedisConfig; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisLookupMapper; -import org.apache.storm.redis.common.config.JedisClusterConfig; -import org.apache.storm.redis.common.config.JedisPoolConfig; import redis.clients.jedis.JedisCommands; import java.util.List; @@ -39,11 +38,11 @@ public class RedisLookupBolt extends AbstractRedisBolt { private final String additionalKey; /** - * Constructor for single Redis environment (JedisPool) - * @param config configuration for initializing JedisPool + * Constructor for single Redis or Redis cluster environment + * @param config configuration for initializing JedisPool or JedisCluster * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses */ - public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) { + public RedisLookupBolt(JedisConfig config, RedisLookupMapper lookupMapper) { super(config); this.lookupMapper = lookupMapper; @@ -53,20 +52,6 @@ public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) { this.additionalKey = dataTypeDescription.getAdditionalKey(); } - /** - * Constructor for Redis Cluster environment (JedisCluster) - * @param config configuration for initializing JedisCluster - * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses - */ - public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) { - super(config); - - this.lookupMapper = lookupMapper; - - RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription(); - this.dataType = dataTypeDescription.getDataType(); - this.additionalKey = dataTypeDescription.getAdditionalKey(); - } /** * {@inheritDoc} diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java index 00ff2186dad..81609d67049 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java @@ -17,8 +17,7 @@ */ package org.apache.storm.redis.bolt; -import org.apache.storm.redis.common.config.JedisClusterConfig; -import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.config.JedisConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -36,25 +35,11 @@ public class RedisStoreBolt extends AbstractRedisBolt { private final String additionalKey; /** - * Constructor for single Redis environment (JedisPool) - * @param config configuration for initializing JedisPool + * Constructor for single Redis or Redis cluster environment + * @param config configuration for initializing JedisPool or JedisCluster * @param storeMapper mapper containing which datatype, storing value's key that Bolt uses */ - public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) { - super(config); - this.storeMapper = storeMapper; - - RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); - this.dataType = dataTypeDescription.getDataType(); - this.additionalKey = dataTypeDescription.getAdditionalKey(); - } - - /** - * Constructor for Redis Cluster environment (JedisCluster) - * @param config configuration for initializing JedisCluster - * @param storeMapper mapper containing which datatype, storing value's key that Bolt uses - */ - public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) { + public RedisStoreBolt(JedisConfig config, RedisStoreMapper storeMapper) { super(config); this.storeMapper = storeMapper; diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java index d8696aaca7a..a4e62814ee3 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java @@ -29,7 +29,7 @@ /** * Configuration for JedisCluster. */ -public class JedisClusterConfig implements Serializable { +public class JedisClusterConfig implements Serializable, JedisConfig { private Set nodes; private int timeout; private int maxRedirections; diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisConfig.java new file mode 100644 index 00000000000..20ad95fddb1 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisConfig.java @@ -0,0 +1,4 @@ +package org.apache.storm.redis.common.config; + +public interface JedisConfig { +} diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java index 149bdad078f..f6bf5188eae 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java @@ -24,7 +24,7 @@ /** * Configuration for JedisPool. */ -public class JedisPoolConfig implements Serializable { +public class JedisPoolConfig implements Serializable, JedisConfig { private String host; private int port;