Skip to content

Commit

Permalink
[ISSUE apache#251]Support avg function in WindowStream
Browse files Browse the repository at this point in the history
  • Loading branch information
Jargon9 committed Jan 27, 2023
1 parent d6b934a commit 61b081c
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.core.function.accumulator;

import java.util.Properties;

public class AvgAccumulator<V> implements Accumulator<V, Double> {
private Double avg;
private Integer num;

@Override
public void addValue(V value) {
if (value instanceof Number) {
Number number = (Number) value;
Double valueToDouble = number.doubleValue();

if (avg == null) {
avg = valueToDouble;
num = 1;
} else {
avg = avg + (valueToDouble - avg) / (num + 1);
num++;
}
} else {
throw new IllegalArgumentException("Calculate avg, input is not a number. value=" + value);
}
}

@Override
public void merge(Accumulator<V, Double> other) {
if (other instanceof AvgAccumulator) {
AvgAccumulator<V> otherAvgAccumulator = (AvgAccumulator) other;
Integer numOther = otherAvgAccumulator.getNum();
Double avgOther = otherAvgAccumulator.getAvg();

avg = avg + numOther / (num + numOther) * (avgOther - avg);
num = num + numOther;
} else {
throw new IllegalArgumentException("Merge avg, input is not a AvgAccumulator.");
}
}

@Override
public Double result(Properties context) {
return avg;
}

@Override
public Accumulator<V, Double> clone() {
return new AvgAccumulator<>();
}

public Double getAvg() {
return avg;
}

public void setAvg(Double avg) {
this.avg = avg;
}

public Integer getNum() {
return num;
}

public void setNum(Integer num) {
this.num = num;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
public interface WindowStream<K, V> {
WindowStream<K, Integer> count();

WindowStream<K, Double> avg();

WindowStream<K, V> filter(FilterAction<V> predictor);

<OUT> WindowStream<K, OUT> map(ValueMapperAction<V, OUT> mapperAction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.rocketmq.streams.core.function.FilterAction;
import org.apache.rocketmq.streams.core.function.ValueMapperAction;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.function.accumulator.AvgAccumulator;
import org.apache.rocketmq.streams.core.function.accumulator.CountAccumulator;
import org.apache.rocketmq.streams.core.function.supplier.FilterSupplier;
import org.apache.rocketmq.streams.core.function.supplier.SinkSupplier;
Expand All @@ -41,6 +42,7 @@
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.SINK_PREFIX;
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.WINDOW_AGGREGATE_PREFIX;
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.WINDOW_COUNT_PREFIX;
import static org.apache.rocketmq.streams.core.util.OperatorNameMaker.WINDOW_AVG_PREFIX;

public class WindowStreamImpl<K, V> implements WindowStream<K, V> {
private final Pipeline pipeline;
Expand Down Expand Up @@ -69,6 +71,22 @@ public WindowStream<K, Integer> count() {
return this.pipeline.addWindowStreamVirtualNode(node, parent, windowInfo);
}

@Override
public WindowStream<K, Double> avg() {
String name = OperatorNameMaker.makeName(WINDOW_AVG_PREFIX, pipeline.getJobId());
Supplier<Processor<V>> supplier = new WindowAccumulatorSupplier<>(name, windowInfo, value -> value, new AvgAccumulator<>());

//是否需要分组计算
ProcessorNode<V> node;
if (this.parent.shuffleNode()) {
node = new ShuffleProcessorNode<>(name, parent.getName(), supplier);
} else {
node = new ProcessorNode<>(name, parent.getName(), supplier);
}

return this.pipeline.addWindowStreamVirtualNode(node, parent, windowInfo);
}

@Override
public WindowStream<K, V> filter(FilterAction<V> predictor) {
String name = OperatorNameMaker.makeName(FILTER_PREFIX, pipeline.getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public <K> void forward(Data<K, V> data) throws Throwable {
for (Processor<V> processor : childList) {

try {
// todo : 这里无法理解:为何后续还要add进去
processor.preProcess(this);
processor.process(data.getValue());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class OperatorNameMaker {
public static final String WINDOW_ADD_TAG = "ROCKETMQ-WINDOW-ADD-TAG";
public static final String ADD_TAG = "ROCKETMQ-ADD-TAG";
public static final String WINDOW_COUNT_PREFIX = "ROCKETMQ-WINDOW-COUNT";
public static final String WINDOW_AVG_PREFIX = "ROCKETMQ-WINDOW-AVG";
public static final String WINDOW_AGGREGATE_PREFIX = "ROCKETMQ-WINDOW-AGGREGATE";
public static final String RSTREAM_AGGREGATE_PREFIX = "ROCKETMQ-RSTREAM-AGGREGATE";
public static final String GROUPED_STREAM_AGGREGATE_PREFIX = "ROCKETMQ-GROUPED-STREAM-AGGREGATE";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.examples.window;

import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.streams.core.RocketMQStream;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.rstream.StreamBuilder;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.apache.rocketmq.streams.core.util.Pair;
import org.apache.rocketmq.streams.core.window.Time;
import org.apache.rocketmq.streams.core.window.TimeType;
import org.apache.rocketmq.streams.core.window.WindowBuilder;

import java.nio.charset.StandardCharsets;
import java.time.LocalTime;
import java.util.Properties;

/**
* 1、启动RocketMQ
* 2、创建topic
* 3、启动本例子运行
* 4、向topic中写入数据
* 5、观察输出结果
*/
public class WindowAvg {
public static void main(String[] args) {
StreamBuilder builder = new StreamBuilder("WindowAvg");
builder.source("avgSource", source -> {
String value = new String(source, StandardCharsets.UTF_8);
Integer num = Integer.parseInt(value);
return new Pair<>(null, num);
}).foreach(value -> System.out.println(String.format("time:%s, input:%d", LocalTime.now(), value)))
.filter(value -> value > 0)
.keyBy(value -> "key")
.window(WindowBuilder.tumblingWindow(Time.seconds(15)))
.avg()
.toRStream()
.print();

TopologyBuilder topologyBuilder = builder.build();

Properties properties = new Properties();
properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
properties.put(Constant.TIME_TYPE, TimeType.PROCESS_TIME);
properties.put(Constant.ALLOW_LATENESS_MILLISECOND, 5000);

RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);

rocketMQStream.start();
}
}

0 comments on commit 61b081c

Please sign in to comment.