-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adjust Operators to be Pausable #13694
Conversation
@@ -72,13 +85,14 @@ | |||
@Override | |||
public int numRows() | |||
{ | |||
return pointers.length; | |||
return end - start; |
Check failure
Code scanning / CodeQL
User-controlled data in arithmetic expression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a validation that this is positive, so this check should be able to be ignored.
processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java
Fixed
Show fixed
Hide fixed
processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java
Fixed
Show fixed
Hide fixed
5d17add
to
c86941a
Compare
This enables "merge" style operations that combine multiple streams. This change includes a naive implementation of one such merge operator just to provide concrete evidence that the refactoring is effective.
c86941a
to
01047ca
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did not review join operator as anything other than a proof of concept for the pattern (since missing some bits if viewed from a 'real' perspective like conditions other than equality, etc)
no blockers on my end and this code is all pretty well isolated and behind flags so going to go ahead and approve
@@ -126,4 +74,122 @@ public <T> T as(Class<? extends T> clazz) | |||
} | |||
return null; | |||
} | |||
|
|||
private class MyColumnAccessor implements BinarySearchableAccessor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this only be a binary searchable accessor if the column is sorted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generically speaking, yes it would be best to validate that first before returning the thing.
As a confounding factor, even if the whole column isn't sorted, it could be sorted in the context of another column. I.e. if a set of rows are sorted by (col1, col2)
, then col2
is not actually sorted if you look at the data, but when you access it in conjunction with col1
, it is. I agree that we should do more to figure this sort of thing out and fail if it's violated, but, for now, the code is counting on the query planning to do things correctly
|
||
FindResult findString(int startIndex, int endIndex, String val); | ||
|
||
FindResult findComplex(int startIndex, int endIndex, Object val); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be findObject instead of findComplex, that is if it should also handle other object types such as ARRAY, ARRAY, ARRAY<ARRAY<...>> etc? Or do you imagine array types will have some other method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine the Array types will have their own methods.
|
||
package org.apache.druid.query.rowsandcols.util; | ||
|
||
public class FindResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i know this makes stuff nicer, but wondering is this maybe expensive if we need to find a lot of things compared to just dealing in int
?
Just thinking out loud, probably don't need to worry about it right now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"dealing in int
" would actually mean dealing in int[]
because I am often returning a start/end range. Once we are dealing in int[]
, we have the overhead of a reference to an object anyway, so this seemed okay.
If we really wanted to fix this, we'd likely need to make the finder thingie itself stateful and have getters on that. That would probably be better, but can be an activity for a later day.
* Adjust Operators to be Pausable This enables "merge" style operations that combine multiple streams. This change includes a naive implementation of one such merge operator just to provide concrete evidence that the refactoring is effective.
This enables "merge" style operations that combine multiple streams.
This change includes a naive implementation of one such merge operator just to provide concrete evidence that the refactoring is effective.
The primary intent of the change can be seen by looking at the
Operator
interface. The change amounts to theaddition of a
Signal
enumeration that allows the Receiver to signal back to the Operator that it needs to pausethings. This then returns control to the caller to do something else (like call another Operator). The vast majority of
operators should never have to deal with this, but some that do merges will.
This interface change has made it possible to implement the
Yielder
interface in terms of anOperator
again, which is a proof-point that this works. On top of that, there is a concrete implementation of such a merge operator inSortedInnerJoinOperator
, which implements a multi-way inner join across operators. The class is almost entirely business logic for conducting the join rather than interactions with the Operator interface, so it is relatively large, but the intent was to prove that it is possible to create a meaningful "merging" operator on top of the interface changes, which it achieves. I would recommend starting with theSortedInnerJoinOperatorTest
when reviewing just because that will show the intended usage of the operator.Note, however, that this operator was created merely to exercise the interface change, as such it is not wired up into
planning no is there a query path that can exercise it yet. Now that I am certain the interface can handle all of the
needs of the data processing pipeline, I intend to go back to fleshing out the full test suite for window functions
and then can come back to this.
In terms of validation of the interface, there is likely one more validation to conduct, though I am fairly certain that it will be relatively simple and leave as a future exercise: the implementation of a
FrameProcessor
based onOperator
.This PR has: