diff --git a/heron/examples/src/java/com/twitter/heron/examples/PrinterBolt.java b/heron/examples/src/java/com/twitter/heron/examples/PrinterBolt.java new file mode 100644 index 00000000000..9a78a32a761 --- /dev/null +++ b/heron/examples/src/java/com/twitter/heron/examples/PrinterBolt.java @@ -0,0 +1,34 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.examples; + +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +@SuppressWarnings("serial") +public class PrinterBolt extends BaseBasicBolt { + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + System.out.println(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer ofd) { + } + +} diff --git a/heron/examples/src/java/com/twitter/heron/examples/RandomIntegerSpout.java b/heron/examples/src/java/com/twitter/heron/examples/RandomIntegerSpout.java new file mode 100644 index 00000000000..1be7cc4ed6c --- /dev/null +++ b/heron/examples/src/java/com/twitter/heron/examples/RandomIntegerSpout.java @@ -0,0 +1,69 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.examples; + +import java.util.Map; +import java.util.Random; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +/** + * Emits a random integer and a timestamp value (offset by one day), + * every 100 ms. The ts field can be used in tuple time based windowing. + */ +@SuppressWarnings("serial") +public class RandomIntegerSpout extends BaseRichSpout { + private static final Logger LOG = Logger.getLogger(RandomIntegerSpout.class.getName()); + private SpoutOutputCollector collector; + private Random rand; + private long msgId = 0; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("value", "ts", "msgid")); + } + + @Override + public void open( + Map conf, TopologyContext context, SpoutOutputCollector newCollector) { + collector = newCollector; + rand = new Random(); + } + + @Override + public void nextTuple() { + Utils.sleep(100); + collector.emit(new Values(rand.nextInt(1000), + System.currentTimeMillis() - (24 * 60 * 60 * 1000), ++msgId), msgId); + } + + @Override + public void ack(Object newMsgId) { + LOG.log(Level.FINE, "Got ACK for msgId : " + newMsgId); + } + + @Override + public void fail(Object newMsgId) { + LOG.log(Level.FINE, "Got FAIL for msgId : " + newMsgId); + } +} diff --git a/heron/examples/src/java/com/twitter/heron/examples/SlidingTupleTsTopology.java b/heron/examples/src/java/com/twitter/heron/examples/SlidingTupleTsTopology.java new file mode 100644 index 00000000000..f7345ea1f8b --- /dev/null +++ b/heron/examples/src/java/com/twitter/heron/examples/SlidingTupleTsTopology.java @@ -0,0 +1,60 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.examples; + +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.utils.Utils; + +import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * Windowing based on tuple timestamp (e.g. the time when tuple is generated + * rather than when its processed). + */ +// SUPPRESS CHECKSTYLE HideUtilityClassConstructor +public class SlidingTupleTsTopology { + public static void main(String[] args) throws Exception { + Logger.getGlobal().setLevel(Level.FINE); + TopologyBuilder builder = new TopologyBuilder(); + BaseWindowedBolt bolt = new SlidingWindowSumBolt() + .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)) + .withTimestampField("ts") + .withLag(new Duration(5, TimeUnit.SECONDS)); + builder.setSpout("integer", new RandomIntegerSpout(), 1); + builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer"); + builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum"); + Config conf = new Config(); + conf.setDebug(true); + + if (args != null && args.length > 0) { + conf.setNumWorkers(1); + StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, builder.createTopology()); + Utils.sleep(40000); + cluster.killTopology("test"); + cluster.shutdown(); + } + } +} diff --git a/heron/examples/src/java/com/twitter/heron/examples/SlidingWindowSumBolt.java b/heron/examples/src/java/com/twitter/heron/examples/SlidingWindowSumBolt.java new file mode 100644 index 00000000000..a34f76c2e45 --- /dev/null +++ b/heron/examples/src/java/com/twitter/heron/examples/SlidingWindowSumBolt.java @@ -0,0 +1,79 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.examples; + +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; + +/** + * Computes sliding window sum + */ +@SuppressWarnings("serial") +public class SlidingWindowSumBolt extends BaseWindowedBolt { + private static final Logger LOG = Logger.getLogger(SlidingWindowSumBolt.class.getName()); + + private int sum = 0; + private OutputCollector collector; + + @Override + @SuppressWarnings("rawtypes") + public void prepare(Map stormConf, TopologyContext context, OutputCollector newCollector) { + collector = newCollector; + } + + @Override + public void execute(TupleWindow inputWindow) { + /* + * The inputWindow gives a view of + * (a) all the events in the window + * (b) events that expired since last activation of the window + * (c) events that newly arrived since last activation of the window + */ + List tuplesInWindow = inputWindow.get(); + List newTuples = inputWindow.getNew(); + List expiredTuples = inputWindow.getExpired(); + + LOG.log(Level.FINE, "Events in current window: " + tuplesInWindow.size()); + /* + * Instead of iterating over all the tuples in the window to compute + * the sum, the values for the new events are added and old events are + * subtracted. Similar optimizations might be possible in other + * windowing computations. + */ + for (Tuple tuple : newTuples) { + sum += (int) tuple.getValue(0); + } + for (Tuple tuple : expiredTuples) { + sum -= (int) tuple.getValue(0); + } + collector.emit(new Values(sum)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sum")); + } +} diff --git a/heron/examples/src/java/com/twitter/heron/examples/SlidingWindowTopology.java b/heron/examples/src/java/com/twitter/heron/examples/SlidingWindowTopology.java new file mode 100644 index 00000000000..40440b4f459 --- /dev/null +++ b/heron/examples/src/java/com/twitter/heron/examples/SlidingWindowTopology.java @@ -0,0 +1,106 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.examples; + +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.apache.storm.windowing.TupleWindow; + +import static org.apache.storm.topology.base.BaseWindowedBolt.Count; + +/** + * A sample topology that demonstrates the usage of {@link org.apache.storm.topology.IWindowedBolt} + * to calculate sliding window sum. + */ +// SUPPRESS CHECKSTYLE HideUtilityClassConstructor +public class SlidingWindowTopology { + + private static final Logger LOG = Logger.getLogger(SlidingWindowTopology.class.getName()); + + /** + * Computes tumbling window average + */ + @SuppressWarnings("serial") + private static class TumblingWindowAvgBolt extends BaseWindowedBolt { + private OutputCollector collector; + + @Override + @SuppressWarnings("rawtypes") + public void prepare(Map stormConf, TopologyContext context, OutputCollector newCollector) { + collector = newCollector; + } + + @Override + public void execute(TupleWindow inputWindow) { + int sum = 0; + List tuplesInWindow = inputWindow.get(); + LOG.log(Level.FINE, "Events in current window: " + tuplesInWindow.size()); + if (tuplesInWindow.size() > 0) { + /* + * Since this is a tumbling window calculation, + * we use all the tuples in the window to compute the avg. + */ + for (Tuple tuple : tuplesInWindow) { + sum += (int) tuple.getValue(0); + } + collector.emit(new Values(sum / tuplesInWindow.size())); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("avg")); + } + } + + + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("integer", new RandomIntegerSpout(), 1); + builder.setBolt("slidingsum", + new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1) + .shuffleGrouping("integer"); + builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1) + .shuffleGrouping("slidingsum"); + builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg"); + Config conf = new Config(); + conf.setDebug(true); + if (args != null && args.length > 0) { + conf.setNumWorkers(1); + StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, builder.createTopology()); + Utils.sleep(40000); + cluster.killTopology("test"); + cluster.shutdown(); + } + } +} diff --git a/heron/storm/src/java/BUILD b/heron/storm/src/java/BUILD index 93f6526a67c..53548075101 100644 --- a/heron/storm/src/java/BUILD +++ b/heron/storm/src/java/BUILD @@ -2,6 +2,7 @@ package(default_visibility = ["//visibility:public"]) storm_deps_files = [ "//heron/api/src/java:api-java", + "//heron/proto:proto_topology_java", "//heron/common/src/java:basics-java", "//heron/simulator/src/java:simulator-java", "@com_googlecode_json_simple_json_simple//jar", diff --git a/heron/storm/src/java/org/apache/storm/Config.java b/heron/storm/src/java/org/apache/storm/Config.java index 4cfa52ff83d..73ff8153452 100644 --- a/heron/storm/src/java/org/apache/storm/Config.java +++ b/heron/storm/src/java/org/apache/storm/Config.java @@ -125,6 +125,56 @@ public class Config extends com.twitter.heron.api.Config { * Not yet implemented in Heron. */ public static final String TOPOLOGY_WORKER_CHILDOPTS = "topology.worker.childopts"; + + /** + * Bolt-specific configuration for windowed bolts to specify the window length as a count of number of tuples + * in the window. + */ + public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT = + "topology.bolts.window.length.count"; + + /** + * Bolt-specific configuration for windowed bolts to specify the window length in time duration. + */ + public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS = + "topology.bolts.window.length.duration.ms"; + + /** + * Bolt-specific configuration for windowed bolts to specify the sliding interval as a count of number of tuples. + */ + public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT = + "topology.bolts.window.sliding.interval.count"; + + /** + * Bolt-specific configuration for windowed bolts to specify the sliding interval in time duration. + */ + public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS = + "topology.bolts.window.sliding.interval.duration.ms"; + + /** + * Bolt-specific configuration for windowed bolts to specify the name of the field in the tuple that holds + * the timestamp (e.g. the ts when the tuple was actually generated). If this config is specified and the + * field is not present in the incoming tuple, a java.lang.IllegalArgumentException will be thrown. + */ + public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME = + "topology.bolts.tuple.timestamp.field.name"; + + /** + * Bolt-specific configuration for windowed bolts to specify the maximum time lag of the tuple timestamp + * in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount. + * This config will be effective only if the TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME is also specified. + */ + public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS = + "topology.bolts.tuple.timestamp.max.lag.ms"; + + /** + * Bolt-specific configuration for windowed bolts to specify the time interval for generating + * watermark events. Watermark event tracks the progress of time when tuple timestamp is used. + * This config is effective only if TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME is also specified. + */ + public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS = + "topology.bolts.watermark.event.interval.ms"; + /** * This config is available for TransactionalSpouts, and contains the id ( a String) for * the transactional topology. This id is used to store the state of the transactional diff --git a/heron/storm/src/java/org/apache/storm/task/TopologyContext.java b/heron/storm/src/java/org/apache/storm/task/TopologyContext.java index 8a990362445..04cde10170c 100644 --- a/heron/storm/src/java/org/apache/storm/task/TopologyContext.java +++ b/heron/storm/src/java/org/apache/storm/task/TopologyContext.java @@ -22,10 +22,12 @@ // import org.apache.storm.generated.Grouping; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.StormTopology; import org.apache.storm.hooks.ITaskHook; import org.apache.storm.hooks.ITaskHookDelegate; @@ -37,6 +39,8 @@ import org.apache.storm.metric.api.ReducedMetric; import org.apache.storm.tuple.Fields; +import com.twitter.heron.api.generated.TopologyAPI; + // import org.apache.storm.state.ISubscribedState; // import org.apache.storm.state.ISubscribedState; @@ -186,6 +190,14 @@ public Map getThisSources() { return getSources(getThisComponentId()); } */ + public Set getThisSourceIds() { + Set streamIds = delegate.getThisSources().keySet(); + Set newStreamIds = new HashSet<>(); + for (TopologyAPI.StreamId streamId: streamIds) { + newStreamIds.add(new GlobalStreamId(streamId.getComponentName(), streamId.getId())); + } + return newStreamIds; + } /* * Gets information about who is consuming the outputs of this component, and how. diff --git a/heron/storm/src/java/org/apache/storm/topology/IWindowedBolt.java b/heron/storm/src/java/org/apache/storm/topology/IWindowedBolt.java new file mode 100644 index 00000000000..8d3c735677d --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/topology/IWindowedBolt.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.Map; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.windowing.TupleWindow; + +/** + * A bolt abstraction for supporting time and count based sliding < tumbling windows. + */ +public interface IWindowedBolt extends IComponent { + /** + * This is similar to the {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except + * that while emitting, the tuples are automatically anchored to the tuples in the inputWindow. + */ + @SuppressWarnings("rawtypes") + void prepare(Map stormConf, TopologyContext context, OutputCollector collector); + + /** + * Process the tuple window and optionally emit new tuples based on the tuples in the input window. + */ + void execute(TupleWindow inputWindow); + + void cleanup(); +} diff --git a/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java b/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java index dd9ec5b05d9..faaec9d78da 100644 --- a/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java +++ b/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java @@ -22,6 +22,7 @@ // import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.generated.StormTopology; +import org.apache.storm.windowing.TupleWindow; import com.twitter.heron.api.HeronTopology; @@ -63,4 +64,34 @@ public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelismHin delegate.setSpout(id, spoutImpl, parallelismHint); return new SpoutDeclarerImpl(declarer); } + + /** + * Define a new bolt in this topology. This defines a windowed bolt, intended + * for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method + * is triggered for each window interval with the list of current events in the window. + * + * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. + * @param bolt the windowed bolt + * @return use the returned object to declare the inputs to this component + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException { + return setBolt(id, bolt, null); + } + + /** + * Define a new bolt in this topology. This defines a windowed bolt, intended + * for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method + * is triggered for each window interval with the list of current events in the window. + * + * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. + * @param bolt the windowed bolt + * @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster. + * @return use the returned object to declare the inputs to this component + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public BoltDeclarer setBolt( + String id, IWindowedBolt bolt, Number parallelismHint) throws IllegalArgumentException { + return setBolt(id, new WindowedBoltExecutor(bolt), parallelismHint); + } } diff --git a/heron/storm/src/java/org/apache/storm/topology/WindowedBoltExecutor.java b/heron/storm/src/java/org/apache/storm/topology/WindowedBoltExecutor.java new file mode 100644 index 00000000000..a1f7f18c39e --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/topology/WindowedBoltExecutor.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.storm.Config; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.CountEvictionPolicy; +import org.apache.storm.windowing.CountTriggerPolicy; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TimeEvictionPolicy; +import org.apache.storm.windowing.TimeTriggerPolicy; +import org.apache.storm.windowing.TriggerPolicy; +import org.apache.storm.windowing.TupleWindowImpl; +import org.apache.storm.windowing.WaterMarkEventGenerator; +import org.apache.storm.windowing.WatermarkCountEvictionPolicy; +import org.apache.storm.windowing.WatermarkCountTriggerPolicy; +import org.apache.storm.windowing.WatermarkTimeEvictionPolicy; +import org.apache.storm.windowing.WatermarkTimeTriggerPolicy; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.apache.storm.windowing.WindowManager; + +import static org.apache.storm.topology.base.BaseWindowedBolt.Count; +import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * An {@link IWindowedBolt} wrapper that does the windowing of tuples. + */ +public class WindowedBoltExecutor implements IRichBolt { + private static final long serialVersionUID = 8110332014773492905L; + private static final Logger LOG = Logger.getLogger(WindowedBoltExecutor.class.getName()); + private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s + private static final int DEFAULT_MAX_LAG_MS = 0; // no lag + private final IWindowedBolt bolt; + private transient WindowedOutputCollector windowedOutputCollector; + private transient WindowLifecycleListener listener; + private transient WindowManager windowManager; + private transient int maxLagMs; + private transient String tupleTsFieldName; + private transient TriggerPolicy triggerPolicy; + private transient EvictionPolicy evictionPolicy; + // package level for unit tests + // SUPPRESS CHECKSTYLE VisibilityModifier + transient WaterMarkEventGenerator waterMarkEventGenerator; + + public WindowedBoltExecutor(IWindowedBolt bolt) { + this.bolt = bolt; + } + + @SuppressWarnings("rawtypes") + private int getTopologyTimeoutMillis(Map stormConf) { + if (stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS) != null) { + boolean timeOutsEnabled = + Boolean.parseBoolean(stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).toString()); + if (!timeOutsEnabled) { + return Integer.MAX_VALUE; + } + } + int timeout = 0; + if (stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS) != null) { + timeout = Integer.parseInt(stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString()); + } + return timeout * 1000; + } + + @SuppressWarnings("rawtypes") + private int getMaxSpoutPending(Map stormConf) { + int maxPending = Integer.MAX_VALUE; + if (stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING) != null) { + maxPending = Integer.parseInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString()); + } + return maxPending; + } + + private void ensureDurationLessThanTimeout(int duration, int timeout) { + if (duration > timeout) { + throw new IllegalArgumentException("Window duration (length + sliding interval) value " + + duration + " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS + " value " + + timeout); + } + } + + private void ensureCountLessThanMaxPending(int count, int maxPending) { + if (count > maxPending) { + throw new IllegalArgumentException("Window count (length + sliding interval) value " + count + + " is more than " + Config.TOPOLOGY_MAX_SPOUT_PENDING + " value " + maxPending); + } + } + + @SuppressWarnings("rawtypes") + private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration, + Count slidingIntervalCount, Duration slidingIntervalDuration) { + + int topologyTimeout = getTopologyTimeoutMillis(stormConf); + int maxSpoutPending = getMaxSpoutPending(stormConf); + if (windowLengthCount == null && windowLengthDuration == null) { + throw new IllegalArgumentException("Window length is not specified"); + } + + if (windowLengthDuration != null && slidingIntervalDuration != null) { + ensureDurationLessThanTimeout(windowLengthDuration.value + + slidingIntervalDuration.value, topologyTimeout); + } else if (windowLengthDuration != null) { + ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout); + } else if (slidingIntervalDuration != null) { + ensureDurationLessThanTimeout(slidingIntervalDuration.value, topologyTimeout); + } + + if (windowLengthCount != null && slidingIntervalCount != null) { + ensureCountLessThanMaxPending( + windowLengthCount.value + slidingIntervalCount.value, maxSpoutPending); + } else if (windowLengthCount != null) { + ensureCountLessThanMaxPending(windowLengthCount.value, maxSpoutPending); + } else if (slidingIntervalCount != null) { + ensureCountLessThanMaxPending(slidingIntervalCount.value, maxSpoutPending); + } + } + + @SuppressWarnings("rawtypes") + private WindowManager initWindowManager( + WindowLifecycleListener lifecycleListener, Map stormConf, TopologyContext context) { + WindowManager manager = new WindowManager<>(lifecycleListener); + Duration windowLengthDuration = null; + Count windowLengthCount = null; + Duration slidingIntervalDuration = null; + Count slidingIntervalCount = null; + // window length + if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) { + windowLengthCount = new Count(Integer.parseInt(stormConf.get( + Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT).toString())); + } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) { + windowLengthDuration = new Duration( + Integer.parseInt(stormConf.get( + Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS).toString()), TimeUnit.MILLISECONDS); + } + // sliding interval + if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) { + slidingIntervalCount = new Count(Integer.parseInt(stormConf.get( + Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT).toString())); + } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) { + slidingIntervalDuration = new Duration(Integer.parseInt(stormConf.get( + Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS).toString()), TimeUnit.MILLISECONDS); + } else { + // default is a sliding window of count 1 + slidingIntervalCount = new Count(1); + } + // tuple ts + if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME)) { + tupleTsFieldName = (String) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME); + // max lag + if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) { + maxLagMs = Integer.parseInt( + stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS).toString()); + } else { + maxLagMs = DEFAULT_MAX_LAG_MS; + } + // watermark interval + int watermarkInterval; + if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) { + watermarkInterval = Integer.parseInt( + stormConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS).toString()); + } else { + watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS; + } + waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval, + maxLagMs, getComponentStreams(context)); + } + // validate + validate(stormConf, windowLengthCount, windowLengthDuration, + slidingIntervalCount, slidingIntervalDuration); + evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration, + manager); + triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, manager); + manager.setEvictionPolicy(evictionPolicy); + manager.setTriggerPolicy(triggerPolicy); + return manager; + } + + private Set getComponentStreams(TopologyContext context) { + Set streams = new HashSet<>(); + for (GlobalStreamId streamId : context.getThisSourceIds()) { + streams.add(streamId); + } + return streams; + } + + /** + * Start the trigger policy and waterMarkEventGenerator if set + */ + protected void start() { + if (waterMarkEventGenerator != null) { + LOG.log(Level.FINE, "Starting waterMarkEventGenerator"); + waterMarkEventGenerator.start(); + } + LOG.log(Level.FINE, "Starting trigger policy"); + triggerPolicy.start(); + } + + private boolean isTupleTs() { + return tupleTsFieldName != null; + } + + private TriggerPolicy getTriggerPolicy( + Count slidingIntervalCount, Duration slidingIntervalDuration, WindowManager manager) { + if (slidingIntervalCount != null) { + if (isTupleTs()) { + return new WatermarkCountTriggerPolicy<>( + slidingIntervalCount.value, manager, evictionPolicy, manager); + } else { + return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy); + } + } else { + if (isTupleTs()) { + return new WatermarkTimeTriggerPolicy<>( + slidingIntervalDuration.value, manager, evictionPolicy, manager); + } else { + return new TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy); + } + } + } + + private EvictionPolicy getEvictionPolicy( + Count windowLengthCount, Duration windowLengthDuration, WindowManager manager) { + if (windowLengthCount != null) { + if (isTupleTs()) { + return new WatermarkCountEvictionPolicy<>(windowLengthCount.value); + } else { + return new CountEvictionPolicy<>(windowLengthCount.value); + } + } else { + if (isTupleTs()) { + return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs); + } else { + return new TimeEvictionPolicy<>(windowLengthDuration.value); + } + } + } + + @Override + @SuppressWarnings("rawtypes") + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.windowedOutputCollector = new WindowedOutputCollector(collector); + bolt.prepare(stormConf, context, windowedOutputCollector); + this.listener = newWindowLifecycleListener(); + this.windowManager = initWindowManager(listener, stormConf, context); + start(); + LOG.log(Level.FINE, "Initialized window manager {} ", this.windowManager); + } + + @Override + public void execute(Tuple input) { + if (isTupleTs()) { + long ts = input.getLongByField(tupleTsFieldName); + GlobalStreamId stream = + new GlobalStreamId(input.getSourceComponent(), input.getSourceStreamId()); + if (waterMarkEventGenerator.track(stream, ts)) { + windowManager.add(input, ts); + } else { + windowedOutputCollector.ack(input); + LOG.log(Level.INFO, String.format( + "Received a late tuple %s with ts %d. This will not be processed.", input, ts)); + } + } else { + windowManager.add(input); + } + } + + @Override + public void cleanup() { + windowManager.shutdown(); + bolt.cleanup(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + bolt.declareOutputFields(declarer); + } + + @Override + public Map getComponentConfiguration() { + return bolt.getComponentConfiguration(); + } + + protected WindowLifecycleListener newWindowLifecycleListener() { + return new WindowLifecycleListener() { + @Override + public void onExpiry(List tuples) { + for (Tuple tuple : tuples) { + windowedOutputCollector.ack(tuple); + } + } + + @Override + public void onActivation( + List tuples, List newTuples, List expiredTuples) { + windowedOutputCollector.setContext(tuples); + bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples)); + } + }; + } + + /** + * Creates an {@link OutputCollector} wrapper that automatically + * anchors the tuples to inputTuples while emitting. + */ + private static class WindowedOutputCollector extends OutputCollector { + private List inputTuples; + + WindowedOutputCollector(IOutputCollector delegate) { + super(delegate); + } + + void setContext(List newInputTuples) { + inputTuples = newInputTuples; + } + + @Override + public List emit(String streamId, List tuple) { + return emit(streamId, inputTuples, tuple); + } + + @Override + public void emitDirect(int taskId, String streamId, List tuple) { + emitDirect(taskId, streamId, inputTuples, tuple); + } + } + +} diff --git a/heron/storm/src/java/org/apache/storm/topology/base/BaseWindowedBolt.java b/heron/storm/src/java/org/apache/storm/topology/base/BaseWindowedBolt.java new file mode 100644 index 00000000000..9930284395d --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/topology/base/BaseWindowedBolt.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology.base; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IWindowedBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; + +public abstract class BaseWindowedBolt implements IWindowedBolt { + + private static final long serialVersionUID = 1603573203346490793L; + + protected final transient Map windowConfiguration; + + /** + * Holds a count value for count based windows and sliding intervals. + */ + public static class Count { + public final int value; + + public Count(int value) { + this.value = value; + } + + @Override + public String toString() { + return "Count{" + "value=" + value + '}'; + } + } + + /** + * Holds a Time duration for time based windows and sliding intervals. + */ + public static class Duration { + public final int value; + + public Duration(int value, TimeUnit timeUnit) { + this.value = (int) timeUnit.toMillis(value); + } + + @Override + public String toString() { + return "Duration{" + "value=" + value + '}'; + } + } + + protected BaseWindowedBolt() { + windowConfiguration = new HashMap<>(); + } + + private BaseWindowedBolt withWindowLength(Count count) { + if (count.value <= 0) { + throw new IllegalArgumentException("Window length must be positive [" + count + "]"); + } + windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value); + return this; + } + + private BaseWindowedBolt withWindowLength(Duration duration) { + if (duration.value <= 0) { + throw new IllegalArgumentException("Window length must be positive [" + duration + "]"); + } + + windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value); + return this; + } + + private BaseWindowedBolt withSlidingInterval(Count count) { + if (count.value <= 0) { + throw new IllegalArgumentException("Sliding interval must be positive [" + count + "]"); + } + windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value); + return this; + } + + private BaseWindowedBolt withSlidingInterval(Duration duration) { + if (duration.value <= 0) { + throw new IllegalArgumentException("Sliding interval must be positive [" + duration + "]"); + } + windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value); + return this; + } + + /** + * Tuple count based sliding window configuration. + * + * @param windowLength the number of tuples in the window + * @param slidingInterval the number of tuples after which the window slides + */ + public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) { + return withWindowLength(windowLength).withSlidingInterval(slidingInterval); + } + + /** + * Tuple count and time duration based sliding window configuration. + * + * @param windowLength the number of tuples in the window + * @param slidingInterval the time duration after which the window slides + */ + public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) { + return withWindowLength(windowLength).withSlidingInterval(slidingInterval); + } + + /** + * Time duration and count based sliding window configuration. + * + * @param windowLength the time duration of the window + * @param slidingInterval the number of tuples after which the window slides + */ + public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) { + return withWindowLength(windowLength).withSlidingInterval(slidingInterval); + } + + /** + * Time duration based sliding window configuration. + * + * @param windowLength the time duration of the window + * @param slidingInterval the time duration after which the window slides + */ + public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) { + return withWindowLength(windowLength).withSlidingInterval(slidingInterval); + } + + /** + * A tuple count based window that slides with every incoming tuple. + * + * @param windowLength the number of tuples in the window + */ + public BaseWindowedBolt withWindow(Count windowLength) { + return withWindowLength(windowLength).withSlidingInterval(new Count(1)); + } + + /** + * A time duration based window that slides with every incoming tuple. + * + * @param windowLength the time duration of the window + */ + public BaseWindowedBolt withWindow(Duration windowLength) { + return withWindowLength(windowLength).withSlidingInterval(new Count(1)); + } + + /** + * A count based tumbling window. + * + * @param count the number of tuples after which the window tumbles + */ + public BaseWindowedBolt withTumblingWindow(Count count) { + return withWindowLength(count).withSlidingInterval(count); + } + + /** + * A time duration based tumbling window. + * + * @param duration the time duration after which the window tumbles + */ + public BaseWindowedBolt withTumblingWindow(Duration duration) { + return withWindowLength(duration).withSlidingInterval(duration); + } + + /** + * Specify a field in the tuple that represents the timestamp as a long value. If this + * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown. + * + * @param fieldName the name of the field that contains the timestamp + */ + public BaseWindowedBolt withTimestampField(String fieldName) { + windowConfiguration.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, fieldName); + return this; + } + + /** + * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps + * cannot be out of order by more than this amount. + * + * @param duration the max lag duration + */ + public BaseWindowedBolt withLag(Duration duration) { + windowConfiguration.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, duration.value); + return this; + } + + /** + * Specify the watermark event generation interval. For tuple based timestamps, watermark events + * are used to track the progress of time + * + * @param interval the interval at which watermark events are generated + */ + public BaseWindowedBolt withWatermarkInterval(Duration interval) { + windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value); + return this; + } + + @Override + @SuppressWarnings("rawtypes") + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + // NOOP + } + + @Override + public void cleanup() { + // NOOP + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // NOOP + } + + @Override + public Map getComponentConfiguration() { + return windowConfiguration; + } +} diff --git a/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java b/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java index 9c99768d0ac..e18bbec9468 100644 --- a/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java +++ b/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java @@ -248,4 +248,9 @@ public String getSourceStreamId() { public void resetValues() { delegate.resetValues(); } + + @Override + public String toString() { + return delegate.toString(); + } } diff --git a/heron/storm/src/java/org/apache/storm/windowing/CountEvictionPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/CountEvictionPolicy.java new file mode 100644 index 00000000000..623712a445b --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/CountEvictionPolicy.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * An eviction policy that tracks event counts and can + * evict based on a threshold count. + * + * @param the type of event tracked by this policy. + */ +public class CountEvictionPolicy implements EvictionPolicy { + protected final int threshold; + protected final AtomicLong currentCount; + + public CountEvictionPolicy(int count) { + this.threshold = count; + this.currentCount = new AtomicLong(); + } + + @Override + public Action evict(Event event) { + /* + * atomically decrement the count if its greater than threshold and + * return if the event should be evicted + */ + while (true) { + long curVal = currentCount.get(); + if (curVal > threshold) { + if (currentCount.compareAndSet(curVal, curVal - 1)) { + return Action.EXPIRE; + } + } else { + break; + } + } + return Action.PROCESS; + } + + @Override + public void track(Event event) { + if (!event.isWatermark()) { + currentCount.incrementAndGet(); + } + } + + @Override + public void setContext(EvictionContext context) { + // NOOP + } + + @Override + public String toString() { + return "CountEvictionPolicy{" + "threshold=" + threshold + ", currentCount=" + + currentCount + '}'; + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/CountTriggerPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/CountTriggerPolicy.java new file mode 100644 index 00000000000..2e7232756bb --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/CountTriggerPolicy.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A trigger that tracks event counts and calls back {@link TriggerHandler#onTrigger()} + * when the count threshold is hit. + * + * @param the type of event tracked by this policy. + */ +public class CountTriggerPolicy implements TriggerPolicy { + private final int count; + private final AtomicInteger currentCount; + private final TriggerHandler handler; + private final EvictionPolicy evictionPolicy; + private boolean started; + + public CountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy evictionPolicy) { + this.count = count; + this.currentCount = new AtomicInteger(); + this.handler = handler; + this.evictionPolicy = evictionPolicy; + this.started = false; + } + + @Override + public void track(Event event) { + if (started && !event.isWatermark()) { + if (currentCount.incrementAndGet() >= count) { + evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis())); + handler.onTrigger(); + } + } + } + + @Override + public void reset() { + currentCount.set(0); + } + + @Override + public void start() { + started = true; + } + + @Override + public void shutdown() { + // NOOP + } + + @Override + public String toString() { + return "CountTriggerPolicy{" + "count=" + count + ", currentCount=" + currentCount + + ", started=" + started + '}'; + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/DefaultEvictionContext.java b/heron/storm/src/java/org/apache/storm/windowing/DefaultEvictionContext.java new file mode 100644 index 00000000000..7639b018862 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/DefaultEvictionContext.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +public class DefaultEvictionContext implements EvictionContext { + private final Long referenceTime; + private final Long currentCount; + private final Long slidingCount; + + public DefaultEvictionContext(Long referenceTime) { + this(referenceTime, null); + } + + public DefaultEvictionContext(Long referenceTime, Long currentCount) { + this(referenceTime, currentCount, null); + } + + public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount) { + this.referenceTime = referenceTime; + this.currentCount = currentCount; + this.slidingCount = slidingCount; + } + + @Override + public Long getReferenceTime() { + return referenceTime; + } + + @Override + public Long getCurrentCount() { + return currentCount; + } + + @Override + public Long getSlidingCount() { + return slidingCount; + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/Event.java b/heron/storm/src/java/org/apache/storm/windowing/Event.java new file mode 100644 index 00000000000..95d769d33a3 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/Event.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +/** + * An event is a wrapper object that gets stored in the window. + * + * @param the type of the object thats wrapped. E.g Tuple + */ +interface Event { + /** + * The event timestamp in millis. This could be the time + * when the source generated the tuple or the time + * when the tuple was received by a bolt. + * + * @return the event timestamp in milliseconds. + */ + long getTimestamp(); + + /** + * Returns the wrapped object, E.g. a tuple + * + * @return the wrapped object. + */ + T get(); + + /** + * If this is a watermark event or not. Watermark events are used + * for tracking time while processing event based ts. + * + * @return true if this is a watermark event + */ + boolean isWatermark(); +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/EventImpl.java b/heron/storm/src/java/org/apache/storm/windowing/EventImpl.java new file mode 100644 index 00000000000..b39271487c5 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/EventImpl.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +public class EventImpl implements Event { + private final T event; + private long ts; + + EventImpl(T event, long ts) { + this.event = event; + this.ts = ts; + } + + @Override + public long getTimestamp() { + return ts; + } + + @Override + public T get() { + return event; + } + + @Override + public boolean isWatermark() { + return false; + } + + @Override + public String toString() { + return "EventImpl{" + "event=" + event + ", ts=" + ts + '}'; + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/EvictionContext.java b/heron/storm/src/java/org/apache/storm/windowing/EvictionContext.java new file mode 100644 index 00000000000..a402a2966c6 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/EvictionContext.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +/** + * Context information that can be used by the eviction policy + */ +public interface EvictionContext { + /** + * Returns the reference time that the eviction policy could use to + * evict the events. In the case of event time processing, this would be + * the watermark time. + * + * @return the reference time in millis + */ + Long getReferenceTime(); + + /** + * Returns the sliding count for count based windows + * + * @return the sliding count + */ + Long getSlidingCount(); + + /** + * Returns the current count of events in the queue up to the reference tim + * based on which count based evictions can be performed. + * + * @return the current count + */ + Long getCurrentCount(); +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/EvictionPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/EvictionPolicy.java new file mode 100644 index 00000000000..959c8ec6974 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/EvictionPolicy.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +/** + * Eviction policy tracks events and decides whether + * an event should be evicted from the window or not. + * + * @param the type of event that is tracked. + */ +public interface EvictionPolicy { + /** + * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked. + */ + enum Action { + /** + * expire the event and remove it from the queue + */ + EXPIRE, + /** + * process the event in the current window of events + */ + PROCESS, + /** + * don't include in the current window but keep the event + * in the queue for evaluating as a part of future windows + */ + KEEP, + /** + * stop processing the queue, there cannot be anymore events + * satisfying the eviction policy + */ + STOP + } + + /** + * Decides if an event should be expired from the window, processed in the current + * window or kept for later processing. + * + * @param event the input event + * @return the {@link EvictionPolicy.Action} to be taken based on the input event + */ + Action evict(Event event); + + /** + * Tracks the event to later decide whether + * {@link EvictionPolicy#evict(Event)} should evict it or not. + * + * @param event the input event to be tracked + */ + void track(Event event); + + /** + * Sets a context in the eviction policy that can be used while evicting the events. + * E.g. For TimeEvictionPolicy, this could be used to set the reference timestamp. + * + * @param context + */ + void setContext(EvictionContext context); + +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/TimeEvictionPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/TimeEvictionPolicy.java new file mode 100644 index 00000000000..48b86462a9a --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/TimeEvictionPolicy.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +/** + * Eviction policy that evicts events based on time duration. + */ +public class TimeEvictionPolicy implements EvictionPolicy { + private final int windowLength; + /** + * The reference time in millis for window calculations and + * expiring events. If not set it will default to System.currentTimeMillis() + */ + protected Long referenceTime; + + /** + * Constructs a TimeEvictionPolicy that evicts events older + * than the given window length in millis + * + * @param windowLength the duration in milliseconds + */ + public TimeEvictionPolicy(int windowLength) { + this.windowLength = windowLength; + } + + /** + * {@inheritDoc} + */ + @Override + public Action evict(Event event) { + long now = referenceTime == null ? System.currentTimeMillis() : referenceTime; + long diff = now - event.getTimestamp(); + if (diff >= windowLength) { + return Action.EXPIRE; + } + return Action.PROCESS; + } + + @Override + public void track(Event event) { + // NOOP + } + + @Override + public void setContext(EvictionContext context) { + referenceTime = context.getReferenceTime(); + } + + @Override + public String toString() { + return "TimeEvictionPolicy{" + "windowLength=" + windowLength + ", referenceTime=" + + referenceTime + '}'; + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/TimeTriggerPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/TimeTriggerPolicy.java new file mode 100644 index 00000000000..a614ed5aec7 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/TimeTriggerPolicy.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.storm.topology.FailedException; + +/** + * Invokes {@link TriggerHandler#onTrigger()} after the duration. + */ +public class TimeTriggerPolicy implements TriggerPolicy { + private static final Logger LOG = Logger.getLogger(TimeTriggerPolicy.class.getName()); + + private long duration; + private final TriggerHandler handler; + private final ScheduledExecutorService executor; + private final EvictionPolicy evictionPolicy; + private ScheduledFuture executorFuture; + + public TimeTriggerPolicy(long millis, TriggerHandler handler) { + this(millis, handler, null); + } + + public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy evictionPolicy) { + this.duration = millis; + this.handler = handler; + this.executor = Executors.newSingleThreadScheduledExecutor(); + this.evictionPolicy = evictionPolicy; + } + + @Override + public void track(Event event) { + checkFailures(); + } + + @Override + public void reset() { + checkFailures(); + } + + @Override + public void start() { + executorFuture = + executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS); + } + + @Override + public void shutdown() { + executor.shutdown(); + try { + if (!executor.awaitTermination(2, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + @Override + public String toString() { + return "TimeTriggerPolicy{" + "duration=" + duration + '}'; + } + + /* + * Check for uncaught exceptions during the execution + * of the trigger and fail fast. + * The uncaught exceptions will be wrapped in + * ExecutionException and thrown when future.get() is invoked. + */ + private void checkFailures() { + if (executorFuture != null && executorFuture.isDone()) { + try { + executorFuture.get(); + } catch (InterruptedException ex) { + LOG.log(Level.SEVERE, "Got exception ", ex); + throw new FailedException(ex); + } catch (ExecutionException ex) { + LOG.log(Level.SEVERE, "Got exception ", ex); + throw new FailedException(ex.getCause()); + } + } + } + + private Runnable newTriggerTask() { + return new Runnable() { + @Override + public void run() { + try { + /* + * set the current timestamp as the reference time for the eviction policy + * to evict the events + */ + if (evictionPolicy != null) { + evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis())); + } + handler.onTrigger(); + // SUPPRESS CHECKSTYLE IllegalCatch + } catch (Throwable th) { + LOG.log(Level.SEVERE, "handler.onTrigger failed ", th); + /* + * propagate it so that task gets canceled and the exception + * can be retrieved from executorFuture.get() + */ + throw th; + } + } + }; + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/TriggerHandler.java b/heron/storm/src/java/org/apache/storm/windowing/TriggerHandler.java new file mode 100644 index 00000000000..42008e90da0 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/TriggerHandler.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; +/** + * The callback fired by {@link TriggerPolicy} when the trigger + * condition is satisfied. + */ +public interface TriggerHandler { + /** + * The code to execute when the {@link TriggerPolicy} condition is satisfied. + * + * @return true if the window was evaluated with at least one event in the window, false otherwise + */ + boolean onTrigger(); +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/TriggerPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/TriggerPolicy.java new file mode 100644 index 00000000000..1b27d0d426a --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/TriggerPolicy.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +/** + * Triggers the window calculations based on the policy. + * + * @param the type of the event that is tracked + */ +public interface TriggerPolicy { + /** + * Tracks the event and could use this to invoke the trigger. + * + * @param event the input event + */ + void track(Event event); + + /** + * resets the trigger policy + */ + void reset(); + + /** + * Starts the trigger policy. This can be used + * during recovery to start the triggers after + * recovery is complete. + */ + void start(); + + /** + * Any clean up could be handled here. + */ + void shutdown(); +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/TupleWindow.java b/heron/storm/src/java/org/apache/storm/windowing/TupleWindow.java new file mode 100644 index 00000000000..9255e7e0198 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/TupleWindow.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import org.apache.storm.tuple.Tuple; + +/** + * A {@link Window} that contains {@link Tuple} objects. + */ +public interface TupleWindow extends Window { +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/TupleWindowImpl.java b/heron/storm/src/java/org/apache/storm/windowing/TupleWindowImpl.java new file mode 100644 index 00000000000..3b24e587e27 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/TupleWindowImpl.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.List; + +import org.apache.storm.tuple.Tuple; + +/** + * Holds the expired, new and current tuples in a window. + */ +public class TupleWindowImpl implements TupleWindow { + private final List tuples; + private final List newTuples; + private final List expiredTuples; + + public TupleWindowImpl(List tuples, List newTuples, List expiredTuples) { + this.tuples = tuples; + this.newTuples = newTuples; + this.expiredTuples = expiredTuples; + } + + @Override + public List get() { + return tuples; + } + + @Override + public List getNew() { + return newTuples; + } + + @Override + public List getExpired() { + return expiredTuples; + } + + @Override + public String toString() { + return "TupleWindowImpl{" + "tuples=" + tuples + ", newTuples=" + newTuples + + ", expiredTuples=" + expiredTuples + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TupleWindowImpl that = (TupleWindowImpl) o; + + if (tuples != null ? !tuples.equals(that.tuples) : that.tuples != null) { + return false; + } + if (newTuples != null ? !newTuples.equals(that.newTuples) : that.newTuples != null) { + return false; + } + return expiredTuples != null ? expiredTuples.equals(that.expiredTuples) + : that.expiredTuples == null; + } + + @Override + public int hashCode() { + int result = tuples != null ? tuples.hashCode() : 0; + result = 31 * result + (newTuples != null ? newTuples.hashCode() : 0); + result = 31 * result + (expiredTuples != null ? expiredTuples.hashCode() : 0); + return result; + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/WaterMarkEvent.java b/heron/storm/src/java/org/apache/storm/windowing/WaterMarkEvent.java new file mode 100644 index 00000000000..d9656919f45 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/WaterMarkEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +/** + * Watermark event used for tracking progress of time when + * processing event based ts. + */ +public class WaterMarkEvent extends EventImpl { + public WaterMarkEvent(long ts) { + super(null, ts); + } + + @Override + public boolean isWatermark() { + return true; + } + + @Override + public String toString() { + return "WaterMarkEvent{} " + super.toString(); + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/WaterMarkEventGenerator.java b/heron/storm/src/java/org/apache/storm/windowing/WaterMarkEventGenerator.java new file mode 100644 index 00000000000..383dde2e4f5 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/WaterMarkEventGenerator.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.topology.FailedException; + +/** + * Tracks tuples across input streams and periodically emits watermark events. + * Watermark event timestamp is the minimum of the latest tuple timestamps + * across all the input streams (minus the lag). Once a watermark event is emitted + * any tuple coming with an earlier timestamp can be considered as late events. + */ +public class WaterMarkEventGenerator implements Runnable { + private static final Logger LOG = Logger.getLogger(WaterMarkEventGenerator.class.getName()); + private final WindowManager windowManager; + private final int eventTsLag; + private final Set inputStreams; + private final Map streamToTs; + private final ScheduledExecutorService executorService; + private final int interval; + private ScheduledFuture executorFuture; + private long lastWaterMarkTs = 0; + + public WaterMarkEventGenerator(WindowManager windowManager, int interval, + int eventTsLag, Set inputStreams) { + this.windowManager = windowManager; + streamToTs = new ConcurrentHashMap<>(); + executorService = Executors.newSingleThreadScheduledExecutor(); + this.interval = interval; + this.eventTsLag = eventTsLag; + this.inputStreams = new HashSet<>(); + for (GlobalStreamId streamId: inputStreams) { + this.inputStreams.add(streamId.get_componentId() + "-" + streamId.get_streamId()); + } + } + + /** + * Tracks the timestamp of the event in the stream, returns + * true if the event can be considered for processing or + * false if its a late event. + */ + public boolean track(GlobalStreamId stream, long ts) { + String streamId = stream.get_componentId() + "-" + stream.get_streamId(); + Long currentVal = streamToTs.get(streamId); + if (currentVal == null || ts > currentVal) { + streamToTs.put(streamId, ts); + } + checkFailures(); + return ts >= lastWaterMarkTs; + } + + @Override + public void run() { + try { + long waterMarkTs = computeWaterMarkTs(); + if (waterMarkTs > lastWaterMarkTs) { + this.windowManager.add(new WaterMarkEvent(waterMarkTs)); + lastWaterMarkTs = waterMarkTs; + } + // SUPPRESS CHECKSTYLE IllegalCatch + } catch (Throwable th) { + LOG.log(Level.SEVERE, "Failed while processing watermark event ", th); + throw th; + } + } + + /** + * Computes the min ts across all streams. + */ + private long computeWaterMarkTs() { + long ts = 0; + // only if some data has arrived on each input stream + if (streamToTs.size() >= inputStreams.size()) { + ts = Long.MAX_VALUE; + for (long value : streamToTs.values()) { + ts = Math.min(ts, value); + } + } + return ts - eventTsLag; + } + + private void checkFailures() { + if (executorFuture != null && executorFuture.isDone()) { + try { + executorFuture.get(); + } catch (InterruptedException ex) { + LOG.log(Level.SEVERE, "Got exception ", ex); + throw new FailedException(ex); + } catch (ExecutionException ex) { + LOG.log(Level.SEVERE, "Got exception ", ex); + throw new FailedException(ex.getCause()); + } + } + } + + public void start() { + this.executorFuture = + executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS); + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java new file mode 100644 index 00000000000..b4d6629c857 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +/** + * An eviction policy that tracks count based on watermark ts and + * evicts events upto the watermark based on a threshold count. + * + * @param the type of event tracked by this policy. + */ +public class WatermarkCountEvictionPolicy extends CountEvictionPolicy { + /* + * The reference time in millis for window calculations and + * expiring events. If not set it will default to System.currentTimeMillis() + */ + private long referenceTime; + private long processed = 0L; + + public WatermarkCountEvictionPolicy(int count) { + super(count); + } + + @Override + public Action evict(Event event) { + Action action; + if (event.getTimestamp() <= referenceTime && processed < currentCount.get()) { + action = super.evict(event); + if (action == Action.PROCESS) { + ++processed; + } + } else { + action = Action.KEEP; + } + return action; + } + + @Override + public void track(Event event) { + // NOOP + } + + @Override + public void setContext(EvictionContext context) { + referenceTime = context.getReferenceTime(); + if (context.getCurrentCount() != null) { + currentCount.set(context.getCurrentCount()); + } else { + currentCount.set(processed + context.getSlidingCount()); + } + processed = 0; + } + + @Override + public String toString() { + return "WatermarkCountEvictionPolicy{" + "referenceTime=" + + referenceTime + "} " + super.toString(); + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java new file mode 100644 index 00000000000..51f074675a1 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.List; + +/** + * A trigger policy that tracks event counts and sets the context for + * eviction policy to evict based on latest watermark time. + * + * @param the type of event tracked by this policy. + */ +public class WatermarkCountTriggerPolicy implements TriggerPolicy { + private final int count; + private final TriggerHandler handler; + private final EvictionPolicy evictionPolicy; + private final WindowManager windowManager; + private long lastProcessedTs = 0; + private boolean started; + + public WatermarkCountTriggerPolicy( + int count, TriggerHandler handler, EvictionPolicy evictionPolicy, + WindowManager windowManager) { + this.count = count; + this.handler = handler; + this.evictionPolicy = evictionPolicy; + this.windowManager = windowManager; + this.started = false; + } + + @Override + public void track(Event event) { + if (started && event.isWatermark()) { + handleWaterMarkEvent(event); + } + } + + @Override + public void reset() { + // NOOP + } + + @Override + public void start() { + started = true; + } + + @Override + public void shutdown() { + // NOOP + } + + /** + * Triggers all the pending windows up to the waterMarkEvent timestamp + * based on the sliding interval count. + * + * @param waterMarkEvent the watermark event + */ + private void handleWaterMarkEvent(Event waterMarkEvent) { + long watermarkTs = waterMarkEvent.getTimestamp(); + List eventTs = + windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs, count); + for (long ts : eventTs) { + evictionPolicy.setContext(new DefaultEvictionContext(ts, null, Long.valueOf(count))); + handler.onTrigger(); + lastProcessedTs = ts; + } + } + + @Override + public String toString() { + return "WatermarkCountTriggerPolicy{" + "count=" + count + ", lastProcessedTs=" + + lastProcessedTs + ", started=" + started + '}'; + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java new file mode 100644 index 00000000000..68b8e9ae922 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +/** + * An eviction policy that evicts events based on time duration taking + * watermark time and event lag into account. + */ +public class WatermarkTimeEvictionPolicy extends TimeEvictionPolicy { + private final int lag; + + /** + * Constructs a WatermarkTimeEvictionPolicy that evicts events older + * than the given window length in millis. + * + * @param windowLength the window length in milliseconds + */ + public WatermarkTimeEvictionPolicy(int windowLength) { + this(windowLength, Integer.MAX_VALUE); + } + + /** + * Constructs a WatermarkTimeEvictionPolicy that evicts events older + * than the given window length in millis. The lag parameter + * can be used in the case of event based ts to break the queue + * scan early. + * + * @param windowLength the window length in milliseconds + * @param lag the max event lag in milliseconds + */ + public WatermarkTimeEvictionPolicy(int windowLength, int lag) { + super(windowLength); + referenceTime = 0L; + this.lag = lag; + } + + /** + * {@inheritDoc} + *

+ * Keeps events with future ts in the queue for processing in the next + * window. If the ts difference is more than the lag, stops scanning + * the queue for the current window. + */ + @Override + public Action evict(Event event) { + long diff = referenceTime - event.getTimestamp(); + if (diff < -lag) { + return Action.STOP; + } else if (diff < 0) { + return Action.KEEP; + } else { + return super.evict(event); + } + } + + @Override + public String toString() { + return "WatermarkTimeEvictionPolicy{" + "lag=" + lag + "} " + super.toString(); + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java b/heron/storm/src/java/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java new file mode 100644 index 00000000000..e220a8ca97c --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Handles watermark events and triggers {@link TriggerHandler#onTrigger()} for each window + * interval that has events to be processed up to the watermark ts. + */ +public class WatermarkTimeTriggerPolicy implements TriggerPolicy { + private static final Logger LOG = Logger.getLogger(WatermarkTimeTriggerPolicy.class.getName()); + private final long slidingIntervalMs; + private final TriggerHandler handler; + private final EvictionPolicy evictionPolicy; + private final WindowManager windowManager; + private long nextWindowEndTs = 0; + private boolean started; + + public WatermarkTimeTriggerPolicy( + long slidingIntervalMs, TriggerHandler handler, EvictionPolicy evictionPolicy, + WindowManager windowManager) { + this.slidingIntervalMs = slidingIntervalMs; + this.handler = handler; + this.evictionPolicy = evictionPolicy; + this.windowManager = windowManager; + this.started = false; + } + + @Override + public void track(Event event) { + if (started && event.isWatermark()) { + handleWaterMarkEvent(event); + } + } + + @Override + public void reset() { + // NOOP + } + + @Override + public void start() { + started = true; + } + + @Override + public void shutdown() { + // NOOP + } + + /** + * Invokes the trigger all pending windows up to the + * watermark timestamp. The end ts of the window is set + * in the eviction policy context so that the events falling + * within that window can be processed. + */ + private void handleWaterMarkEvent(Event event) { + long watermarkTs = event.getTimestamp(); + long windowEndTs = nextWindowEndTs; + LOG.log(Level.FINE, + String.format("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs)); + while (windowEndTs <= watermarkTs) { + long currentCount = windowManager.getEventCount(windowEndTs); + evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount)); + if (handler.onTrigger()) { + windowEndTs += slidingIntervalMs; + } else { + /* + * No events were found in the previous window interval. + * Scan through the events in the queue to find the next + * window intervals based on event ts. + */ + long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs); + LOG.log(Level.FINE, "Next aligned window end ts {}", ts); + if (ts == Long.MAX_VALUE) { + LOG.log(Level.FINE, String.format( + "No events to process between %d and watermark ts %d", windowEndTs, watermarkTs)); + break; + } + windowEndTs = ts; + } + } + nextWindowEndTs = windowEndTs; + } + + /** + * Computes the next window by scanning the events in the window and + * finds the next aligned window between the startTs and endTs. Return the end ts + * of the next aligned window, i.e. the ts when the window should fire. + * + * @param startTs the start timestamp (excluding) + * @param endTs the end timestamp (including) + * @return the aligned window end ts for the next window or Long.MAX_VALUE if there + * are no more events to be processed. + */ + private long getNextAlignedWindowTs(long startTs, long endTs) { + long nextTs = windowManager.getEarliestEventTs(startTs, endTs); + if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) { + return nextTs; + } + return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs)); + } + + @Override + public String toString() { + return "WatermarkTimeTriggerPolicy{" + "slidingIntervalMs=" + slidingIntervalMs + + ", nextWindowEndTs=" + nextWindowEndTs + ", started=" + started + '}'; + } +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/Window.java b/heron/storm/src/java/org/apache/storm/windowing/Window.java new file mode 100644 index 00000000000..2575b8016b3 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/Window.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.List; + +/** + * A view of events in a sliding window. + * + * @param the type of event that this window contains. E.g. {@link org.apache.storm.tuple.Tuple} + */ +public interface Window { + /** + * Gets the list of events in the window. + * + * @return the list of events in the window. + */ + List get(); + + /** + * Get the list of newly added events in the window since the last time the window was generated. + * + * @return the list of newly added events in the window. + */ + List getNew(); + + /** + * Get the list of events expired from the window since the last time the window was generated. + * + * @return the list of events expired from the window. + */ + List getExpired(); +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/WindowLifecycleListener.java b/heron/storm/src/java/org/apache/storm/windowing/WindowLifecycleListener.java new file mode 100644 index 00000000000..41fa6e36a91 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/WindowLifecycleListener.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.List; + +/** + * A callback for expiry, activation of events tracked by the {@link WindowManager} + * + * @param The type of Event in the window (e.g. Tuple). + */ +public interface WindowLifecycleListener { + /** + * Called on expiry of events from the window due to {@link EvictionPolicy} + * + * @param events the expired events + */ + void onExpiry(List events); + + /** + * Called on activation of the window due to the {@link TriggerPolicy} + * + * @param events the list of current events in the window. + * @param newEvents the newly added events since last activation. + * @param expired the expired events since last activation. + */ + void onActivation(List events, List newEvents, List expired); +} diff --git a/heron/storm/src/java/org/apache/storm/windowing/WindowManager.java b/heron/storm/src/java/org/apache/storm/windowing/WindowManager.java new file mode 100644 index 00000000000..7640aa42200 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/windowing/WindowManager.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.storm.windowing.EvictionPolicy.Action; +import static org.apache.storm.windowing.EvictionPolicy.Action.EXPIRE; +import static org.apache.storm.windowing.EvictionPolicy.Action.PROCESS; +import static org.apache.storm.windowing.EvictionPolicy.Action.STOP; + +/** + * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks + * on expiry of events or activation of the window due to {@link TriggerPolicy}. + * + * @param the type of event in the window. + */ +public class WindowManager implements TriggerHandler { + private static final Logger LOG = Logger.getLogger(WindowManager.class.getName()); + + /** + * Expire old events every EXPIRE_EVENTS_THRESHOLD to + * keep the window size in check. + */ + public static final int EXPIRE_EVENTS_THRESHOLD = 100; + + private final WindowLifecycleListener windowLifecycleListener; + private final ConcurrentLinkedQueue> queue; + private final List expiredEvents; + private final Set> prevWindowEvents; + private final AtomicInteger eventsSinceLastExpiry; + private final ReentrantLock lock; + private EvictionPolicy evictionPolicy; + private TriggerPolicy triggerPolicy; + + public WindowManager(WindowLifecycleListener lifecycleListener) { + windowLifecycleListener = lifecycleListener; + queue = new ConcurrentLinkedQueue<>(); + expiredEvents = new ArrayList<>(); + prevWindowEvents = new HashSet<>(); + eventsSinceLastExpiry = new AtomicInteger(); + lock = new ReentrantLock(true); + } + + public void setEvictionPolicy(EvictionPolicy evictionPolicy) { + this.evictionPolicy = evictionPolicy; + } + + public void setTriggerPolicy(TriggerPolicy triggerPolicy) { + this.triggerPolicy = triggerPolicy; + } + + /** + * Add an event into the window, with {@link System#currentTimeMillis()} as + * the tracking ts. + * + * @param event the event to add + */ + public void add(T event) { + add(event, System.currentTimeMillis()); + } + + /** + * Add an event into the window, with the given ts as the tracking ts. + * + * @param event the event to track + * @param ts the timestamp + */ + public void add(T event, long ts) { + add(new EventImpl(event, ts)); + } + + /** + * Tracks a window event + * + * @param windowEvent the window event to track + */ + public void add(Event windowEvent) { + // watermark events are not added to the queue. + if (!windowEvent.isWatermark()) { + queue.add(windowEvent); + } else { + LOG.log(Level.FINE, "Got watermark event with ts {}", windowEvent.getTimestamp()); + } + track(windowEvent); + compactWindow(); + } + + /** + * The callback invoked by the trigger policy. + */ + @Override + public boolean onTrigger() { + List> windowEvents = null; + List expired = null; + try { + lock.lock(); + /* + * scan the entire window to handle out of order events in + * the case of time based windows. + */ + windowEvents = scanEvents(true); + expired = new ArrayList<>(expiredEvents); + expiredEvents.clear(); + } finally { + lock.unlock(); + } + List events = new ArrayList<>(); + List newEvents = new ArrayList<>(); + for (Event event : windowEvents) { + events.add(event.get()); + if (!prevWindowEvents.contains(event)) { + newEvents.add(event.get()); + } + } + prevWindowEvents.clear(); + if (!events.isEmpty()) { + prevWindowEvents.addAll(windowEvents); + LOG.log(Level.FINE, + "invoking windowLifecycleListener onActivation, [{}] events in window.", events.size()); + windowLifecycleListener.onActivation(events, newEvents, expired); + } else { + LOG.log(Level.FINE, "No events in the window, skipping onActivation"); + } + triggerPolicy.reset(); + return !events.isEmpty(); + } + + public void shutdown() { + LOG.log(Level.FINE, "Shutting down WindowManager"); + if (triggerPolicy != null) { + triggerPolicy.shutdown(); + } + } + + /** + * expires events that fall out of the window every + * EXPIRE_EVENTS_THRESHOLD so that the window does not grow + * too big. + */ + private void compactWindow() { + if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) { + scanEvents(false); + } + } + + /** + * feed the event to the eviction and trigger policies + * for bookkeeping and optionally firing the trigger. + */ + private void track(Event windowEvent) { + evictionPolicy.track(windowEvent); + triggerPolicy.track(windowEvent); + } + + /** + * Scan events in the queue, using the expiration policy to check + * if the event should be evicted or not. + * + * @param fullScan if set, will scan the entire queue; if not set, will stop + * as soon as an event not satisfying the expiration policy is found + * @return the list of events to be processed as a part of the current window + */ + private List> scanEvents(boolean fullScan) { + LOG.log(Level.FINE, "Scan events, eviction policy {}", evictionPolicy); + List eventsToExpire = new ArrayList<>(); + List> eventsToProcess = new ArrayList<>(); + try { + lock.lock(); + Iterator> it = queue.iterator(); + while (it.hasNext()) { + Event windowEvent = it.next(); + Action action = evictionPolicy.evict(windowEvent); + if (action == EXPIRE) { + eventsToExpire.add(windowEvent.get()); + it.remove(); + } else if (!fullScan || action == STOP) { + break; + } else if (action == PROCESS) { + eventsToProcess.add(windowEvent); + } + } + expiredEvents.addAll(eventsToExpire); + } finally { + lock.unlock(); + } + eventsSinceLastExpiry.set(0); + LOG.log(Level.FINE, "[{}] events expired from window.", eventsToExpire.size()); + if (!eventsToExpire.isEmpty()) { + LOG.log(Level.FINE, "invoking windowLifecycleListener.onExpiry"); + windowLifecycleListener.onExpiry(eventsToExpire); + } + return eventsToProcess; + } + + /** + * Scans the event queue and returns the next earliest event ts + * between the startTs and endTs + * + * @param startTs the start ts (exclusive) + * @param endTs the end ts (inclusive) + * @return the earliest event ts between startTs and endTs + */ + public long getEarliestEventTs(long startTs, long endTs) { + long minTs = Long.MAX_VALUE; + for (Event event : queue) { + if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) { + minTs = Math.min(minTs, event.getTimestamp()); + } + } + return minTs; + } + + /** + * Scans the event queue and returns number of events having + * timestamp less than or equal to the reference time. + * + * @param referenceTime the reference timestamp in millis + * @return the count of events with timestamp less than or equal to referenceTime + */ + public int getEventCount(long referenceTime) { + int count = 0; + for (Event event : queue) { + if (event.getTimestamp() <= referenceTime) { + ++count; + } + } + return count; + } + + /** + * Scans the event queue and returns the list of event ts + * falling between startTs (exclusive) and endTs (inclusive) + * at each sliding interval counts. + * + * @param startTs the start timestamp (exclusive) + * @param endTs the end timestamp (inclusive) + * @param slidingCount the sliding interval count + * @return the list of event ts + */ + public List getSlidingCountTimestamps(long startTs, long endTs, int slidingCount) { + List timestamps = new ArrayList<>(); + if (endTs > startTs) { + int count = 0; + long ts = Long.MIN_VALUE; + for (Event event : queue) { + if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) { + ts = Math.max(ts, event.getTimestamp()); + if (++count % slidingCount == 0) { + timestamps.add(ts); + } + } + } + } + return timestamps; + } + + @Override + public String toString() { + return "WindowManager{" + "evictionPolicy=" + evictionPolicy + ", triggerPolicy=" + + triggerPolicy + '}'; + } +} diff --git a/heron/storm/tests/java/BUILD b/heron/storm/tests/java/BUILD new file mode 100644 index 00000000000..7c007e8190c --- /dev/null +++ b/heron/storm/tests/java/BUILD @@ -0,0 +1,26 @@ +load("/tools/rules/java_tests", "java_tests") +load("/tools/rules/heron_deps", "heron_java_proto_files") + +java_library( + name = "storm-tests", + srcs = glob(["**/*.java"]), + deps = heron_java_proto_files() + [ + "//heron/storm/src/java:storm-compatibility-java", + "//heron/common/src/java:common-java", + "//heron/api/src/java:api-java", + "//third_party/java:junit4", + "//third_party/java:mockito", + ], +) + +java_tests( + test_classes = [ + "org.apache.storm.windowing.WaterMarkEventGeneratorTest", + "org.apache.storm.windowing.WindowManagerTest", + "org.apache.storm.topology.WindowedBoltExecutorTest", + ], + runtime_deps = [ + ":storm-tests", + ], + size = "small", +) diff --git a/heron/storm/tests/java/org/apache/storm/topology/WindowedBoltExecutorTest.java b/heron/storm/tests/java/org/apache/storm/topology/WindowedBoltExecutorTest.java new file mode 100644 index 00000000000..c13128d4c3c --- /dev/null +++ b/heron/storm/tests/java/org/apache/storm/topology/WindowedBoltExecutorTest.java @@ -0,0 +1,126 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.storm.topology; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.storm.Config; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.twitter.heron.api.generated.TopologyAPI; + +import static org.junit.Assert.*; + +/** + * Unit tests for {@link WindowedBoltExecutor} + */ +public class WindowedBoltExecutorTest { + + private WindowedBoltExecutor executor; + private TestWindowedBolt testWindowedBolt; + + @SuppressWarnings("serial") + private static class TestWindowedBolt extends BaseWindowedBolt { + private List tupleWindows = new ArrayList<>(); + + @Override + public void execute(TupleWindow input) { + tupleWindows.add(input); + } + } + + private Tuple getTuple(String streamId, final Fields fields, Values values) { + com.twitter.heron.api.topology.TopologyContext context = + Mockito.mock(com.twitter.heron.common.utils.topology.TopologyContextImpl.class); + com.twitter.heron.api.tuple.Fields f = new com.twitter.heron.api.tuple.Fields(fields.toList()); + Mockito.when( + context.getComponentOutputFields(Mockito.anyString(), Mockito.anyString())).thenReturn(f); + TopologyAPI.StreamId stream = + TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName("c1").build(); + return new TupleImpl(new com.twitter.heron.common.utils.tuple.TupleImpl( + context, stream, 1, null, values)); + } + + private OutputCollector getOutputCollector() { + return Mockito.mock(OutputCollector.class); + } + + private TopologyContext getTopologyContext() { + TopologyContext context = Mockito.mock(TopologyContext.class); + Set sources = Collections.singleton(new GlobalStreamId("s1", "default")); + Mockito.when(context.getThisSourceIds()).thenReturn(sources); + return context; + } + + @Before + public void setUp() { + testWindowedBolt = new TestWindowedBolt(); + executor = new WindowedBoltExecutor(testWindowedBolt); + Map conf = new HashMap<>(); + conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000); + conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20); + conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10); + conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts"); + conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5); + // trigger manually to avoid timing issues + conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100000); + executor.prepare(conf, getTopologyContext(), getOutputCollector()); + } + + @Test(expected = IllegalArgumentException.class) + public void testExecuteWithoutTs() throws Exception { + executor.execute(getTuple("s1", new Fields("a"), new Values(1))); + } + + @Test + public void testExecuteWithTs() throws Exception { + long[] timstamps = {603, 605, 607, 618, 626, 636}; + for (long ts : timstamps) { + executor.execute(getTuple("s1", new Fields("ts"), new Values(ts))); + } + //Thread.sleep(120); + executor.waterMarkEventGenerator.run(); + assertEquals(3, testWindowedBolt.tupleWindows.size()); + TupleWindow first = testWindowedBolt.tupleWindows.get(0); + assertArrayEquals(new long[]{603, 605, 607}, + new long[]{(long) first.get().get(0).getValue(0), (long) first.get().get(1).getValue(0), + (long) first.get().get(2).getValue(0)}); + + TupleWindow second = testWindowedBolt.tupleWindows.get(1); + assertArrayEquals(new long[]{603, 605, 607, 618}, + new long[]{(long) second.get().get(0).getValue(0), (long) second.get().get(1).getValue(0), + (long) second.get().get(2).getValue(0), (long) second.get().get(3).getValue(0)}); + + TupleWindow third = testWindowedBolt.tupleWindows.get(2); + assertArrayEquals(new long[]{618, 626}, + new long[]{(long) third.get().get(0).getValue(0), (long) third.get().get(1).getValue(0)}); + } +} diff --git a/heron/storm/tests/java/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java b/heron/storm/tests/java/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java new file mode 100644 index 00000000000..183bda28e08 --- /dev/null +++ b/heron/storm/tests/java/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java @@ -0,0 +1,114 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.storm.windowing; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.storm.generated.GlobalStreamId; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Unit tests for {@link WaterMarkEventGenerator} + */ +public class WaterMarkEventGeneratorTest { + private WaterMarkEventGenerator waterMarkEventGenerator; + private WindowManager windowManager; + private List> eventList = new ArrayList<>(); + + private GlobalStreamId streamId(String component) { + return new GlobalStreamId(component, "default"); + } + + @Before + public void setUp() { + windowManager = new WindowManager(null) { + @Override + public void add(Event event) { + eventList.add(event); + } + }; + // set watermark interval to a high value and trigger manually to fix timing issues + waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 100000, 5, + Collections.singleton(streamId("s1"))); + waterMarkEventGenerator.start(); + } + + @Test + public void testTrackSingleStream() throws Exception { + waterMarkEventGenerator.track(streamId("s1"), 100); + waterMarkEventGenerator.track(streamId("s1"), 110); + waterMarkEventGenerator.run(); + assertTrue(eventList.get(0).isWatermark()); + assertEquals(105, eventList.get(0).getTimestamp()); + } + + @Test + public void testTrackSingleStreamOutOfOrder() throws Exception { + waterMarkEventGenerator.track(streamId("s1"), 100); + waterMarkEventGenerator.track(streamId("s1"), 110); + waterMarkEventGenerator.track(streamId("s1"), 104); + waterMarkEventGenerator.run(); + assertTrue(eventList.get(0).isWatermark()); + assertEquals(105, eventList.get(0).getTimestamp()); + } + + @Test + public void testTrackTwoStreams() throws Exception { + Set streams = new HashSet<>(); + streams.add(streamId("s1")); + streams.add(streamId("s2")); + waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 100000, 5, streams); + + waterMarkEventGenerator.track(streamId("s1"), 100); + waterMarkEventGenerator.track(streamId("s1"), 110); + waterMarkEventGenerator.run(); + assertTrue(eventList.isEmpty()); + waterMarkEventGenerator.track(streamId("s2"), 95); + waterMarkEventGenerator.track(streamId("s2"), 98); + waterMarkEventGenerator.run(); + assertTrue(eventList.get(0).isWatermark()); + assertEquals(93, eventList.get(0).getTimestamp()); + } + + @Test + public void testNoEvents() throws Exception { + waterMarkEventGenerator.run(); + assertTrue(eventList.isEmpty()); + } + + @Test + public void testLateEvent() throws Exception { + assertTrue(waterMarkEventGenerator.track(streamId("s1"), 100)); + assertTrue(waterMarkEventGenerator.track(streamId("s1"), 110)); + waterMarkEventGenerator.run(); + assertTrue(eventList.get(0).isWatermark()); + assertEquals(105, eventList.get(0).getTimestamp()); + eventList.clear(); + assertTrue(waterMarkEventGenerator.track(streamId("s1"), 105)); + assertTrue(waterMarkEventGenerator.track(streamId("s1"), 106)); + assertTrue(waterMarkEventGenerator.track(streamId("s1"), 115)); + assertFalse(waterMarkEventGenerator.track(streamId("s1"), 104)); + waterMarkEventGenerator.run(); + assertTrue(eventList.get(0).isWatermark()); + assertEquals(110, eventList.get(0).getTimestamp()); + } +} diff --git a/heron/storm/tests/java/org/apache/storm/windowing/WindowManagerTest.java b/heron/storm/tests/java/org/apache/storm/windowing/WindowManagerTest.java new file mode 100644 index 00000000000..72e889e2059 --- /dev/null +++ b/heron/storm/tests/java/org/apache/storm/windowing/WindowManagerTest.java @@ -0,0 +1,588 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.storm.windowing; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link WindowManager} + */ +public class WindowManagerTest { + private WindowManager windowManager; + private Listener listener; + + private static class Listener implements WindowLifecycleListener { + // SUPPRESS CHECKSTYLE VisibilityModifier + List onExpiryEvents = Collections.emptyList(); + // SUPPRESS CHECKSTYLE VisibilityModifier + List onActivationEvents = Collections.emptyList(); + // SUPPRESS CHECKSTYLE VisibilityModifier + List onActivationNewEvents = Collections.emptyList(); + // SUPPRESS CHECKSTYLE VisibilityModifier + List onActivationExpiredEvents = Collections.emptyList(); + + // all events since last clear + // SUPPRESS CHECKSTYLE VisibilityModifier + List> allOnExpiryEvents = new ArrayList<>(); + // SUPPRESS CHECKSTYLE VisibilityModifier + List> allOnActivationEvents = new ArrayList<>(); + // SUPPRESS CHECKSTYLE VisibilityModifier + List> allOnActivationNewEvents = new ArrayList<>(); + // SUPPRESS CHECKSTYLE VisibilityModifier + List> allOnActivationExpiredEvents = new ArrayList<>(); + + @Override + public void onExpiry(List events) { + onExpiryEvents = events; + allOnExpiryEvents.add(events); + } + + @Override + public void onActivation(List events, List newEvents, List expired) { + onActivationEvents = events; + allOnActivationEvents.add(events); + onActivationNewEvents = newEvents; + allOnActivationNewEvents.add(newEvents); + onActivationExpiredEvents = expired; + allOnActivationExpiredEvents.add(expired); + } + + void clear() { + onExpiryEvents = Collections.emptyList(); + onActivationEvents = Collections.emptyList(); + onActivationNewEvents = Collections.emptyList(); + onActivationExpiredEvents = Collections.emptyList(); + + allOnExpiryEvents.clear(); + allOnActivationEvents.clear(); + allOnActivationNewEvents.clear(); + allOnActivationExpiredEvents.clear(); + } + } + + @Before + public void setUp() { + listener = new Listener(); + windowManager = new WindowManager<>(listener); + } + + @After + public void tearDown() { + windowManager.shutdown(); + } + + @Test + public void testCountBasedWindow() throws Exception { + EvictionPolicy evictionPolicy = new CountEvictionPolicy(5); + TriggerPolicy triggerPolicy = + new CountTriggerPolicy(2, windowManager, evictionPolicy); + triggerPolicy.start(); + windowManager.setEvictionPolicy(evictionPolicy); + windowManager.setTriggerPolicy(triggerPolicy); + windowManager.add(1); + windowManager.add(2); + // nothing expired yet + assertTrue(listener.onExpiryEvents.isEmpty()); + assertEquals(seq(1, 2), listener.onActivationEvents); + assertEquals(seq(1, 2), listener.onActivationNewEvents); + assertTrue(listener.onActivationExpiredEvents.isEmpty()); + windowManager.add(3); + windowManager.add(4); + // nothing expired yet + assertTrue(listener.onExpiryEvents.isEmpty()); + assertEquals(seq(1, 4), listener.onActivationEvents); + assertEquals(seq(3, 4), listener.onActivationNewEvents); + assertTrue(listener.onActivationExpiredEvents.isEmpty()); + windowManager.add(5); + windowManager.add(6); + // 1 expired + assertEquals(seq(1), listener.onExpiryEvents); + assertEquals(seq(2, 6), listener.onActivationEvents); + assertEquals(seq(5, 6), listener.onActivationNewEvents); + assertEquals(seq(1), listener.onActivationExpiredEvents); + listener.clear(); + windowManager.add(7); + // nothing expires until threshold is hit + assertTrue(listener.onExpiryEvents.isEmpty()); + windowManager.add(8); + // 1 expired + assertEquals(seq(2, 3), listener.onExpiryEvents); + assertEquals(seq(4, 8), listener.onActivationEvents); + assertEquals(seq(7, 8), listener.onActivationNewEvents); + assertEquals(seq(2, 3), listener.onActivationExpiredEvents); + } + + @Test + public void testExpireThreshold() throws Exception { + int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD; + int windowLength = 5; + windowManager.setEvictionPolicy(new CountEvictionPolicy(5)); + TriggerPolicy triggerPolicy = + new TimeTriggerPolicy(new Duration(1, TimeUnit.HOURS).value, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + for (int i : seq(1, 5)) { + windowManager.add(i); + } + // nothing expired yet + assertTrue(listener.onExpiryEvents.isEmpty()); + for (int i : seq(6, 10)) { + windowManager.add(i); + } + for (int i : seq(11, threshold)) { + windowManager.add(i); + } + // window should be compacted and events should be expired. + assertEquals(seq(1, threshold - windowLength), listener.onExpiryEvents); + } + + + @Test + public void testTimeBasedWindow() throws Exception { + EvictionPolicy evictionPolicy = + new TimeEvictionPolicy(new Duration(1, TimeUnit.SECONDS).value); + windowManager.setEvictionPolicy(evictionPolicy); + /* + * Don't wait for Timetrigger to fire since this could lead to timing issues in unit tests. + * Set it to a large value and trigger manually. + */ + TriggerPolicy triggerPolicy = new TimeTriggerPolicy( + new Duration(1, TimeUnit.DAYS).value, windowManager, evictionPolicy); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + long now = System.currentTimeMillis(); + + // add with past ts + for (int i : seq(1, 50)) { + windowManager.add(i, now - 1000); + } + + // add with current ts + for (int i : seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD)) { + windowManager.add(i, now); + } + // first 50 should have expired due to expire events threshold + assertEquals(50, listener.onExpiryEvents.size()); + + // add more events with past ts + for (int i : seq( + WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100)) { + windowManager.add(i, now - 1000); + } + // simulate the time trigger by setting the reference time and invoking onTrigger() manually + evictionPolicy.setContext(new DefaultEvictionContext(now + 100)); + windowManager.onTrigger(); + + // 100 events with past ts should expire + assertEquals(100, listener.onExpiryEvents.size()); + assertEquals(seq( + WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100), + listener.onExpiryEvents); + List activationsEvents = seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD); + assertEquals(seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD), listener.onActivationEvents); + assertEquals(seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD), listener.onActivationNewEvents); + // activation expired list should contain even the ones expired due to EXPIRE_EVENTS_THRESHOLD + List expiredList = seq(1, 50); + expiredList.addAll(seq( + WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100)); + assertEquals(expiredList, listener.onActivationExpiredEvents); + + listener.clear(); + // add more events with current ts + List newEvents = seq( + WindowManager.EXPIRE_EVENTS_THRESHOLD + 101, WindowManager.EXPIRE_EVENTS_THRESHOLD + 200); + for (int i : newEvents) { + windowManager.add(i, now); + } + activationsEvents.addAll(newEvents); + // simulate the time trigger by setting the reference time and invoking onTrigger() manually + evictionPolicy.setContext(new DefaultEvictionContext(now + 200)); + windowManager.onTrigger(); + assertTrue(listener.onExpiryEvents.isEmpty()); + assertEquals(activationsEvents, listener.onActivationEvents); + assertEquals(newEvents, listener.onActivationNewEvents); + + } + + + @Test + public void testTimeBasedWindowExpiry() throws Exception { + EvictionPolicy evictionPolicy = + new TimeEvictionPolicy(new Duration(100, TimeUnit.MILLISECONDS).value); + windowManager.setEvictionPolicy(evictionPolicy); + /* + * Don't wait for Timetrigger to fire since this could lead to timing issues in unit tests. + * Set it to a large value and trigger manually. + */ + TriggerPolicy triggerPolicy = + new TimeTriggerPolicy(new Duration(1, TimeUnit.DAYS).value, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + long now = System.currentTimeMillis(); + // add 10 events + for (int i : seq(1, 10)) { + windowManager.add(i); + } + // simulate the time trigger by setting the reference time and invoking onTrigger() manually + evictionPolicy.setContext(new DefaultEvictionContext(now + 60)); + windowManager.onTrigger(); + + assertEquals(seq(1, 10), listener.onActivationEvents); + assertTrue(listener.onActivationExpiredEvents.isEmpty()); + listener.clear(); + // wait so all events expire + evictionPolicy.setContext(new DefaultEvictionContext(now + 120)); + windowManager.onTrigger(); + + assertEquals(seq(1, 10), listener.onExpiryEvents); + assertTrue(listener.onActivationEvents.isEmpty()); + listener.clear(); + evictionPolicy.setContext(new DefaultEvictionContext(now + 180)); + windowManager.onTrigger(); + assertTrue(listener.onActivationExpiredEvents.isEmpty()); + assertTrue(listener.onActivationEvents.isEmpty()); + + } + + @Test + public void testTumblingWindow() throws Exception { + EvictionPolicy evictionPolicy = new CountEvictionPolicy(3); + windowManager.setEvictionPolicy(evictionPolicy); + TriggerPolicy triggerPolicy = + new CountTriggerPolicy(3, windowManager, evictionPolicy); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + windowManager.add(1); + windowManager.add(2); + // nothing expired yet + assertTrue(listener.onExpiryEvents.isEmpty()); + windowManager.add(3); + assertTrue(listener.onExpiryEvents.isEmpty()); + assertEquals(seq(1, 3), listener.onActivationEvents); + assertTrue(listener.onActivationExpiredEvents.isEmpty()); + assertEquals(seq(1, 3), listener.onActivationNewEvents); + + listener.clear(); + windowManager.add(4); + windowManager.add(5); + windowManager.add(6); + + assertEquals(seq(1, 3), listener.onExpiryEvents); + assertEquals(seq(4, 6), listener.onActivationEvents); + assertEquals(seq(1, 3), listener.onActivationExpiredEvents); + assertEquals(seq(4, 6), listener.onActivationNewEvents); + + } + + + @Test + public void testEventTimeBasedWindow() throws Exception { + EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy<>(20); + windowManager.setEvictionPolicy(evictionPolicy); + TriggerPolicy triggerPolicy = + new WatermarkTimeTriggerPolicy(10, windowManager, evictionPolicy, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + + windowManager.add(1, 603); + windowManager.add(2, 605); + windowManager.add(3, 607); + + // This should trigger the scan to find + // the next aligned window end ts, but not produce any activations + windowManager.add(new WaterMarkEvent(609)); + assertEquals(Collections.emptyList(), listener.allOnActivationEvents); + + windowManager.add(4, 618); + windowManager.add(5, 626); + windowManager.add(6, 636); + // send a watermark event, which should trigger three windows. + windowManager.add(new WaterMarkEvent(631)); + + assertEquals(3, listener.allOnActivationEvents.size()); + assertEquals(seq(1, 3), listener.allOnActivationEvents.get(0)); + assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1)); + assertEquals(seq(4, 5), listener.allOnActivationEvents.get(2)); + + assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(0)); + assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(1)); + assertEquals(seq(1, 3), listener.allOnActivationExpiredEvents.get(2)); + + assertEquals(seq(1, 3), listener.allOnActivationNewEvents.get(0)); + assertEquals(seq(4, 4), listener.allOnActivationNewEvents.get(1)); + assertEquals(seq(5, 5), listener.allOnActivationNewEvents.get(2)); + + assertEquals(seq(1, 3), listener.allOnExpiryEvents.get(0)); + + // add more events with a gap in ts + windowManager.add(7, 825); + windowManager.add(8, 826); + windowManager.add(9, 827); + windowManager.add(10, 839); + + listener.clear(); + windowManager.add(new WaterMarkEvent(834)); + + assertEquals(3, listener.allOnActivationEvents.size()); + assertEquals(seq(5, 6), listener.allOnActivationEvents.get(0)); + assertEquals(seq(6, 6), listener.allOnActivationEvents.get(1)); + assertEquals(seq(7, 9), listener.allOnActivationEvents.get(2)); + + assertEquals(seq(4, 4), listener.allOnActivationExpiredEvents.get(0)); + assertEquals(seq(5, 5), listener.allOnActivationExpiredEvents.get(1)); + assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(2)); + + assertEquals(seq(6, 6), listener.allOnActivationNewEvents.get(0)); + assertEquals(Collections.emptyList(), listener.allOnActivationNewEvents.get(1)); + assertEquals(seq(7, 9), listener.allOnActivationNewEvents.get(2)); + + assertEquals(seq(4, 4), listener.allOnExpiryEvents.get(0)); + assertEquals(seq(5, 5), listener.allOnExpiryEvents.get(1)); + assertEquals(seq(6, 6), listener.allOnExpiryEvents.get(2)); + } + + @Test + public void testCountBasedWindowWithEventTs() throws Exception { + EvictionPolicy evictionPolicy = new WatermarkCountEvictionPolicy<>(3); + windowManager.setEvictionPolicy(evictionPolicy); + TriggerPolicy triggerPolicy = + new WatermarkTimeTriggerPolicy(10, windowManager, evictionPolicy, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + + windowManager.add(1, 603); + windowManager.add(2, 605); + windowManager.add(3, 607); + windowManager.add(4, 618); + windowManager.add(5, 626); + windowManager.add(6, 636); + // send a watermark event, which should trigger three windows. + windowManager.add(new WaterMarkEvent(631)); + + assertEquals(3, listener.allOnActivationEvents.size()); + assertEquals(seq(1, 3), listener.allOnActivationEvents.get(0)); + assertEquals(seq(2, 4), listener.allOnActivationEvents.get(1)); + assertEquals(seq(3, 5), listener.allOnActivationEvents.get(2)); + + // add more events with a gap in ts + windowManager.add(7, 665); + windowManager.add(8, 666); + windowManager.add(9, 667); + windowManager.add(10, 679); + + listener.clear(); + windowManager.add(new WaterMarkEvent(674)); + assertEquals(4, listener.allOnActivationEvents.size()); + // same set of events part of three windows + assertEquals(seq(4, 6), listener.allOnActivationEvents.get(0)); + assertEquals(seq(4, 6), listener.allOnActivationEvents.get(1)); + assertEquals(seq(4, 6), listener.allOnActivationEvents.get(2)); + assertEquals(seq(7, 9), listener.allOnActivationEvents.get(3)); + } + + @Test + public void testCountBasedTriggerWithEventTs() throws Exception { + EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy(20); + windowManager.setEvictionPolicy(evictionPolicy); + TriggerPolicy triggerPolicy = + new WatermarkCountTriggerPolicy(3, windowManager, evictionPolicy, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + + windowManager.add(1, 603); + windowManager.add(2, 605); + windowManager.add(3, 607); + windowManager.add(4, 618); + windowManager.add(5, 625); + windowManager.add(6, 626); + windowManager.add(7, 629); + windowManager.add(8, 636); + // send a watermark event, which should trigger three windows. + windowManager.add(new WaterMarkEvent(631)); + + assertEquals(2, listener.allOnActivationEvents.size()); + assertEquals(seq(1, 3), listener.allOnActivationEvents.get(0)); + assertEquals(seq(3, 6), listener.allOnActivationEvents.get(1)); + + // add more events with a gap in ts + windowManager.add(9, 665); + windowManager.add(10, 666); + windowManager.add(11, 667); + windowManager.add(12, 669); + windowManager.add(12, 679); + + listener.clear(); + windowManager.add(new WaterMarkEvent(674)); + assertEquals(2, listener.allOnActivationEvents.size()); + // same set of events part of three windows + assertEquals(seq(9), listener.allOnActivationEvents.get(0)); + assertEquals(seq(9, 12), listener.allOnActivationEvents.get(1)); + } + + @Test + public void testCountBasedTumblingWithSameEventTs() throws Exception { + EvictionPolicy evictionPolicy = new WatermarkCountEvictionPolicy<>(2); + windowManager.setEvictionPolicy(evictionPolicy); + TriggerPolicy triggerPolicy = + new WatermarkCountTriggerPolicy(2, windowManager, evictionPolicy, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + + windowManager.add(1, 10); + windowManager.add(2, 10); + windowManager.add(3, 11); + windowManager.add(4, 12); + windowManager.add(5, 12); + windowManager.add(6, 12); + windowManager.add(7, 12); + windowManager.add(8, 13); + windowManager.add(9, 14); + windowManager.add(10, 15); + + windowManager.add(new WaterMarkEvent(20)); + assertEquals(5, listener.allOnActivationEvents.size()); + assertEquals(seq(1, 2), listener.allOnActivationEvents.get(0)); + assertEquals(seq(3, 4), listener.allOnActivationEvents.get(1)); + assertEquals(seq(5, 6), listener.allOnActivationEvents.get(2)); + assertEquals(seq(7, 8), listener.allOnActivationEvents.get(3)); + assertEquals(seq(9, 10), listener.allOnActivationEvents.get(4)); + } + + @Test + public void testCountBasedSlidingWithSameEventTs() throws Exception { + EvictionPolicy evictionPolicy = new WatermarkCountEvictionPolicy<>(5); + windowManager.setEvictionPolicy(evictionPolicy); + TriggerPolicy triggerPolicy = + new WatermarkCountTriggerPolicy(2, windowManager, evictionPolicy, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + + windowManager.add(1, 10); + windowManager.add(2, 10); + windowManager.add(3, 11); + windowManager.add(4, 12); + windowManager.add(5, 12); + windowManager.add(6, 12); + windowManager.add(7, 12); + windowManager.add(8, 13); + windowManager.add(9, 14); + windowManager.add(10, 15); + + windowManager.add(new WaterMarkEvent(20)); + assertEquals(5, listener.allOnActivationEvents.size()); + assertEquals(seq(1, 2), listener.allOnActivationEvents.get(0)); + assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1)); + assertEquals(seq(2, 6), listener.allOnActivationEvents.get(2)); + assertEquals(seq(4, 8), listener.allOnActivationEvents.get(3)); + assertEquals(seq(6, 10), listener.allOnActivationEvents.get(4)); + + } + + @Test + public void testEventTimeLag() throws Exception { + EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy<>(20, 5); + windowManager.setEvictionPolicy(evictionPolicy); + TriggerPolicy triggerPolicy = + new WatermarkTimeTriggerPolicy(10, windowManager, evictionPolicy, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + + windowManager.add(1, 603); + windowManager.add(2, 605); + windowManager.add(3, 607); + windowManager.add(4, 618); + windowManager.add(5, 626); + windowManager.add(6, 632); + windowManager.add(7, 629); + windowManager.add(8, 636); + // send a watermark event, which should trigger three windows. + windowManager.add(new WaterMarkEvent(631)); + assertEquals(3, listener.allOnActivationEvents.size()); + assertEquals(seq(1, 3), listener.allOnActivationEvents.get(0)); + assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1)); + // out of order events should be processed upto the lag + assertEquals(Arrays.asList(4, 5, 7), listener.allOnActivationEvents.get(2)); + } + + @Test + public void testScanStop() throws Exception { + final Set eventsScanned = new HashSet<>(); + EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy(20, 5) { + + @Override + public Action evict(Event event) { + eventsScanned.add(event.get()); + return super.evict(event); + } + + }; + windowManager.setEvictionPolicy(evictionPolicy); + TriggerPolicy triggerPolicy = + new WatermarkTimeTriggerPolicy(10, windowManager, evictionPolicy, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + + windowManager.add(1, 603); + windowManager.add(2, 605); + windowManager.add(3, 607); + windowManager.add(4, 618); + windowManager.add(5, 626); + windowManager.add(6, 629); + windowManager.add(7, 636); + windowManager.add(8, 637); + windowManager.add(9, 638); + windowManager.add(10, 639); + + // send a watermark event, which should trigger three windows. + windowManager.add(new WaterMarkEvent(631)); + + assertEquals(3, listener.allOnActivationEvents.size()); + assertEquals(seq(1, 3), listener.allOnActivationEvents.get(0)); + assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1)); + + // out of order events should be processed upto the lag + assertEquals(Arrays.asList(4, 5, 6), listener.allOnActivationEvents.get(2)); + + // events 8, 9, 10 should not be scanned at all since TimeEvictionPolicy lag 5s should break + // the WindowManager scan loop early. + assertEquals(new HashSet<>(seq(1, 7)), eventsScanned); + } + + private List seq(int start) { + return seq(start, start); + } + + private List seq(int start, int stop) { + List ints = new ArrayList<>(); + for (int i = start; i <= stop; i++) { + ints.add(i); + } + return ints; + } +}