Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Dataset Lock Mechanism to Generic Resource Lock #1338

Merged
merged 20 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3170dae
Split APIs and types between S3-shares and shares-base
dlpzx Jun 7, 2024
05fd497
Merge remote-tracking branch 'refs/remotes/origin/main' into feat/gen…
dlpzx Jun 14, 2024
17070cf
Fix issues with tables select worksheet and with unused items in sear…
dlpzx Jun 14, 2024
274cbee
Make resource lock generic pt 1
noah-paige Jun 17, 2024
e61bebe
Create and delete resource locks on env group or CR create or delete
noah-paige Jun 17, 2024
d1ced2a
Add migration script and debug
noah-paige Jun 18, 2024
0c5c0cb
Merge latest from os
noah-paige Jun 18, 2024
07a2e3d
add back resources
noah-paige Jun 18, 2024
d1ce3ba
Fix revision IDs
noah-paige Jun 18, 2024
670e12a
Merge branch 'os-main' into generic-resource-locking
noah-paige Jun 19, 2024
3fe45e0
merge latest from main and update alembic revision ID
noah-paige Jun 19, 2024
53ae0a1
Merge latest from main + resolve conflicts + update db migration revi…
noah-paige Jun 24, 2024
3ff6734
Merge latest from main + resolve conflicts + update db migration revi…
noah-paige Jun 24, 2024
74555b1
Merge branch 'os-main' into generic-resource-locking
noah-paige Jun 26, 2024
2b4909a
Merge branch 'os-main' into generic-resource-locking
noah-paige Jun 26, 2024
8867991
Add context manager to acquire locks with retry and move function to …
noah-paige Jun 26, 2024
ccde783
Remove isLocked and create/delete + fix migration script
noah-paige Jun 26, 2024
d41e8e4
Fix tests
noah-paige Jun 26, 2024
59752d7
make methods private acquire and release
noah-paige Jun 27, 2024
9e2e569
make methods private acquire and release
noah-paige Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions backend/dataall/base/db/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,15 @@ def __init__(self, action, message):

def __str__(self):
return f'{self.message}'


class ResourceLockTimeout(Exception):
def __init__(self, action, message):
self.action = action
self.message = f"""
An error occurred (ResourceLockTimeout) when calling {self.action} operation:
{message}
"""

def __str__(self):
return f'{self.message}'
10 changes: 1 addition & 9 deletions backend/dataall/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
"""The package contains the core functionality that is required by data.all to work correctly"""

from dataall.core import (
permissions,
stacks,
groups,
environment,
organizations,
tasks,
vpc,
)
from dataall.core import permissions, stacks, groups, environment, organizations, tasks, vpc, resource_lock
Empty file.
Empty file.
25 changes: 25 additions & 0 deletions backend/dataall/core/resource_lock/db/resource_lock_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Optional
from sqlalchemy import Column, String, Boolean

from dataall.base.db import Base


class ResourceLock(Base):
__tablename__ = 'resource_lock'

resourceUri = Column(String, nullable=False, primary_key=True)
resourceType = Column(String, nullable=False, primary_key=True)
acquiredByUri = Column(String, nullable=True)
acquiredByType = Column(String, nullable=True)

def __init__(
self,
resourceUri: str,
resourceType: str,
acquiredByUri: Optional[str] = None,
acquiredByType: Optional[str] = None,
):
self.resourceUri = resourceUri
self.resourceType = resourceType
self.acquiredByUri = acquiredByUri
self.acquiredByType = acquiredByType
Comment on lines +15 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be a @dataclass and avoid the boilerplate ctor?

137 changes: 137 additions & 0 deletions backend/dataall/core/resource_lock/db/resource_lock_repositories.py
petrkalos marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import logging

from dataall.core.resource_lock.db.resource_lock_models import ResourceLock
from sqlalchemy import and_, or_
from sqlalchemy.orm import Session
from time import sleep
from typing import List, Tuple
from contextlib import contextmanager
from dataall.base.db.exceptions import ResourceLockTimeout

log = logging.getLogger(__name__)

MAX_RETRIES = 10
RETRY_INTERVAL = 60


class ResourceLockRepository:
@staticmethod
def _acquire_locks(resources, session, acquired_by_uri, acquired_by_type):
"""
Attempts to acquire/create one or more locks on the resources identified by resourceUri and resourceType.

Args:
resources: List of resource tuples (resourceUri, resourceType) to acquire locks for.
session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database.
acquired_by_uri: The ID of the resource that is attempting to acquire the lock.
acquired_by_type: The resource type that is attempting to acquire the lock.qu

Returns:
bool: True if the lock is successfully acquired, False otherwise.
"""
try:
# Execute the query to get the ResourceLock object
filter_conditions = [
and_(
ResourceLock.resourceUri == resource[0],
ResourceLock.resourceType == resource[1],
)
for resource in resources
]

if not session.query(ResourceLock).filter(or_(*filter_conditions)).first():
records = []
for resource in resources:
records.append(
ResourceLock(
resourceUri=resource[0],
resourceType=resource[1],
acquiredByUri=acquired_by_uri,
acquiredByType=acquired_by_type,
)
)
session.add_all(records)
session.commit()
return True
else:
log.info(
'Not all ResourceLocks were found. One or more ResourceLocks may be acquired by another resource...'
)
return False
except Exception as e:
session.expunge_all()
session.rollback()
log.error('Error occurred while acquiring lock:', e)
return False

@staticmethod
def _release_lock(session, resource_uri, resource_type, share_uri):
"""
Releases/delete the lock on the resource identified by resource_uri, resource_type.

Args:
session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database.
resource_uri: The ID of the resource that owns the lock.
resource_type: The type of the resource that owns the lock.
share_uri: The ID of the share that is attempting to release the lock.

Returns:
bool: True if the lock is successfully released, False otherwise.
"""
try:
log.info(f'Releasing lock for resource: {resource_uri=}, {resource_type=}')

resource_lock = (
session.query(ResourceLock)
.filter(
and_(
ResourceLock.resourceUri == resource_uri,
ResourceLock.resourceType == resource_type,
ResourceLock.acquiredByUri == share_uri,
)
)
.with_for_update()
.first()
)

if resource_lock:
session.delete(resource_lock)
session.commit()
return True
else:
log.info(f'ResourceLock not found for resource: {resource_uri=}, {resource_type=}')
return False

except Exception as e:
session.expunge_all()
session.rollback()
log.error('Error occurred while releasing lock:', e)
return False

@staticmethod
@contextmanager
def acquire_lock_with_retry(
resources: List[Tuple[str, str]], session: Session, acquired_by_uri: str, acquired_by_type: str
):
retries_remaining = MAX_RETRIES
log.info(f'Attempting to acquire lock for resources {resources} by share {acquired_by_uri}...')
while not (
lock_acquired := ResourceLockRepository._acquire_locks(
resources, session, acquired_by_uri, acquired_by_type
)
):
log.info(
f'Lock for one or more resources {resources} already acquired. Retrying in {RETRY_INTERVAL} seconds...'
)
sleep(RETRY_INTERVAL)
retries_remaining -= 1
if retries_remaining <= 0:
raise ResourceLockTimeout(
'process shares',
f'Failed to acquire lock for one or more of {resources=}',
)
try:
yield lock_acquired
finally:
for resource in resources:
ResourceLockRepository._release_lock(session, resource[0], resource[1], acquired_by_uri)
12 changes: 0 additions & 12 deletions backend/dataall/modules/datasets_base/db/dataset_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,3 @@ def uri(cls):


DatasetBase.__name__ = 'Dataset'


class DatasetLock(Base):
__tablename__ = 'dataset_lock'
datasetUri = Column(String, ForeignKey('dataset.datasetUri'), nullable=False, primary_key=True)
isLocked = Column(Boolean, default=False)
acquiredBy = Column(String, nullable=True)

def __init__(self, datasetUri, isLocked=False, acquiredBy=None):
self.datasetUri = datasetUri
self.isLocked = isLocked
self.acquiredBy = acquiredBy
14 changes: 1 addition & 13 deletions backend/dataall/modules/datasets_base/db/dataset_repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,14 @@
from dataall.base.db import paginate
from dataall.base.db.exceptions import ObjectNotFound
from dataall.core.activity.db.activity_models import Activity
from dataall.modules.datasets_base.db.dataset_models import DatasetBase, DatasetLock
from dataall.modules.datasets_base.db.dataset_models import DatasetBase

logger = logging.getLogger(__name__)


class DatasetBaseRepository:
"""DAO layer for GENERIC Datasets"""

@staticmethod
def create_dataset_lock(session, dataset: DatasetBase):
dataset_lock = DatasetLock(datasetUri=dataset.datasetUri, isLocked=False, acquiredBy='')
session.add(dataset_lock)
session.commit()

@staticmethod
def delete_dataset_lock(session, dataset: DatasetBase):
dataset_lock = session.query(DatasetLock).filter(DatasetLock.datasetUri == dataset.datasetUri).first()
session.delete(dataset_lock)
session.commit()

@staticmethod
def update_dataset_activity(session, dataset: DatasetBase, username):
activity = Activity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
from typing import List
from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository
from dataall.base.aws.quicksight import QuicksightClient
from dataall.base.db import exceptions
from dataall.base.utils.naming_convention import NamingConventionPattern
Expand Down Expand Up @@ -164,8 +165,6 @@ def create_dataset(uri, admin_group, data: dict):
DatasetService.check_imported_resources(dataset)

dataset = DatasetRepository.create_dataset(session=session, env=environment, dataset=dataset, data=data)
DatasetBaseRepository.create_dataset_lock(session=session, dataset=dataset)

DatasetBucketRepository.create_dataset_bucket(session, dataset, data)

ResourcePolicyService.attach_resource_policy(
Expand Down Expand Up @@ -411,7 +410,6 @@ def delete_dataset(uri: str, delete_from_aws: bool = False):
ResourcePolicyService.delete_resource_policy(session=session, resource_uri=uri, group=env.SamlGroupName)
if dataset.stewards:
ResourcePolicyService.delete_resource_policy(session=session, resource_uri=uri, group=dataset.stewards)
DatasetBaseRepository.delete_dataset_lock(session=session, dataset=dataset)
DatasetRepository.delete_dataset(session, dataset)

if delete_from_aws:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,3 @@ def __init__(self, action, message):
class PrincipalRoleNotFound(BaseShareException):
def __init__(self, action, message):
super().__init__('PrincipalRoleNotFound', action, message)


class DatasetLockTimeout(BaseShareException):
def __init__(self, action, message):
super().__init__('DatasetLockTimeout', action, message)
Loading
Loading