From 5ab40f20b87419ace9aa780e51184338888d0358 Mon Sep 17 00:00:00 2001 From: Abhinav Singh <126065+abhinavsingh@users.noreply.github.com> Date: Sat, 13 Apr 2024 14:47:35 +0530 Subject: [PATCH] Wait until buffer flush (#1385) * Wait until all data in buffer is flushed to client when upstream server finishes. (cherry picked from commit d7765067b0c7d4a8b0bf5548bcd3b9a77b73d0b1) * Wait until buffer flush * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Avoid shadowing * _teared not _teardown * Refactor logic * Do not try `read_from_descriptors` if reads have previously teared down * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: yk <1876421041@qq.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- helper/monitor_open_files.sh | 21 +++++++++++++-------- proxy/core/connection/connection.py | 6 +++++- proxy/http/handler.py | 29 +++++++++++++++++------------ proxy/http/proxy/server.py | 12 ++++++++---- 4 files changed, 43 insertions(+), 25 deletions(-) diff --git a/helper/monitor_open_files.sh b/helper/monitor_open_files.sh index f353e0db15..d4214e8bb6 100755 --- a/helper/monitor_open_files.sh +++ b/helper/monitor_open_files.sh @@ -20,15 +20,20 @@ if [[ -z "$PROXY_PY_PID" ]]; then exit 1 fi -OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l) -echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN" +while true; +do + OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l) + echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN" -pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do - OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l) - echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR" + pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do + OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l) + echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR" - pgrep -P "$acceptorPid" | while read -r childPid; do - OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l) - echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC" + pgrep -P "$acceptorPid" | while read -r childPid; do + OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l) + echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC" + done done + + sleep 1 done diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index d0bebe26db..63cb62e316 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -87,7 +87,11 @@ def flush(self, max_send_size: Optional[int] = None) -> int: # TODO: Assemble multiple packets if total # size remains below max send size. max_send_size = max_send_size or DEFAULT_MAX_SEND_SIZE - sent: int = self.send(mv[:max_send_size]) + try: + sent: int = self.send(mv[:max_send_size]) + except BlockingIOError: + logger.warning('BlockingIOError when trying send to {0}'.format(self.tag)) + return 0 if sent == len(mv): self.buffer.pop(0) self._num_buffer -= 1 diff --git a/proxy/http/handler.py b/proxy/http/handler.py index b8d207d561..581e911a33 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -49,6 +49,8 @@ def __init__(self, *args: Any, **kwargs: Any): if not self.flags.threadless: self.selector = selectors.DefaultSelector() self.plugin: Optional[HttpProtocolHandlerPlugin] = None + self.writes_teared: bool = False + self.reads_teared: bool = False ## # initialize, is_inactive, shutdown, get_events, handle_events @@ -137,23 +139,26 @@ async def handle_events( ) -> bool: """Returns True if proxy must tear down.""" # Flush buffer for ready to write sockets - teardown = await self.handle_writables(writables) - if teardown: + self.writes_teared = await self.handle_writables(writables) + if self.writes_teared: return True # Invoke plugin.write_to_descriptors if self.plugin: - teardown = await self.plugin.write_to_descriptors(writables) - if teardown: + self.writes_teared = await self.plugin.write_to_descriptors(writables) + if self.writes_teared: return True - # Read from ready to read sockets - teardown = await self.handle_readables(readables) - if teardown: + # Read from ready to read sockets if reads have not already teared down + if not self.reads_teared: + self.reads_teared = await self.handle_readables(readables) + if not self.reads_teared: + # Invoke plugin.read_from_descriptors + if self.plugin: + self.reads_teared = await self.plugin.read_from_descriptors( + readables, + ) + # Wait until client buffer has flushed when reads has teared down but we can still write + if self.reads_teared and not self.work.has_buffer(): return True - # Invoke plugin.read_from_descriptors - if self.plugin: - teardown = await self.plugin.read_from_descriptors(readables) - if teardown: - return True return False def handle_data(self, data: memoryview) -> Optional[bool]: diff --git a/proxy/http/proxy/server.py b/proxy/http/proxy/server.py index f18f45fc55..70d3369ec4 100644 --- a/proxy/http/proxy/server.py +++ b/proxy/http/proxy/server.py @@ -175,7 +175,11 @@ async def get_descriptors(self) -> Descriptors: return r, w async def write_to_descriptors(self, w: Writables) -> bool: - if (self.upstream and self.upstream.connection.fileno() not in w) or not self.upstream: + if ( + self.upstream + and not self.upstream.closed + and self.upstream.connection.fileno() not in w + ) or not self.upstream: # Currently, we just call write/read block of each plugins. It is # plugins responsibility to ignore this callback, if passed descriptors # doesn't contain the descriptor they registered. @@ -208,9 +212,9 @@ async def write_to_descriptors(self, w: Writables) -> bool: async def read_from_descriptors(self, r: Readables) -> bool: if ( - self.upstream and not - self.upstream.closed and - self.upstream.connection.fileno() not in r + self.upstream + and not self.upstream.closed + and self.upstream.connection.fileno() not in r ) or not self.upstream: # Currently, we just call write/read block of each plugins. It is # plugins responsibility to ignore this callback, if passed descriptors