From f9912d9bc46c1dd76b80f14ff111379cb616882f Mon Sep 17 00:00:00 2001 From: idantepper Date: Sun, 21 Apr 2024 10:04:35 +0300 Subject: [PATCH 1/6] (S3DeleteObjectsOperator): Added ability to delete keys by last modified time --- airflow/providers/amazon/aws/operators/s3.py | 34 +++++-- .../providers/amazon/aws/operators/test_s3.py | 94 ++++++++++++++++--- 2 files changed, 109 insertions(+), 19 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py index f256196ee96cd..db38681c65b4e 100644 --- a/airflow/providers/amazon/aws/operators/s3.py +++ b/airflow/providers/amazon/aws/operators/s3.py @@ -28,11 +28,11 @@ from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.helpers import exactly_one +from airflow.utils.timezone import datetime if TYPE_CHECKING: from airflow.utils.context import Context - BUCKET_DOES_NOT_EXIST_MSG = "Bucket with name: %s doesn't exist" @@ -473,6 +473,10 @@ class S3DeleteObjectsOperator(BaseOperator): :param prefix: Prefix of objects to delete. (templated) All objects matching this prefix in the bucket will be deleted. + :param from_datetime: Greater LastModified Date of objects to delete. (templated) + All objects which LastModified Date is greater than this datetime in the bucket will be deleted. + :param to_datetime: less LastModified Date of objects to delete. (templated) + All objects which LastModified Date is less than this datetime in the bucket will be deleted. :param aws_conn_id: Connection id of the S3 connection to use :param verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. @@ -487,7 +491,7 @@ class S3DeleteObjectsOperator(BaseOperator): CA cert bundle than the one used by botocore. """ - template_fields: Sequence[str] = ("keys", "bucket", "prefix") + template_fields: Sequence[str] = ("keys", "bucket", "prefix", "from_datetime", "to_datetime") def __init__( self, @@ -495,6 +499,8 @@ def __init__( bucket: str, keys: str | list | None = None, prefix: str | None = None, + from_datetime: datetime | None = None, + to_datetime: datetime | None = None, aws_conn_id: str | None = "aws_default", verify: str | bool | None = None, **kwargs, @@ -503,23 +509,37 @@ def __init__( self.bucket = bucket self.keys = keys self.prefix = prefix + self.from_datetime = from_datetime + self.to_datetime = to_datetime self.aws_conn_id = aws_conn_id self.verify = verify self._keys: str | list[str] = "" - if not exactly_one(prefix is None, keys is None): - raise AirflowException("Either keys or prefix should be set.") + if not exactly_one(keys is None, all([prefix is None, from_datetime is None, to_datetime is None])): + raise AirflowException( + "Either keys or at least one of prefix, from_datetime, to_datetime should be set." + ) def execute(self, context: Context): - if not exactly_one(self.keys is None, self.prefix is None): - raise AirflowException("Either keys or prefix should be set.") + if not exactly_one( + self.keys is None, + all([self.prefix is None, self.from_datetime is None, self.to_datetime is None]), + ): + raise AirflowException( + "Either keys or at least one of prefix, from_datetime, to_datetime should be set." + ) if isinstance(self.keys, (list, str)) and not self.keys: return s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix) + keys = self.keys or s3_hook.list_keys( + bucket_name=self.bucket, + prefix=self.prefix, + from_datetime=self.from_datetime, + to_datetime=self.to_datetime, + ) if keys: s3_hook.delete_objects(bucket=self.bucket, keys=keys) self._keys = keys diff --git a/tests/providers/amazon/aws/operators/test_s3.py b/tests/providers/amazon/aws/operators/test_s3.py index a2b381e50bec4..e2cf5d3543eaa 100644 --- a/tests/providers/amazon/aws/operators/test_s3.py +++ b/tests/providers/amazon/aws/operators/test_s3.py @@ -51,6 +51,7 @@ S3PutBucketTaggingOperator, ) from airflow.providers.openlineage.extractors import OperatorLineage +from airflow.utils.timezone import datetime, utcnow BUCKET_NAME = os.environ.get("BUCKET_NAME", "test-airflow-bucket") S3_KEY = "test-airflow-key" @@ -531,6 +532,37 @@ def test_s3_delete_multiple_objects(self): # There should be no object found in the bucket created earlier assert "Contents" not in conn.list_objects(Bucket=bucket, Prefix=key_pattern) + def test_s3_delete_from_to_datetime(self): + bucket = "testbucket" + key_pattern = "path/data" + n_keys = 3 + keys = [key_pattern + str(i) for i in range(n_keys)] + + conn = boto3.client("s3") + conn.create_bucket(Bucket=bucket) + for k in keys: + conn.upload_fileobj(Bucket=bucket, Key=k, Fileobj=BytesIO(b"input")) + + # The objects should be detected before the DELETE action is taken + objects_in_dest_bucket = conn.list_objects(Bucket=bucket) + assert len(objects_in_dest_bucket["Contents"]) == n_keys + assert sorted(x["Key"] for x in objects_in_dest_bucket["Contents"]) == sorted(keys) + + now = utcnow() + from_datetime = now.replace(year=now.year - 1) + to_datetime = now.replace(year=now.year + 1) + + op = S3DeleteObjectsOperator( + task_id="test_task_s3_delete_prefix", + bucket=bucket, + from_datetime=from_datetime, + to_datetime=to_datetime, + ) + op.execute(None) + + # There should be no object found in the bucket created earlier + assert "Contents" not in conn.list_objects(Bucket=bucket) + def test_s3_delete_prefix(self): bucket = "testbucket" key_pattern = "path/data" @@ -598,31 +630,64 @@ def test_s3_delete_empty_string(self): assert objects_in_dest_bucket["Contents"][0]["Key"] == key_of_test @pytest.mark.parametrize( - "keys, prefix", + "keys, prefix, from_datetime, to_datetime", [ - pytest.param("path/data.txt", "path/data", id="single-key-and-prefix"), - pytest.param(["path/data.txt"], "path/data", id="multiple-keys-and-prefix"), - pytest.param(None, None, id="both-none"), + pytest.param("path/data.txt", "path/data", None, None, id="single-key-and-prefix"), + pytest.param(["path/data.txt"], "path/data", None, None, id="multiple-keys-and-prefix"), + pytest.param( + ["path/data.txt"], + "path/data", + datetime(1992, 3, 8, 18, 52, 51), + None, + id="keys-prefix-and-from_datetime", + ), + pytest.param( + ["path/data.txt"], + "path/data", + datetime(1992, 3, 8, 18, 52, 51), + datetime(1993, 3, 8, 18, 52, 51), + id="keys-prefix-and-from-to_datetime", + ), + pytest.param(None, None, None, None, id="all-none"), ], ) - def test_validate_keys_and_prefix_in_constructor(self, keys, prefix): - with pytest.raises(AirflowException, match=r"Either keys or prefix should be set\."): + def test_validate_keys_and_filters_in_constructor(self, keys, prefix, from_datetime, to_datetime): + with pytest.raises( + AirflowException, + match=r"Either keys or at least one of prefix, from_datetime, to_datetime should be set.", + ): S3DeleteObjectsOperator( task_id="test_validate_keys_and_prefix_in_constructor", bucket="foo-bar-bucket", keys=keys, prefix=prefix, + from_datetime=from_datetime, + to_datetime=to_datetime, ) @pytest.mark.parametrize( - "keys, prefix", + "keys, prefix, from_datetime, to_datetime", [ - pytest.param("path/data.txt", "path/data", id="single-key-and-prefix"), - pytest.param(["path/data.txt"], "path/data", id="multiple-keys-and-prefix"), - pytest.param(None, None, id="both-none"), + pytest.param("path/data.txt", "path/data", None, None, id="single-key-and-prefix"), + pytest.param(["path/data.txt"], "path/data", None, None, id="multiple-keys-and-prefix"), + pytest.param( + ["path/data.txt"], + "path/data", + datetime(1992, 3, 8, 18, 52, 51), + None, + id="keys-prefix-and-from_datetime", + ), + pytest.param( + ["path/data.txt"], + "path/data", + datetime(1992, 3, 8, 18, 52, 51), + datetime(1993, 3, 8, 18, 52, 51), + id="keys-prefix-and-from-to_datetime", + ), + pytest.param(None, None, None, None, id="all-none"), ], ) - def test_validate_keys_and_prefix_in_execute(self, keys, prefix): + def test_validate_keys_and_prefix_in_execute(self, keys, prefix, from_datetime, to_datetime): bucket = "testbucket" key_of_test = "path/data.txt" @@ -639,13 +704,18 @@ def test_validate_keys_and_prefix_in_execute(self, keys, prefix): ) op.keys = keys op.prefix = prefix + op.from_datetime = from_datetime + op.to_datetime = to_datetime # The object should be detected before the DELETE action is tested objects_in_dest_bucket = conn.list_objects(Bucket=bucket, Prefix=key_of_test) assert len(objects_in_dest_bucket["Contents"]) == 1 assert objects_in_dest_bucket["Contents"][0]["Key"] == key_of_test - with pytest.raises(AirflowException, match=r"Either keys or prefix should be set\."): + with pytest.raises( + AirflowException, + match=r"Either keys or at least one of prefix, from_datetime, to_datetime should be set.", + ): op.execute(None) # The object found in the bucket created earlier should still be there From 7912e552bc6be02e78816f05c32fd7dd1ef4d20d Mon Sep 17 00:00:00 2001 From: idantepper Date: Tue, 23 Apr 2024 00:21:18 +0300 Subject: [PATCH 2/6] (operator/s3.py): changed the import class to datetime and moved it to TYPE_CHECKING section --- airflow/providers/amazon/aws/operators/s3.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py index db38681c65b4e..a8915f07a2cba 100644 --- a/airflow/providers/amazon/aws/operators/s3.py +++ b/airflow/providers/amazon/aws/operators/s3.py @@ -28,9 +28,10 @@ from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.helpers import exactly_one -from airflow.utils.timezone import datetime if TYPE_CHECKING: + from datetime import datetime + from airflow.utils.context import Context BUCKET_DOES_NOT_EXIST_MSG = "Bucket with name: %s doesn't exist" From f280b36ee52ab7bc6623698e54c702c0dd8cd81d Mon Sep 17 00:00:00 2001 From: idantepper Date: Tue, 23 Apr 2024 01:47:56 +0300 Subject: [PATCH 3/6] (utils/helpers.py): added at_least_one function --- airflow/utils/helpers.py | 15 +++++++++++++++ tests/utils/test_helpers.py | 24 ++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 3ad87bd104035..885f9768806cf 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -294,6 +294,21 @@ def render_template_as_native(template: jinja2.Template, context: Context) -> An return render_template(template, cast(MutableMapping[str, Any], context), native=True) +def at_least_one(*args) -> bool: + """ + Return True if at least one of the *args is "truthy", and False Otherwise. + + If user supplies an iterable, we raise ValueError and force them to unpack. + """ + if not args: + raise ValueError("Not supported argument. Need to get at least one argument") + if is_container(args[0]): + raise ValueError( + "Not supported for iterable args. Use `*` to unpack your iterable in the function call." + ) + return any(args) + + def exactly_one(*args) -> bool: """ Return True if exactly one of *args is "truthy", and False otherwise. diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index 75b04b14c7b50..9e77a80ae9b61 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -27,6 +27,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner from airflow.utils import helpers, timezone from airflow.utils.helpers import ( + at_least_one, at_most_one, build_airflow_url_with_query, exactly_one, @@ -250,6 +251,29 @@ def test_validate_group_key(self, key_id, message, exception): else: validate_group_key(key_id) + def test_empty_at_least_one(self): + with pytest.raises( + ValueError, + match=r"Not supported argument. Need to get at least one argument", + ): + at_least_one() + + def test_at_least_one_should_fail(self): + with pytest.raises(ValueError): + at_least_one([True, False]) + + @pytest.mark.parametrize( + "input_args, expected_output", + [ + pytest.param((True, True, True), True, id="all-values-true"), + pytest.param((True, False, False), True, id="only-one-true"), + pytest.param((True, True, False), True, id="multiple-true-and-false"), + pytest.param((False, False, False), False, id="all-false"), + ], + ) + def test_at_least_one_true(self, input_args, expected_output): + assert at_least_one(*input_args) == expected_output + def test_exactly_one(self): """ Checks that when we set ``true_count`` elements to "truthy", and others to "falsy", From 23058bb0d94c9bb129c71a0fa5aa75458dd4b7c0 Mon Sep 17 00:00:00 2001 From: idantepper Date: Tue, 23 Apr 2024 02:20:13 +0300 Subject: [PATCH 4/6] (operators/s3.py): changed validations to use at_least_one helper --- airflow/providers/amazon/aws/operators/s3.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py index a8915f07a2cba..dd1f6941ae882 100644 --- a/airflow/providers/amazon/aws/operators/s3.py +++ b/airflow/providers/amazon/aws/operators/s3.py @@ -27,7 +27,7 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.utils.helpers import exactly_one +from airflow.utils.helpers import at_least_one, exactly_one if TYPE_CHECKING: from datetime import datetime @@ -517,15 +517,14 @@ def __init__( self._keys: str | list[str] = "" - if not exactly_one(keys is None, all([prefix is None, from_datetime is None, to_datetime is None])): + if not exactly_one(keys is not None, at_least_one(prefix, from_datetime, to_datetime)): raise AirflowException( "Either keys or at least one of prefix, from_datetime, to_datetime should be set." ) def execute(self, context: Context): if not exactly_one( - self.keys is None, - all([self.prefix is None, self.from_datetime is None, self.to_datetime is None]), + self.keys is not None, at_least_one(self.prefix, self.from_datetime, self.to_datetime) ): raise AirflowException( "Either keys or at least one of prefix, from_datetime, to_datetime should be set." From 552374cc97fe0bab80710e6a878b2b596053b65b Mon Sep 17 00:00:00 2001 From: idantepper Date: Sat, 4 May 2024 18:11:21 +0300 Subject: [PATCH 5/6] (operators/s3.py): changed validation of given variables in init and execute --- airflow/providers/amazon/aws/operators/s3.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py index dd1f6941ae882..a3f9c9245e041 100644 --- a/airflow/providers/amazon/aws/operators/s3.py +++ b/airflow/providers/amazon/aws/operators/s3.py @@ -27,7 +27,7 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.utils.helpers import at_least_one, exactly_one +from airflow.utils.helpers import exactly_one if TYPE_CHECKING: from datetime import datetime @@ -517,14 +517,14 @@ def __init__( self._keys: str | list[str] = "" - if not exactly_one(keys is not None, at_least_one(prefix, from_datetime, to_datetime)): + if not exactly_one(keys is None, all(var is None for var in [prefix, from_datetime, to_datetime])): raise AirflowException( "Either keys or at least one of prefix, from_datetime, to_datetime should be set." ) def execute(self, context: Context): if not exactly_one( - self.keys is not None, at_least_one(self.prefix, self.from_datetime, self.to_datetime) + self.keys is None, all(var is None for var in [self.prefix, self.from_datetime, self.to_datetime]) ): raise AirflowException( "Either keys or at least one of prefix, from_datetime, to_datetime should be set." From bdf22754f2a4064943d4ba28a5787e7ea1cea8e0 Mon Sep 17 00:00:00 2001 From: idantepper Date: Sat, 4 May 2024 18:27:17 +0300 Subject: [PATCH 6/6] (operators/s3.py): changed validation of given variables in init and execute --- airflow/utils/helpers.py | 15 --------------- tests/utils/test_helpers.py | 24 ------------------------ 2 files changed, 39 deletions(-) diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 885f9768806cf..3ad87bd104035 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -294,21 +294,6 @@ def render_template_as_native(template: jinja2.Template, context: Context) -> An return render_template(template, cast(MutableMapping[str, Any], context), native=True) -def at_least_one(*args) -> bool: - """ - Return True if at least one of the *args is "truthy", and False Otherwise. - - If user supplies an iterable, we raise ValueError and force them to unpack. - """ - if not args: - raise ValueError("Not supported argument. Need to get at least one argument") - if is_container(args[0]): - raise ValueError( - "Not supported for iterable args. Use `*` to unpack your iterable in the function call." - ) - return any(args) - - def exactly_one(*args) -> bool: """ Return True if exactly one of *args is "truthy", and False otherwise. diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index 9e77a80ae9b61..75b04b14c7b50 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -27,7 +27,6 @@ from airflow.jobs.base_job_runner import BaseJobRunner from airflow.utils import helpers, timezone from airflow.utils.helpers import ( - at_least_one, at_most_one, build_airflow_url_with_query, exactly_one, @@ -251,29 +250,6 @@ def test_validate_group_key(self, key_id, message, exception): else: validate_group_key(key_id) - def test_empty_at_least_one(self): - with pytest.raises( - ValueError, - match=r"Not supported argument. Need to get at least one argument", - ): - at_least_one() - - def test_at_least_one_should_fail(self): - with pytest.raises(ValueError): - at_least_one([True, False]) - - @pytest.mark.parametrize( - "input_args, expected_output", - [ - pytest.param((True, True, True), True, id="all-values-true"), - pytest.param((True, False, False), True, id="only-one-true"), - pytest.param((True, True, False), True, id="multiple-true-and-false"), - pytest.param((False, False, False), False, id="all-false"), - ], - ) - def test_at_least_one_true(self, input_args, expected_output): - assert at_least_one(*input_args) == expected_output - def test_exactly_one(self): """ Checks that when we set ``true_count`` elements to "truthy", and others to "falsy",