Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix handler npe #67

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,30 @@ public ExecutionSiddhiStream cql(String executionPlan) {
* @return ExecutionSiddhiStream context
*/
public ExecutionSiddhiStream cql(DataStream<ControlEvent> controlStream) {
DataStream<Tuple2<StreamRoute, Object>> unionStream = controlStream
.map(new NamedControlStream(ControlEvent.DEFAULT_INTERNAL_CONTROL_STREAM))
.broadcast()
.union(this.toDataStream())

// first of all we gather control events to the broadcasted stream
DataStream<Tuple2<StreamRoute, Object>> broadcastedControlStream = controlStream
.map(new NamedControlStream(ControlEvent.DEFAULT_INTERNAL_CONTROL_STREAM))
.broadcast();

// Control events are cut off on the AddRouteOperator stage
DataStream<Tuple2<StreamRoute, Object>> dataWithControlStream = this.toDataStream()
Copy link
Contributor

@pranjal0811 pranjal0811 May 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your way of solving this. I do have also the logic for solving this like -

public ExecutionSiddhiStream cql(DataStream<ControlEvent> controlStream) {
   DataStream<Tuple2<StreamRoute, Object>> unionStream = controlStream
                .map(new NamedControlStream(ControlEvent.DEFAULT_INTERNAL_CONTROL_STREAM))
                .broadcast()
                .union(this.toDataStream())
                .transform("add route transform",
                    SiddhiTypeFactory.getStreamTupleTypeInformation(TypeInformation.of(Object.class)),
                    new AddRouteOperator(getCepEnvironment().getDataStreamSchemas()));
DataStream<Tuple2<StreamRoute,Object>> controlEventStream = unionStream.filter(x ->x.f0.isBroadCastPartitioning()) 
DataStream<Tuple2<StreamRoute,Object>> eventStream = unionStream.filter(x -> !(x.f0.isBroadCastPartitioning()))

DataStream<Tuple2<StreamRoute, Object>> partitionedStream = new DataStream<>(
                    eventStream.getExecutionEnvironment(),
                    new PartitionTransformation<>(eventStream.getTransformation(),
                            new DynamicPartitioner()));

return new ExecutionSiddhiStream(partitionedStream.union(controlEventStream), null, getCepEnvironment());

However, I didn't change the AddRouteOperator logic. This thing will also work, I think my approach will be more overhead. Want your thought to it.

Copy link
Author

@rovboyko rovboyko May 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I compared two solutions if they build the same JobGraphs or not.

The JobGraphs look like the same except:
my solution:
image
your solution:
image

So it really has some overhead - not in additional vertexes, but in additional logic in the route transformation operator.

Nevertheless I prefer to not include control events to the stream before the routing. The main advantage - it won't be ambiguous.

.union(broadcastedControlStream)
.transform("add route transform",
SiddhiTypeFactory.getStreamTupleTypeInformation(TypeInformation.of(Object.class)),
new AddRouteOperator(getCepEnvironment().getDataStreamSchemas()));

// create partitioned stream based only on data events
DataStream<Tuple2<StreamRoute, Object>> partitionedStream = new DataStream<>(
unionStream.getExecutionEnvironment(),
new PartitionTransformation<>(unionStream.getTransformation(),
dataWithControlStream.getExecutionEnvironment(),
new PartitionTransformation<>(dataWithControlStream.getTransformation(),
new DynamicPartitioner()));
return new ExecutionSiddhiStream(partitionedStream, null, getCepEnvironment());

// union partitioned data stream with broadcasted control stream
DataStream<Tuple2<StreamRoute, Object>> unionStream = partitionedStream
.union(broadcastedControlStream);

return new ExecutionSiddhiStream(unionStream, null, getCepEnvironment());
}

private static class NamedControlStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public void processElement(StreamRecord<Tuple2<StreamRoute, Object>> element) th
} else if (value instanceof MetadataControlEvent) {
handleMetadataControlEvent((MetadataControlEvent)value);
}

output.collect(element);
} else {
String inputStreamId = streamRoute.getInputStreamId();
if (!inputStreamToExecutionPlans.containsKey(inputStreamId)) {
Expand Down