diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index adc87e802fa6f..d96cf7faacd8a 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -713,11 +713,11 @@ def send(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None: async def _aread_frame(self): len_bytes = await self._async_reader.readexactly(4) - len = int.from_bytes(len_bytes, byteorder="big") - if len >= 2**32: - raise OverflowError(f"Refusing to receive messages larger than 4GiB {len=}") + length = int.from_bytes(len_bytes, byteorder="big") + if length >= 2**32: + raise OverflowError(f"Refusing to receive messages larger than 4GiB {length=}") - buffer = await self._async_reader.readexactly(len) + buffer = await self._async_reader.readexactly(length) return self.resp_decoder.decode(buffer) async def _aget_response(self, expect_id: int) -> ToTriggerRunner | None: diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py b/task-sdk/src/airflow/sdk/execution_time/comms.py index 0589f12f866e3..97ed1761ad8eb 100644 --- a/task-sdk/src/airflow/sdk/execution_time/comms.py +++ b/task-sdk/src/airflow/sdk/execution_time/comms.py @@ -183,9 +183,9 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]): def send(self, msg: SendMsgType) -> ReceiveMsgType | None: """Send a request to the parent and block until the response is received.""" frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump()) - bytes = frame.as_bytes() + frame_bytes = frame.as_bytes() - self.socket.sendall(bytes) + self.socket.sendall(frame_bytes) if isinstance(msg, ResendLoggingFD): if recv_fds is None: return None @@ -225,13 +225,13 @@ def _read_frame(self, maxfds: int | None = None) -> tuple[_ResponseFrame, list[i if len_bytes == b"": raise EOFError("Request socket closed before length") - len = int.from_bytes(len_bytes, byteorder="big") + length = int.from_bytes(len_bytes, byteorder="big") - buffer = bytearray(len) + buffer = bytearray(length) nread = self.socket.recv_into(buffer) - if nread != len: + if nread != length: raise RuntimeError( - f"unable to read full response in child. (We read {nread}, but expected {len})" + f"unable to read full response in child. (We read {nread}, but expected {length})" ) if nread == 0: raise EOFError(f"Request socket closed before response was complete ({self.id_counter=})")