diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java deleted file mode 100644 index 0c64f43005f..00000000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.bolt; - -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.redis.common.config.JedisClusterConfig; -import org.apache.storm.redis.common.config.JedisPoolConfig; -import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder; -import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer; -import redis.clients.jedis.JedisCommands; - -import java.util.Map; - -/** - * AbstractRedisBolt class is for users to implement custom bolts which makes interaction with Redis. - *
- * Due to environment abstraction, AbstractRedisBolt provides JedisCommands which contains only single key operations. - * - * Custom Bolts may want to follow this pattern: - *
- * JedisCommands jedisCommands = null;
- * try {
- * jedisCommand = getInstance();
- * // do some works
- * } finally {
- * if (jedisCommand != null) {
- * returnInstance(jedisCommand);
- * }
- * }
- *
- *
- */
-// TODO: Separate Jedis / JedisCluster to provide full operations for each environment to users
-public abstract class AbstractRedisBolt extends BaseRichBolt {
- protected OutputCollector collector;
-
- private transient JedisCommandsInstanceContainer container;
-
- private JedisPoolConfig jedisPoolConfig;
- private JedisClusterConfig jedisClusterConfig;
-
- /**
- * Constructor for single Redis environment (JedisPool)
- * @param config configuration for initializing JedisPool
- */
- public AbstractRedisBolt(JedisPoolConfig config) {
- this.jedisPoolConfig = config;
- }
-
- /**
- * Constructor for Redis Cluster environment (JedisCluster)
- * @param config configuration for initializing JedisCluster
- */
- public AbstractRedisBolt(JedisClusterConfig config) {
- this.jedisClusterConfig = config;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
- // FIXME: stores map (stormConf), topologyContext and expose these to derived classes
- this.collector = collector;
-
- if (jedisPoolConfig != null) {
- this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
- } else if (jedisClusterConfig != null) {
- this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
- } else {
- throw new IllegalArgumentException("Jedis configuration not found");
- }
- }
-
- /**
- * Borrow JedisCommands instance from container.
- * JedisCommands is an interface which contains single key operations.
- * @return implementation of JedisCommands
- * @see JedisCommandsInstanceContainer#getInstance()
- */
- protected JedisCommands getInstance() {
- return this.container.getInstance();
- }
-
- /**
- * Return borrowed instance to container.
- * @param instance borrowed object
- */
- protected void returnInstance(JedisCommands instance) {
- this.container.returnInstance(instance);
- }
-}
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/BaseLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/BaseLookupBolt.java
new file mode 100644
index 00000000000..e1fe3d26b89
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/BaseLookupBolt.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import redis.clients.jedis.JedisCommands;
+
+import java.util.List;
+
+public abstract class BaseLookupBolt extends BaseRedisBolt {
+ protected OutputCollector collector;
+
+ protected final RedisLookupMapper mapper;
+ protected final RedisDataTypeDescription.RedisDataType dataType;
+ protected final String additionalKey;
+
+ public BaseLookupBolt(RedisLookupMapper lookupMapper) {
+ this.mapper = lookupMapper;
+ Preconditions.checkNotNull(this.mapper, "Mapper is Null");
+
+ RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String key = mapper.getKeyFromTuple(input);
+ Object lookupValue;
+
+ JedisCommands jedisCommand = getInstance();
+ try {
+ switch (dataType) {
+ case STRING:
+ lookupValue = jedisCommand.get(key);
+ break;
+
+ case LIST:
+ lookupValue = jedisCommand.lpop(key);
+ break;
+
+ case HASH:
+ lookupValue = jedisCommand.hget(additionalKey, key);
+ break;
+
+ case SET:
+ lookupValue = jedisCommand.scard(key);
+ break;
+
+ case SORTED_SET:
+ lookupValue = jedisCommand.zscore(additionalKey, key);
+ break;
+
+ case HYPER_LOG_LOG:
+ lookupValue = jedisCommand.pfcount(key);
+ break;
+
+ case GEO:
+ lookupValue = jedisCommand.geopos(additionalKey, key);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
+ }
+
+ List
* Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG
*/
-public class RedisLookupBolt extends AbstractRedisBolt {
- private final RedisLookupMapper lookupMapper;
- private final RedisDataTypeDescription.RedisDataType dataType;
- private final String additionalKey;
-
- /**
- * Constructor for single Redis environment (JedisPool)
- * @param config configuration for initializing JedisPool
- * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
- */
- public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) {
- super(config);
+public class RedisLookupBolt extends BaseLookupBolt {
- this.lookupMapper = lookupMapper;
+ private final JedisPoolConfig config;
- RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
- this.dataType = dataTypeDescription.getDataType();
- this.additionalKey = dataTypeDescription.getAdditionalKey();
- }
-
- /**
- * Constructor for Redis Cluster environment (JedisCluster)
- * @param config configuration for initializing JedisCluster
- * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
- */
- public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) {
- super(config);
-
- this.lookupMapper = lookupMapper;
-
- RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
- this.dataType = dataTypeDescription.getDataType();
- this.additionalKey = dataTypeDescription.getAdditionalKey();
+ public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) {
+ super(lookupMapper);
+ this.config = config;
}
- /**
- * {@inheritDoc}
- */
@Override
- public void execute(Tuple input) {
- String key = lookupMapper.getKeyFromTuple(input);
- Object lookupValue;
-
- JedisCommands jedisCommand = null;
- try {
- jedisCommand = getInstance();
-
- switch (dataType) {
- case STRING:
- lookupValue = jedisCommand.get(key);
- break;
-
- case LIST:
- lookupValue = jedisCommand.lpop(key);
- break;
-
- case HASH:
- lookupValue = jedisCommand.hget(additionalKey, key);
- break;
-
- case SET:
- lookupValue = jedisCommand.scard(key);
- break;
-
- case SORTED_SET:
- lookupValue = jedisCommand.zscore(additionalKey, key);
- break;
-
- case HYPER_LOG_LOG:
- lookupValue = jedisCommand.pfcount(key);
- break;
-
- case GEO:
- lookupValue = jedisCommand.geopos(additionalKey, key);
- break;
-
- default:
- throw new IllegalArgumentException("Cannot process such data type: " + dataType);
- }
-
- List
* Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG
*/
-public class RedisStoreBolt extends AbstractRedisBolt {
- private final RedisStoreMapper storeMapper;
- private final RedisDataTypeDescription.RedisDataType dataType;
- private final String additionalKey;
-
- /**
- * Constructor for single Redis environment (JedisPool)
- * @param config configuration for initializing JedisPool
- * @param storeMapper mapper containing which datatype, storing value's key that Bolt uses
- */
- public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
- super(config);
- this.storeMapper = storeMapper;
-
- RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
- this.dataType = dataTypeDescription.getDataType();
- this.additionalKey = dataTypeDescription.getAdditionalKey();
- }
+public class RedisStoreBolt extends BaseStoreBolt {
+ private final JedisPoolConfig config;
- /**
- * Constructor for Redis Cluster environment (JedisCluster)
- * @param config configuration for initializing JedisCluster
- * @param storeMapper mapper containing which datatype, storing value's key that Bolt uses
- */
- public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
- super(config);
- this.storeMapper = storeMapper;
-
- RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
- this.dataType = dataTypeDescription.getDataType();
- this.additionalKey = dataTypeDescription.getAdditionalKey();
+ public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper mapper) {
+ super(mapper);
+ this.config = config;
}
- /**
- * {@inheritDoc}
- */
@Override
- public void execute(Tuple input) {
- String key = storeMapper.getKeyFromTuple(input);
- String value = storeMapper.getValueFromTuple(input);
-
- JedisCommands jedisCommand = null;
- try {
- jedisCommand = getInstance();
-
- switch (dataType) {
- case STRING:
- jedisCommand.set(key, value);
- break;
-
- case LIST:
- jedisCommand.rpush(key, value);
- break;
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
- case HASH:
- jedisCommand.hset(additionalKey, key, value);
- break;
-
- case SET:
- jedisCommand.sadd(key, value);
- break;
-
- case SORTED_SET:
- jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
- break;
-
- case HYPER_LOG_LOG:
- jedisCommand.pfadd(key, value);
- break;
-
- case GEO:
- String[] array = value.split(":");
- if (array.length != 2) {
- throw new IllegalArgumentException("value structure should be longitude:latitude");
- }
-
- double longitude = Double.valueOf(array[0]);
- double latitude = Double.valueOf(array[1]);
- jedisCommand.geoadd(additionalKey, longitude, latitude, key);
- break;
-
- default:
- throw new IllegalArgumentException("Cannot process such data type: " + dataType);
- }
-
- collector.ack(input);
- } catch (Exception e) {
- this.collector.reportError(e);
- this.collector.fail(input);
- } finally {
- returnInstance(jedisCommand);
+ if (this.config != null) {
+ this.container = JedisCommandsContainerBuilder.buildContainer(config);
+ } else {
+ throw new IllegalArgumentException("Jedis configuration not found");
}
}
- /**
- * {@inheritDoc}
- */
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
}
}
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
index d8696aaca7a..e9838a597b4 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
@@ -21,7 +21,6 @@
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Protocol;
-import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Set;
@@ -29,7 +28,7 @@
/**
* Configuration for JedisCluster.
*/
-public class JedisClusterConfig implements Serializable {
+public class JedisClusterConfig implements JedisConfig {
private Set