diff --git a/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py b/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py index 1098efac777c4..548e82a1e9e41 100644 --- a/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py +++ b/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py @@ -461,7 +461,10 @@ def test_task_retry_on_api_failure(self, _, mock_executor, caplog): mock_executor.attempt_submit_jobs() mock_executor.sync_running_jobs() for i in range(2): - assert f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed" + assert ( + f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed" + in caplog.text + ) @mock.patch("airflow.providers.amazon.aws.executors.batch.batch_executor.exponential_backoff_retry") def test_sync_unhealthy_boto_connection(self, mock_exponentional_backoff_retry, mock_executor): diff --git a/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py index 2987796dcd362..da69f22cfa34a 100644 --- a/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py @@ -554,7 +554,10 @@ def test_inject_openlineage_simple_config_wrong_transport_to_spark( operator.hook.TERMINAL_STATES = [BatchState.SUCCESS] operator.execute(MagicMock()) - assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + assert ( + "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + in caplog.text + ) assert operator.spark_params["conf"] == {} diff --git a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py index 7acffb447d07c..c7010a0fb835f 100644 --- a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py +++ b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py @@ -467,7 +467,10 @@ def test_inject_openlineage_simple_config_wrong_transport_to_spark( ) operator.execute(MagicMock()) - assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + assert ( + "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + in caplog.text + ) assert operator.conf == { "parquet.compression": "SNAPPY", } diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index abb503c8aa477..3a64fe1f9b193 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -2545,6 +2545,7 @@ def test_async_get_logs_should_execute_successfully( @patch(KUB_OP_PATH.format("extract_xcom")) @patch(HOOK_CLASS) @patch(KUB_OP_PATH.format("pod_manager")) + @pytest.mark.xfail def test_async_write_logs_should_execute_successfully( self, mock_manager, mocked_hook, mock_extract_xcom, post_complete_action, get_logs ): @@ -2565,8 +2566,12 @@ def test_async_write_logs_should_execute_successfully( self.run_pod_async(k) if get_logs: - assert f"Container logs: {test_logs}" + # Note: the test below is broken and failing. Either the mock is wrong + # or the mocked container is not in a state that logging methods are called at-all. + # See https://github.com/apache/airflow/issues/57515 + assert f"Container logs: {test_logs}" # noqa: PLW0129 post_complete_action.assert_called_once() + mock_manager.return_value.read_pod_logs.assert_called() else: mock_manager.return_value.read_pod_logs.assert_not_called() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index fc477fe0cf763..ce57d0f5e96d9 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -156,14 +156,14 @@ async def test_run_loop_return_waiting_event( mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.WAITING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") @@ -175,14 +175,14 @@ async def test_run_loop_return_running_event( mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.RUNNING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") @@ -250,7 +250,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( generator = trigger.run() await generator.asend(None) - assert "Container logs:" + assert "Waiting until 120s to get the POD scheduled..." in caplog.text @pytest.mark.asyncio @pytest.mark.parametrize( diff --git a/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py b/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py index 9ce383c53c901..cba7cb58993b4 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py @@ -153,5 +153,8 @@ async def test_run_loop_is_still_running(self, mock_hook, trigger, caplog): await asyncio.sleep(0.5) assert not task.done() - assert f"Current job status is: {TransferState.RUNNING}" - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert ( + f"Current state is {TransferState.RUNNING}" in caplog.text + or "Current state is TransferState.RUNNING" in caplog.text + ) + assert f"Waiting for {POLL_INTERVAL} seconds" in caplog.text diff --git a/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py b/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py index 1312a1dfb2aca..bf8cfd05d6041 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py @@ -229,8 +229,8 @@ async def test_run_loop_is_still_running(self, mock_job_status, template_job_sta await asyncio.sleep(0.5) assert not task.done() - assert f"Current job status is: {JobState.JOB_STATE_RUNNING}" - assert f"Sleeping for {POLL_SLEEP} seconds." + assert "Current job status is: JOB_STATE_RUNNING" in caplog.text + assert f"Sleeping for {POLL_SLEEP} seconds." in caplog.text # cancel the task to suppress test warnings task.cancel() diff --git a/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py b/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py index 27ee12bb43629..084d1175f8d49 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py @@ -132,4 +132,4 @@ async def test_async_dataplex_job_run_loop_is_still_running( await asyncio.sleep(0.5) assert not task.done() - assert f"Current state is: {DataScanJob.State.RUNNING}, sleeping for {TEST_POLL_INTERVAL} seconds." + assert f"Current state is: RUNNING, sleeping for {TEST_POLL_INTERVAL} seconds." in caplog.text diff --git a/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py b/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py index 3704c5c320624..05e194a3f9735 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py @@ -200,14 +200,14 @@ async def test_run_loop_return_waiting_event_should_execute_successfully( mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.WAITING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start") @@ -219,14 +219,14 @@ async def test_run_loop_return_running_event_should_execute_successfully( mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.RUNNING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start") @@ -265,7 +265,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( generator = trigger.run() await generator.asend(None) - assert "Container logs:" + assert "Waiting until 120s to get the POD scheduled..." in caplog.text @pytest.mark.parametrize( "container_state, expected_state", @@ -447,8 +447,8 @@ async def test_run_loop_return_waiting_event_pending_status( await asyncio.sleep(0.5) assert not task.done() - assert "Operation is still running." - assert f"Sleeping for {POLL_INTERVAL}s..." + assert "Operation is still running." in caplog.text + assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}._get_hook") @@ -470,8 +470,8 @@ async def test_run_loop_return_waiting_event_running_status( await asyncio.sleep(0.5) assert not task.done() - assert "Operation is still running." - assert f"Sleeping for {POLL_INTERVAL}s..." + assert "Operation is still running." in caplog.text + assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text class TestGKEStartJobTrigger: diff --git a/pyproject.toml b/pyproject.toml index 8a134826de0b8..f6417fa4c3291 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -591,8 +591,11 @@ extend-select = [ "E", # pycodestyle rules "W", # pycodestyle rules # Warning (PLW) re-implemented in ruff from Pylint - "PLW0127", # Self-assignment of variable "PLW0120", # else clause on loop without a break statement; remove the else and dedent its contents + "PLW0127", # Self-assignment of variable + "PLW0128", # Redeclared variable {name} in assignment + "PLW0129", # Asserting on an empty string literal will never pass + "PLW0133", # Missing raise statement on exception # Per rule enables "RUF006", # Checks for asyncio dangling task "RUF015", # Checks for unnecessary iterable allocation for first element @@ -625,7 +628,6 @@ extend-select = [ "RET506", # Unnecessary {branch} after raise statement "RET507", # Unnecessary {branch} after continue statement "RET508", # Unnecessary {branch} after break statement - "PLW0133", # Missing raise statement on exception ] ignore = [ "D100", # Unwanted; Docstring at the top of every file.