Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two share requests running in parallel can override each other #941

Closed
zsaltys opened this issue Jan 2, 2024 · 6 comments · Fixed by #1072
Closed

Two share requests running in parallel can override each other #941

zsaltys opened this issue Jan 2, 2024 · 6 comments · Fixed by #1072
Labels
effort: large effort: medium priority: high status: in-progress This issue has been picked and is being implemented
Milestone

Comments

@zsaltys
Copy link
Contributor

zsaltys commented Jan 2, 2024

Describe the bug

We had a situation where a dataset owner was approving a lot of share requests very quickly. What ended up happening is that we see in CloudTrail that the pivotRole was putting a new bucket policy on the same bucket twice at the exact same time. What ended up happening is that one share request overrode the other because both requests ran in parallel, figured out how the policy needs to be updated and tried to update it. In the end one request overrode the other.

From my understanding share requests run as ecs tasks. There is probably no easy way to ensure that only one such task can be running at a time for 1 dataset. Therefore we probably need to introduce locking into the sharing mechanism so that other tasks have to wait until others are done before they attempt to read the existing bucket policy and make changes.

How to Reproduce

It is not trivial to do but we've seen this happen multiple times already. The result with S3 bucket sharing is that access request is granted and the share works but the bucket policy is missing permissions.

Expected behavior

Share requests running in parallel should never override one another.

Your project

No response

Screenshots

No response

OS

N/A

Python version

N/A

AWS data.all version

2.2

Additional context

No response

@dlpzx dlpzx added priority: high status: not-picked-yet At the moment we have not picked this item. Anyone can pick it up effort: medium effort: large labels Jan 3, 2024
@dlpzx
Copy link
Contributor

dlpzx commented Jan 3, 2024

Hi @zsaltys, thanks for the issue. It is an important task and we will look into it. We can work together on the design in this issue

@anmolsgandhi anmolsgandhi added this to the v2.3.0 milestone Jan 5, 2024
@dlpzx
Copy link
Contributor

dlpzx commented Jan 15, 2024

Issue: Share processes executed in parallel on the SAME dataset override each other and cause sharing failures.
Solution: We need a way to ensure that shares on the SAME dataset are executed sequentially.

Design

Alternatives

  1. Evaluate ECS status in ECS task and wait in ECS task
  2. Metadata in RDS share_object or task with queue order in APIs and wait in ECS task
  3. Same as 2 but with locking table
  4. Evaluate ECS status in Worker Lambda and Re-queue in Worker Lambda
  5. Metadata in RDS task with queue order in APIs and Re-queue in Worker Lambda

Other very-creative solutions:
6. Batch same target dataset shares and process them in the same ECS task. -> not sure how to do it without option 9.
7. Embed ECS task in another ECS sharing task manager task -> it works, but we have an extra ECS task just waiting.
8. Embed ECS tasks in Step Functions - A little of an overkill for an infrequent issue.
9. Place Eventbridge Pipe in front of sharing ECS task -> Might be an issue with execution time limitations.
10. [Update] separate trigger-SQS and task-SQS queue

[Update]: CDK constructs found for ECS-SQS: https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_ecs_patterns/QueueProcessingFargateService.html

I will dive deep into the most promising ideas. Let me know if you want to assess any other idea or if you can think of other viable alternatives.

1. Evaluate in ECS task and wait in ECS task

Data.all triggers all sharing tasks. Inside each task we evaluate if there are other concurrent sharing tasks on the same dataset, if so, the task awaits until no concurrent tasks are being executed.

Here is an example.

APIs approving share requests all targeting the same dataset.
Share1
Share2
Share3

Each API triggers an ECS task - Directly in `dataall/modules/dataset_sharing/tasks/share_manager_task.py` we evaluate:
Task1 -> another ECS share-tasks running? ---> YES ---> is thisShare.targetDataset in listAllOtherRunningShares.targetDatasets? ---> NO ---> execute sharing
Task2 -> another ECS share-tasks running? ---> YES ---> is thisShare.targetDataset in listAllOtherRunningShares.targetDatasets? ---> YES ---> wait
Task3 -> another ECS share-tasks running? ---> YES ---> is thisShare.targetDataset in listAllOtherRunningShares.targetDatasets? ---> YES ---> wait

:atom: We still face issues with managing the parallel tasks because our ECS task cannot ensure atomicity. As you can see in the diagram, if between we provision task1 and check other tasks, we provision task2, which means that the check might return: "no you cannot run another parallel task". Schrodinger's cat :(

image

Plus, it will have problems choosing between task2 and task3 after it is done with task 1.

CONS: ECS task might have started before we check the thisShare.targetDataset in listAllOtherRunningShares.targetDatasets condition. We still might have issues on parallel tasks.
PROS: everything in one place. No additional metadata.

2.Metadata with queue order in APIs and Wait in ECS Task

We introduce a metadata field in RDS to handle the queue order. The queue order is defined in the sharing approve/revoke APIs. All sharing tasks are triggered. Only the queue=1 item is processed. ECS tasks update the queue order information for other tasks.

Example:

APIs approving share requests all targeting the same dataset.
Share1-> queue=1
Share2-> queue=2
Share3-> queue=3

Each API triggers an ECS task
Task1 -> queue=1? ---> YES = execute sharing ---> when it finishes it subtracts 1 from all items.queue field
Task2 -> queue=1? ---> NO = wait
Task3 -> queue=1? ---> NO = wait

After Task1 is complete
Share2-> queue=1
Share3-> queue=2

Task2 -> queue=1? ---> YES = execute sharing ---> when it finishes it subtracts 1 from all items.queue field
Task3 -> queue=1? ---> NO = wait
image

This is a way more rare situation than the one with ECS tasks. RDS reads/writes should happen very fast.
Nevertheless, we could still face an issue if the check other tasks, return other tasks and write task metadata overlap in time between task 1 and task 2. Imagine, task 1 metadata is being written to RDS, in that exact moment task 2 metadata is being calculated without accounting for task 1. Meaning that both would write queue=1. We need to ensure that for the same target dataset the counter has to be atomic. Similar to DynamoDB atomic counters. I need to check, but since RDS transactions are ACID, instead of updating one row individually, we could update multiple rows as a whole.

Looking into the postgres docs now: https://www.postgresql.org/docs/13/explicit-locking.html. Something like this should ensure the transactions are atomic:

image

CONS: additional RDS column.
PROS: Ensures one process happens after the other (API calls happening at the exact same same time is extremely rare). Relatively easy to implement. Logic stays in the sharing task and sharing APIs. Easy to debug if a sharing task has been queued but not executed (queue>0)

4 and 5. Re-queue in Worker Lambda

The worker Lambda evaluates if there are any existing ECS executions for the target dataset or if the metadata queue=1. It triggers an ECS task or it sends the SQS message back to the SQS queue (maybe add a wait X minutes before sending the message back to allow the previous task to complete)

PROS: Trigger ECS task only when it is needed.
CONS: Architecture changes (additional permissions). Specific module logic in generic Worker lambda violates modularization principles. We would need to write it in a modular re-usable way. For example, instead of adding the metadata to the share_object table, we could add it to the task table. This way it could be re-used by other ECS tasks if needed.


Updated alternatives

Updated with @anushka-singh and @noah-paige suggestions.

  • We discarded checking ECS status with ECS API calls = discarded 1 and 4 above
  • We have not discarded implementing some sort of locking logic in RDS (2,3,5). There are several ways of implementing locking, in my opinion the best one is the one described below by @anushka-singh. If we implement a lock based on RDS metadata, we need to decide where is the check going to be executed (Worker Lambda or ECS task)
  • Other creative solutions are not quite as good as the locking logic in RDS, like StepFunctions or ECS master task. I have done some more investigation and I propose another alternative solution below using 2 SQS queues.

Locking in RDS + check/wait in ECS

image

Locking in RDS + re-queue

image

SQS trigger queue + SQS task queue

image

Conclusion

For simplicity we have agreed on implementing Locking in RDS + check/wait in ECS
image

@kukushking
Copy link
Contributor

Concurrency constraint on ECS task group (group per dataset) would be perfect if only it would exist... aws/containers-roadmap#232

@anmolsgandhi anmolsgandhi added status: in-progress This issue has been picked and is being implemented and removed status: not-picked-yet At the moment we have not picked this item. Anyone can pick it up labels Jan 18, 2024
@anushka-singh
Copy link
Contributor

anushka-singh commented Jan 18, 2024

We could also address this by introducing a locking mechanism for the resources.
We can introduce a centralized mechanism that multiple tasks can use to coordinate access. We can use a new Amazon RDS table as the mechanism to manage locks.
Steps involved:

  1. Create a Locking Table:

Create a dedicated table in RDS database to store information about locks. This table could have columns such as dataset_id and is_locked.

CREATE TABLE dataset_locks (
    dataset_id VARCHAR(255) PRIMARY KEY,
    is_locked BOOLEAN
);
  1. Acquire Lock Function:
    This function checks whether a lock exists for the dataset and sets it if it's not already locked.
import psycopg2 

def acquire_lock(dataset_id, connection):
    with connection.cursor() as cursor:
        cursor.execute(
            "UPDATE dataset_locks SET is_locked = TRUE WHERE dataset_id = %s AND NOT is_locked RETURNING *",
            (dataset_id,)
        )
  1. Release Lock Function:

Implement a function to release the lock for a specific dataset.

def release_lock(dataset_id, connection):
    with connection.cursor() as cursor:
        cursor.execute(
            "UPDATE dataset_locks SET is_locked = FALSE WHERE dataset_id = %s",
            (dataset_id,)
        )
  1. Use Lock in Share Request Processing:

Before processing a share request for a specific dataset, we attempt to acquire the lock. If the lock is successfully acquired, proceed with processing. If not, wait or handle accordingly.

dataset_id = "your_dataset_id"

connection = psycopg2.connect("your_database_connection_string")

if acquire_lock(dataset_id, connection):
    try:
        # Process the share request
        # ...
    finally:
        release_lock(dataset_id, connection)
else:
    # Another task is already processing, wait or handle accordingly

We can have this RDS table in the infra accounts.

Solutions for handling tasks when lock is already occupied:

  1. Wait and retry
import time

max_retries = 3
retry_delay = 5  # seconds

retries = 0
while retries < max_retries:
    if acquire_lock(dataset_id, connection):
        try:
            # Process the share request
            # ...
            break
        finally:
            release_lock(dataset_id, connection)
    else:
        retries += 1
        time.sleep(retry_delay)
else:
    # Handle the case when the lock cannot be acquired after maximum retries
    log("Unable to acquire the lock after multiple retries.")

Pros:

  • The task will attempt to acquire the lock immediately and process the share request as soon as the lock becomes available.
  • simple to implement.

Cons

  • If the lock is frequently occupied, this approach might introduce latency as tasks wait for their turn to acquire the lock.
  • Frequent retries may contribute to resource contention and increased load on the system.
  1. Queue Share Requests:
    If the lock is occupied, the task can enqueue the share request for later processing. We can use SQS for this.

Pros:

  • Share requests are processed sequentially in the order they are enqueued, avoiding contention.
  • The order of share request processing is predictable and determined by the order of enqueueing.

Cons:

  • Share requests may experience a delay in processing, especially during periods of high demand or if the queue is long.
  • Implementing a queue system involves additional components, such as a queuing mechanism and a background process to dequeue and process requests.

Let me know what you think of using this technique

@zsaltys
Copy link
Contributor Author

zsaltys commented Jan 22, 2024

Im in favor of the "Locking in RDS + check/wait in ECS" solution. Even though it's not generic and would only work for tasks running in ECS it's also the simplest and we know that this problem only exists for datasets at the moment where we run manual SDK calls.

I don't think it's a problem that ECS tasks can wait because the only concern there is cost and this problem is such an edge case that the impact of this waiting is very small.

The "Locking in RDS + re-queue" is more flexible, there's no waiting, it would work for non ECS tasks too but it's also more complex, new permissions and there's a worry of what happens if a message cannot be re-queued like an outage?

Fore the first solution we should just make sure that two tasks cannot believe they both have a lock at the same time. I propose that we have a separate lock table with a field like resource_uri which can be used for other resources not just dataset uris if there's such in the future. This table can be reused by lamba approach if we decide to switch to it later on. You can also add a field like version to the locks table and use it to make sure two resources cannot get a lock at the same time.

@TejasRGitHub
Copy link
Contributor

TejasRGitHub commented Jan 23, 2024

Agree with @zsaltys , about using "Locking in RDS + check/wait in ECS" solution.

Although, I think we can use the same task table to creating locking and unlocking for the dataset URI. If we extend the task table with a new column targetItem ( this could be a dataset name, any other task name ). Thus whenever a task is created, along with the payload of the task ( targetUri , created ,etc ) it will also add the dataset name / dataset URI in the task record in RDS.
When a task is created, the status is PENDING ( this is the current status used in data.all ) . The AWS Worker once receives the task calls the ECS. Once the task is submitted to ECS and running in ECS, it can put the state to IN_PROGRESS state. Thus, any other tasks running in ECS for the same dataset will wait till there is no IN_PROGRESS state in the task table for this dataset. Once the task is completed then the task will be put into COMPLETED state.

For "Locking in RDS + check/wait in ECS" or the "Locking in RDS + re-queue", will have to make sure that the reads (SELECTs ) and the UPDATEs are locked ( row-locked) on the postgres level at the time of transaction so that we don't have the same issue in which two shares are parallely working for the same dataset.

Ref - https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE
https://www.postgresql.org/docs/current/explicit-locking.html
https://docs.sqlalchemy.org/en/13/orm/session_transaction.html#managing-transactions

@noah-paige noah-paige linked a pull request Feb 23, 2024 that will close this issue
noah-paige added a commit that referenced this issue Feb 28, 2024
### Feature or Bugfix
- Feature

### Detail
Share requests running in parallel override one another.

### Relates
- #941

### Testing
- 2 simultaneous shares got processed successfully with locking
mechanism.
- Here is what the dataset_lock DB looks like and the values in it.
<img width="1386" alt="Screenshot 2024-02-14 at 5 12 54 PM"
src="https://github.com/data-dot-all/dataall/assets/26413731/9e36044f-4f3d-4371-a6f2-7dee399da7a7">
- acquiredBy column show the share which last acquired the lock for the
particular dataset


### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Noah Paige <69586985+noah-paige@users.noreply.github.com>
Co-authored-by: dlpzx <71252798+dlpzx@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: jaidisido <jaidisido@gmail.com>
Co-authored-by: dlpzx <dlpzx@amazon.com>
Co-authored-by: mourya-33 <134511711+mourya-33@users.noreply.github.com>
Co-authored-by: nikpodsh <124577300+nikpodsh@users.noreply.github.com>
Co-authored-by: MK <manjula_kasturi@hotmail.com>
Co-authored-by: Manjula <manjula.kasturi@gmail.com>
Co-authored-by: Zilvinas Saltys <zilvinas.saltys@gmail.com>
Co-authored-by: Zilvinas Saltys <zilvinas.saltys@yahooinc.com>
Co-authored-by: Daniel Lorch <98748454+lorchda@users.noreply.github.com>
Co-authored-by: Anushka Singh <anushka.singh@yahooinc.com>
Co-authored-by: Tejas Rajopadhye <71188245+TejasRGitHub@users.noreply.github.com>
Co-authored-by: trajopadhye <tejas.rajopadhye@yahooinc.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
effort: large effort: medium priority: high status: in-progress This issue has been picked and is being implemented
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants