Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This adds updating queries to the SQL frontend. This allows Arroyo to read Debezium sources, write Debezium to sinks, and build new types of pipelines that intelligently compute how a record has changed, if it in fact has. This is rather large PR, so I'd recommend reviewing it in the following sections:
UpdatingData
This is the central struct for handling updates inside the dataflow. The
map()
method lets us cleanly apply map functions, potentially eliminating the record entirely if the values are the same. Similarly,filter()
applies a predicate to the update and determines if downstream requires an update.We also have DebeziumData, which wraps updates into the debezium format, similar to what Flink does when
format = 'debezium-json
.New Operators
Handling Updates requires new operators:
UpdatingAggregateOperator
This operator is for non-windowed aggregates. Every incoming record is aggregated into a single intermediate value according to the key. There are checks for if either the outgoing value or the internal state has changed, and work is only done when necessary. It supports both Updating and Append inputs, with the SQL layer providing different methods in each case. Currently SQL only supports aggregates that have compact intermediate forms (everything except count distinct). Because DataFusion currently unrolls single
count(distinct field)
computations into two non-distinct aggregates, this doesn't happen very often.This is the only new operator that requires state. We reuse the KeyTimeMap backend, where the time is the record timestamp. Similar to flink, this functions as an expiration, with a default expiration of 24 hours. There is not currently any eviction of stale data within the running processor, although expired data will be compacted away and not restored from a checkpoint.
KeyMapUpdatingOperator
This operator is necessary to recompute the key on an UpdatingData input. Because the key could be different between the old and new field, we may need to split a UpdatingData::Update into a UpdatingData::Retract and UpdatingData::Append and collect both of them.
UpdatingData
This is only an addition to datastream::Operator, as it ends up being compiled to an OptionMapOperator, but it applies an optional function on T to UpdatingData via the
map()
method mentioned above.JoinWithExpiration
This operator already supported inner joins with expirations. It is now extended to the four main join types, with everything except inner joins emitting an update stream. In particular, the first record on a side that was allowed to be null by the join type will now retract any previously emitted records.
SQL
The majority of the implementation is within arroyo-sql, in particular the portions concerned with SqlOperator and PlanOperators.
Remove PlanType from PlanEdge
In order to convert a PlanNode to a datastream::Operator the node generally needed to have type information. Introducing updates increased this need, as the operators can be quite different depending on whether the node produces updating data. Having this duplicated on the edges and node of the PlanGraph only complicated things, so it was removed. When converting to the Program graph we just look at the source node's type.
PlanType::UpdatingData
Nodes can now have a return type that is updating data, with an inner PlanType giving more details. These will be converted to UpdatingData types in the datastream:Program, and the compiler is, except for a few of the specific operators, unaware of Updatingdata.
Sources and Sinks
Sources and sinks can support updating data, currently through the "debezium_json" serialization mode.
Known Issues
Query LImitations
Updating tables only support a subset of queries. In particular, the following are not supported
row_number()
) can't have updating data as an input.Out of order retractions
For any forward sequence of nodes a retraction should always occur after the record it is retracting. However, I think that there is a sequence of shuffles that could result in a retraction arriving at a downstream node.
State Performance
The state works but is not highly optimized. In particular there are two main performance inefficiencies:
select count(*) from input
then there will be a state entry for every row. Because we only rely on expiration for compaction, recovering from this checkpoint will be slow.SELECT max(bid.price) from input
will write increasingly large records with every incoming record.Despite these limitations, I still think this is worth trying to merge so we can continue to iterate in this direction.