Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDBLogStore #339

Conversation

mrk-its
Copy link
Contributor

@mrk-its mrk-its commented Mar 3, 2020

This PR addresses issue #41 - Support for AWS S3 (multiple clusters/drivers/JVMs)

It implements few ideas from #41 discussion:

  • provides generic base class BaseExternalLogStore for storing listing of commit files
    in external DB. This class may be easly extended for specific DB backend
  • stores contents of commit in temporary file and links to it in DB's row
    to be able to finish uncompleted write operation while reading
  • provides concrete DynamoDBLogStore implementation extending BaseExternalLogStore
  • implementations for other DB backends should be simple to implement
    (ZooKeeper implementation is almost ready, I can create separate PR if anyone is interested)

DynamoDBLogStore requirements:

To enable DynamoDBLogStore set following spark property:
spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore

Single dynamodb table is required. Default table name is 'delta_log',
it may be changed by setting spark property.

Required key schema:

  • parentPath: String, HASH
  • filename: String, RANGE

Table may be created with following aws cli command:

aws --region us-east-1 dynamodb create-table \
    --table-name delta_log \
    --attribute-definitions AttributeName=tablePath,AttributeType=S \
                            AttributeName=fileName,AttributeType=S \
    --key-schema AttributeName=tablePath,KeyType=HASH \
                AttributeName=fileName,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

Following spark properties are recognized:

  • spark.delta.DynamoDBLogStore.tableName - table name (defaults to 'delta_log')
  • spark.delta.DynamoDBLogStore.region - AWS region (defaults to 'us-east-1')

Testing

Python integration test is included :examples/python/dynamodb_logstore.py

This solution has been also stress-tested on Amazon's EMR cluster
(mutiple test jobs writing thousands of parallel transactions to single delta table)
and no data loss has beed observed so far

@databricks-cla-assistant
Copy link

databricks-cla-assistant commented Mar 3, 2020

CLA assistant check
All committers have signed the CLA.

@mrk-its
Copy link
Contributor Author

mrk-its commented Mar 4, 2020

@tdas @marmbrus could you take a look?

@tdas
Copy link
Contributor

tdas commented Mar 5, 2020

do you have any testing framework that can be run as unit tests in the PR?

I believe that with appropriate separation of contention-handling logic and dynamodb, each of them can be tested separately. for example, contention-handling logic can be tested with a pluggable in-memory store (that is, independent of dynamodb). Then we can ensure correctnes independent of any specific KV store.

@mrk-its
Copy link
Contributor Author

mrk-its commented Mar 5, 2020

Thanks for taking a look!

do you have any testing framework that can be run as unit tests in the PR?

Not yet.

I believe that with appropriate separation of contention-handling logic and dynamodb, each of them can be tested separately. for example, contention-handling logic can be tested with a pluggable in-memory store (that is, independent of dynamodb). Then we can ensure correctnes independent of any specific KV store.

You are right, I thought about that. BaseExternalLogStore logic can be tested independently of any KV store by subclassing it in tests. I'll work on that.

@mmazek
Copy link

mmazek commented Apr 14, 2020

Hi @tdas, we implemented the recommended changes and added tests. Can you please take a look and tag appropriate reviewers?

@mmazek
Copy link

mmazek commented May 14, 2020

Hi, can we get some reviews on this PR?

@matthewpick
Copy link

Is it possible to migrate an existing delta table to this format? Or does it have to be created from scratch with this implementation?

@mrk-its
Copy link
Contributor Author

mrk-its commented May 28, 2020

Is it possible to migrate an existing delta table to this format? Or does it have to be created from scratch with this implementation?

It is perfectly fine to start with existing delta table and empty DynamoDB table, you don't need to do any migration.

@zsxwing
Copy link
Member

zsxwing commented May 28, 2020

Sorry for the delay. We are working on adding a contrib directory and a new project to put all future LogStore implementations like this. We would like to keep the core project small because special LogStore implementations are not needed by everyone. Will report back when the project is ready.

@mmazek
Copy link

mmazek commented May 29, 2020

Hi @zsxwing, thanks for the reply. Can you share some rough estimates on when you'll be releasing those changes? Is it 1-2 months or rather some time in Q3 or Q4? We're currently running our prod workloads on the forked delta and I'd like to plan for the switch.

@zsxwing
Copy link
Member

zsxwing commented Jun 3, 2020

@mmazek I would avoid changing the build script as it might break the release script and block 0.7.0. So it will likely happen in Q3.

@mixam24
Copy link

mixam24 commented Jan 14, 2021

Hello, do you have any update on this?
Thanks.

@zsxwing
Copy link
Member

zsxwing commented Apr 12, 2021

Apologies for leaving this PR open for so long. We are currently working on refactoring the project to support merging LogStore implementations in a separate module, so that we don't need to pull lots of unnecessary dependencies to the core project. In addition, we are also working on a stable public LogStore API to avoid developers building something on top of private APIs. This should be done soon as we are marking these tasks release blockers for the next release. Will ping you when they are ready. Thanks again for your contribution.

@mrk-its
Copy link
Contributor Author

mrk-its commented Apr 21, 2021

@zsxwing thanks for update, I'll try to rebase our work on top of #644 and maybe will be able to provide some feedback.

@emanuelh-cloud
Copy link

emanuelh-cloud commented May 27, 2021

Any news regarding this PR? Is it part of the 1.0.0?
thanks

@mrk-its
Copy link
Contributor Author

mrk-its commented May 27, 2021

@emanuelh-cloud I'm going to continue work on that soon, stay tuned!

@mrk-its
Copy link
Contributor Author

mrk-its commented Jun 2, 2021

@zsxwing @tdas

I've moved code to contribs - could you take a look?

Regarding tests:

  1. DynamoDBLogStore code is split onto two parts: BaseExternalLogStore class containing db-independent part (for example support for recovery from failures is there) and DynamoDBLogStore class extending BaseExternalLogStore adding DynamoDB support. For tests (ExternalLogStoreSuite) we define MemoryLogStore extending BaseExternalLogStore - so DynamoDB part itself is not tested here. But important parts are still tested, like after-failure recovery.
  2. python script examples/python/extra/dynamodb_logstore.py may be used to stress-test DynamoDBLogStore with real DynamoDB table - it writes number of transactions concurrently and verifies number of rows written.
  3. We also tested it with spark stress test job, running number of parallel spark applications (on separate drivers) writing concurrent transactions.
  4. This code is battle-tested on our production for over a year on few many-TB sized datasets with concurrent writes - no issues so far.

@scottsand-db scottsand-db changed the base branch from master to dynamodb_logstore_scala_feature_branch March 15, 2022 20:45
@scottsand-db scottsand-db merged commit 3af698d into delta-io:dynamodb_logstore_scala_feature_branch Mar 15, 2022
@soumilshah1995
Copy link

Any Updates #1498

Thanks
have been having issue with this 🗡️

@orionmoana
Copy link

orionmoana commented Oct 3, 2023

@mrk-its

ZooKeeper implementation is almost ready

Do you still have access to this Zookeeper implementation? Could be interested in progressing this, as a DynamoDB alternative would be very useful. #1441

@mrk-its
Copy link
Contributor Author

mrk-its commented Oct 4, 2023

@mrk-its

ZooKeeper implementation is almost ready

Do you still have access to this Zookeeper implementation? Could be interested in progressing this, as a DynamoDB alternative would be very useful. #1441

Yes, I have: https://github.com/mrk-its/delta-old/blob/98c34704ca4d9d264e5fad58f88b5ab379abd9c9/src/main/scala/org/apache/spark/sql/delta/storage/ZookeeperLogStore.scala

Of course it is very old (based on delta 0.5), but adopting to current version of delta should be possible.

vkorukanti pushed a commit that referenced this pull request May 20, 2024
## Description
Taking inspiration from #339, this
PR adds a Commit Owner Client which uses DynamoDB as the backend. Each
Delta table managed by a DynamoDB instance will have one corresponding
entry in a DynamoDB table. The table schema is as follows:

* tableId: String --- The unique identifier for the entry. This is a
UUID.
* path: String --- The fully qualified path of the table in the file
system. e.g. s3://bucket/path.
* acceptingCommits: Boolean --- Whether the commit owner is accepting
new commits. This will only
* be set to false when the table is converted from managed commits to
file system commits.
* tableVersion: Number --- The version of the latest commit.
* tableTimestamp: Number --- The inCommitTimestamp of the latest commit.
* schemaVersion: Number --- The version of the schema used to store the
data.
* commits: --- The list of unbackfilled commits.
  -  version: Number --- The version of the commit.
  -  inCommitTimestamp: Number --- The inCommitTimestamp of the commit.
  -  fsName: String --- The name of the unbackfilled file.
  -  fsLength: Number --- The length of the unbackfilled file.
- fsTimestamp: Number --- The modification time of the unbackfilled
file.

For a table to be managed by DynamoDB, `registerTable` must be called
for that Delta table. This will create a new entry in the db for this
Delta table. Every `commit` invocation appends the UUID delta file
status to the `commits` list in the table entry. `commit` is performed
through a conditional write in DynamoDB.

## How was this patch tested?
Added a new suite called `DynamoDBCommitOwnerClient5BackfillSuite` which
uses a mock DynamoDB client. + plus manual testing against a DynamoDB
instance.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
assessment Assessing issue or PR enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.