-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-14214][SQL] Update state to provide get/put interface #12013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3646878 to
ccb323f
Compare
| store = StateStore.get( | ||
| storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value) | ||
| val inputIter = dataRDD.iterator(partition, ctxt) | ||
| storeUpdateFunction(store, inputIter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The finally was removed to allow commits to be done lazily.
|
Test build #54360 has finished for PR 12013 at commit
|
|
Not sure about what this mima test failure is about. Thats anyways not on the latest commit, so I will wait for the tests on the latest commit to completed (build # 2703). |
| /** Commit all the updates that have been made to the store, and return the new version. */ | ||
| override def commit(): Long = { | ||
| verify(state == UPDATING, "Cannot commit again after already committed or cancelled") | ||
| verify(state == UPDATING, "Cannot commit after already committed or cancelled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: cancelled -> aborted
|
Test build #2703 has finished for PR 12013 at commit
|
|
Test build #54361 has finished for PR 12013 at commit
|
| } | ||
|
|
||
| override def put(key: UnsafeRow, value: UnsafeRow): Unit = { | ||
| verify(state == UPDATING, "Cannot remove after already committed or cancelled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove -> put
|
LGTM except one nit |
|
Test build #54369 has finished for PR 12013 at commit
|
|
Can we close this now? |
This PR adds the ability to perform aggregations inside of a `ContinuousQuery`. In order to implement this feature, the planning of aggregation has augmented with a new `StatefulAggregationStrategy`. Unlike batch aggregation, stateful-aggregation uses the `StateStore` (introduced in #11645) to persist the results of partial aggregation across different invocations. The resulting physical plan performs the aggregation using the following progression: - Partial Aggregation - Shuffle - Partial Merge (now there is at most 1 tuple per group) - StateStoreRestore (now there is 1 tuple from this batch + optionally one from the previous) - Partial Merge (now there is at most 1 tuple per group) - StateStoreSave (saves the tuple for the next batch) - Complete (output the current result of the aggregation) The following refactoring was also performed to allow us to plug into existing code: - The get/put implementation is taken from #12013 - The logic for breaking down and de-duping the physical execution of aggregation has been move into a new pattern `PhysicalAggregation` - The `AttributeReference` used to identify the result of an `AggregateFunction` as been moved into the `AggregateExpression` container. This change moves the reference into the same object as the other intermediate references used in aggregation and eliminates the need to pass around a `Map[(AggregateFunction, Boolean), Attribute]`. Further clean up (using a different aggregation container for logical/physical plans) is deferred to a followup. - Some planning logic is moved from the `SessionState` into the `QueryExecution` to make it easier to override in the streaming case. - The ability to write a `StreamTest` that checks only the output of the last batch has been added to simulate the future addition of output modes. Author: Michael Armbrust <michael@databricks.com> Closes #12048 from marmbrus/statefulAgg.
|
closing this as #12048 already absorbed this change. |
What changes were proposed in this pull request?
The goal is to make the state store more flexible by giving it a more key-value store like interface of gets and puts. This is to allow streaming aggregates physical plan to easily do only-gets instead of only-updates that the current is designed for.
How was this patch tested?
Unit tests.