Skip to content

Commit

Permalink
Make log/sink/consume Streamlet component support setName and setNumP…
Browse files Browse the repository at this point in the history
…artitions (apache#3459)
  • Loading branch information
nwangtw authored and sreev committed Apr 9, 2020
1 parent 5305d22 commit 0ad5d24
Show file tree
Hide file tree
Showing 17 changed files with 472 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,15 @@ public static void main(String[] args) throws Exception {
* Elements in the first cloned streamlet go to the database sink.
*/
splitGameScoreStreamlet.get(0)
.toSink(new DatabaseSink());
.toSink(new DatabaseSink())
.setName("sink0");

/**
* Elements in the second cloned streamlet go to the logging sink.
*/
splitGameScoreStreamlet.get(1)
.toSink(new FormattedLogSink());
.toSink(new FormattedLogSink())
.setName("sink1");

Config config = Config.defaultConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ object ScalaClassicalMusicTopology {
)
.setName("joined-classical-musics-by-year")
.log()
.setName("log")

val config = Config.defaultConfig()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ object ScalaIntegerProcessingTopology {
.union(zeroes)
.setName("union-of-numbers")
.log()
.setName("log")
.setNumPartitions(1)

val config = Config.newBuilder
.setNumContainers(NUM_CONTAINERS)
Expand Down
8 changes: 4 additions & 4 deletions heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* Streamlet before doing the transformation.
*/
@InterfaceStability.Evolving
public interface Streamlet<R> {
public interface Streamlet<R> extends StreamletBase<R> {

/**
* Sets the name of the BaseStreamlet.
Expand Down Expand Up @@ -299,21 +299,21 @@ <K> KVStreamlet<KeyedWindow<K>, Long> countByKeyAndWindow(
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
void log();
StreamletBase<R> log();

/**
* Applies the consumer function to every element of the stream
* This function does not return anything.
* @param consumer The user supplied consumer function that is invoked for each element
* of this streamlet.
*/
void consume(SerializableConsumer<R> consumer);
StreamletBase<R> consume(SerializableConsumer<R> consumer);

/**
* Applies the sink's put function to every element of the stream
* This function does not return anything.
* @param sink The Sink whose put method consumes each element
* of this streamlet.
*/
void toSink(Sink<R> sink);
StreamletBase<R> toSink(Sink<R> sink);
}
60 changes: 60 additions & 0 deletions heron/api/src/java/org/apache/heron/streamlet/StreamletBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.heron.streamlet;

/**
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
* The StreamletBase class contains basic information of a Streamlet
* such as name and partition count without the connection functions
* such as map() and filter().
*/
public interface StreamletBase<R> {

/**
* Sets the name of the BaseStreamlet.
* @param sName The name given by the user for this BaseStreamlet
* @return Returns back the Streamlet with changed name
*/
StreamletBase<R> setName(String sName);

/**
* Gets the name of the Streamlet.
* @return Returns the name of the Streamlet
*/
String getName();

/**
* Sets the number of partitions of the streamlet
* @param numPartitions The user assigned number of partitions
* @return Returns back the Streamlet with changed number of partitions
*/
StreamletBase<R> setNumPartitions(int numPartitions);

/**
* Gets the number of partitions of this Streamlet.
* @return the number of partitions of this Streamlet
*/
int getNumPartitions();

// This is the main interface that every Streamlet implementation should implement
// The main tasks are generally to make sure that appropriate names/partitions are
// computed and add a spout/bolt to the TopologyBuilder
// void build(TopologyBuilder bldr, Set<String> stageNames);
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public TopologyBuilder build(TopologyBuilder builder) {
streamlet.build(builder, stageNames);
}
for (StreamletImpl<?> streamlet : sources) {
if (!streamlet.allBuilt()) {
if (!streamlet.isFullyBuilt()) {
throw new RuntimeException("Topology cannot be fully built! Are all sources added?");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.streamlet.impl;

import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.logging.Logger;

import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.StreamletBase;

import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;

/**
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
* Streamlets originate from pub/sub systems(such Pulsar/Kafka), or from
* static data(such as csv files, HDFS files), or for that matter any other
* source. They are also created by transforming existing Streamlets using
* operations such as map/flatMap, etc.
* Besides the tuples, a Streamlet has the following properties associated with it
* a) name. User assigned or system generated name to refer the streamlet
* b) nPartitions. Number of partitions that the streamlet is composed of. Thus the
* ordering of the tuples in a Streamlet is wrt the tuples within a partition.
* This allows the system to distribute each partition to different nodes across the cluster.
* A bunch of transformations can be done on Streamlets(like map/flatMap, etc.). Each
* of these transformations operate on every tuple of the Streamlet and produce a new
* Streamlet. One can think of a transformation attaching itself to the stream and processing
* each tuple as they go by. Thus the parallelism of any operator is implicitly determined
* by the number of partitions of the stream that it is operating on. If a particular
* transformation wants to operate at a different parallelism, one can repartition the
* Streamlet before doing the transformation.
*/
public abstract class StreamletBaseImpl<R> implements StreamletBase<R> {
private static final Logger LOG = Logger.getLogger(StreamletBaseImpl.class.getName());
protected String name;
protected int nPartitions;
private List<StreamletBaseImpl<?>> children;
private boolean built;

/**
* Only used by the implementors
*/
protected StreamletBaseImpl() {
this.name = null;
this.nPartitions = -1;
this.children = new LinkedList<>();
this.built = false;
}

protected enum StreamletNamePrefix {
CONSUMER("consumer"),
COUNT("count"),
CUSTOM("custom"),
CUSTOM_BASIC("customBasic"),
CUSTOM_WINDOW("customWindow"),
FILTER("filter"),
FLATMAP("flatmap"),
JOIN("join"),
KEYBY("keyBy"),
LOGGER("logger"),
MAP("map"),
SOURCE("generator"),
REDUCE("reduce"),
REMAP("remap"),
SINK("sink"),
SPLIT("split"),
SPOUT("spout"),
SUPPLIER("supplier"),
TRANSFORM("transform"),
UNION("union");

private final String prefix;

StreamletNamePrefix(final String prefix) {
this.prefix = prefix;
}

@Override
public String toString() {
return prefix;
}
}

/**
* Sets the name of the Streamlet.
* @param sName The name given by the user for this streamlet
* @return Returns back the Streamlet with changed name
*/
@Override
public StreamletBase<R> setName(String sName) {
checkNotBlank(sName, "Streamlet name cannot be null/blank");

this.name = sName;
return this;
}

/**
* Gets the name of the Streamlet.
* @return Returns the name of the Streamlet
*/
@Override
public String getName() {
return name;
}

private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> stageNames) {
int index = 1;
String calculatedName;
while (true) {
calculatedName = new StringBuilder(prefix.toString()).append(index).toString();
if (!stageNames.contains(calculatedName)) {
break;
}
index++;
}
LOG.info("Calculated stage Name as " + calculatedName);
return calculatedName;
}

/**
* Sets a default unique name to the Streamlet by type if it is not set.
* Otherwise, just checks its uniqueness.
* @param prefix The name prefix of this streamlet
* @param stageNames The collections of created streamlet/stage names
*/
protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stageNames) {
if (getName() == null) {
setName(defaultNameCalculator(prefix, stageNames));
}
if (stageNames.contains(getName())) {
throw new RuntimeException(String.format(
"The stage name %s is used multiple times in the same topology", getName()));
}
stageNames.add(getName());
}

/**
* Sets the number of partitions of the streamlet
* @param numPartitions The user assigned number of partitions
* @return Returns back the Streamlet with changed number of partitions
*/
@Override
public StreamletBase<R> setNumPartitions(int numPartitions) {
require(numPartitions > 0, "Streamlet's partitions number should be > 0");

this.nPartitions = numPartitions;
return this;
}

/**
* Gets the number of partitions of this Streamlet.
* @return the number of partitions of this Streamlet
*/
@Override
public int getNumPartitions() {
return nPartitions;
}

public <T> void addChild(StreamletBaseImpl<T> child) {
children.add(child);
}

/**
* Gets all the children of this streamlet.
* Children of a streamlet are streamlets that are resulting from transformations of elements of
* this and potentially other streamlets.
* @return The kid streamlets
*/
public List<StreamletBaseImpl<?>> getChildren() {
return children;
}

public void build(TopologyBuilder bldr, Set<String> stageNames) {
if (built) {
throw new RuntimeException("Logic Error While building " + getName());
}

if (doBuild(bldr, stageNames)) {
built = true;
for (StreamletBaseImpl<?> streamlet : getChildren()) {
streamlet.build(bldr, stageNames);
}
}
}

public boolean isBuilt() {
return built;
}

public boolean isFullyBuilt() {
if (!isBuilt()) {
return false;
}
for (StreamletBaseImpl<?> child : children) {
if (!child.isFullyBuilt()) {
return false;
}
}
return true;
}

// This is the main interface that every Streamlet implementation should implement
// The main tasks are generally to make sure that appropriate names/partitions are
// computed and add a spout/bolt to the TopologyBuilder
protected abstract boolean doBuild(TopologyBuilder bldr, Set<String> stageNames);
}
Loading

0 comments on commit 0ad5d24

Please sign in to comment.