-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Storage System] Support for AWS S3 (multiple clusters/drivers/JVMs) #41
Comments
The Terraform project has a nice configurable state locking system: The default locking mechanism for S3 storage of their state files is based on DynamoDB: A similar mechanism here would be a good first cut, but it would be great to make this something users can customize. I am esp. thinking of my own use cases where we may run against an S3-like system (e.g. minio) in some environments where DynamoDB is not available, but I may have access to a different state locking mechanism (e.g. etcd, esp. if I'm running Spark on kubernetes). |
Yeah, it will be good to design a generic database-backed (or kvstore-backed) log store that can be plugged in with any database or key-value store implementation. |
An IPC mechanism between drivers, such as a message queue, can also be utilized for coordination of the transactional changes. This could remove external storage dependencies. |
I think that it might be great also consider costs and performance based on the architecture we choose and its performance implications (https://aws.amazon.com/dynamodb/pricing/on-demand/). I remember about TD's use of RocksDB, if I remember correctly, for storing the state of Structured Streaming. We could also use ElasticCache (https://aws.amazon.com/elasticache/pricing/). Let me know in case I can be of any help :) |
FoundationDB would be a great choice to solve the two problems listed in #39. It is the only open-source, highly-available, and scalable database that provides strictly serializable, arbitrary read-write ACID transactions. It is backed by a team of extremely talented developers at Apple and a few other places, some of which have been there since nearly the beginning when it was a commercial product and before it was acquired. FoundationDB has Java bindings, and the Java bindings are the most mature of all the bindings because they are used by Apple and others in production at petabyte scale. Snowflake uses FoundationDB for the very similar use case of storing the metadata for their cloud data warehouse which, like Delta, stores data in an object storage system such as S3. FoundationDB could also be used for storing other metadata besides the transaction log, such as zone maps for pruning of partition files or directories while executing queries. FoundationDB provides strict serializability, which is the strongest isolation level possible. It is broadly defined as serializability from database theory combined with linearizability from concurrency theory. This is the same isolation level offered by Spanner from Google and FaunaDB, both of which are closed-source products. Spanner is also not even available outside of Google Cloud Platform. In practice, strict serializability means transactions are atomically executed at a single point in time, and those transactions are visible in a real-time order. FoundationDB is also causally consistent, which is important for use cases such as what I'm proposing below where invariants need to be maintained across transactions. I think using a database with such strong guarantees is important because metadata corruption from using a weaker isolation level or asynchronous durability in the metadata store would be an absolute disaster. Metadata corruption is essentially impossible to recover from because, unlike a back-up of a database which may be able to provide a consistent snapshot of itself, a system which combines S3 PLUS a metadata store cannot be backed up in a consistent fashion. Even if you restore your metadata store from a back-up that is consistent unto itself (and not all provide that option), you would have to manually determine the state of the tables and rectify that yourself between the metadata store and the objects in S3. Systems which will almost certainly be suggested, but which should be rejected because they suffer from weak isolation or asynchronous durability include:
Systems which do fulfill the requirements, but which suffer other drawbacks that can be worked around in various ways:
The two problems could be solved as such:
I would be happy to talk to anyone on the Delta team if they were interested in pursuing this. |
Thanks for taking the time to write such a complete and thoughtful post @ryanworl! In general, I agree with your analysis though there is one point I would like to clarify: the current implementation of Delta uses Spark to process the transaction log, which is stored along-side the actual data. As such, I think system like Dynamo or Zookeeper could be used to determine the "winner" of any given table version (problem #1) and also as the source for what the newest version of the table is (problem #2) without needing to store large quantities of data themselves. Now, I think you could also move the entire transaction log itself into something like FoundationDB, and build a version of the delta log protocol that operates as SQL queries against this system. This could possible improve the latency of querying the latest version of the table. That said, that is a much larger effort than what we anticipating when we created this ticket :) |
@marmbrus Thank you for the clarification. I wasn't sure if I was correct about the relationship between the consistent directory listing limitation and the table version. How is problem 2 solved if there is no consistent directory listing in S3? By issuing a single read for each version number? EDIT: I see from this comment in the S3 driver a proposed solution:
Is this adequate to ensure all metadata files are visible to all clients before they attempt to commit? I am not sure. The scenario I envision is if the metadata file one version previous to the current version is not yet visible to a client, and it attempts to read the current version of the table. If it misses the second-to-last metadata file, it would see a corrupt version of the table, but think it has successfully read the table and potentially committed a new write because the current version did happen to be visible. It could successfully CAS the current version to the new version it committed without reading the second-to-last metadata file. I'm sure you've all been thinking about this more than I have since learning about Delta, so I am probably not correct here. |
The number of the latest version is all we need to correctly reconstruct the current state of a table. We will list to find the latest checkpoint, but it is not incorrect to start with an earlier one, in the case where a later checkpoint is omitted by eventually consistent listing (you'll just do a bit more work to reconstruct the snapshot). Other side notes: |
We'll never "miss" a metadata files as they are named with contiguous, monotonically increasing integers. So as long as we know the latest we can figure out the others. However, if that cache is missing the latest value (because a write came from another cluster) you might see a stale (but not corrupted) version of the table. |
Let me lay out the scenario I'm talking about more explicitly. Three competing clients A, B, and C, are all modifying the table concurrently. They proceed along a timeline represented by T0, T1, etc. Table version is represented as V0, etc. At T0, A increments to V1 from V0, issues a Put of metadata file for V1. If client C has nothing in cache and is starting work after T2, it could go to the version mutex server and see the version is V3 now because the version server is linearizable. If client C then proceeds to do a directory listing at T2, is it guaranteed to see ALL the files for V1, V2 and V3 in the directory listing? I don't think that is the case. If the client does individual reads for each version key between what it has in cache and the latest version, it would see everything because S3 has read-after-write consistency for newly written keys. But if it solely relies on the content of the directory listing and doesn't detect the gap from e.g. receiving a directory listing with V0,V1, and V3 metadata files, it would miss a file and therefore read a corrupt snapshot of the table. I don't think S3 guarantees that objects are visible in any specific order relative to when they were written. Specifically the documentation says:
An additional wrinkle is that if a client successfully increments the version number and crashes before committing the metadata file to S3, it would leave a hole. This hole would be difficult to differentiate from an eventual consistency artifact in the directory listing, without attempting to read the key directly. |
Another issue worth considering is the scenario where a client A increments the version, but before it has written the metadata file (GC pause, network partition, etc), client B reads the version from the version server and issues a This sequence would invalidate read-after-write consistency from S3. From the documentation:
|
Thanks for the clarification! I agree, that there are a bunch of edge-cases to consider here, but I don't think they are insurmountable. Specific thoughts inline below.
You are right, you must always load a contiguous set of versions, based some available snapshot (latest is most efficient, but not required for correctness) and the latest version available (from the transactional store). The current implementation will refuse to load a table if there are any missing version files.
Agreed, we would not want to sacrifice liveness of the protocol here by allowing a client that "locks" a version to die without completing the operation. I think we can avoid an issue here by including information about the contents of the commit atomically with the operation that "claims" that version. For example, you could write the contents of In this case, any reader who comes along and sees that the transaction log is behind the lock table (either due to a crash or due to eventual consistency) could finish that operation. If multiple readers do this concurrently, there are no issues with eventual consistency as they are all writing the same data. As an optimization, if the version is small (which is the common case), you could avoid the indirection and just include the data in the lock request. (This is how the commit service in Databrick's for Managed Delta Lake works).
You can avoid this by confirming the existence of a file using list, before every doing a I think a solution to the negative cache is to be robust to these errors and retry automatically, or as suggested above, "complete" the missing operation yourself. |
This was my general thinking as well. By committing to a temporary file in S3 first, then writing the temporary file's location simultaneously with the version CAS operation, you avoid ever doing speculative Thanks for walking me through this. I am confident this will work with the only requirement being a key value store that can do CAS on two keys at the same time as well as two simultaneous linearizable reads on those same two keys, one being the version counter and the other being the last committed metadata file location/inline value. Storing the entire history of metadata file locations (or a snapshot plus recent history) as I suggested in my original post in the KV store would just be a performance optimization to reduce polling of the S3 directory listing API waiting for the writes to either achieve consistency, or be retried by a non-failing client. If Databricks is interested in exploring FoundationDB for metadata storage (especially for Managed Delta Lake), I have some availability soon. You can reach me at ryantworl at gmail dot com. My episode on the Data Engineering Podcast explaining FoundationDB was about a month before your's explaining Delta Lake @marmbrus 😄 |
EMR clusters provide EMRFS filesystem as an option which provides consistent viewing of S3 powered by DynamoDB tables. EMRFS filesystem provides both
Most of the production grade EMR cluster is expected to use EMRFS filesystem and enabling concurrent writes to delta table from different cluster should incur very minimal code change too... |
@rogue-one Thanks for the pointer. I don't see anything that guarantees mutual exclusion there, but perhaps I'm missing it. Do you have a pointer to the specific API that provides this for EMRFS? Note that this guarantee is stronger than "consistent listing" as you need to avoid the race condition between two writer who both check and find the file to be missing. The storage system needs to atomically check-and-create the file in question. |
I wonder if the approach used in |
@marmbrus do you see any potential issues with the approach I described? I'm considering implementing it |
I agree with your idea that we should introduce that can make it pluggable. However, distributed locking is probably not the best way forward. See the argument @marmbrus made above regarding the liveness risks of using distributed locks. It's better to invest time and effort in a design that does not suffer from obvious pitfalls. |
Hi,
as always I think that Michael has hit the nail in the coffin.
As a coder I would think about more complex ways of coding that prima facie
appears brilliant, but as a designer and architect I would think about
pitfalls/ issues of the entire solution and then as a consumer I would be
super critical and wary of storage classes that are either super expensive
to maintain (like for DynamoDB we get stuck with AWS specific
implementation and calls to a DB with additional cost overhead, and
security issues to handle) or are susceptible to failures.
using EMRFS consistent view (just a checkbox option while starting EMR
clusters) while starting the EMR clusters should help quite a bit as well
as mentioned above by Charles, I have been using it quite a lot for
streaming solutions.
Thanks and Regards,
Gourav Sengupta
…On Fri, Nov 8, 2019 at 8:22 PM Tathagata Das ***@***.***> wrote:
I agree with your idea that we should introduce that can make it
pluggable. However, distributed locking is probably not the best way
forward. See the argument @marmbrus <https://github.com/marmbrus> made
above regarding the liveness risks of using distributed locks. It's better
to invest time and effort in a design that does not suffer from obvious
pitfalls.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#41?email_source=notifications&email_token=AAJZLQ4RBTBSBHGN6HV2KELQSXDANA5CNFSM4HMHCFDKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEDTH6RQ#issuecomment-551976774>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAJZLQ7KHR2R6BUHMXDOGLTQSXDANANCNFSM4HMHCFDA>
.
|
@tdas could you elaborate the liveness risks with distributed locks again? Respectively, given current design, can they be taken care of at all in a |
For everyone interested we have a solution for this - using AWS DynamoDB table: #339 |
With the release of strong read-after-write consistency in S3, is this still an issue? |
Yes. The basis of this issue is S3 doesn't have a compare-and-swap operation. Consistent listing makes some operations easier, though. The accepted solution using DynamoDB works because DynamoDB has a compare-and-swap operation. |
Hi folks, this PR looks like a viable approach for this problem? #339 |
@SreeramGarlapati the DynamoDBLogStore approach defined in #339 requires that all parties (readers/writers) know about the log store being in DynamoDB. That would mean basically everything has to be configured to use the "non-standard" log store. The approach we're exploring in delta-rs for this problem is to instead use a DynamoDB-based locking mechanism on the writer-side only, which would allow the readers to continue to look at the JSON file in S3. Meanwhile, writers would need to coordinate around this DynamoDB-based lock. |
Amazon S3 consistency semantics have improved over the years, as @zpencerq pointed out. Also, there exist S3 dialects that have offered stronger consistency semantics for a long time - for example the S3 API provided by Ceph. Note that even when directory listings do offer strongly consistency, they are still paginated in all S3 SDKs. The S3 endpoint sees a series of HTTP get requests corresponding to the lexicographic ranges of interest. I would strongly advise against approaches which use the S3 key name to embed associations between databases, tables, and partitions. Apache Iceberg avoids this by individually tracking data files in a table instead of directories. The data file to table tracking information is maintained in a avro serialized metadata file/object. Now a query planner can understand which data files correspond to a particular table (and even at a particular point in time) by issuing a single request, instead of O(pages) requests. If data files are always written to new keys, and tracked in a per table metadata file/object, then the remaining challenge is to provide a mechanism for per table / catalog locking. With above, the reliance and volume of data stored in ZK/etcd/FoundationDB is considerably less, which is good because capacity in those often command a higher premium than an object store. |
Has an implementation of Given S3 strong consistency guarantees, the read-related guarantees are trivially satisfied now. The potential advantage of not having to manage separate storage system for write coordination would |
Are the new committers available in hadoop-aws helpful in achieving the desired semantics? |
Does the aws s3 Object Lock https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html can somehow help us with "write if not exists" aws s3 consistency issue? |
No. The new S3 committer is to solve another problem. It doesn't handle concurrent write.
No, it doesn't. S3 Object Lock is to allow users to pin an object to a specific version so that readers alway see the pinned version even if the object is overwritten. It's not a lock providing mutual exclusion. |
Hi all. I have not been following this thread but I've come across it while trying to find a solution to the problem "Concurrent writes to the same Delta table from multiple Spark drivers can lead to data loss." Does the solution in #339 solve this? Additionally, are there any constraints on the solution using S3 specifically? Alternative yet similar storage platforms exist such as IBM Cloud's Cloud Object Storage, which actually provides partial support for the S3 API. |
As @zsxwing noted, the key issue of concern is the lack of |
The question: The "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" definition is necessary for example if one spark driver host many jobs that writing concurrently to the same AWS S3 delta table. But when only one job is running and writing to the AWS S3 delta table (mean NO concurrent writes to the same AWS S3 delta table) the "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" configuration is NOT necessary because the https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency. |
Hi,
I think that TD already answered this question here:
#324
Regards,
Gourav Sengupta
…On Thu, May 20, 2021 at 9:35 AM emanuelh-cloud ***@***.***> wrote:
The question: The
"spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore"
definition is necessary for example if one spark driver host many jobs that
writing concurrently to the same AWS S3 delta table. But when only one job
is running and writing to the AWS S3 delta table (mean NO concurrent writes
to the same AWS S3 delta table) the
"spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore"
configuration is NOT necessary because the
https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency
.
Am I correct?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#41 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAJZLQ56ODPCU52XIJAGBE3TOTCVVANCNFSM4HMHCFDA>
.
|
Resolves delta-io#41 This PR addresses issue delta-io#41 - Support for AWS S3 (multiple clusters/drivers/JVMs). It implements few ideas from delta-io#41 discussion: - provides generic base class BaseExternalLogStore for storing listing of commit files in external DB. This class may be easily extended for specific DB backend - stores contents of commit in temporary file and links to it in DB's row to be able to finish uncompleted write operation while reading - provides concrete DynamoDBLogStore implementation extending BaseExternalLogStore - implementations for other DB backends should be simple to implement (ZooKeeper implementation is almost ready, I can create separate PR if anyone is interested) - unit tests in `ExternalLogStoreSuite` which uses `InMemoryLogStore` to mock `DynamoDBLogStore` - python integration test inside of `storage-dynamodb/integration_test/dynamodb_logstore.py` which tests concurrent readers and writers - that integration test can also run using `FailingDynamoDBLogStore` which injects errors into the runtime execution to test error edge cases - This solution has been also stress-tested (by SambaTV) on Amazon's EMR cluster (multiple test jobs writing thousands of parallel transactions to single delta table) and no data loss has beed observed so far To enable DynamoDBLogStore set following spark property: `spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore` Following configuration properties are recognized: io.delta.storage.DynamoDBLogStore.tableName - table name (defaults to 'delta_log') io.delta.storage.DynamoDBLogStore.region - AWS region (defaults to 'us-east-1') Closes delta-io#1044 Co-authored-by: Scott Sandre <scott.sandre@databricks.com> Co-authored-by: Allison Portis <allison.portis@databricks.com> Signed-off-by: Scott Sandre <scott.sandre@databricks.com> GitOrigin-RevId: 7c276f95be92a0ebf1eaa9038d118112d25ebc21
Resolves delta-io#41 This PR addresses issue delta-io#41 - Support for AWS S3 (multiple clusters/drivers/JVMs). It implements few ideas from delta-io#41 discussion: - provides generic base class BaseExternalLogStore for storing listing of commit files in external DB. This class may be easily extended for specific DB backend - stores contents of commit in temporary file and links to it in DB's row to be able to finish uncompleted write operation while reading - provides concrete DynamoDBLogStore implementation extending BaseExternalLogStore - implementations for other DB backends should be simple to implement (ZooKeeper implementation is almost ready, I can create separate PR if anyone is interested) - unit tests in `ExternalLogStoreSuite` which uses `InMemoryLogStore` to mock `DynamoDBLogStore` - python integration test inside of `storage-dynamodb/integration_test/dynamodb_logstore.py` which tests concurrent readers and writers - that integration test can also run using `FailingDynamoDBLogStore` which injects errors into the runtime execution to test error edge cases - This solution has been also stress-tested (by SambaTV) on Amazon's EMR cluster (multiple test jobs writing thousands of parallel transactions to single delta table) and no data loss has beed observed so far To enable DynamoDBLogStore set following spark property: `spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore` Following configuration properties are recognized: io.delta.storage.DynamoDBLogStore.tableName - table name (defaults to 'delta_log') io.delta.storage.DynamoDBLogStore.region - AWS region (defaults to 'us-east-1') Closes delta-io#1044 Co-authored-by: Scott Sandre <scott.sandre@databricks.com> Co-authored-by: Allison Portis <allison.portis@databricks.com> Signed-off-by: Scott Sandre <scott.sandre@databricks.com> GitOrigin-RevId: 7c276f95be92a0ebf1eaa9038d118112d25ebc21
Any updates here #1498 |
This is the official for discussing support for Delta Lake on S3 while writing from multiple clusters. The challenges of S3 support have been explained in #39 . While #39 tracks the work for a simpler solution that works only with all write operations going through the same cluster/driver/JVM, this issues tracks the larger problem of making it work with multiple clusters.
Please use this thread to discuss and vote on ideas.
Update 2022-01-13
We have begun working with an open-source contributor on the design + implementation of this feature using DynamoDB to provide the mutual-exclusion that S3 is lacking.
Here's the public design doc.
The current status is:
storage-dynamodb
SBT projectThe text was updated successfully, but these errors were encountered: