Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature 941 #1072

Merged
merged 76 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
ad4ab1f
Add Additional Error Messages for KMS Key lookup on imported dataset …
noah-paige Sep 15, 2023
dbbef3c
Get Latest in main to v2m1m0 (#771)
noah-paige Sep 19, 2023
d096160
Handle Environment Import of IAM service roles (#749)
noah-paige Sep 26, 2023
a53434f
Build Compliant Names for Opensearch Resources (#750)
noah-paige Oct 5, 2023
16c7026
Merge branch 'main' into v2m1m0
dlpzx Oct 10, 2023
c61ba15
Update Lambda runtime (#782)
nikpodsh Oct 10, 2023
f84250e
Feat: limit pivot role S3 permissions (#780)
dlpzx Oct 12, 2023
7d9122d
Fix: ensure valid environments for share request and other objects cr…
dlpzx Oct 12, 2023
1801cf1
Adding configurable session timeout to IDP (#786)
manjulaK Oct 13, 2023
599fc1a
Fix: shell true semgrep (#760)
dlpzx Oct 16, 2023
b356bf2
Fix: allow to submit a share when you are both and approver and a req…
zsaltys Oct 16, 2023
793a078
feat: redirect upon creating a share request (#799)
zsaltys Oct 16, 2023
f448613
Fix: condition when there are no public subnets (#794)
lorchda Oct 18, 2023
66b9a08
feat: removing unused variable (#815)
zsaltys Oct 18, 2023
c833c26
feat: Handle Pre-filtering of tables (#811)
anushka-singh Oct 18, 2023
6cc564e
Fix Check other share exists before clean up (#769)
noah-paige Oct 18, 2023
8b7b82e
Email Notification on Share Workflow - Issue - 734 (#818)
TejasRGitHub Oct 20, 2023
48c32e5
feat: adding frontend and backend feature flags (#817)
zsaltys Oct 25, 2023
6d727e9
Feat: Refactor notifications from core to modules (#822)
dlpzx Oct 26, 2023
8ad760b
Merge branch 'main' into v2m1m0
dlpzx Oct 27, 2023
3f100b4
Feat: pivot role limit kms (#830)
dlpzx Oct 27, 2023
fb7b61b
Make hosted_zone_id optional, code update (#812)
lorchda Oct 27, 2023
b51da2c
Clean-up for v2.1 (#843)
dlpzx Oct 30, 2023
6d3c016
Merge branch 'main' into v2m1m0
dlpzx Oct 27, 2023
7912a24
Feat: pivot role limit kms (#830)
dlpzx Oct 27, 2023
55c579b
Make hosted_zone_id optional, code update (#812)
lorchda Oct 27, 2023
92d4324
Clean-up for v2.1 (#843)
dlpzx Oct 30, 2023
5fb7cf8
feat: Enabling S3 bucket share
anushka-singh Oct 31, 2023
cf9afc1
feat: Enabling S3 bucket share
anushka-singh Oct 31, 2023
ddf8623
Merge branch 'v2m1m0' of https://github.com/anushka-singh/aws-dataall…
anushka-singh Oct 31, 2023
b54860d
fix: adding missing pivot role permission to get key policy (#845)
zsaltys Oct 31, 2023
a05e548
Merge branch 'v2m1m0' into anu-s3-copy
dlpzx Oct 31, 2023
1365e92
Revert overwrites 2.
dlpzx Oct 31, 2023
bbcfbd5
Revert overwrites 3.
dlpzx Oct 31, 2023
9e8cdf1
Revert overwrites 4.
dlpzx Oct 31, 2023
5d90797
Revert overwrites 4.
dlpzx Oct 31, 2023
94be491
Revert overwrites 5.
dlpzx Oct 31, 2023
cff577f
Revert overwrites 6.
dlpzx Oct 31, 2023
5ff80fb
Revert overwrites 7.
dlpzx Oct 31, 2023
3383166
Revert overwrites 7.
dlpzx Oct 31, 2023
7ed96af
Revert overwrites 8.
dlpzx Oct 31, 2023
c051896
Revert overwrites 9.
dlpzx Oct 31, 2023
f5d62d7
Revert overwrites 10.
dlpzx Oct 31, 2023
3783a95
Revert overwrites 11.
dlpzx Oct 31, 2023
dacba14
Revert overwrites 12.
dlpzx Oct 31, 2023
3b404cd
Revert overwrites 13.
dlpzx Oct 31, 2023
5d0fe68
Fix down revision for migration script
dlpzx Oct 31, 2023
158925a
feat: Enabling S3 bucket share
anushka-singh Nov 2, 2023
d112a21
bugfix: Enabling S3 bucket share
anushka-singh Nov 3, 2023
06edb53
feat: Enabling S3 bucket share - Addressing comments on PR
anushka-singh Nov 8, 2023
f43003c
feat: Enabling S3 bucket share
anushka-singh Nov 10, 2023
4516f4d
feat: Enabling S3 bucket share - Addressing comments on PR
anushka-singh Nov 15, 2023
0f2faf7
feat: Enabling S3 bucket share - Addressing comments on PR
anushka-singh Nov 16, 2023
9b0ab34
Merge branch 'main' into bucket_share_anushka
anushka-singh Nov 16, 2023
7ab6427
feat: Enabling S3 bucket share
anushka-singh Nov 10, 2023
e251fcc
feat: Enabling S3 bucket share - Addressing comments on PR
anushka-singh Nov 15, 2023
e8bfb4b
feat: Enabling S3 bucket share - Addressing comments on PR
anushka-singh Nov 15, 2023
2ff67bc
Merge branch 'main' of https://github.com/anushka-singh/aws-dataall i…
anushka-singh Nov 16, 2023
3254260
Update share.js
anushka-singh Nov 16, 2023
eb8bf3d
Update index.js
anushka-singh Nov 16, 2023
a06838f
Merge branch 'main' of https://github.com/anushka-singh/aws-dataall
anushka-singh Dec 18, 2023
bed3d51
Bugfix#932: Investigate why some shares did not go to failed state, b…
anushka-singh Dec 19, 2023
a38c27d
Merge branch 'main' of https://github.com/anushka-singh/aws-dataall
anushka-singh Feb 22, 2024
053070d
Merge branch 'main' of https://github.com/anushka-singh/aws-dataall
anushka-singh Feb 22, 2024
5df1401
Feature:941 - Two share requests running in parallel can override eac…
anushka-singh Feb 22, 2024
5b387c7
Feature:941 - Two share requests running in parallel can override eac…
anushka-singh Feb 22, 2024
60e5427
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 23, 2024
89e2df8
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 26, 2024
8e1a5b5
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 26, 2024
3c006aa
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 27, 2024
ed5d1dd
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 27, 2024
0976c07
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 27, 2024
225bf84
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 27, 2024
2aae8ac
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 27, 2024
3d0e732
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 27, 2024
be5706b
Data-405: Multiple shares override each other if they run at the same…
anushka-singh Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 182 additions & 57 deletions backend/dataall/modules/dataset_sharing/services/data_sharing_service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging

from sqlalchemy import and_
from time import sleep

from dataall.base.db import Engine
from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectSM, ShareObjectRepository, \
ShareItemSM
Expand All @@ -10,6 +13,8 @@
from dataall.modules.dataset_sharing.services.share_processors.s3_bucket_process_share import ProcessS3BucketShare

from dataall.modules.dataset_sharing.services.dataset_sharing_enums import (ShareObjectActions, ShareItemStatus, ShareableType)
from dataall.modules.datasets_base.db.dataset_models import DatasetLock


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -38,71 +43,86 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:
True if sharing succeeds,
False if folder or table sharing failed
"""
with engine.scoped_session() as session:
(
try:
with engine.scoped_session() as session:
(
source_env_group,
env_group,
dataset,
share,
source_environment,
target_environment,
) = ShareObjectRepository.get_share_data(session, share_uri)

lock_acquired = cls.acquire_lock_with_retry(dataset.datasetUri, session, share.shareUri)
if not lock_acquired:
log.error(f"Failed to acquire lock for dataset {dataset.datasetUri}. Exiting...")
return False
noah-paige marked this conversation as resolved.
Show resolved Hide resolved

share_sm = ShareObjectSM(share.status)
new_share_state = share_sm.run_transition(ShareObjectActions.Start.value)
share_sm.update_state(session, share, new_share_state)

(
shared_tables,
shared_folders,
shared_buckets
) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Share_Approved.value)

log.info(f'Granting permissions to folders: {shared_folders}')

approved_folders_succeed = ProcessS3AccessPointShare.process_approved_shares(
session,
dataset,
share,
shared_folders,
source_environment,
target_environment,
source_env_group,
env_group,
env_group
)
log.info(f'sharing folders succeeded = {approved_folders_succeed}')

log.info('Granting permissions to S3 buckets')

approved_s3_buckets_succeed = ProcessS3BucketShare.process_approved_shares(
session,
dataset,
share,
shared_buckets,
source_environment,
target_environment,
) = ShareObjectRepository.get_share_data(session, share_uri)
source_env_group,
env_group
)
log.info(f'sharing s3 buckets succeeded = {approved_s3_buckets_succeed}')

share_sm = ShareObjectSM(share.status)
new_share_state = share_sm.run_transition(ShareObjectActions.Start.value)
log.info(f'Granting permissions to tables: {shared_tables}')
approved_tables_succeed = ProcessLakeFormationShare(
session,
dataset,
share,
shared_tables,
[],
source_environment,
target_environment,
env_group,
).process_approved_shares()
log.info(f'sharing tables succeeded = {approved_tables_succeed}')

new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value)
share_sm.update_state(session, share, new_share_state)

(
shared_tables,
shared_folders,
shared_buckets
) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Share_Approved.value)

log.info(f'Granting permissions to folders: {shared_folders}')

approved_folders_succeed = ProcessS3AccessPointShare.process_approved_shares(
session,
dataset,
share,
shared_folders,
source_environment,
target_environment,
source_env_group,
env_group
)
log.info(f'sharing folders succeeded = {approved_folders_succeed}')

log.info('Granting permissions to S3 buckets')

approved_s3_buckets_succeed = ProcessS3BucketShare.process_approved_shares(
session,
dataset,
share,
shared_buckets,
source_environment,
target_environment,
source_env_group,
env_group
)
log.info(f'sharing s3 buckets succeeded = {approved_s3_buckets_succeed}')

log.info(f'Granting permissions to tables: {shared_tables}')
approved_tables_succeed = ProcessLakeFormationShare(
session,
dataset,
share,
shared_tables,
[],
source_environment,
target_environment,
env_group,
).process_approved_shares()
log.info(f'sharing tables succeeded = {approved_tables_succeed}')

new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value)
share_sm.update_state(session, share, new_share_state)

return approved_folders_succeed and approved_s3_buckets_succeed and approved_tables_succeed
return approved_folders_succeed and approved_s3_buckets_succeed and approved_tables_succeed

except Exception as e:
log.error(f"Error occurred during share approval: {e}")
return False

finally:
lock_released = cls.release_lock(dataset.datasetUri, session, share.shareUri)
if not lock_released:
log.error(f"Failed to release lock for dataset {dataset.datasetUri}.")

@classmethod
def revoke_share(cls, engine: Engine, share_uri: str):
Expand Down Expand Up @@ -226,3 +246,108 @@ def revoke_share(cls, engine: Engine, share_uri: str):
share_sm.update_state(session, share, new_share_state)

return revoked_folders_succeed and revoked_s3_buckets_succeed and revoked_tables_succeed

@staticmethod
def acquire_lock(dataset_uri, session, share_uri):
"""
Attempts to acquire a lock on the dataset identified by dataset_id.

Args:
dataset_uri: The ID of the dataset for which the lock is being acquired.
session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database.
share_uri: The ID of the share that is attempting to acquire the lock.

Returns:
bool: True if the lock is successfully acquired, False otherwise.
"""
try:
# Execute the query to get the DatasetLock object
dataset_lock = (
session.query(DatasetLock)
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
.filter(
and_(
DatasetLock.datasetUri == dataset_uri,
~DatasetLock.isLocked
)
)
.first()
)

# Check if dataset_lock is not None before attempting to update
if dataset_lock:
# Update the attributes of the DatasetLock object
dataset_lock.isLocked = True
dataset_lock.acquiredBy = share_uri

session.commit()
return True
else:
log.info("DatasetLock not found for the given criteria.")
return False

except Exception as e:
session.rollback()
log.error("Error occurred while acquiring lock:", e)
return False

@staticmethod
def acquire_lock_with_retry(dataset_uri, session, share_uri):
max_retries = 10
retry_interval = 60
noah-paige marked this conversation as resolved.
Show resolved Hide resolved
for attempt in range(max_retries):
try:
log.info(f"Attempting to acquire lock for dataset {dataset_uri} by share {share_uri}...")
lock_acquired = DataSharingService.acquire_lock(dataset_uri, session, share_uri)
if lock_acquired:
return True

log.info(
f"Lock for dataset {dataset_uri} already acquired. Retrying in {retry_interval} seconds...")
sleep(retry_interval)

except Exception as e:
log.error("Error occurred while retrying acquiring lock:", e)
return False

log.info(f"Max retries reached. Failed to acquire lock for dataset {dataset_uri}")
return False

@staticmethod
def release_lock(dataset_uri, session, share_uri):
"""
Releases the lock on the dataset identified by dataset_uri.

Args:
dataset_uri: The ID of the dataset for which the lock is being released.
session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database.
share_uri: The ID of the share that is attempting to release the lock.

Returns:
bool: True if the lock is successfully released, False otherwise.
"""
try:
log.info(f"Releasing lock for dataset: {dataset_uri} last acquired by share: {share_uri}")
query = (
session.query(DatasetLock)
.filter(
and_(
DatasetLock.datasetUri == dataset_uri,
DatasetLock.isLocked == True,
DatasetLock.acquiredBy == share_uri
)
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anushka-singh - sorry for all the back and forth ...

with the latest changes to add .with_for_update().one() I am now receiving error: "Arguments: (AttributeError("'DatasetLock' object has no attribute 'update'"),)"

I think for we have 2 options:

  • (1) For def release_lock() we can handle the update the same as for acquire lock
            dataset_lock = (
                session.query(DatasetLock)
                .filter(
                    and_(
                        DatasetLock.datasetUri == dataset_uri,
                        DatasetLock.isLocked == True,
                        DatasetLock.acquiredBy == share_uri
                    )
                )
                .with_for_update().one()
            )
           if dataset_lock:
                dataset_lock.isLocked = False
                dataset_lock.acquiredBy = ''
...
  • (2) OR perform query and update all in 1:
            query = (
                session.query(DatasetLock)
                .filter(
                    and_(
                        DatasetLock.datasetUri == dataset_uri,
                        DatasetLock.isLocked == True,
                        DatasetLock.acquiredBy == share_uri
                    )
                )
                .update(
                {
                    "isLocked": False,
                    "acquiredBy": ''
                },
                )
            )
``` (and likely can handle `acquire_lock()` similarly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that is really weird. I did not come across this issue in my testing!

But I have pushed out another revision with solution (1). Please LMK if its fixed now. I will also try it out again on my end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do - trying again now


query.update(
{
"isLocked": False,
"acquiredBy": share_uri
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
},
synchronize_session=False
)

session.commit()
return True
except Exception as e:
log.error("Error occurred while releasing lock:", e)
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
return False
11 changes: 11 additions & 0 deletions backend/dataall/modules/datasets_base/db/dataset_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,14 @@ class DatasetBucket(Resource, Base):
@classmethod
def uri(cls):
return cls.bucketUri


class DatasetLock(Base):
__tablename__ = 'dataset_lock'
datasetUri = Column(String, nullable=False, primary_key=True)
isLocked = Column(Boolean, default=False)
acquiredBy = Column(String, nullable=False)

noah-paige marked this conversation as resolved.
Show resolved Hide resolved
@classmethod
def uri(cls):
return cls.datasetUri
Loading
Loading