Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,10 @@ def test_task_retry_on_api_failure(self, _, mock_executor, caplog):
mock_executor.attempt_submit_jobs()
mock_executor.sync_running_jobs()
for i in range(2):
assert f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed"
assert (
f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed"
in caplog.text
)

@mock.patch("airflow.providers.amazon.aws.executors.batch.batch_executor.exponential_backoff_retry")
def test_sync_unhealthy_boto_connection(self, mock_exponentional_backoff_retry, mock_executor):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,10 @@ def test_inject_openlineage_simple_config_wrong_transport_to_spark(
operator.hook.TERMINAL_STATES = [BatchState.SUCCESS]
operator.execute(MagicMock())

assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
assert (
"OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
in caplog.text
)
assert operator.spark_params["conf"] == {}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,10 @@ def test_inject_openlineage_simple_config_wrong_transport_to_spark(
)
operator.execute(MagicMock())

assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
assert (
"OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
in caplog.text
)
assert operator.conf == {
"parquet.compression": "SNAPPY",
}
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,7 @@ def test_async_get_logs_should_execute_successfully(
@patch(KUB_OP_PATH.format("extract_xcom"))
@patch(HOOK_CLASS)
@patch(KUB_OP_PATH.format("pod_manager"))
@pytest.mark.xfail
def test_async_write_logs_should_execute_successfully(
self, mock_manager, mocked_hook, mock_extract_xcom, post_complete_action, get_logs
):
Expand All @@ -2565,8 +2566,12 @@ def test_async_write_logs_should_execute_successfully(
self.run_pod_async(k)

if get_logs:
assert f"Container logs: {test_logs}"
# Note: the test below is broken and failing. Either the mock is wrong
# or the mocked container is not in a state that logging methods are called at-all.
# See https://github.com/apache/airflow/issues/57515
assert f"Container logs: {test_logs}" # noqa: PLW0129
post_complete_action.assert_called_once()
mock_manager.return_value.read_pod_logs.assert_called()
else:
mock_manager.return_value.read_pod_logs.assert_not_called()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,14 @@ async def test_run_loop_return_waiting_event(
mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock())
mock_method.return_value = ContainerState.WAITING

caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG)

task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert not task.done()
assert "Container is not completed and still working."
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert "Container is not completed and still working." in caplog.text
assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
Expand All @@ -175,14 +175,14 @@ async def test_run_loop_return_running_event(
mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock())
mock_method.return_value = ContainerState.RUNNING

caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG)

task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert not task.done()
assert "Container is not completed and still working."
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert "Container is not completed and still working." in caplog.text
assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
Expand Down Expand Up @@ -250,7 +250,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully(

generator = trigger.run()
await generator.asend(None)
assert "Container logs:"
assert "Waiting until 120s to get the POD scheduled..." in caplog.text

@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,8 @@ async def test_run_loop_is_still_running(self, mock_hook, trigger, caplog):
await asyncio.sleep(0.5)

assert not task.done()
assert f"Current job status is: {TransferState.RUNNING}"
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert (
f"Current state is {TransferState.RUNNING}" in caplog.text
or "Current state is TransferState.RUNNING" in caplog.text
)
assert f"Waiting for {POLL_INTERVAL} seconds" in caplog.text
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ async def test_run_loop_is_still_running(self, mock_job_status, template_job_sta
await asyncio.sleep(0.5)

assert not task.done()
assert f"Current job status is: {JobState.JOB_STATE_RUNNING}"
assert f"Sleeping for {POLL_SLEEP} seconds."
assert "Current job status is: JOB_STATE_RUNNING" in caplog.text
assert f"Sleeping for {POLL_SLEEP} seconds." in caplog.text
# cancel the task to suppress test warnings
task.cancel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,4 @@ async def test_async_dataplex_job_run_loop_is_still_running(
await asyncio.sleep(0.5)

assert not task.done()
assert f"Current state is: {DataScanJob.State.RUNNING}, sleeping for {TEST_POLL_INTERVAL} seconds."
assert f"Current state is: RUNNING, sleeping for {TEST_POLL_INTERVAL} seconds." in caplog.text
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,14 @@ async def test_run_loop_return_waiting_event_should_execute_successfully(
mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock())
mock_method.return_value = ContainerState.WAITING

caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG)

task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert not task.done()
assert "Container is not completed and still working."
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert "Container is not completed and still working." in caplog.text
assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start")
Expand All @@ -219,14 +219,14 @@ async def test_run_loop_return_running_event_should_execute_successfully(
mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock())
mock_method.return_value = ContainerState.RUNNING

caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG)

task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert not task.done()
assert "Container is not completed and still working."
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert "Container is not completed and still working." in caplog.text
assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start")
Expand Down Expand Up @@ -265,7 +265,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully(

generator = trigger.run()
await generator.asend(None)
assert "Container logs:"
assert "Waiting until 120s to get the POD scheduled..." in caplog.text

@pytest.mark.parametrize(
"container_state, expected_state",
Expand Down Expand Up @@ -447,8 +447,8 @@ async def test_run_loop_return_waiting_event_pending_status(
await asyncio.sleep(0.5)

assert not task.done()
assert "Operation is still running."
assert f"Sleeping for {POLL_INTERVAL}s..."
assert "Operation is still running." in caplog.text
assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._get_hook")
Expand All @@ -470,8 +470,8 @@ async def test_run_loop_return_waiting_event_running_status(
await asyncio.sleep(0.5)

assert not task.done()
assert "Operation is still running."
assert f"Sleeping for {POLL_INTERVAL}s..."
assert "Operation is still running." in caplog.text
assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text


class TestGKEStartJobTrigger:
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,11 @@ extend-select = [
"E", # pycodestyle rules
"W", # pycodestyle rules
# Warning (PLW) re-implemented in ruff from Pylint
"PLW0127", # Self-assignment of variable
"PLW0120", # else clause on loop without a break statement; remove the else and dedent its contents
"PLW0127", # Self-assignment of variable
"PLW0128", # Redeclared variable {name} in assignment
"PLW0129", # Asserting on an empty string literal will never pass
"PLW0133", # Missing raise statement on exception
# Per rule enables
"RUF006", # Checks for asyncio dangling task
"RUF015", # Checks for unnecessary iterable allocation for first element
Expand Down Expand Up @@ -625,7 +628,6 @@ extend-select = [
"RET506", # Unnecessary {branch} after raise statement
"RET507", # Unnecessary {branch} after continue statement
"RET508", # Unnecessary {branch} after break statement
"PLW0133", # Missing raise statement on exception
]
ignore = [
"D100", # Unwanted; Docstring at the top of every file.
Expand Down
Loading