diff --git a/PROTOCOL.md b/PROTOCOL.md index 5d01b2b19f5..f03d4a57ecb 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -95,6 +95,7 @@ Here is an example of a Delta table with three entries in the commit log, stored /mytable/_delta_log/_last_checkpoint /mytable/_change_data/cdc-00000-924d9ac7-21a9-4121-b067-a0a6517aa8ed.c000.snappy.parquet /mytable/part-00000-3935a07c-416b-4344-ad97-2a38342ee2fc.c000.snappy.parquet +/mytable/deletion_vector-0c6cbaaf-5e04-4c9d-8959-1088814f58ef.bin ``` ### Data Files @@ -103,6 +104,11 @@ By default, the reference implementation stores data files in directories that a This directory format is only used to follow existing conventions and is not required by the protocol. Actual partition values for a file must be read from the transaction log. +### Deletion Vector Files +Deletion Vector (DV) files are stored root directory of the table alongside the data files. A DV file contains one or more serialised DV, each describing the set of *invalidated* (or "soft deleted") rows for a particular data file it is associated with. +For data with partition values, DV files are *not* kept in the same directory hierarchy as data files, as each one can contain DVs for files from multiple partitions. +DV files store DVs in a [binary format](#deletion-vector-format). + ### Change Data Files Change data files are stored in a directory at the root of the table named `_change_data`, and represent the changes for the table version they are in. For data with partition values, it is recommended that the change data files are stored within the `_change_data` directory in their respective partitions (i.e. `_change_data/part1=value1/...`). Writers can _optionally_ produce these change data files as a consequence of operations that change underlying data, like `UPDATE`, `DELETE`, and `MERGE` operations to a Delta Lake table. If an operation only adds new data or removes existing data without updating any existing rows, a writer can write only data files and commit them in `add` or `remove` actions without duplicating the data into change data files. When available, change data readers should use the change data files instead of computing changes from the underlying data files. @@ -279,19 +285,25 @@ The following is an example `metaData` action: ### Add File and Remove File -The `add` and `remove` actions are used to modify the data in a table by adding or removing individual data files respectively. +The `add` and `remove` actions are used to modify the data in a table by adding or removing individual _logical files_ respectively. + +Every _logical file_ of the table is represented by a path to a data file, combined with an optional Deletion Vector (DV) that indicates which rows of the data file are no longer in the table. Deletion Vectors are an optional feature, see their [reader requirements](#deletion-vectors) for details. -The path of a file acts as the primary key for the entry in the set of files. -When an `add` action is encountered for a path that is already present in the table, statistics and other information from the latest version should replace that from any previous version. -As such, additional statistics can be added for a path already present in the table by adding it again. +When an `add` action is encountered for a logical file that is already present in the table, statistics and other information from the latest version should replace that from any previous version. +The primary key for the entry of a logical file in the set of files is a tuple of the data file's `path` and a unique id describing the DV. If no DV is part of this logical file, then its primary key is `(path, NULL)` instead. The `remove` action includes a timestamp that indicates when the removal occurred. -Physical deletion of the file can happen lazily after some user-specified expiration time threshold. +Physical deletion of physical files can happen lazily after some user-specified expiration time threshold. This delay allows concurrent readers to continue to execute against a stale snapshot of the data. A `remove` action should remain in the state of the table as a _tombstone_ until it has expired. -A tombstone expires when the creation timestamp of the delta file exceeds the expiration threshold added to the `remove` action timestamp. +A tombstone expires when *current time* (according to the node performing the cleanup) exceeds the expiration threshold added to the `remove` action timestamp. -Since actions within a given Delta file are not guaranteed to be applied in order, it is not valid for multiple file operations with the same path to exist in a single version. +In the following statements, `dvId` can refer to either the unique id of a specific Deletion Vector (`deletionVector.uniqueId`) or to `NULL`, indicating that no rows are invalidated. Since actions within a given Delta commit are not guaranteed to be applied in order, a **valid** version is restricted to contain at most one file action *of the same type* (i.e. `add`/`remove`) for any one combination of `path` and `dvId`. Moreover, for simplicity it is required that there is at most one file action of the same type for any `path` (regardless of `dvId`). +That means specifically that for any commit… + + - it is **legal** for the same `path` to occur in an `add` action and a `remove` action, but with two different `dvId`s. + - it is **legal** for the same `path` to be added and/or removed and also occur in a `cdc` action. + - it is **illegal** for the same `path` to be occur twice with different `dvId`s within each set of `add` or `remove` actions. The `dataChange` flag on either an `add` or a `remove` can be set to `false` to indicate that an action when combined with other actions in the same atomic version only rearranges existing data or adds new statistics. For example, streaming queries that are tailing the transaction log can use this flag to skip actions that would not affect the final results. @@ -300,13 +312,14 @@ The schema of the `add` action is as follows: Field Name | Data Type | Description -|-|- -path| String | A relative path to a file from the root of the table or an absolute path to a file that should be added to the table. The path is a URI as specified by [RFC 2396 URI Generic Syntax](https://www.ietf.org/rfc/rfc2396.txt), which needs to be decoded to get the file path. -partitionValues| Map[String, String] | A map from partition column to value for this file. See also [Partition Value Serialization](#Partition-Value-Serialization) -size| Long | The size of this file in bytes -modificationTime | Long | The time this file was created, as milliseconds since the epoch -dataChange | Boolean | When `false` the file must already be present in the table or the records in the added file must be contained in one or more `remove` actions in the same version -stats | [Statistics Struct](#Per-file-Statistics) | Contains statistics (e.g., count, min/max values for columns) about the data in this file -tags | Map[String, String] | Map containing metadata about this file +path| String | A relative path to a data file from the root of the table or an absolute path to a file that should be added to the table. The path is a URI as specified by [RFC 2396 URI Generic Syntax](https://www.ietf.org/rfc/rfc2396.txt), which needs to be decoded to get the data file path. +partitionValues| Map[String, String] | A map from partition column to value for this logical file. See also [Partition Value Serialization](#Partition-Value-Serialization) +size| Long | The size of this data file in bytes +modificationTime | Long | The time this logical file was created, as milliseconds since the epoch +dataChange | Boolean | When `false` the logical file must already be present in the table or the records in the added file must be contained in one or more `remove` actions in the same version +stats | [Statistics Struct](#Per-file-Statistics) | Contains statistics (e.g., count, min/max values for columns) about the data in this logical file +tags | Map[String, String] | Map containing metadata about this logical file +deletionVector | [DeletionVectorDescriptor Struct](#Deletion-Vectors) | Either null (or absent in JSON) when no DV is associated with this data file, or a struct (described below) that contains necessary information about the DV that is part of this logical file. The following is an example `add` action: ``` @@ -326,13 +339,14 @@ The schema of the `remove` action is as follows: Field Name | Data Type | Description -|-|- -path| String | A relative path to a file from the root of the table or an absolute path to a file that should be removed from the table. The path is a URI as specified by [RFC 2396 URI Generic Syntax](https://www.ietf.org/rfc/rfc2396.txt), which needs to be decoded to get the file path. +path| String | A relative path to a file from the root of the table or an absolute path to a file that should be removed from the table. The path is a URI as specified by [RFC 2396 URI Generic Syntax](https://www.ietf.org/rfc/rfc2396.txt), which needs to be decoded to get the data file path. deletionTimestamp | Option[Long] | The time the deletion occurred, represented as milliseconds since the epoch dataChange | Boolean | When `false` the records in the removed file must be contained in one or more `add` file actions in the same version extendedFileMetadata | Boolean | When `true` the fields `partitionValues`, `size`, and `tags` are present partitionValues| Map[String, String] | A map from partition column to value for this file. See also [Partition Value Serialization](#Partition-Value-Serialization) -size| Long | The size of this file in bytes +size| Long | The size of this data file in bytes tags | Map[String, String] | Map containing metadata about this file +deletionVector | [DeletionVectorDescriptor Struct](#Deletion-Vectors) | Either null (or absent in JSON) when no DV is associated with this data file, or a struct (described below) that contains necessary information about the DV that is part of this logical file. The following is an example `remove` action. ``` @@ -472,19 +486,21 @@ An example of storing provenance information related to an `INSERT` operation: # Action Reconciliation A given snapshot of the table can be computed by replaying the events committed to the table in ascending order by commit version. A given snapshot of a Delta table consists of: + - A single `protocol` action - A single `metaData` action - A map from `appId` to transaction `version` - - A collection of `add` actions with unique `path`s - - A collection of `remove` actions with unique `path`s. The intersection of the paths in the `add` collection and `remove` collection must be empty. That means a file cannot exist in both the `remove` and `add` collections. The `remove` actions act as _tombstones_. + - A collection of `add` actions with unique `path`s. + - A collection of `remove` actions with unique `(path, deletionVector.uniqueId)` keys. The intersection of the primary keys in the `add` collection and `remove` collection must be empty. That means a logical file cannot exist in both the `remove` and `add` collections at the same time; however, the same *data file* can exist with *different* DVs in the `remove` collection, as logically they represent different content. The `remove` actions act as _tombstones_, and only exist for the benefit of the VACUUM command. Snapshot reads only return `add` actions on the read path. To achieve the requirements above, related actions from different delta files need to be reconciled with each other: + - The latest `protocol` action seen wins - The latest `metaData` action seen wins - For transaction identifiers, the latest `version` seen for a given `appId` wins - - All `add` actions for different paths need to be accumulated as a list. The latest `add` action (from a more recent delta file) observed for a given path wins. - - All `remove` actions for different paths need to be accumulated as a list. If a `remove` action is received **later** (from a more recent delta file) for the same path as an `add` operation, the corresponding `add` action should be removed from the `add` collection and the file needs to be tracked as part of the `remove` collection. - - If an `add` action is received **later** (from a more recent delta file) for the same path as a `remove` operation, the corresponding `remove` action should be removed from the `remove` collection and the file needs to be tracked as part of the `add` collection. + - Logical files in a table are identified by their `(path, deletionVector.uniqueId)` primary key. File actions (`add` or `remove`) reference logical files, and a log can contain any number of references to a single file. + - To replay the log, scan all file actions and keep only the newest reference for each logical file. + - `add` actions in the result identify logical files currently present in the table (for queries). `remove` actions in the result identify tombstones of logical files no longer present in the table (for VACUUM). # Column Mapping Delta can use column mapping to avoid any column naming restrictions, and to support the renaming and dropping of columns without having to rewrite all the data. There are two modes of column mapping, by `name` and by `id`. In both modes, every column - nested or leaf - is assigned a unique _physical_ name, and a unique 32 bit integer as an id. The physical name is stored as part of the column metadata with the key `delta.columnMapping.physicalName`. The column id is stored within the metadata with the key `delta.columnMapping.id`. The column mapping is governed by the table property `delta.columnMapping.mode` and can be one of `none`, `id`, and `name`. @@ -531,6 +547,73 @@ In `id ` mode, readers must resolve columns by using the `field_id` in the parqu In `name` mode, readers must resolve columns in the data files by their physical names. Partition values and column level statistics will also be resolved by their physical names. For columns that are not found in the files, `null`s need to be returned. Column ids are not used in this mode for resolution purposes. +# Deletion Vectors + +`add` and `remove` actions can optionally include a Deletion Vector (DV) that provides information about logically deleted rows, that are however still physically present in the underlying data file and must thus be skipped during processing. + +DVs can be stored and accessed in different ways, indicated by the `storageType` field. The Delta protocol currently supports inline or on-disk storage, where the latter can be accessed either by a relative path derived from a UUID or an absolute path. + +## Deletion Vector Descriptor Schema + +The schema of the `DeletionVectorDescriptor` struct is as follows: + +Field Name | Data Type | Description +-|-|- +storageType | String | A single character to indicate how to access the DV. Legal options are: `['u', 'i', 'p']`. +pathOrInlineDv | String | Three format options are currently proposed: +offset | Option[Int] | Start of the data for this DV in number of bytes from the beginning of the file it is stored in. Always `None` (absent in JSON) when `storageType = 'i'`. +sizeInBytes | Int | Size of the serialized DV in bytes (raw data size, i.e. before base85 encoding, if inline). +cardinality | Long | Number of rows the given DV logically removes from the file. + +The concrete Base85 variant used is [Z85](https://rfc.zeromq.org/spec/32/), because it is JSON-friendly. + +### Derived Fields + +Some fields that are necessary to use the DV are not stored explicitly but can be derived in code from the stored fields. + +Field Name | Data Type | Description | Computed As +-|-|-|- +uniqueId | String | Uniquely identifies a DV for a given file. This is used for snapshot reconstruction to differentiate the same file with different DVs in successive versions. | If `offset` is `None` then ``.
Otherwise `@`. +absolutePath | String/URI/Path | The absolute path of the DV file. Can be calculated for relative path DVs by providing a parent directory path. | If `storageType='p'`, just use the already absolute path. If `storageType='u'`, the DV is stored at `//deletion_vector_.bin`. This is not a legal field if `storageType='i'`, as an inline DV has no absolute path. + +### JSON Example 1 — On Disk with Relative Path (with Random Prefix) +```json +{ + "storageType" : "u", + "pathOrInlineDv" : "ab^-aqEH.-t@S}K{vb[*k^", + "offset" : 4, + "sizeInBytes" : 40, + "cardinality" : 6 +} +``` +Assuming that this DV is stored relative to an `s3://mytable/` directory, the absolute path to be resolved here would be: `s3://mytable/ab/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin`. + +### JSON Example 2 — On Disk with Absolute Path +```json +{ + "storageType" : "p", + "pathOrInlineDv" : "s3://mytable/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin", + "offset" : 4, + "sizeInBytes" : 40, + "cardinality" : 6 +} +``` + +### JSON Example 3 — Inline +```json +{ + "storageType" : "i", + "pathOrInlineDv" : "wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", + "sizeInBytes" : 40, + "cardinality" : 6 +} +``` +The row indexes encoded in this DV are: 3, 4, 7, 11, 18, 29. + +## Reader Requirements for Deletion Vectors + +If a snapshot contains logical files with records that are invalidated by a DV, then these records *must not* be returned in the output. + # Requirements for Writers This section documents additional requirements that writers must follow in order to preserve some of the higher level guarantees that Delta provides. @@ -544,7 +627,7 @@ This section documents additional requirements that writers must follow in order ## Delta Log Entries - A single log entry MUST NOT include more than one action that reconciles with each other. - - Add / Remove actions with the same `path` + - Add / Remove actions with the same `(path, DV)` tuple. - More than one Metadata action - More than one protocol action - More than one SetTransaction with the same `appId` @@ -649,17 +732,55 @@ Writer Version 3 | Enforce:
- `delta.checkpoint.writeStatsAsJson`
- `delta Writer Version 4 | - Support [Change Data Feed](#add-cdc-file)
- Support [Generated Columns](#generated-columns) Writer Version 5 | Respect [Column Mapping](#column-mapping) Writer Version 6 | Support [Identity Columns](#identity-columns) +Writer Version 7 | Respect [Deletion Vectors](#deletion-vectors) # Requirements for Readers +This section documents additional requirements that readers must respect in order to produce correct scans of a Delta table. + +## Reader Version Requirements + The requirements of the readers according to the protocol versions are summarized in the table below. Each row inherits the requirements from the preceding row.
| Requirements -|- Reader Version 2 | Respect [Column Mapping](#column-mapping) +Reader Version 3 | Respect [Deletion Vectors](#deletion-vectors) # Appendix +## Deletion Vector Format + +Deletion Vectors are basically sets of row indexes, that is 64-bit integers that describe the position (index) of a row in a parquet file starting from zero. We store these sets in a compressed format. The fundamental building block for this is the open source [RoaringBitmap](https://roaringbitmap.org/) library. RoaringBitmap is a flexible format for storing 32-bit integers that automatically switches between three different encodings at the granularity of a 16-bit block (64K values): + +- Simple integer array, when the number of values in the block is small. +- Bitmap-compressed, when the number of values in the block is large and scattered. +- Run-length encoded, when the number of values in the block is large, but clustered. + +The serialization format is standardized, and both [Java](https://github.com/lemire/RoaringBitmap/) and [C/C++](https://github.com/RoaringBitmap/CRoaring) implementations are available (among others). + +Since RoaringBitmap only covers 32-bit integers, we extend the format in a simple manner by keeping an array of 32-bit RoaringBitmaps and using the upper 32-bits to index into the array. + +The serialization format for such a `RoaringBitmapArray` is as follows (all numerical values are written in little endian byte order): + +Bytes | Name | Description +-|-|- +0 — 3 | magicNumber | 1681511376; Indicates that the following bytes are serialised in this exact format. Future alternative—but related—formats must have a different magic number, for example by incrementing this one. +4 — 7 | numBitmaps | The number of 32-bit bitmaps in the array +`repeat for i in 0 to length` | | For each 32-bit RoaringBitmap +`` — ` + 3` | bitmapDataSize | Number of bytes of this bitmap’s serialized representation. +` + 4` — ` + 4 + bitmapDataSize` | bitmapData | Serialized bytes in the RoaringBitmap standard serialization format. + +The format for storing DVs in file storage is one (or more) of these `RoaringBitmapArray`s per file, together with a checksum for each DV: + +Bytes | Name | Description +-|-|- +0 — 1 | version | The format version of this file: `1` for the format described here. +`repeat for each DV i` | | For each DV +`` — ` + 3` | dataSize | Size of this DV’s data (without the checksum) +` + 4` — ` + 4 + dataSize - 1` | bitmapData | One `RoaringBitmapArray` serialised as described above. +` + 4 + dataSize` — ` + 4 + dataSize + 3` | checksum | CRC-32 checksum of `bitmapData` + ## Per-file Statistics `add` actions can optionally contain statistics about the data in the file being added to the table. These statistics can be used for eliminating files based on query predicates or as inputs to query optimization. @@ -670,6 +791,9 @@ The following global statistic is currently supported: Name | Description -|- numRecords | The number of records in this file. +tightBounds | Whether per-column statistics are currently **tight** or **wide** (see below). + +In the presence of [Deletion Vectors](#Deletion-Vectors) the statistics may be somewhat outdated, i.e. not reflecting deleted rows yet. The flag `stats.tightBounds` indicates whether we have **tight bounds** (i.e. the min/maxValue exists[^1] in the valid state of the file) or **wide bounds** (i.e. the minValue is <= all valid values in the file, and the maxValue >= all valid values in the file). These upper/lower bounds are sufficient information for data skipping. Per-column statistics record information for each column in the file and they are encoded, mirroring the schema of the actual data. For example, given the following data schema: @@ -683,6 +807,7 @@ Statistics could be stored with the following schema: ``` |-- stats: struct | |-- numRecords: long +| |-- tightBounds: boolean | |-- minValues: struct | | |-- a: struct | | | |-- b: struct @@ -695,11 +820,13 @@ Statistics could be stored with the following schema: The following per-column statistics are currently supported: -Name | Description --|- -nullCount | The number of null values for this column -minValues | A value smaller than all values present in the file for this column -maxValues | A value larger than all values present in the file for this column +Name | Description (`stats.tightBounds=true`) | Description (`stats.tightBounds=false`) +-|-|- +nullCount | The number of `null` values for this column |

If the `nullCount` for a column equals the physical number of records (`stats.numRecords`) then **all** valid rows for this column must have `null` values (the reverse is not necessarily true).

If the `nullCount` for a column equals 0 then **all** valid rows are non-`null` in this column (the reverse is not necessarily true).

If the `nullCount` for a column is any value other than these two special cases, the value carries no information and should be treated as if absent.

+minValues | A value that is equal to the smallest valid value[^1] present in the file for this column. If all valid rows are null, this carries no information. | A value that is less than or equal to all valid values[^1] present in this file for this column. If all valid rows are null, this carries no information. +maxValues | A value that is equal to the largest valid value[^1] present in the file for this column. If all valid rows are null, this carries no information. | A value that is greater than or equal to all valid values[^1] present in this file for this column. If all valid rows are null, this carries no information. + +[^1]: String columns are cut off at a fixed prefix length. Timestamp columns are truncated down to milliseconds. ## Partition Value Serialization