diff --git a/providers/docker/src/airflow/providers/docker/operators/docker.py b/providers/docker/src/airflow/providers/docker/operators/docker.py index 2efbdd2690cdd..f2d67a6189eda 100644 --- a/providers/docker/src/airflow/providers/docker/operators/docker.py +++ b/providers/docker/src/airflow/providers/docker/operators/docker.py @@ -58,12 +58,24 @@ def stringify(line: str | bytes): def fetch_logs(log_stream, log: Logger): - log_lines = [] + log_lines: list[str] = [] + buffer = "" + for log_chunk_raw in log_stream: - log_chunk = stringify(log_chunk_raw).rstrip() - log_lines.append(log_chunk) - for log_chunk_line in log_chunk.split("\n"): - log.info("%s", log_chunk_line) + buffer += stringify(log_chunk_raw) + lines = buffer.split("\n") + buffer = lines.pop() # Keep incomplete line for next iteration + + for line in lines: + stripped_line = line.rstrip() + log.info("%s", stripped_line) + log_lines.append(stripped_line) + + if buffer: + buffer = buffer.rstrip() + log.info("%s", buffer) + log_lines.append(buffer) + return log_lines diff --git a/providers/docker/tests/unit/docker/operators/test_docker.py b/providers/docker/tests/unit/docker/operators/test_docker.py index 668af6c7125f2..fef721558facb 100644 --- a/providers/docker/tests/unit/docker/operators/test_docker.py +++ b/providers/docker/tests/unit/docker/operators/test_docker.py @@ -149,7 +149,7 @@ def setup_patchers(self, docker_api_client_patcher): self.client_mock.pull.return_value = {"status": "pull log"} self.client_mock.wait.return_value = {"StatusCode": 0} self.client_mock.create_host_config.return_value = mock.Mock() - self.log_messages = ["container log 😁 ", b"byte string container log"] + self.log_messages = ["container log 😁 \n", b"byte string container log\n"] self.client_mock.attach.return_value = self.log_messages # If logs() is called with tail then only return the last value, otherwise return the whole log. @@ -566,7 +566,7 @@ def test_skip(self, kwargs, actual_exit_code, expected_exc): def test_execute_container_fails(self): failed_msg = {"StatusCode": 1} - log_line = ["unicode container log 😁 ", b"byte string container log"] + log_line = ["unicode container log 😁 \n", b"byte string container log\n"] expected_message = "Docker container failed: {failed_msg}" self.client_mock.attach.return_value = log_line self.client_mock.wait.return_value = failed_msg @@ -579,7 +579,7 @@ def test_execute_container_fails(self): assert str(raised_exception.value) == expected_message.format( failed_msg=failed_msg, ) - assert raised_exception.value.logs == [log_line[0].strip(), log_line[1].decode("utf-8")] + assert raised_exception.value.logs == [log_line[0].rstrip(), log_line[1].decode("utf-8").rstrip()] def test_auto_remove_container_fails(self): self.client_mock.wait.return_value = {"StatusCode": 1} @@ -622,9 +622,9 @@ def test_execute_xcom_behavior(self): assert no_xcom_push_result is None def test_execute_xcom_behavior_bytes(self): - self.log_messages = [b"container log 1 ", b"container log 2"] + self.log_messages = [b"container log 1 \n", b"container log 2\n"] self.client_mock.pull.return_value = [b'{"status":"pull log"}'] - self.client_mock.attach.return_value = iter([b"container log 1 ", b"container log 2"]) + self.client_mock.attach.return_value = iter([b"container log 1 \n", b"container log 2\n"]) # Make sure the logs side effect is updated after the change self.client_mock.attach.side_effect = ( lambda **kwargs: iter(self.log_messages[-kwargs["tail"] :]) @@ -762,8 +762,8 @@ def test_docker_host_env_unset(self, monkeypatch): [ pytest.param( [ - "return self.main(*args, **kwargs)", - " ^^^^^^^^^^^^^^^^", + "return self.main(*args, **kwargs)\n", + " ^^^^^^^^^^^^^^^^\n", ], [ "return self.main(*args, **kwargs)", @@ -773,13 +773,25 @@ def test_docker_host_env_unset(self, monkeypatch): ), pytest.param( [ - " ^^^^^^^^^^^^^^^^ ", + " ^^^^^^^^^^^^^^^^ \n", ], [ " ^^^^^^^^^^^^^^^^", ], id="should-remove-trailing-spaces", ), + # Test case for issue #58911: Docker stream may chunks lines mid-way + pytest.param( + [ + b" rv = self.invoke(ctx)\n ^^^", + b"^^^^^^^^^^^^^\n", + ], + [ + " rv = self.invoke(ctx)", + " ^^^^^^^^^^^^^^^^", + ], + id="should-handle-mid-line-chunk-splits", + ), ], ) @mock.patch("logging.Logger") diff --git a/providers/docker/tests/unit/docker/test_exceptions.py b/providers/docker/tests/unit/docker/test_exceptions.py index 4362ead42f78c..e304fa5159f73 100644 --- a/providers/docker/tests/unit/docker/test_exceptions.py +++ b/providers/docker/tests/unit/docker/test_exceptions.py @@ -31,7 +31,7 @@ from airflow.providers.docker.operators.docker import DockerOperator FAILED_MESSAGE = {"StatusCode": 1} -FAILED_LOGS = ["unicode container log 😁 ", b"byte string container log"] +FAILED_LOGS = ["unicode container log 😁 \n", b"byte string container log\n"] EXPECTED_MESSAGE = f"Docker container failed: {FAILED_MESSAGE}" FAILED_SKIP_MESSAGE = {"StatusCode": 2} SKIP_ON_EXIT_CODE = 2 @@ -77,4 +77,4 @@ def test_docker_failed_exception(self, failed_msg, log_line, expected_message, s operator.execute(None) assert str(raised_exception.value) == expected_message - assert raised_exception.value.logs == [log_line[0].strip(), log_line[1].decode("utf-8")] + assert raised_exception.value.logs == [log_line[0].rstrip(), log_line[1].decode("utf-8").rstrip()]