Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support "delta format sharing" and release delta-sharing-spark 3.1 #2291

Closed
2 of 8 tasks
linzhou-db opened this issue Nov 14, 2023 · 0 comments
Closed
2 of 8 tasks
Labels
enhancement New feature or request
Milestone

Comments

@linzhou-db
Copy link
Collaborator

linzhou-db commented Nov 14, 2023

Feature request

Support "delta format sharing" and release delta-sharing-spark 3.1

Which Delta project/connector is this regarding?

delta-sharing-spark

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (delta-sharing-spark)

Context: Advanced Delta Features

Advanced delta features such as DeletionVectors and ColumnMapping are developed where delta is no longer a parquet only protocol. In order to catch up with new advanced delta features, we are proposing to upgrade delta sharing protocol to support "delta format sharing", where we could return the shared table in delta format, and leverage developed delta spark library to read data. The benefit would be to avoid code duplication on supporting newly created advanced delta features in delta sharing spark, and easier to catch up on future delta features.

Please refer delta-io/delta-sharing#341 for original proposal.

Delta Format Sharing

The idea is to transfer the delta log from the provider to the recipient via the delta sharing http requests, construct a local delta log, and leverage delta spark library to read the data out of the delta log.

Protocol Changes

In the delta sharing protocol, a new http request header delta-sharing-capabilities will be introduced, where its value will be comma separated capabilities, where each capability is like capability_key=capability_value. Example: delta-sharing-capabilities:responseFormat=delta,readerfeatures=deletionVectors,columnMapping.
For upgraded delta sharing server that could handle the new header, it will parse the new header and prepare the response accordingly, it will ignore the capabilities that cannot be handled or having an unrecognized value. But it will return error if the shared table has capabilities that is not specified in the header (indicating it's supported by the client).

If the responseFormat=delta in the request header and the delta sharing server could handle it, then it will add a similar header in the response as well to indicate that it's handled: delta-sharing-capabilities:responseFormat=delta. Then each line in the response is a json object that could be parsed as a delta action, and could be constructed as a delta log on the client side. With the only change to be the path will be a pre-signed url, so the client side needs to read data out of the pre-signed url.
The goal is to support delta format sharing, in delta-io/delta oss repo, and release delta-sharing-spark 3.1.

Library Changes

In order to support this, we need to restructure the delta sharing libraries. We'll launch a delta-sharing-client library to include code with two core functionalities: delta sharing client and related utils that handle http requests/responses to the delta sharing server, delta sharing file system and related utils that handle reading data out of pre-signed url and refreshing of pre-signed urls. With responseFormat=delta, the delta sharing client won't parse the json lines and will let the delta spark library to parse and handle them.

We'll continue to release delta-sharing-spark library with the rest of the functionalities including data source, the streaming source, options, etc. While all the code will be moved from delta-io/delta-sharing to delta-io/delta to be able to leverage all the delta classes and libraries to construct a delta log, read data, and finally serve the DataFrame to the query.

High Level Implementation Details

Snapshot Queries

For snapshot queries, as we need to handle filters push down to delta sharing server:

  • createRelation requires a BaseRelation to be returned, so we will return a basic HadoopFsRelation, where a FileIndex is required.
  • The FileIndex requires the schema and partitionSchema of the table, we will firstly fetch table schema with client.getMetadata.
  • Then a DeltaSharingFileIndex class is built with the fetched metadata, and used in a HadoopFsRelation, to return for createRelation.
  • Then within DeltaSharingFileIndex.listFiles, both partitionFilters and dataFilters are ready, which will be converted to jsonPredicateHints in the rpc query to the delta sharing server.
  • With fetched files in delta format from the server, each line is reprocessed to 1) prepare the idToUrl mapping. 2) prepare the delta-sharing path for each file action to be put in the AddFile.
  • Once all lines are processed, a 0.json delta log file containing all the lines are put in a block of the BlockManager, including the Protocol/Metadata/AddFiles.
  • DeltaSharingLogFileSystem: A file system with the scheme as delta-sharing-log:// , the local DeltaLog will have a path starting with delta-sharing-log://, and DeltaSharingLogFileSystem will translate file path to blockIds that contains the content in BlockManager, and serve the content.
  • Then, a DeltaLog class is constructed with a delta-sharing-log path pointing to the block manager.
  • Finally, a TahoeLogFileIndex is constructed with the DeltaLog and the result from this file index’s listFiles will be returned.

CDF Queries

For cdf queries, as we are not applying filtering pushdown, we’ll directly fetch delta files from the delta sharing server, construct the delta log, and leverage delta spark to read cdf out of it.

  • When createRelation is called as readChangeFeed=true, we’ll start to prepare a DeltaCDFRelation.
  • Firstly, DeltaSharingClient will send rpc with the cdf options to fetch the needed files in delta format.
  • With fetched files in delta format from the server, each line is reprocessed to 1) prepare the idToUrl mapping. 2) prepare the delta-sharing path for each file action to be put in the FileAction.
    prepare a fake checkpoint with minVersion-1 to avoid going back to version 0.
  • Protocol and Metadata will be put in minVersion.json as required by the CDCReader.
  • All other FileActions will be put in corresponding version.json since version is returned for each of them from the server.
  • All the json file content will be put in BlockManager and served through DeltaSharingLogFileSystem.
  • Finally, DeltaTableV2.toBaseRelation with a delta-sharing-log path pointing to the locally constructed table is used to return the BaseRelation needed by createRelation.

Streaming Queries

For streaming queries, we will create a new class DeltaFormatSharingSource, which wraps an instance of DeltaSource, and for the two main APIs latestOffset and getBatch, it will firstly perform delta sharing related tasks: such as to use DeltaSharingClient to request the files from the server side, store it in the local BlockManager, and then leverage DeltaSource to perform the concrete operations. Specifically:

  • From DeltaSharingDataSource, when receiving a streaming query to create a Source, if it’s asking for delta format sharing, we’ll return the DeltaFormatSharingSource, otherwise, the DeltaSharingSource for parquet format sharing.
  • Within the DeltaFormatSharingSource, we wrap an instance of DeltaSource, i.e., deltaSource, which is built on top of the path of the local delta log.
  • Upon initialization, we will fetch the latest protocol and metadata from the server, store them in the local delta log, for DeltaSource to use..
  • When latestOffset is called, we’ll firstly check whether there are unprocessed data files in the recipient local delta log,
    If all local data is processed, we use getTableVersion to check if there is new data from the provider side, if so we fetch the new data with queryTable, store it locally (in the meantime, we also need to prepare the file id to url mapping, setup url refresh, etc).
  • If there is unprocessed data, deltaSource.latestOffset is called and the return value is returned.
  • When getBatch is called, we’ll directly call deltaSource.getBatch with the same parameters to return the requested data.
  • Upon every commit call, we’ll check the table version from the endOffset, and clean data from BlockManager before version-1.

Project Plan

ID Task description PR Status Author
1 Introduce Util Functions #2403 Closed @linzhou-db
2 Support Snapshot Queries #2440 Closed @linzhou-db
3 Support CDF Queries #2457 Closed @linzhou-db
4 Support Streaming Queries #2472 Closed @linzhou-db
5 Support DeletionVectors #2480 Closed @linzhou-db

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

  • Yes. I can contribute this feature independently.
  • Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
  • No. I cannot contribute this feature at this time.
@linzhou-db linzhou-db added the enhancement New feature or request label Nov 14, 2023
@tdas tdas added this to the 3.1.0 milestone Nov 14, 2023
vkorukanti pushed a commit that referenced this issue Jan 9, 2024
Adds snapshot support for "delta format sharing", this is the second PR of issue #2291
- DeltaSharingDataSource with snapshot query support
- DeltaSharingDataSourceDeltaSuite
- DeltaSharingDataSourceDeltaTestUtils/TestClientForDeltaFormatSharing/TestDeltaSharingFileSystem

Closes #2440

GitOrigin-RevId: a095445b6da809ee9a5b4ece7c38d04a172ff70f
vkorukanti pushed a commit that referenced this issue Jan 10, 2024
Adds cdf support for "delta format sharing", this is the third PR of issue #2291
- DeltaSharingDataSource with cdf query support
- DeltaSharingDataSourceDeltaSuite

Closes #2457

GitOrigin-RevId: 2482f48c1344db14ecc79137ba1ea675820c83eb
vkorukanti pushed a commit that referenced this issue Jan 12, 2024
…n 3.1

## Description
(Cherry-pick of #2472 to branch-3.1)

Fourth PR of #2291: Adds Streaming support for "delta format sharing", and add column mapping test
- DeltaSharingDataSource with streaming query support
- DeltaFormatSharingSource
- DeltaFormatSharingSourceSuite/DeltaSharingDataSourceCMSuite

## How was this patch tested?
Unit Tests
vkorukanti pushed a commit that referenced this issue Jan 13, 2024
…ring

## Description
(Cherry-pick of #2480 to branch-3.1)

Fifth PR of #2291: Adds deletion vector support for "delta format sharing"
- Extends PrepareDeltaScan to PrepareDeltaSharingScan, to convert DeltaSharingFileIndex to TahoeLogFileIndex.
- Update DeltaSparkSessionExtension to add the rule of PrepareDeltaSharingScan
- Added unit test in DeltaSharingDataSourceDeltaSuite

## How was this patch tested?
Unit Tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Development

No branches or pull requests

3 participants