diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java index 4c858bcfdf2..e0e9da67a4e 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java @@ -17,17 +17,20 @@ */ package org.apache.storm.redis.bolt; +import java.util.List; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.BasicStreamMapper; +import org.apache.storm.redis.common.mapper.DefaultStreamMapper; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisFilterMapper; +import org.apache.storm.redis.common.mapper.StreamMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import redis.clients.jedis.GeoCoordinate; import redis.clients.jedis.JedisCommands; -import java.util.List; - /** * Basic bolt for querying from Redis and filters out if key/field doesn't exist. * If key/field exists on Redis, this bolt just forwards input tuple to default stream. @@ -45,18 +48,41 @@ */ public class RedisFilterBolt extends AbstractRedisBolt { private final RedisFilterMapper filterMapper; + private final StreamMapper streamMapper; private final RedisDataTypeDescription.RedisDataType dataType; private final String additionalKey; /** - * Constructor for single Redis environment (JedisPool) + * Constructor for single Redis environment (JedisPool). + * Tuples will be emitted to Storm's default streamId. * @param config configuration for initializing JedisPool * @param filterMapper mapper containing which datatype, query key that Bolt uses */ public RedisFilterBolt(JedisPoolConfig config, RedisFilterMapper filterMapper) { + this(config, filterMapper, new DefaultStreamMapper()); + } + + /** + * Constructor for single Redis environment (JedisPool). + * @param config configuration for initializing JedisCluster + * @param filterMapper mapper containing which datatype, query key that Bolt uses + * @param streamId the stream to which tuples that make it through the filter should be emitted. + */ + public RedisFilterBolt(JedisPoolConfig config, RedisFilterMapper filterMapper, String streamId) { + this(config, filterMapper, new BasicStreamMapper(streamId)); + } + + /** + * Constructor for single Redis environment (JedisPool). + * @param config configuration for initializing JedisPool + * @param filterMapper mapper containing which datatype, query key that Bolt uses + * @param streamMapper mapper to which stream a given Tuple/Values pair should be emitted. + */ + public RedisFilterBolt(JedisPoolConfig config, RedisFilterMapper filterMapper, StreamMapper streamMapper) { super(config); this.filterMapper = filterMapper; + this.streamMapper = streamMapper; RedisDataTypeDescription dataTypeDescription = filterMapper.getDataTypeDescription(); this.dataType = dataTypeDescription.getDataType(); @@ -69,14 +95,36 @@ public RedisFilterBolt(JedisPoolConfig config, RedisFilterMapper filterMapper) { } /** - * Constructor for Redis Cluster environment (JedisCluster) + * Constructor for Redis Cluster environment (JedisCluster). + * Tuples will be emitted to Storm's default streamId. * @param config configuration for initializing JedisCluster * @param filterMapper mapper containing which datatype, query key that Bolt uses */ public RedisFilterBolt(JedisClusterConfig config, RedisFilterMapper filterMapper) { + this(config, filterMapper, new DefaultStreamMapper()); + } + + /** + * Constructor for Redis Cluster environment (JedisCluster). + * @param config configuration for initializing JedisCluster + * @param filterMapper mapper containing which datatype, query key that Bolt uses + * @param streamId the stream to which tuples that make it through the filter should be emitted. + */ + public RedisFilterBolt(JedisClusterConfig config, RedisFilterMapper filterMapper, String streamId) { + this(config, filterMapper, new BasicStreamMapper(streamId)); + } + + /** + * Constructor for Redis Cluster environment (JedisCluster). + * @param config configuration for initializing JedisCluster + * @param filterMapper mapper containing which datatype, query key that Bolt uses + * @param streamMapper mapper to which stream a given Tuple/Values pair should be emitted. + */ + public RedisFilterBolt(JedisClusterConfig config, RedisFilterMapper filterMapper, StreamMapper streamMapper) { super(config); this.filterMapper = filterMapper; + this.streamMapper = streamMapper; RedisDataTypeDescription dataTypeDescription = filterMapper.getDataTypeDescription(); this.dataType = dataTypeDescription.getDataType(); @@ -126,7 +174,9 @@ public void execute(Tuple input) { } if (found) { - collector.emit(input, input.getValues()); + Values values = new Values(input.getValues().toArray()); + String streamId = streamMapper.getStreamId(input, values); + collector.emit(streamId, input, values); } collector.ack(input); diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java index 06529232ee3..13b66c7d407 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java @@ -17,17 +17,19 @@ */ package org.apache.storm.redis.bolt; +import java.util.List; +import org.apache.storm.redis.common.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.BasicStreamMapper; +import org.apache.storm.redis.common.mapper.DefaultStreamMapper; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisLookupMapper; +import org.apache.storm.redis.common.mapper.StreamMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; -import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; -import org.apache.storm.redis.common.mapper.RedisLookupMapper; -import org.apache.storm.redis.common.config.JedisClusterConfig; -import org.apache.storm.redis.common.config.JedisPoolConfig; import redis.clients.jedis.JedisCommands; -import java.util.List; - /** * Basic bolt for querying from Redis and emits response as tuple. *

@@ -35,18 +37,41 @@ */ public class RedisLookupBolt extends AbstractRedisBolt { private final RedisLookupMapper lookupMapper; + private final StreamMapper streamMapper; private final RedisDataTypeDescription.RedisDataType dataType; private final String additionalKey; /** - * Constructor for single Redis environment (JedisPool) + * Constructor for single Redis environment (JedisPool). + * Emits tuples to Storm's default stream. * @param config configuration for initializing JedisPool * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses */ public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) { + this(config, lookupMapper, new DefaultStreamMapper()); + } + + /** + * Constructor for single Redis environment (JedisPool). + * @param config configuration for initializing JedisPool + * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses + * @param streamId the streamId to which this bolt should emit tuples + */ + public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper, String streamId) { + this(config, lookupMapper, new BasicStreamMapper(streamId)); + } + + /** + * Constructor for single Redis environment (JedisPool). + * @param config configuration for initializing JedisPool + * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses + * @param streamMapper mapper to which stream a given Tuple/Values pair should be emitted. + */ + public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper, StreamMapper streamMapper) { super(config); this.lookupMapper = lookupMapper; + this.streamMapper = streamMapper; RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription(); this.dataType = dataTypeDescription.getDataType(); @@ -54,14 +79,36 @@ public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) { } /** - * Constructor for Redis Cluster environment (JedisCluster) + * Constructor for Redis Cluster environment (JedisCluster). + * Emits tuples to Storm's default stream. * @param config configuration for initializing JedisCluster * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses */ public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) { + this(config, lookupMapper, new DefaultStreamMapper()); + } + + /** + * Constructor for single Redis environment (JedisPool). + * @param config configuration for initializing JedisPool + * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses + * @param streamId the streamId to which this bolt should emit tuples + */ + public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper, String streamId) { + this(config, lookupMapper, new BasicStreamMapper(streamId)); + } + + /** + * Constructor for Redis Cluster environment (JedisCluster). + * @param config configuration for initializing JedisCluster + * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses + * @param streamMapper mapper to which stream a given Tuple/Values pair should be emitted. + */ + public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper, StreamMapper streamMapper) { super(config); this.lookupMapper = lookupMapper; + this.streamMapper = streamMapper; RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription(); this.dataType = dataTypeDescription.getDataType(); @@ -115,7 +162,8 @@ public void execute(Tuple input) { List values = lookupMapper.toTuple(input, lookupValue); for (Values value : values) { - collector.emit(input, value); + String streamId = streamMapper.getStreamId(input, value); + collector.emit(streamId, input, value); } collector.ack(input); diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/BasicStreamMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/BasicStreamMapper.java new file mode 100644 index 00000000000..04b51056256 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/BasicStreamMapper.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.mapper; + +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +/** + * A StreamMapper implementation which always returns the streamId with + * which it was constructed. + */ +public class BasicStreamMapper implements StreamMapper { + + private final String streamId; + + public BasicStreamMapper(String streamId) { + this.streamId = streamId; + } + + @Override + public String getStreamId(Tuple input, Values values) { + return streamId; + } + +} diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/DefaultStreamMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/DefaultStreamMapper.java new file mode 100644 index 00000000000..8b95776526f --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/DefaultStreamMapper.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.mapper; + +import org.apache.storm.utils.Utils; + +/** + * A RedisStreamMapper implementation which always returns Storm's default streamId. + */ +public final class DefaultStreamMapper extends BasicStreamMapper { + + public DefaultStreamMapper() { + super(Utils.DEFAULT_STREAM_ID); + } + +} diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/InputSourceStreamMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/InputSourceStreamMapper.java new file mode 100644 index 00000000000..b3d63bf0071 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/InputSourceStreamMapper.java @@ -0,0 +1,32 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.mapper; + +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +/** + * Returns the streamId of the input tuple's source stream. In other words, + * tuples will be emitted to the same streamId from which they came. + */ +public class InputSourceStreamMapper implements StreamMapper { + + @Override + public String getStreamId(Tuple input, Values values) { + return input.getSourceStreamId(); + } + +} diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/StreamMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/StreamMapper.java new file mode 100644 index 00000000000..8a6b9a8a28b --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/StreamMapper.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.mapper; + +import java.io.Serializable; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +/** + * StreamMapper is for specifying the stream to which Values should be + * emitted, based on the input tuple and/or the already mapped output values + * (about to be emitted). + */ +public interface StreamMapper extends Serializable { + + /** + * Gets the streamId based on the input Tuple and/or the values to be + * emitted. + * @param input the original source input tuple + * @param values the Values which were generated by a bolt, based on the input tuple. + * @return the stream id to use for emitting tuples + */ + String getStreamId(Tuple input, Values values); + +}