-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support reading Iceberg equality delete files (Design) #8748
Comments
|
Quick note on equality delete files in intention and practice: The underlying motivation for having a separate equality delete file type, in addition to position delete, is that they are used when the producer of the delete cannot do a table scan, or it's cost-prohibitive. There is a tradeoff associated with using equality deletes: in exchange for not having to do a table scan to find the position of the delete, you must do extra work during merge on read. This is why batch and interactive systems such as Spark, Trino and Presto generate only positional delete files, whereas streaming systems like Flink and Iceberg tools may generate equality delete files, especially for change data capture (CDC) use cases. This means that in practice, one does not typically encounter equality delete files except when one is using Iceberg for CDC, and this is still considered an emergent feature of Iceberg. |
@tdcmeehan Thanks for the explanation, so from the point of view of execution engine that can be used in both batch and streaming cases, equality deletion is used more frequently in row oriented formats with keys, e.g. Avro? And Avro only has single column key, so the priority is not high to support multi column key deletion. However I think as a general execution engine, we need to get it right (at least at design level) in the first place, so that if someone comes with a row-oriented format with multi-column keys, we can cope with it without major change. In this sense I still think investment in proper support of multi-column keys is worth doing. Also another point is in big company sometimes the producer of the data is not aware of the consumers. It's not uncommon that streaming data is stored in data warehouse and later read by a different tool. So on read side we need to be as flexible as possible. |
@Yuhta I don't think it's necessarily correlated to the underlying file format, it's really whether or not the system generating the deletes wants or is capable of performing a table scan. This is because positional deletes require the position, which requires a table scan to determine the row that is to be deleted. I think this applies equally to any file format. To be clear, I believe it is beneficial and useful for Velox to support reading equality deletes as it is consistent with Velox's mission to be a pervasive execution layer in query engines. Given Iceberg's popularity and momentum, it's important for Velox's Iceberg integration to have full support for both equality and position delete files. I am simply adding some context on when one typically sees equality delete files in real world settings. |
@Yuhta The remaining filter approach already supports multi-column expressions, e.g. a <> 1 OR b <> 1. My draft PR #8728 already works for that case and there is a test for multiple column delete values. And as I explained in "Build Hash Tables or Filters/FilterFunctions?" section, I believe the remaining filter approach is more advantageous than building additional mutation hash tables in all dimensions including performance, implementation easiness, and code cleanliness. I don't think you need to add any new interface for multi-key hash table if you just go with the remaining filter way. Also, correct me if I understood this wrong: I think the "Disjunctive(OR) predicate pushdown" I mentioned is different than your "multi-column ID pushdown". The essential point is to push the OR predicates to the ColumnReaders and decoders as domain filters, so the benefit of filter pushdown can be honored. E.g. In a predicate like (a = 1 OR b = 1) AND (c = 1), today we can only push down c = 1, but my idea is that we can also push (a = 1 OR b = 1) down to the ColumnReader level in the future. Of course this needs fundamental changes in the current reader implementations. It was an idea emerged while I was working on Iceberg, and I don't think any other engines have it. I want to try it in the future but not the near future. Whereas the "multi-column ID pushdown" you mentioned seems to be AFTER all data is read since you mentioned you wanted to build hash tables in mutation object, thus won't have the benefit of filter pushdown. |
Thanks @tdcmeehan for the background introduction. While not being able to do a scan WAS the reason why some engines produce equality delete files, the performance benefit will be another major reason why users want to use equality deletes in the future. I believe equality deletes WILL out-perform the positional deletes after we implement it in Prestissimo, because 1) we will save a big scan and semi join in the delete query. 2) we can push down some of the equality delete as domain filters to Velox Column readers, plus some other optimizations that are specific to equality delete files only. Given the fact that many engines scan is not as efficient, making the "merge" happen in Velox/Presissimo scan will have better performance. Also the dynamic filter support is limited nowadays, but for equality delete, we can definitely pushdown domain filters while not worrying about the data size etc. So we will use equality delete in TPCDS publication and implement equality delete in Prestissimo in the next step. |
@yingsu00 For single column key we can push it down to decoders, but I don't think you can do the same for multi-column keys due to the correlation between columns. Putting huge list of ORs in remaining filter would just destroy the performance. So
|
@Yuhta Thanks for your questions. This is a very preliminary idea now. It's essentially to pushdown the expression evaluation into ColumnReaders for some special expressions like logical expressions with non-overlapping columns. e.g. for expression Now, if we can create a
This has multiple benefits:
So I think it would benefit the performance generally, especially after we push the filters to ENCODED data. Even if we don't push the filters to ENCODED data, it may still be faster in a lot of cases. You said " Putting huge list of ORs in remaining filter would just destroy the performance", I think you meant the case that the number of values in the expression is much smaller than the number of row to be read, such that the bitmap may be larger than the hash table. But remember, you have to extract and copy all rows first, which is mostly larger than the bitmap itself. For each batch, we read at most 10,000 rows, and the bitmap for it would just be around 1KB for each column. Actually, the RowSet itself for each ColumnReader nowadays may be larger than that. Even your hash table may be larger than several KBs itself. And most importantly, calculating hash values on all relevant columns for all rows may be much more expensive. But I agree that in some cases building hash table or evaluating the remaining filter afterwards may be faster, e.g. when the equality delete file contains many many columns. So I think the execution strategy should be self-adapted at run time. And the criteria and policy shall be determined on extensive performance testing. Anyways, this idea is preliminary and needs a lot of refinement and generalization. I may try prototyping something next year, but not in this Iceberg work. But your feedback and discussion are very welcome! |
At this point you already go through all the key data and pay almost the full price of reading them (decompressing and decoding, the only saving is you do not copy the data out but using 1 bit, not a lot difference really, unless you keep these bitmaps also encoded, which will be further complex and spending more time when we do OR later). The saving from selective reading is mainly by skipping, in this case you cannot skip reading Also I don't see we can use this to speed up mutability. The method does not work well with tree of logic expression. How do you push down For the first implementation I would suggest we do the single column case only, since that is the thing everyone is agreed on, and covers most of the real world use cases. |
Not quite. The GroupColumnReader does need to decompress the whole ColumnChunk, but it does NOT necessarily need to decode all data, if we can evaluate the filter on ENCODED data.
I think there will be a difference, but I don't have time to try it now so we can forget it. Whether to try it or not also depends on the portion of the remaining filter or hash table mutation cost in the whole query. If it's already fast then no need to do it. We will send a PR for IcebergReadBenchmark and it will cover the equality delete case. Then we will have more insights on where the time is spent.
The naive way is to have two GroupColumnReaders, one for (b = 1 OR c = 1) and the other for (b = 2 OR c = 2). Thus we'll have to read b and c twice, but we can skip some rows for (b = 2 OR c = 2), and also use filter on ENCODED data to avoid decoding all data. Then the improved version is to
In this approach, each row uses 4 comparisons and 3 logical ops. This approach can also be applied to the LogicalExpressionEvaluator. The current general ExprSet evaluation can recogonize common expressions. If the expression is I agree this approach has big limitations, it only works for logical expressions, and doesn't work for complex functions and those involve multi-columns like Now let's consider the hash table approach for But when there are many many values, e.g.
Yes this is exactly what was done in the PR #8728. It only pushes single column filters and the rest are evaluated as remaining filters. All rest optimizations are not included in this PR. Your review is much appreciated. I'll ping you when the tests pass. Actually this particular optimization(push down disjunctives) is the last thing I may want to try, since it requires lots of code change and thus bigger risk. The other mentioned optimizations, e.g. logical expression simplifications and caching, will be tried first, if necessary. So we agree on this. |
That's the only way to achieve it. However I am not sure if the gain worth it, it might be still slower than just copying the key columns out and probing a hash table.
Sorry I made a mistake, the relevant expression should be |
Description
In https://github.com/facebookincubator/velox/pull/7847we introduced IcebergSplitReader and the support of reading positional delete files. In this doc we will discuss the implementation of reading equality delete files.
Iceberg Equality Deletes Overview
A general introduction of equality delete files can be found at https://iceberg.apache.org/spec/#equality-delete-files. Some key takeaways:
means:
The expression specifies:
means:
Equality delete file 1
Equality delete file 2
Positional delete file 1
means
Design considerations
Build Hash Tables or Filters/FilterFunctions?
The equality delete files can be interpreted as logical expressions and become the remaining filters that can be evaluated after all rows in a batch is read out into the result vectors. Or alternatively, they can be used to construct a number of hash tables that will be probed against after all rows are read into the result vectors. Suppose the equality delete file contains the following information:
It means
To build the hash tables, we will need to concatenate the hash values of column 2 and 3 together, and the hash table will contain two hash values for 'Bear##Grizzly' and 'Bear##Brown'. Then in the matching phase, the hash values of column 2 and 3 for all the rows in the output RowVectors would be calculated and concatenated before probing the hash table. If it's not a match, it means this row was definitely not deleted; if it is a match, then the row needs to be compared with the original delete values to confirm if it's really deleted. Only when all the values are the same the row shall be confirmed to have been removed. Note that the final comparison is necessary, because there is still a very small possibility that hash probe collision could happen, especially when there are many columns involved.
Note that creating hash tables on single columns is not correct without additional processing. For example, suppose the base file is as follows:
If we build one hash table on the second column "category" that contains {'Bear'}, and another hash table on name that contains {'Grizzly', 'Brown'}, then probing the category hash table to exclude rows with category = 'Bear' would incorrectly remove row 3, probing the name hash table to exclude 'Grizzly' and 'Brown' would incorrectly remove row 4. Taking logical AND or OR on the two probe results is also incorrect.
Now let's take one step back, and build the hashtables on single values. So we have hashtable A on "category" with one value 'Bear', and another hash table B on "name" with value "Grizzly" and another hash table C on "name" with value "Brown", then a row would pass if (category <> 'Bear' OR name <> 'Grizzly') AND (category <> 'Bear' OR name <> 'Polar') by probing hash table A twice, and hash table B and C once, then compute the logical ORs and ANDs. However, this is no difference than just comparing the values and no hash tables are actually needed.
The other way is to compile these delete values into logical expressions that can be executed as the remaining filter functions, or even domain filters that can be pushed down to the base file reader. This can be more efficient than taking the hash table approach. Firstly, filter pushdown can eliminate a lot of decoding costs; Secondly, computing and concatenating the hash values for all rows are very expensive. In fact, it is much slower than just performing simple comparisons on single column values. The latter could be efficiently done by SIMD operations, while the hash value computation cannot be efficiently implemented using SIMD. And lastly, the values need to be compared anyways even for the hash table approach.
We should also notice that if we convert them into logical expressions as remaining filters, the existing Velox expression evaluation implementation can automatically choose the best evaulation strategy, e.g. whether to build hash tables, or do efficient SIMD logical comparisons when it sees fit. This is much more flexible than building fixed hash tables in the connector DataSources. In many cases, the ExpressionEvaluator can choose more efficient way to evaluate the equivalent expressions. Today, it's already very easy to construct the logical expressions, and for the equality delete to work, there is no additional code needed beyond the expression constructions what's so ever. The existing Velox data source and readers can already handle them, so the implementation would be fairly simple. Plus, we can additionally improve existing filter function / expression evaluation implementations that can potentially benefit other components of the driver pipeline in the future. So we propose to choose the remaining filter path and just convert the equality delete files into filters and filter functions.
Where and How to open the equality delete files
Query engines like Presto or Spark usually have some central coordinators, where the distributed plan and splits are created. The splits would then be sent to the workers and executed there using the Velox stack. A query may issue many splits for each table, and each of them may include many (may be up to hundreds) delete files. We have the choice to open the delete files in the coordinator, and/or in the workers. There are some basic design considerations here:
Native workers running Velox need to have the ability to read both equality delete and positional delete files.
Although it's possible for Prestissimo to open the (equality) delete files on the coordinator(s), we cannot assume other engines can process the equality delete files internally by themselves. The Iceberg splits come with a list of delete file paths and they could be positional or equality or both. With the normal scheduling implementations in most engines, the splits would be directly sent to the workers for local executions. An engine would need a fairly large amount of special handling to break this procedure and open the delete files, update the distributed query plan, creating filters that can be pushed down, or even change the scan into a join, etc. before sending out the splits. This makes the engines logic much more complex and the integration with Velox much harder.
Opening files is a very expensive operation. Opening all delete files on the coordinator may become the bottleneck of the system. Even though we could cache the parsed expressions from each distinct delete file, opening hundreds of them on a single or a small number of coordinator nodes is still not practical.
Based on these considerations, I think we need to implement reading equality deletes in Velox. However it doesn't mean we cannot open some of the equality delete files on the coordinator for optimization purpose. But that optimization should not be mandatory for the engines built on top of Velox.
Performance Considerations
We want to push down the filters as much, and as deep as possible.
By pushing down the filters to the readers or even decoder's level, we can efficiently avoid the costs of decoding skipped rows, or even save some decompression costs. This savings could be huge if the selectivity rate is very small. We shall notice that some of the equality delete files and all positional delete files could be converted to TupleDomain filters or initial row numbers that can be pushed to the readers. In order to achieve this, we will need to extract the parts that can be pushed down, and guarantee the rest parts are evaluated or tested correctly.
We want to avoid opening the delete files as much as possible
A split may include hundreds of delete files, and a worker could receive many splits with the same set of delete files. Ideally, each delete file should be opened only once on one worker. This is because 1) opening files is expensive 2) expression compilation, or building hashtables that can be later probed are also not cheap. There're a couple of ways to achieve this
Building a hash table for each HiveDataSource, or a long living cache on the compiled expressions on each node
Convert the scan with equality delete files to a broadcast join, with the delete files becoming one data source, and the base file becoming another data source. This shows good improvements in Presto, but it also misses the opportunities of
We want to reduce the amount of expression evaluation as much as possible.
We have shown that the equality delete files can be interpreted as some conjunctive logical expressions. However, logical expression evaluations are also expensive when the expression contains many terms. We notice that Velox can already extract common sub expressions and flatten adjacent logical expressions, but the more general logical expression simplifications is still not implemented. Nonetheless, there are some ways to simplify the expressions for some simple cases for Iceberg. We will discuss them later.
Design
EqualityDeleteFileReader
We will be introducing the
EqualityDeleteFileReader
class, and each reader is responsible for opening one equality delete file. The content will be read in batches, and for each batch, the logical expressions will be built and merged with existing remainingFilter in the HiveDataSource.The equality delete file schema is not fixed and can only be known at query run time. The equality Ids and the base file schema are used together to get the output row type and build the ScanSpec for the equality delete file. The equality Ids are the same as the id in Velox
TypeWithId
, and therefore we can directly usedwio::common::typeutils::buildSelectedType()
to get the delete file schema. Note that this Id is not necessarily from a primitive type column, but could also be a sub-field from a complex type column. For example, deleting from an ARRAY[INTEGER] column c where c[i]=5 can also be expressed as an equality delete file. The field Ids for this column isTherefore the equality id for this predicate is 2, and the content of the equality delete file is value 5.
Once we read the delete values, we can build the
ExprSet
and add it to the existingremainingFilterExprSet_
in HiveDataSource. Then theexpresionEvaluator_
inHiveDataSource
will evaluate them after all relevant vectors are loaded. There are two ways to add the newly created ExprSet to the existing remainingFilterExprSet:remainingFilterExprSet_
remainingFilterExprSet_
Note that the current HiveDataSource assumes
remainingFilterExprSet_
has only one Expr, and the owned SimpleExpressionEvaluator only evaluates the firstExpr
in anExprSet
. There're a couple of facts that we discover:SimpleExpressionEvaluator
is only used in TableScan and HiveConnectorremainingFilterExprSet_
would always be a special kind ofExprSet
that it only contains logical expressions.While I think
SimpleExpressionEvaluator
should indeed evaluate all Exprs in the passed inExprSet
, I think we can alternative create a newLogicalExpressionEvaluator
, in which we can have special logical expression evaluation improvements in the future. Then it seems that adding the new Expr to remainingFilterExprSet_
as an array element is the most clean and simple way.Extraction of domain filters
When the equality delete file only has one field, we can extract it as a domain filter. Such filter can be pushed down to the readers and decoders, where performance savings could happen. In this case we will create a NOT IN filter for it. This is done in
connector::hive::iceberg::FilterUtil::createNotInFilter()
, which in turn would call into the utility functions in common:Filter.h/cpp. The values will be de-duplicated and nulls will be treated separately. Velox can optimize it into different kinds of filter, e.g. a range filter when there is only one value.Note that we need to verify the field is not a sub-field, since Velox currently doesn't support pushing down filters to sub-fields. This restriction will be removed once Velox supports sub-field filter pushdowns.
An equality delete file with multiple fields cannot be pushed down as domain filters at this moment, no matter if there's a single row or multiple rows. E.g. this delete file can be interpreted as
id <> 3 || name <> 'Grizzly'
. Currently Velox does not support pushing down disjunctives but we may do it in the future.Domain Filter Merge
A split may come with multiple equality delete files. Some of them may have the same schema. If they all have the same single field, the extracted domain filters will be deduped and merged with the existing one. E.g.
Equality delete file 1
Equality delete file 2
The domain filter built from these 2 files will be
category NOT IN {'bear', 'mouse'}
This is using themergeWith
api in the Filter class.Remaining Filter Function Merge
If the equality delete files have the same schema but not the single field, For example
Equality delete file 1
Equality delete file 2
This will create 2 Expr in the final ExprSet:
Today Velox supports common sub-expressions recognition in the ExpressionEvaluator, and such expression would be evaluated only once. In this example (`id <> 3 || name <> 'Winnie') evaluation result would be cached internally and does not need to be evaluated twice.
Logical Expression Simplification
As far as I understand, Velox can do logical expression flattening, but still can't automatically simplify the logical expression. For example, the expression
a AND (b AND (c AND d))
would be flattened asAND(a,b,c,d)
, buta AND (a OR b)
cannot be automatically simplified toa
, therefore to evaluatea AND (a OR b)
, a and b will both be evaluated, and one AND and one OR operation need to be performed. While we hope to improve logical expression simplification in the future, we can still do some simple improvements for Iceberg now.An Iceberg split can come with multiple equality delete files and their schemas could have overlaps. For example
Equality delete file 1
Equality delete file 2
Equality delete file 3
We see that equality delete file 2 is on the
category
column and would remove all tuples with valuemouse
. This means that the first two rows in equality delete file 1 are already contained and doesn’t need to be read or compiled. Similarly, the single row in file 3 contains row 3 in file 1, therefore row 3 in file 1 doesn’t need to be read or compiled. The simplified delete files are like the follows:and
and
With this simplification, the resulted expression would be simpler and the evaluation cost will be reduced.
When the delete file only has one field, the domain filter built from it can be used as a filter when reading other equality delete files whose fields include this one. In the above example,
category <> 'mouse'
can be pushed to file 1, whose row 1 and 2 would be filtered out. This not only helps final expression evaluation, but also improve the read performance for reading file 1.If the delete file has more than 1 field, the situation is more complex. In the above example, file 3 would be compiled to
category <> 'bear' OR name <> 'Winnie
, but it cannot be pushed to file 1 nor the base file directly because it's a disjunctive expression. So far Velox only supports domain filters in conjunctive expressions. So for now we will only use single field equality delete files to do the simplifications. For this, we will go over the equality ids from all equality delete files and pick all single field ones to read first. Then the filters will be pushed to the other equality file readers.In the future, we can even implement disjunctive expression push downs. For example
category <> 'bear' OR name <> 'Winnie
can be pushed to the SelectiveColumnReaders, with the category and name columns as a ColumnGroup. This will save the cost of having to read all values out before applying the filter function as a remaining filter, and the selectivity vector can be reused among them. Moreover, the reduction of rows from applying this filter directly on this ColumnGroup would benefit the reading of other columns later.Expression Caching
We know that a unique
HiveDataSource
object is created for a uniqueTableScan
operator, and the splits received by aHiveDataSource
instance belong to the same query and same table. Additionally for Iceberg splits, they must be reading the same snapshot of an Iceberg table. When theHiveDataSource
receives a new Iceberg split with some equality delete files, it would create a newIcebergSplitReader
, which would open the delete files. If the equality delete file can be interpreted into some domain filters or filter functions, thescanSpec_
andremainingFilterExprSet_
inHIveDataSource
may need to be updated.Currently, the Iceberg library selects the qualified data and delete files based on partitions and snapshot Ids or transaction sequence numbers. For a single transaction, the snapshot is fixed, and all delete files from the same partition would go with the base data files when the splits are enumerated. So we can assume for now that all splits received from the same partition are the same for a single
HiveDataSource
. However, the delete files for different partitions could be different, and the splits from multiple partitions could arrive out of order. If we updated thescanSpec_
andremainingFilterExprSet_
for previous partition, we will need to restore them back to the original before applying the current set of delete files. As the first implementation, we will make a copy of these objects in the IcebergSplitReader and restore them back when the IcebergSplitReader is destructed.In some user's workloads, the deletions are quite frequent, and the number of delete files coming with a split for a subsequent SELECT query can be many. For all splits in a partition, the delete files may be the same. We don't want to repeatedly read such equality delete files for every split a
HiveDataSource
needs to handle. One way of overcoming this is to build an expression cache. There are 2 levels of the caching ideas:In 1, the key of the hash table is <partition, snapshotId> and the values are the compiled filters and expressions.
In 2, the key of the cache is <table, partition, snapshotId> and the values are the compiled filters and expressions. To avoid excessive contentions, we can divide the cache into multiple levels. The implementation will be adjusted with more experiments and observations of the customer workloads in the future.
If the Iceberg library changes its TableScan or FileScan in the future and can additionally prune the delete files based on each individual base data files, we will need to change the cache keys and add the information for the base file.
We will work on caching in the future when we understands the workloads better.
LogicalExpressionEvaluator Improvements
Currently the remaining filter is evaluated in
HiveDataSource::evaluateRemainingFilter()
This code evaluates the
remainingFilterExprSet_
as a general expression instead of a special logical expression, and would put the result of the Expr's inremainingFilterExprSet_
in aFlatVector
as bool type, thenprocessFilterResults()
would perform logical AND/OR on these vectors. This incurs additional memory copies. Moreover, it contains special handling of nulls, while the logical expressions would NOT produce NULLs at all, so this part of cost can be saved as well. Our newly introduced LogicalExpressionEvaluator will have its own evaluate() implementation that is more performant for logical expressions.Testing
Prestissimo End To End Tests
In addition to unit tests we will have
Microbenchmarks
We will build microbenchmarks in Velox and Presto and cover both delete files.
The text was updated successfully, but these errors were encountered: