Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix handler npe #67

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

fix handler npe #67

wants to merge 1 commit into from

Conversation

rovboyko
Copy link

@rovboyko rovboyko commented May 7, 2020

PR corrects the situation when data stream events are assigned to partitions by AddRouteOperator, but control events aren't. This happens because control events can't be distributed on broadcast rule through the DynamicPartitioner.
PR also fixes NPE in testDynamicalStreamSimplePatternMatch().
#64

.broadcast();

// Control events are cut off on the AddRouteOperator stage
DataStream<Tuple2<StreamRoute, Object>> dataWithControlStream = this.toDataStream()
Copy link
Contributor

@pranjal0811 pranjal0811 May 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your way of solving this. I do have also the logic for solving this like -

public ExecutionSiddhiStream cql(DataStream<ControlEvent> controlStream) {
   DataStream<Tuple2<StreamRoute, Object>> unionStream = controlStream
                .map(new NamedControlStream(ControlEvent.DEFAULT_INTERNAL_CONTROL_STREAM))
                .broadcast()
                .union(this.toDataStream())
                .transform("add route transform",
                    SiddhiTypeFactory.getStreamTupleTypeInformation(TypeInformation.of(Object.class)),
                    new AddRouteOperator(getCepEnvironment().getDataStreamSchemas()));
DataStream<Tuple2<StreamRoute,Object>> controlEventStream = unionStream.filter(x ->x.f0.isBroadCastPartitioning()) 
DataStream<Tuple2<StreamRoute,Object>> eventStream = unionStream.filter(x -> !(x.f0.isBroadCastPartitioning()))

DataStream<Tuple2<StreamRoute, Object>> partitionedStream = new DataStream<>(
                    eventStream.getExecutionEnvironment(),
                    new PartitionTransformation<>(eventStream.getTransformation(),
                            new DynamicPartitioner()));

return new ExecutionSiddhiStream(partitionedStream.union(controlEventStream), null, getCepEnvironment());

However, I didn't change the AddRouteOperator logic. This thing will also work, I think my approach will be more overhead. Want your thought to it.

Copy link
Author

@rovboyko rovboyko May 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I compared two solutions if they build the same JobGraphs or not.

The JobGraphs look like the same except:
my solution:
image
your solution:
image

So it really has some overhead - not in additional vertexes, but in additional logic in the route transformation operator.

Nevertheless I prefer to not include control events to the stream before the routing. The main advantage - it won't be ambiguous.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants