From eef886468b46a131139595b705799b3a95ba8b11 Mon Sep 17 00:00:00 2001 From: rboyko Date: Thu, 7 May 2020 16:22:15 +0700 Subject: [PATCH] fix npe in testDynamicalStreamSimplePatternMatch() --- .../flink/streaming/siddhi/SiddhiStream.java | 25 +++++++++++++------ .../siddhi/router/AddRouteOperator.java | 2 -- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java b/core/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java index f5ad77f..0c7dbe6 100644 --- a/core/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java +++ b/core/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java @@ -124,19 +124,30 @@ public ExecutionSiddhiStream cql(String executionPlan) { * @return ExecutionSiddhiStream context */ public ExecutionSiddhiStream cql(DataStream controlStream) { - DataStream> unionStream = controlStream - .map(new NamedControlStream(ControlEvent.DEFAULT_INTERNAL_CONTROL_STREAM)) - .broadcast() - .union(this.toDataStream()) + + // first of all we gather control events to the broadcasted stream + DataStream> broadcastedControlStream = controlStream + .map(new NamedControlStream(ControlEvent.DEFAULT_INTERNAL_CONTROL_STREAM)) + .broadcast(); + + // Control events are cut off on the AddRouteOperator stage + DataStream> dataWithControlStream = this.toDataStream() + .union(broadcastedControlStream) .transform("add route transform", SiddhiTypeFactory.getStreamTupleTypeInformation(TypeInformation.of(Object.class)), new AddRouteOperator(getCepEnvironment().getDataStreamSchemas())); + // create partitioned stream based only on data events DataStream> partitionedStream = new DataStream<>( - unionStream.getExecutionEnvironment(), - new PartitionTransformation<>(unionStream.getTransformation(), + dataWithControlStream.getExecutionEnvironment(), + new PartitionTransformation<>(dataWithControlStream.getTransformation(), new DynamicPartitioner())); - return new ExecutionSiddhiStream(partitionedStream, null, getCepEnvironment()); + + // union partitioned data stream with broadcasted control stream + DataStream> unionStream = partitionedStream + .union(broadcastedControlStream); + + return new ExecutionSiddhiStream(unionStream, null, getCepEnvironment()); } private static class NamedControlStream diff --git a/core/src/main/java/org/apache/flink/streaming/siddhi/router/AddRouteOperator.java b/core/src/main/java/org/apache/flink/streaming/siddhi/router/AddRouteOperator.java index ff0e236..7fc4cfc 100644 --- a/core/src/main/java/org/apache/flink/streaming/siddhi/router/AddRouteOperator.java +++ b/core/src/main/java/org/apache/flink/streaming/siddhi/router/AddRouteOperator.java @@ -61,8 +61,6 @@ public void processElement(StreamRecord> element) th } else if (value instanceof MetadataControlEvent) { handleMetadataControlEvent((MetadataControlEvent)value); } - - output.collect(element); } else { String inputStreamId = streamRoute.getInputStreamId(); if (!inputStreamToExecutionPlans.containsKey(inputStreamId)) {