diff --git a/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java b/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java index 843da57f63b..03488f81d12 100644 --- a/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java +++ b/examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java @@ -144,13 +144,15 @@ public static void main(String[] args) throws Exception { * Elements in the first cloned streamlet go to the database sink. */ splitGameScoreStreamlet.get(0) - .toSink(new DatabaseSink()); + .toSink(new DatabaseSink()) + .setName("sink0"); /** * Elements in the second cloned streamlet go to the logging sink. */ splitGameScoreStreamlet.get(1) - .toSink(new FormattedLogSink()); + .toSink(new FormattedLogSink()) + .setName("sink1"); Config config = Config.defaultConfig(); diff --git a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala index f155bcce35c..7bc20caceaa 100644 --- a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala +++ b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala @@ -92,6 +92,7 @@ object ScalaClassicalMusicTopology { ) .setName("joined-classical-musics-by-year") .log() + .setName("log") val config = Config.defaultConfig() diff --git a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala index b0663bc4120..6f618dfd7c4 100644 --- a/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala +++ b/examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala @@ -58,6 +58,8 @@ object ScalaIntegerProcessingTopology { .union(zeroes) .setName("union-of-numbers") .log() + .setName("log") + .setNumPartitions(1) val config = Config.newBuilder .setNumContainers(NUM_CONTAINERS) diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java index 1a108a4475f..1c387603416 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java @@ -45,7 +45,7 @@ * Streamlet before doing the transformation. */ @InterfaceStability.Evolving -public interface Streamlet { +public interface Streamlet extends StreamletBase { /** * Sets the name of the BaseStreamlet. @@ -299,7 +299,7 @@ KVStreamlet, Long> countByKeyAndWindow( * Logs every element of the streamlet using String.valueOf function * This is one of the sink functions in the sense that this operation returns void */ - void log(); + StreamletBase log(); /** * Applies the consumer function to every element of the stream @@ -307,7 +307,7 @@ KVStreamlet, Long> countByKeyAndWindow( * @param consumer The user supplied consumer function that is invoked for each element * of this streamlet. */ - void consume(SerializableConsumer consumer); + StreamletBase consume(SerializableConsumer consumer); /** * Applies the sink's put function to every element of the stream @@ -315,5 +315,5 @@ KVStreamlet, Long> countByKeyAndWindow( * @param sink The Sink whose put method consumes each element * of this streamlet. */ - void toSink(Sink sink); + StreamletBase toSink(Sink sink); } diff --git a/heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java b/heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java new file mode 100644 index 00000000000..f52e7f2d4fd --- /dev/null +++ b/heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.streamlet; + +/** + * A Streamlet is a (potentially unbounded) ordered collection of tuples. + * The StreamletBase class contains basic information of a Streamlet + * such as name and partition count without the connection functions + * such as map() and filter(). + */ +public interface StreamletBase { + + /** + * Sets the name of the BaseStreamlet. + * @param sName The name given by the user for this BaseStreamlet + * @return Returns back the Streamlet with changed name + */ + StreamletBase setName(String sName); + + /** + * Gets the name of the Streamlet. + * @return Returns the name of the Streamlet + */ + String getName(); + + /** + * Sets the number of partitions of the streamlet + * @param numPartitions The user assigned number of partitions + * @return Returns back the Streamlet with changed number of partitions + */ + StreamletBase setNumPartitions(int numPartitions); + + /** + * Gets the number of partitions of this Streamlet. + * @return the number of partitions of this Streamlet + */ + int getNumPartitions(); + + // This is the main interface that every Streamlet implementation should implement + // The main tasks are generally to make sure that appropriate names/partitions are + // computed and add a spout/bolt to the TopologyBuilder + // void build(TopologyBuilder bldr, Set stageNames); +} diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java index 853bb972f71..2dca00e867b 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java @@ -91,7 +91,7 @@ public TopologyBuilder build(TopologyBuilder builder) { streamlet.build(builder, stageNames); } for (StreamletImpl streamlet : sources) { - if (!streamlet.allBuilt()) { + if (!streamlet.isFullyBuilt()) { throw new RuntimeException("Topology cannot be fully built! Are all sources added?"); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletBaseImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletBaseImpl.java new file mode 100644 index 00000000000..6b3530cbc17 --- /dev/null +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletBaseImpl.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.streamlet.impl; + +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.logging.Logger; + +import org.apache.heron.api.topology.TopologyBuilder; +import org.apache.heron.streamlet.StreamletBase; + +import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank; +import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require; + +/** + * A Streamlet is a (potentially unbounded) ordered collection of tuples. + * Streamlets originate from pub/sub systems(such Pulsar/Kafka), or from + * static data(such as csv files, HDFS files), or for that matter any other + * source. They are also created by transforming existing Streamlets using + * operations such as map/flatMap, etc. + * Besides the tuples, a Streamlet has the following properties associated with it + * a) name. User assigned or system generated name to refer the streamlet + * b) nPartitions. Number of partitions that the streamlet is composed of. Thus the + * ordering of the tuples in a Streamlet is wrt the tuples within a partition. + * This allows the system to distribute each partition to different nodes across the cluster. + * A bunch of transformations can be done on Streamlets(like map/flatMap, etc.). Each + * of these transformations operate on every tuple of the Streamlet and produce a new + * Streamlet. One can think of a transformation attaching itself to the stream and processing + * each tuple as they go by. Thus the parallelism of any operator is implicitly determined + * by the number of partitions of the stream that it is operating on. If a particular + * transformation wants to operate at a different parallelism, one can repartition the + * Streamlet before doing the transformation. + */ +public abstract class StreamletBaseImpl implements StreamletBase { + private static final Logger LOG = Logger.getLogger(StreamletBaseImpl.class.getName()); + protected String name; + protected int nPartitions; + private List> children; + private boolean built; + + /** + * Only used by the implementors + */ + protected StreamletBaseImpl() { + this.name = null; + this.nPartitions = -1; + this.children = new LinkedList<>(); + this.built = false; + } + + protected enum StreamletNamePrefix { + CONSUMER("consumer"), + COUNT("count"), + CUSTOM("custom"), + CUSTOM_BASIC("customBasic"), + CUSTOM_WINDOW("customWindow"), + FILTER("filter"), + FLATMAP("flatmap"), + JOIN("join"), + KEYBY("keyBy"), + LOGGER("logger"), + MAP("map"), + SOURCE("generator"), + REDUCE("reduce"), + REMAP("remap"), + SINK("sink"), + SPLIT("split"), + SPOUT("spout"), + SUPPLIER("supplier"), + TRANSFORM("transform"), + UNION("union"); + + private final String prefix; + + StreamletNamePrefix(final String prefix) { + this.prefix = prefix; + } + + @Override + public String toString() { + return prefix; + } + } + + /** + * Sets the name of the Streamlet. + * @param sName The name given by the user for this streamlet + * @return Returns back the Streamlet with changed name + */ + @Override + public StreamletBase setName(String sName) { + checkNotBlank(sName, "Streamlet name cannot be null/blank"); + + this.name = sName; + return this; + } + + /** + * Gets the name of the Streamlet. + * @return Returns the name of the Streamlet + */ + @Override + public String getName() { + return name; + } + + private String defaultNameCalculator(StreamletNamePrefix prefix, Set stageNames) { + int index = 1; + String calculatedName; + while (true) { + calculatedName = new StringBuilder(prefix.toString()).append(index).toString(); + if (!stageNames.contains(calculatedName)) { + break; + } + index++; + } + LOG.info("Calculated stage Name as " + calculatedName); + return calculatedName; + } + + /** + * Sets a default unique name to the Streamlet by type if it is not set. + * Otherwise, just checks its uniqueness. + * @param prefix The name prefix of this streamlet + * @param stageNames The collections of created streamlet/stage names + */ + protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set stageNames) { + if (getName() == null) { + setName(defaultNameCalculator(prefix, stageNames)); + } + if (stageNames.contains(getName())) { + throw new RuntimeException(String.format( + "The stage name %s is used multiple times in the same topology", getName())); + } + stageNames.add(getName()); + } + + /** + * Sets the number of partitions of the streamlet + * @param numPartitions The user assigned number of partitions + * @return Returns back the Streamlet with changed number of partitions + */ + @Override + public StreamletBase setNumPartitions(int numPartitions) { + require(numPartitions > 0, "Streamlet's partitions number should be > 0"); + + this.nPartitions = numPartitions; + return this; + } + + /** + * Gets the number of partitions of this Streamlet. + * @return the number of partitions of this Streamlet + */ + @Override + public int getNumPartitions() { + return nPartitions; + } + + public void addChild(StreamletBaseImpl child) { + children.add(child); + } + + /** + * Gets all the children of this streamlet. + * Children of a streamlet are streamlets that are resulting from transformations of elements of + * this and potentially other streamlets. + * @return The kid streamlets + */ + public List> getChildren() { + return children; + } + + public void build(TopologyBuilder bldr, Set stageNames) { + if (built) { + throw new RuntimeException("Logic Error While building " + getName()); + } + + if (doBuild(bldr, stageNames)) { + built = true; + for (StreamletBaseImpl streamlet : getChildren()) { + streamlet.build(bldr, stageNames); + } + } + } + + public boolean isBuilt() { + return built; + } + + public boolean isFullyBuilt() { + if (!isBuilt()) { + return false; + } + for (StreamletBaseImpl child : children) { + if (!child.isFullyBuilt()) { + return false; + } + } + return true; + } + + // This is the main interface that every Streamlet implementation should implement + // The main tasks are generally to make sure that appropriate names/partitions are + // computed and add a spout/bolt to the TopologyBuilder + protected abstract boolean doBuild(TopologyBuilder bldr, Set stageNames); +} diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java index 81a7d7827e0..65d7534f5ce 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,7 +28,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.heron.api.grouping.NoneStreamGrouping; import org.apache.heron.api.grouping.StreamGrouping; -import org.apache.heron.api.topology.TopologyBuilder; import org.apache.heron.api.utils.Utils; import org.apache.heron.streamlet.IStreamletOperator; import org.apache.heron.streamlet.JoinType; @@ -43,6 +41,7 @@ import org.apache.heron.streamlet.SerializableTransformer; import org.apache.heron.streamlet.Sink; import org.apache.heron.streamlet.Streamlet; +import org.apache.heron.streamlet.StreamletBase; import org.apache.heron.streamlet.WindowConfig; import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet; import org.apache.heron.streamlet.impl.streamlets.CountByKeyAndWindowStreamlet; @@ -89,72 +88,9 @@ * transformation wants to operate at a different parallelism, one can repartition the * Streamlet before doing the transformation. */ -public abstract class StreamletImpl implements Streamlet { +public abstract class StreamletImpl + extends StreamletBaseImpl implements Streamlet { private static final Logger LOG = Logger.getLogger(StreamletImpl.class.getName()); - protected String name; - protected int nPartitions; - private List> children; - private boolean built; - - public boolean isBuilt() { - return built; - } - - public boolean allBuilt() { - if (!built) { - return false; - } - for (StreamletImpl child : children) { - if (!child.allBuilt()) { - return false; - } - } - return true; - } - - protected enum StreamletNamePrefix { - CONSUMER("consumer"), - COUNT("count"), - CUSTOM("custom"), - CUSTOM_BASIC("customBasic"), - CUSTOM_WINDOW("customWindow"), - FILTER("filter"), - FLATMAP("flatmap"), - JOIN("join"), - KEYBY("keyBy"), - LOGGER("logger"), - MAP("map"), - SOURCE("generator"), - REDUCE("reduce"), - REMAP("remap"), - SINK("sink"), - SPLIT("split"), - SPOUT("spout"), - SUPPLIER("supplier"), - TRANSFORM("transform"), - UNION("union"); - - private final String prefix; - - StreamletNamePrefix(final String prefix) { - this.prefix = prefix; - } - - @Override - public String toString() { - return prefix; - } - } - - /** - * Gets all the children of this streamlet. - * Children of a streamlet are streamlets that are resulting from transformations of elements of - * this and potentially other streamlets. - * @return The kid streamlets - */ - public List> getChildren() { - return children; - } /** * Sets the name of the Streamlet. @@ -163,38 +99,10 @@ public List> getChildren() { */ @Override public Streamlet setName(String sName) { - checkNotBlank(sName, "Streamlet name cannot be null/blank"); - - this.name = sName; + super.setName(sName); return this; } - /** - * Gets the name of the Streamlet. - * @return Returns the name of the Streamlet - */ - @Override - public String getName() { - return name; - } - - /** - * Sets a default unique name to the Streamlet by type if it is not set. - * Otherwise, just checks its uniqueness. - * @param prefix The name prefix of this streamlet - * @param stageNames The collections of created streamlet/stage names - */ - protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set stageNames) { - if (getName() == null) { - setName(defaultNameCalculator(prefix, stageNames)); - } - if (stageNames.contains(getName())) { - throw new RuntimeException(String.format( - "The stage name %s is used multiple times in the same topology", getName())); - } - stageNames.add(getName()); - } - /** * Sets the number of partitions of the streamlet * @param numPartitions The user assigned number of partitions @@ -202,21 +110,10 @@ protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set stag */ @Override public Streamlet setNumPartitions(int numPartitions) { - require(numPartitions > 0, "Streamlet's partitions number should be > 0"); - - this.nPartitions = numPartitions; + super.setNumPartitions(numPartitions); return this; } - /** - * Gets the number of partitions of this Streamlet. - * @return the number of partitions of this Streamlet - */ - @Override - public int getNumPartitions() { - return nPartitions; - } - /** * Set the id of the stream to be used by the children nodes. * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2): @@ -271,45 +168,7 @@ public String getStreamId() { * Only used by the implementors */ protected StreamletImpl() { - this.nPartitions = -1; - this.children = new LinkedList<>(); - this.built = false; - } - - public void build(TopologyBuilder bldr, Set stageNames) { - if (built) { - throw new RuntimeException("Logic Error While building " + getName()); - } - - if (doBuild(bldr, stageNames)) { - built = true; - for (StreamletImpl streamlet : children) { - streamlet.build(bldr, stageNames); - } - } - } - - // This is the main interface that every Streamlet implementation should implement - // The main tasks are generally to make sure that appropriate names/partitions are - // computed and add a spout/bolt to the TopologyBuilder - protected abstract boolean doBuild(TopologyBuilder bldr, Set stageNames); - - public void addChild(StreamletImpl child) { - children.add(child); - } - - private String defaultNameCalculator(StreamletNamePrefix prefix, Set stageNames) { - int index = 1; - String calculatedName; - while (true) { - calculatedName = new StringBuilder(prefix.toString()).append(index).toString(); - if (!stageNames.contains(calculatedName)) { - break; - } - index++; - } - LOG.info("Calculated stage Name as " + calculatedName); - return calculatedName; + super(); } /** @@ -571,9 +430,10 @@ public Streamlet union(Streamlet otherStreamlet) { * that does not contain any tuple. Thus this function returns void. */ @Override - public void log() { + public StreamletBase log() { LogStreamlet logger = new LogStreamlet<>(this); addChild(logger); + return logger; } /** @@ -581,11 +441,12 @@ public void log() { * @param consumer The user supplied consumer function that is invoked for each element */ @Override - public void consume(SerializableConsumer consumer) { + public StreamletBase consume(SerializableConsumer consumer) { checkNotNull(consumer, "consumer cannot be null"); ConsumerStreamlet consumerStreamlet = new ConsumerStreamlet<>(this, consumer); addChild(consumerStreamlet); + return consumerStreamlet; } /** @@ -593,11 +454,12 @@ public void consume(SerializableConsumer consumer) { * @param sink The Sink that consumes */ @Override - public void toSink(Sink sink) { + public StreamletBase toSink(Sink sink) { checkNotNull(sink, "sink cannot be null"); SinkStreamlet sinkStreamlet = new SinkStreamlet<>(this, sink); addChild(sinkStreamlet); + return sinkStreamlet; } /** diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java index 633c4c68a84..eb5d3be150a 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.heron.api.topology.TopologyBuilder; +import org.apache.heron.streamlet.impl.StreamletBaseImpl; import org.apache.heron.streamlet.impl.StreamletImpl; /** @@ -100,8 +101,7 @@ public int getNumPartitions() { /* * Functions related to topology building need to be overriden. */ - @Override - public void addChild(StreamletImpl child) { + public void addChild(StreamletBaseImpl child) { real.addChild(child); } @@ -112,7 +112,7 @@ public void addChild(StreamletImpl child) { * @return The kid streamlets */ @Override - public List> getChildren() { + public List> getChildren() { return real.getChildren(); } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java index f71b71fe5ac..92c592815e8 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java @@ -24,6 +24,7 @@ import org.apache.heron.api.topology.TopologyBuilder; import org.apache.heron.streamlet.SerializableConsumer; +import org.apache.heron.streamlet.impl.StreamletBaseImpl; import org.apache.heron.streamlet.impl.StreamletImpl; import org.apache.heron.streamlet.impl.sinks.ConsumerSink; @@ -32,7 +33,7 @@ * streamlet after consuming every element. Since elements of the parents are just consumed * by the user passed consumer function, nothing is emitted, thus this streamlet is empty. */ -public class ConsumerStreamlet extends StreamletImpl { +public class ConsumerStreamlet extends StreamletBaseImpl { private StreamletImpl parent; private SerializableConsumer consumer; diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java index 9f68ff9a690..db9cefaaab2 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.heron.api.topology.TopologyBuilder; +import org.apache.heron.streamlet.impl.StreamletBaseImpl; import org.apache.heron.streamlet.impl.StreamletImpl; import org.apache.heron.streamlet.impl.sinks.LogSink; @@ -30,7 +31,7 @@ * streamlet after logging each element. Since elements of the parents are just logged * nothing is emitted, thus this streamlet is empty. */ -public class LogStreamlet extends StreamletImpl { +public class LogStreamlet extends StreamletBaseImpl { private StreamletImpl parent; public LogStreamlet(StreamletImpl parent) { diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java index 42e5d031f7c..9625b6d4f93 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SinkStreamlet.java @@ -24,6 +24,7 @@ import org.apache.heron.api.topology.TopologyBuilder; import org.apache.heron.streamlet.Sink; +import org.apache.heron.streamlet.impl.StreamletBaseImpl; import org.apache.heron.streamlet.impl.StreamletImpl; import org.apache.heron.streamlet.impl.sinks.ComplexSink; @@ -32,7 +33,7 @@ * streamlet after consuming every element. Since elements of the parents are just consumed * by the user passed consumer function, nothing is emitted, thus this streamlet is empty. */ -public class SinkStreamlet extends StreamletImpl { +public class SinkStreamlet extends StreamletBaseImpl { private StreamletImpl parent; private Sink sink; diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala index 8151941c1f5..073b74a5815 100644 --- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala +++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala @@ -46,7 +46,7 @@ import org.apache.heron.streamlet.{ * transformation wants to operate at a different parallelism, one can repartition the * Streamlet before doing the transformation. */ -trait Streamlet[R] { +trait Streamlet[R] extends StreamletBase[R] { /** * Sets the name of the Streamlet. @@ -56,13 +56,6 @@ trait Streamlet[R] { */ def setName(sName: String): Streamlet[R] - /** - * Gets the name of the Streamlet. - * - * @return Returns the name of the Streamlet - */ - def getName: String - /** * Sets the number of partitions of the streamlet * @@ -71,13 +64,6 @@ trait Streamlet[R] { */ def setNumPartitions(numPartitions: Int): Streamlet[R] - /** - * Gets the number of partitions of this Streamlet. - * - * @return the number of partitions of this Streamlet - */ - def getNumPartitions: Int - /** * Set the id of the stream to be used by the children nodes. * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2): @@ -320,7 +306,7 @@ trait Streamlet[R] { * Logs every element of the streamlet using String.valueOf function * This is one of the sink functions in the sense that this operation returns void */ - def log(): Unit + def log(): StreamletBase[R] /** * Applies the consumer function to every element of the stream @@ -329,7 +315,7 @@ trait Streamlet[R] { * @param consumer The user supplied consumer function that is invoked for each element * of this streamlet. */ - def consume(consumer: R => Unit): Unit + def consume(consumer: R => Unit): StreamletBase[R] /** * Applies the sink's put function to every element of the stream @@ -338,6 +324,6 @@ trait Streamlet[R] { * @param sink The Sink whose put method consumes each element * of this streamlet. */ - def toSink(sink: Sink[R]): Unit + def toSink(sink: Sink[R]): StreamletBase[R] } diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/StreamletBase.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/StreamletBase.scala new file mode 100644 index 00000000000..64468d1e0dd --- /dev/null +++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/StreamletBase.scala @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.streamlet.scala + +/** + * A Streamlet is a (potentially unbounded) ordered collection of tuples. + * The StreamletBase class contains basic information of a Streamlet + * such as name and partition count without the connection functions + * such as map() and filter(). + */ +trait StreamletBase[R] { + + /** + * Sets the name of the Streamlet. + * + * @param sName The name given by the user for this Streamlet + * @return Returns back the Streamlet with changed name + */ + def setName(sName: String): StreamletBase[R] + + /** + * Gets the name of the Streamlet. + * + * @return Returns the name of the Streamlet + */ + def getName: String + + /** + * Sets the number of partitions of the streamlet + * + * @param numPartitions The user assigned number of partitions + * @return Returns back the Streamlet with changed number of partitions + */ + def setNumPartitions(numPartitions: Int): StreamletBase[R] + + /** + * Gets the number of partitions of this Streamlet. + * + * @return the number of partitions of this Streamlet + */ + def getNumPartitions: Int +} diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletBaseImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletBaseImpl.scala new file mode 100644 index 00000000000..d5f336e3b0b --- /dev/null +++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletBaseImpl.scala @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.streamlet.scala.impl + +import org.apache.heron.streamlet.{StreamletBase => JavaStreamletBase} +import org.apache.heron.streamlet.scala.StreamletBase + +object StreamletBaseImpl { + def fromJavaStreamletBase[R](javaStreamletBase: JavaStreamletBase[R]): StreamletBase[R] = + new StreamletBaseImpl[R](javaStreamletBase) + + def toJavaStreamletBase[R](streamlet: StreamletBase[R]): JavaStreamletBase[R] = + streamlet.asInstanceOf[StreamletBaseImpl[R]].javaStreamletBase +} + +/** + * This class provides Scala Streamlet Implementation by wrapping Java Streamlet API. + * Passed User defined Scala Functions are transformed to related FunctionalInterface versions and + * related Java Streamlet is transformed to Scala version again. + */ +class StreamletBaseImpl[R](val javaStreamletBase: JavaStreamletBase[R]) extends StreamletBase[R] { + import StreamletBaseImpl._ + + /** + * Sets the name of the Streamlet. + * + * @param sName The name given by the user for this Streamlet + * @return Returns back the Streamlet with changed name + */ + override def setName(sName: String): StreamletBase[R] = + fromJavaStreamletBase[R](javaStreamletBase.setName(sName)) + + /** + * Gets the name of the Streamlet. + * + * @return Returns the name of the Streamlet + */ + override def getName(): String = javaStreamletBase.getName + + /** + * Sets the number of partitions of the streamlet + * + * @param numPartitions The user assigned number of partitions + * @return Returns back the Streamlet with changed number of partitions + */ + override def setNumPartitions(numPartitions: Int): StreamletBase[R] = + fromJavaStreamletBase[R](javaStreamletBase.setNumPartitions(numPartitions)) + + /** + * Gets the number of partitions of this Streamlet. + * + * @return the number of partitions of this Streamlet + */ + override def getNumPartitions(): Int = javaStreamletBase.getNumPartitions +} diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala index fe6b7df8435..bee12981f90 100644 --- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala +++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala @@ -32,9 +32,11 @@ import org.apache.heron.streamlet.{ SerializablePredicate, KVStreamlet => JavaKVStreamlet, Streamlet => JavaStreamlet, + StreamletBase => JavaStreamletBase, WindowConfig } import org.apache.heron.streamlet.impl.{ + StreamletBaseImpl => JavaStreamletBaseImpl, StreamletImpl => JavaStreamletImpl } import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet @@ -43,7 +45,8 @@ import org.apache.heron.streamlet.scala.{ SerializableTransformer, Sink, KVStreamlet, - Streamlet + Streamlet, + StreamletBase } import org.apache.heron.streamlet.scala.converter.ScalaToJavaConverter._ @@ -62,7 +65,7 @@ object StreamletImpl { * related Java Streamlet is transformed to Scala version again. */ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R]) - extends Streamlet[R] { + extends StreamletBaseImpl[R](javaStreamlet) with Streamlet[R] { import StreamletImpl._ @@ -75,13 +78,6 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R]) override def setName(sName: String): Streamlet[R] = fromJavaStreamlet[R](javaStreamlet.setName(sName)) - /** - * Gets the name of the Streamlet. - * - * @return Returns the name of the Streamlet - */ - override def getName(): String = javaStreamlet.getName - /** * Sets the number of partitions of the streamlet * @@ -91,13 +87,6 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R]) override def setNumPartitions(numPartitions: Int): Streamlet[R] = fromJavaStreamlet[R](javaStreamlet.setNumPartitions(numPartitions)) - /** - * Gets the number of partitions of this Streamlet. - * - * @return the number of partitions of this Streamlet - */ - override def getNumPartitions(): Int = javaStreamlet.getNumPartitions - /** * Set the id of the stream to be used by the children nodes. * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2): @@ -480,7 +469,10 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R]) * Logs every element of the streamlet using String.valueOf function * This is one of the sink functions in the sense that this operation returns void */ - override def log(): Unit = javaStreamlet.log() + override def log(): StreamletBase[R] = { + val newJavaStreamletBase = javaStreamlet.log() + StreamletBaseImpl.fromJavaStreamletBase(newJavaStreamletBase) + } /** * Applies the consumer function to every element of the stream @@ -489,9 +481,10 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R]) * @param consumer The user supplied consumer function that is invoked for each element * of this streamlet. */ - override def consume(consumer: R => Unit): Unit = { + override def consume(consumer: R => Unit): StreamletBase[R] = { val serializableConsumer = toSerializableConsumer[R](consumer) - javaStreamlet.consume(serializableConsumer) + val newJavaStreamletBase = javaStreamlet.consume(serializableConsumer) + StreamletBaseImpl.fromJavaStreamletBase(newJavaStreamletBase) } /** @@ -501,9 +494,10 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R]) * @param sink The Sink whose put method consumes each element * of this streamlet. */ - override def toSink(sink: Sink[R]): Unit = { + override def toSink(sink: Sink[R]): StreamletBase[R] = { val javaSink = toJavaSink[R](sink) - javaStreamlet.toSink(javaSink) + val newJavaStreamletBase = javaStreamlet.toSink(javaSink) + StreamletBaseImpl.fromJavaStreamletBase(newJavaStreamletBase) } /** @@ -513,11 +507,11 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R]) * * @return The kid streamlets */ - private[impl] def getChildren: List[JavaStreamletImpl[_]] = { + private[impl] def getChildren: List[JavaStreamletBaseImpl[_]] = { import _root_.scala.collection.JavaConversions._ val children = javaStreamlet - .asInstanceOf[JavaStreamletImpl[_]] + .asInstanceOf[JavaStreamletBaseImpl[_]] .getChildren children.toList } diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java index ea40f012c96..43924171ea1 100644 --- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java @@ -461,7 +461,7 @@ public void testSimpleBuild() throws Exception { TopologyBuilder topologyBuilder = new TopologyBuilder(); Set stageNames = new HashSet<>(); supplierStreamlet.build(topologyBuilder, stageNames); - assertTrue(supplierStreamlet.allBuilt()); + assertTrue(supplierStreamlet.isFullyBuilt()); assertEquals(supplierStreamlet.getChildren().size(), 1); assertTrue(supplierStreamlet.getChildren().get(0) instanceof FlatMapStreamlet); FlatMapStreamlet fStreamlet = @@ -499,11 +499,11 @@ public void testComplexBuild() { Set stageNames = new HashSet<>(); supplierStreamlet1.build(topologyBuilder, stageNames); assertTrue(supplierStreamlet1.isBuilt()); - assertFalse(supplierStreamlet1.allBuilt()); + assertFalse(supplierStreamlet1.isFullyBuilt()); supplierStreamlet2.build(topologyBuilder, stageNames); - assertTrue(supplierStreamlet1.allBuilt()); - assertTrue(supplierStreamlet2.allBuilt()); + assertTrue(supplierStreamlet1.isFullyBuilt()); + assertTrue(supplierStreamlet2.isFullyBuilt()); // go over all stuff assertEquals(supplierStreamlet1.getChildren().size(), 1); @@ -546,7 +546,7 @@ public void testCalculatedDefaultStageNames() { supplierStreamlet.build(topologyBuilder, stageNames); // verify SupplierStreamlet - assertTrue(supplierStreamlet.allBuilt()); + assertTrue(supplierStreamlet.isFullyBuilt()); assertEquals(1, supplierStreamlet.getChildren().size()); assertTrue(supplierStreamlet.getChildren().get(0) instanceof ConsumerStreamlet); assertEquals("consumer1", supplierStreamlet.getChildren().get(0).getName());