Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/breeze/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,6 @@ PLEASE DO NOT MODIFY THE HASH BELOW! IT IS AUTOMATICALLY UPDATED BY PRE-COMMIT.

---------------------------------------------------------------------------------------------------------

Package config hash: fcc9ad87e1172ef881f3584a066215dde4f617fca6220e81c84f16f94d061231868a754ad9992630e81202a8972df18f64cabf943236c68512c2d99c18085ae7
Package config hash: 19b7a69c4b7ef23d1c665286fd7ca1a1d8c28fa9ba8523da6c3e215d8cd7c4bc0406186898a90c92d8e9f527bc8fa8d5c6407f914d7674e59e4981bb3c795e8c

---------------------------------------------------------------------------------------------------------
7 changes: 5 additions & 2 deletions dev/breeze/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ dependencies = [
"psutil>=5.9.6",
"pygithub>=2.1.1",
"pytest-xdist>=3.3.1",
"pytest>=8.2,<9",
"pytest>=8.3.3",
"pyyaml>=6.0.2",
"requests>=2.31.0",
"restructuredtext-lint>=1.4.0",
Expand All @@ -74,7 +74,10 @@ dependencies = [
"tabulate>=0.9.0",
"tomli>=2.0.1; python_version < '3.11'",
"twine>=4.0.2",
"tqdm>=4.67.1"
"tqdm>=4.67.1",
"boto3>=1.34.90",
"awswrangler>=3.11.0",
"semver>=3.0.4"
]

[project.scripts]
Expand Down
2 changes: 1 addition & 1 deletion devel-common/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ dependencies = [
"pytest-timeouts>=1.2.1",
"pytest-unordered>=0.6.1",
"pytest-xdist>=3.5.0",
"pytest>=8.3.3,<9",
"pytest>=8.3.3",
]
"sentry" = [
"blinker>=1.7.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def aws_region():
return AWS_REGION


@mock_aws
@pytest.fixture
def patch_hook(monkeypatch, aws_region):
"""Patch hook object by dummy boto3 Batch client."""
Expand All @@ -59,6 +58,7 @@ def test_batch_waiters(aws_region):
assert isinstance(batch_waiters, BatchWaitersHook)


@mock_aws
class TestBatchWaiters:
@pytest.fixture(autouse=True)
def setup_tests(self, patch_hook):
Expand Down Expand Up @@ -215,6 +215,7 @@ def test_wait_for_job_raises_for_waiter_error(self):
assert mock_waiter.wait.call_count == 1


@mock_aws
class TestBatchJobWaiters:
"""Test default waiters."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,43 +181,43 @@ def test_event_to_str(self):

@pytest.mark.db_test
class TestCloudwatchTaskHandler:
@conf_vars({("logging", "remote_log_conn_id"): "aws_default"})
@pytest.fixture(autouse=True)
def setup_tests(self, create_log_template, tmp_path_factory, session):
self.remote_log_group = "log_group_name"
self.region_name = "us-west-2"
self.local_log_location = str(tmp_path_factory.mktemp("local-cloudwatch-log-location"))
if AIRFLOW_V_3_0_PLUS:
create_log_template("{dag_id}/{task_id}/{logical_date}/{try_number}.log")
else:
create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
self.cloudwatch_task_handler = CloudwatchTaskHandler(
self.local_log_location,
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
)

date = datetime(2020, 1, 1)
dag_id = "dag_for_testing_cloudwatch_task_handler"
task_id = "task_for_testing_cloudwatch_log_handler"
self.dag = DAG(dag_id=dag_id, schedule=None, start_date=date)
task = EmptyOperator(task_id=task_id, dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="scheduled",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="scheduled",
def setup(self, create_log_template, tmp_path_factory, session):
with conf_vars({("logging", "remote_log_conn_id"): "aws_default"}):
self.remote_log_group = "log_group_name"
self.region_name = "us-west-2"
self.local_log_location = str(tmp_path_factory.mktemp("local-cloudwatch-log-location"))
if AIRFLOW_V_3_0_PLUS:
create_log_template("{dag_id}/{task_id}/{logical_date}/{try_number}.log")
else:
create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
self.cloudwatch_task_handler = CloudwatchTaskHandler(
self.local_log_location,
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)

date = datetime(2020, 1, 1)
dag_id = "dag_for_testing_cloudwatch_task_handler"
task_id = "task_for_testing_cloudwatch_log_handler"
self.dag = DAG(dag_id=dag_id, schedule=None, start_date=date)
task = EmptyOperator(task_id=task_id, dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="scheduled",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="scheduled",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)

self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
self.ti.dag_run = dag_run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,39 @@ def s3mock():

@pytest.mark.db_test
class TestS3RemoteLogIO:
@conf_vars({("logging", "remote_log_conn_id"): "aws_default"})
@pytest.fixture(autouse=True)
def setup_tests(self, create_log_template, tmp_path_factory, session):
self.remote_log_base = "s3://bucket/remote/log/location"
self.remote_log_location = "s3://bucket/remote/log/location/1.log"
self.remote_log_key = "remote/log/location/1.log"
self.local_log_location = str(tmp_path_factory.mktemp("local-s3-log-location"))
create_log_template("{try_number}.log")
self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
# Verify the hook now with the config override
self.subject = self.s3_task_handler.io
assert self.subject.hook is not None

date = datetime(2016, 1, 1)
self.dag = DAG("dag_for_testing_s3_task_handler", schedule=None, start_date=date)
task = EmptyOperator(task_id="task_for_testing_s3_log_handler", dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="manual",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="manual",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)
with conf_vars({("logging", "remote_log_conn_id"): "aws_default"}):
self.remote_log_base = "s3://bucket/remote/log/location"
self.remote_log_location = "s3://bucket/remote/log/location/1.log"
self.remote_log_key = "remote/log/location/1.log"
self.local_log_location = str(tmp_path_factory.mktemp("local-s3-log-location"))
create_log_template("{try_number}.log")
self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
# Verify the hook now with the config override
self.subject = self.s3_task_handler.io
assert self.subject.hook is not None

date = datetime(2016, 1, 1)
self.dag = DAG("dag_for_testing_s3_task_handler", schedule=None, start_date=date)
task = EmptyOperator(task_id="task_for_testing_s3_log_handler", dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="manual",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="manual",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)

self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
self.ti.dag_run = dag_run
Expand Down