Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupted SCP stucks during "Closing channel" #628

Open
omaxx opened this issue Jan 26, 2024 · 22 comments
Open

Interrupted SCP stucks during "Closing channel" #628

omaxx opened this issue Jan 26, 2024 · 22 comments

Comments

@omaxx
Copy link

omaxx commented Jan 26, 2024

I use asyncssh to download files from remote servers/network devices and would like to have an option to safely interrupt process.

There is my code:

async def download(host: str):
    path = Path(f"./download/{host}")
    path.mkdir(parents=True, exist_ok=True)
    try:
        async with asyncssh.connect(host) as conn:
            print(f"{host}: connected")
            await asyncssh.scp(
                (conn, HUGE_FILE), path,
            )
    except Exception as exc:
        print(f"{host}: {exc.__class__}: {exc}")
    except asyncio.exceptions.CancelledError as exc:
        print(f"{host}: {exc.__class__}: {exc}")
        raise
    finally:
        print(f"{host}: done")

async def main():
    await asyncio.gather(
        download(HOST),
    )

try:
    asyncio.run(main())
except KeyboardInterrupt:
    print(f"Interrupted!!!")

It works fine when I download file from linux server:

[conn=0, chan=0] Received 16384 SCP data bytes
[conn=0, chan=0] Received 16384 SCP data bytes   <=== ctrl-C is pressed here
^C[conn=0, chan=0] Stopping remote SCP 
[conn=0, chan=0] Closing channel
[conn=0, chan=0] Received exit signal PIPE
[conn=0, chan=0]   Core dumped: False
[conn=0, chan=0]   Message:
[conn=0, chan=0] Received channel close
[conn=0, chan=0] Channel closed
[conn=0] Closing connection
[conn=0] Sending disconnect: Disconnected by application (11)
[conn=0] Connection closed
172.31.0.8: <class 'asyncio.exceptions.CancelledError'>:
172.31.0.8: done
Interrupted!!!

but when I download file from juniper router it stuck during channel closing:

[conn=0, chan=0] Received 16384 SCP data bytes
[conn=0, chan=0] Received 16384 SCP data bytes   <=== ctrl-C is pressed here
^C[conn=0, chan=0] Stopping remote SCP
[conn=0, chan=0] Closing channel                 <=== stuck here and need to press ctrl-C again
^C[conn=0] Closing connection
[conn=0, chan=0] Closing channel
[conn=0] Sending disconnect: Disconnected by application (11)
[conn=0] Connection closed
[conn=0, chan=0] Closing channel due to connection close
[conn=0, chan=0] Channel closed
172.31.0.48: <class 'asyncio.exceptions.CancelledError'>:
172.31.0.48: done
Interrupted!!!

As I understand remote device doesn't close channel correctly, but is any option to force it and don't wait another side?

@omaxx omaxx changed the title Interrupted SCP stuck during "Closing channel" Interrupted SCP stucks during "Closing channel" Jan 26, 2024
@ronf
Copy link
Owner

ronf commented Jan 27, 2024

I must admit I haven't really looked much at how asyncio deals with KeyboardInterrupt. I definitely have seen the behavior in test programs where sometimes you have hit Ctrl-C multiple times for it to actually fully exit the program, but I haven't looked closely at why that happens.

I wasn't able to reproduce your failure here, but you might try something like:

async def download(host: str):
    path = Path(f"./download/{host}")
    path.mkdir(parents=True, exist_ok=True)
    try:
        async with asyncssh.connect(host, client_keys='~/.ssh/id_rsa') as conn:
            print(f"{host}: connected")
            try:
                await asyncssh.scp(
                    (conn, HUGE_FILE), path,
                )
            except asyncio.exceptions.CancelledError as exc:
                conn.abort()
    finally:
        print(f"{host}: done")

This adds a check for CancelledError around just the SCP call, and explicitly aborts the SSH connection, which should immediately close that connection without waiting for any in-progress operations to complete. Perhaps this will work better than letting a regular close happen when exiting the "async with" block.

@omaxx
Copy link
Author

omaxx commented Jan 27, 2024

It doesn't help. It stucks somewhere inside asyncssh.scp function and don't throw Exception outside.

[conn=0, chan=0] Received 16384 SCP data bytes
[conn=0, chan=0] Received 16384 SCP data bytes   <=== first time ctrl-C was pressed
^C[conn=0, chan=0] Stopping remote SCP
[conn=0, chan=0] Closing channel  <=== second ctrl-C
^C[conn=0] Aborting connection    <=== and only now Exception was caught and conn.abort() was called
[conn=0] Closing connection
[conn=0, chan=0] Closing channel
[conn=0] Sending disconnect: Disconnected by application (11)
[conn=0] Connection closed
[conn=0, chan=0] Closing channel due to connection close
[conn=0, chan=0] Channel closed
172.31.0.48: done
Interrupted!!!

@ronf
Copy link
Owner

ronf commented Jan 27, 2024

I sort of expected this to be the case.

When you hit Ctrl-C for the second time, do you get any kind of traceback?

There isn't any handling of CancelledError in the AsyncSSH scp code right now, so whatever is catching that exception and blocking is probably in asyncio or some other system library.

Doing a quick search, I found a number of posts about various issues with KeyboardInterrupt and asyncio. For instance, see python/cpython#93122.

One suggestion I found was to use signal handlers -- check out https://stackoverflow.com/questions/54383346/close-asyncio-loop-on-keyboardinterrupt-run-stop-routine. You might want to see if that works better than trying to do this with "try" blocks. In this case, you'd be handling the signal handler callback call loop.stop().

@omaxx
Copy link
Author

omaxx commented Jan 27, 2024

Might be this gives you a clue: everything works fine if there is one more task running in parallel with scp process.
So I found that workaround:

async def nothing():
    while True:
        await asyncio.sleep(3600)

async def download(host: str):
    try:
        async with asyncssh.connect(host) as conn:
            print(f"{host}: connected")
            task = asyncio.get_running_loop().create_task(nothing())
            async def download():
                await asyncssh.scp(
                    (conn, HUGE_FILE), path,
                )
                task.cancel()
            await asyncio.gather(task, download())

    except Exception as exc:
        print(f"{host}: {exc}")
    except asyncio.exceptions.CancelledError as exc:
        print(f"{host}: {exc.__class__}: {exc}")
        raise
    finally:
        print(f"{host}: done")

In this case I got:

[conn=0, chan=0] Received 16384 SCP data bytes
[conn=0, chan=0] Received 16384 SCP data bytes
^C[conn=0, chan=0] Stopping remote SCP
[conn=0, chan=0] Closing channel
[conn=0] Closing connection
[conn=0, chan=0] Closing channel
[conn=0] Sending disconnect: Disconnected by application (11)
[conn=0] Connection closed
[conn=0, chan=0] Closing channel due to connection close
[conn=0, chan=0] Channel closed
172.31.0.48: <class 'asyncio.exceptions.CancelledError'>:
172.31.0.48: done
Interrupted!!!

@omaxx
Copy link
Author

omaxx commented Jan 27, 2024

I think problem is here:

class _SCPSink(_SCPHandler):
    async def run(self, dstpath: _SCPPath) -> None:
        try:
<...>
            else:
# if asyncio runner catches KeyboardInterrupted, then it will cancel all tasks, so CancelledError will be raised here
                await self._recv_files(b'', dstpath)
        except (OSError, SFTPError, ValueError) as exc:
            self.handle_error(exc)
        finally:
# This safety closing channel will be called anyway, but if remote side is not aware of cancelling connection, it could stuck
            await self.close()

Might be it should looks like:

            else:
                await self._recv_files(b'', dstpath)
                await self.close()
        except (OSError, SFTPError, ValueError) as exc:
            self.handle_error(exc)
            await self.close()
        except asyncio.exceptions.CancelledError:
            self._writer.channel.abort()

@omaxx
Copy link
Author

omaxx commented Jan 27, 2024

Updated workaround:

async def download(host: str):
    path = Path(f"./download/{host}")
    path.mkdir(parents=True, exist_ok=True)
    try:
        async with asyncssh.connect(host) as conn:
            print(f"{host}: connected")
            done = asyncio.Event()
            async def download():
                try:
                    await asyncssh.scp((conn, HUGE_FILE), path)
                finally:
                    done.set()
            await asyncio.gather(download(), done.wait())

    except Exception as exc:
        print(f"{host}: {exc}")
    except asyncio.exceptions.CancelledError as exc:
        print(f"{host}: {exc.__class__}: {exc}")
        raise
    finally:
        print(f"{host}: done")

@ronf
Copy link
Owner

ronf commented Jan 27, 2024

Do you actually need the "done" variable in the latest workaround? It looks like that version has removed the self.close() call when exiting the try block, so I would expect that to be enough to avoid the hang. However, it also doesn't clean up the channel it opened. In your code, this doesn't really matter, as exiting the "async with" block will close the entire connection, including the channel opened for SCP. However, that may not be the case for other code using SCP.

I like the idea of switching to doing an abort on the channel when handling CancelledError. That will flush any outstanding data in flight and force a close on the sending side. I'm thinking the best way to do this is to modify close() on _SCPHandler to add a "cancelled" argument which can be set when getting CancelledError, and then changing _SCPHandler.close() to do the abort call instead of the close() or exit() it does now. This would need to go in three places (affecting four calls to close()). I'm thinking it would look something like:

diff --git a/asyncssh/scp.py b/asyncssh/scp.py
index 85b5f22..40b1543 100644
--- a/asyncssh/scp.py
+++ b/asyncssh/scp.py
@@ -388,12 +388,14 @@ class _SCPHandler:
         elif self._error_handler:
             self._error_handler(exc)

-    async def close(self) -> None:
+    async def close(self, cancelled: bool = False) -> None:
         """Close an SCP session"""

         self.logger.info('Stopping remote SCP')

-        if self._server:
+        if cancelled:
+            self._writer.channel.abort()
+        elif self._server:
             cast('SSHServerChannel', self._writer.channel).exit(0)
         else:
             self._writer.close()
@@ -535,6 +537,8 @@ class _SCPSource(_SCPHandler):
     async def run(self, srcpath: _SCPPath) -> None:
         """Start SCP transfer"""

+        cancelled = False
+
         try:
             if isinstance(srcpath, PurePath):
                 srcpath = str(srcpath)
@@ -550,10 +554,12 @@ class _SCPSource(_SCPHandler):
             for name in await SFTPGlob(self._fs).match(srcpath):
                 await self._send_files(cast(bytes, name.filename),
                                             b'', name.attrs)
+        except asyncio.CancelledError:
+            cancelled = True
         except (OSError, SFTPError) as exc:
             self.handle_error(exc)
         finally:
-            await self.close()
+            await self.close(cancelled)


 class _SCPSink(_SCPHandler):
@@ -699,6 +705,8 @@ class _SCPSink(_SCPHandler):
     async def run(self, dstpath: _SCPPath) -> None:
         """Start SCP file receive"""

+        cancelled = False
+
         try:
             if isinstance(dstpath, PurePath):
                 dstpath = str(dstpath)
@@ -711,10 +719,12 @@ class _SCPSink(_SCPHandler):
                                              dstpath))
             else:
                 await self._recv_files(b'', dstpath)
+        except asyncio.CancelledError as exc:
+            cancelled = True
         except (OSError, SFTPError, ValueError) as exc:
             self.handle_error(exc)
         finally:
-            await self.close()
+            await self.close(cancelled)


 class _SCPCopier:
@@ -870,13 +880,17 @@ class _SCPCopier:
     async def run(self) -> None:
         """Start SCP remote-to-remote transfer"""

+        cancelled = False
+
         try:
             await self._copy_files()
+        except asyncio.CancelledError:
+            cancelled = True
         except (OSError, SFTPError) as exc:
             self._handle_error(exc)
         finally:
-            await self._source.close()
-            await self._sink.close()
+            await self._source.close(cancelled)
+            await self._sink.close(cancelled)


 async def scp(srcpaths: Union[_SCPConnPath, Sequence[_SCPConnPath]],

@omaxx
Copy link
Author

omaxx commented Jan 27, 2024

The only one purpose of done.wait() hack is to have a function which could catch CancelledError exception and propagate it outside to async with asyncssh.connect(host). Exiting connect() context will close channel and unstack asyncssh.scp().
If asyncssh.scp finished successful it will set done, so done.wait() will exit.
But this is only workaround to use in client code.

I like your solution. This should work, but I have a question: is it necessary to call await self._writer.channel.wait_closed() after self._writer.channel.abort() in _SCPHandler.close()?

@ronf
Copy link
Owner

ronf commented Feb 9, 2024

Sorry for the slow response - I missed your last edit here.

Even in the abort() case, there could be some activities related to cleaning up the channel that aren't completed by the synchronous call to abort(). The same goes for the exit(0) and close() cases. So, having all of these block on wait_closed() makes sense here. It shouldn't wait for long, as abort() discards the receive buffer and forcibly closes the sending side, so the channel will quickly end up in the closed state.

I'll go ahead and check this in shortly. If you have any problems with it, let me know.

@ronf
Copy link
Owner

ronf commented Feb 9, 2024

Ok - this should now be available as commit 676534b in the "develop" branch.

@omaxx
Copy link
Author

omaxx commented Feb 23, 2024

Unfortunately it didn't help:

[conn=0, chan=0] Received 8192 SCP data bytes
[conn=0, chan=0] Received 8192 SCP data bytes
[conn=0, chan=0] Received 8192 SCP data bytes   <=== first time ctrl-C was pressed
^C[conn=0, chan=0] Stopping remote SCP
[conn=0, chan=0] Aborting channel               <=== stuck here and need to press ctrl-C again
^C[conn=0] Closing connection
[conn=0, chan=0] Closing channel
[conn=0] Sending disconnect: Disconnected by application (11)
[conn=0] Connection closed
[conn=0, chan=0] Closing channel due to connection close
[conn=0, chan=0] Channel closed

@omaxx
Copy link
Author

omaxx commented Feb 23, 2024

I see that process stuck here:

    async def wait_closed(self) -> None:
        await self._close_event.wait()

Closing process wait until _close_event will be set, but only one place where it could be set is here:

    def _cleanup(self, exc: Optional[Exception] = None) -> None:
<...>
        self._close_event.set()

But I don't find place where SSHClientChannel.abort() call _cleanup function.

@ronf
Copy link
Owner

ronf commented Feb 24, 2024

The _cleanup() function is scheduled to run by _discard_recv(), which is called from abort() when the receive state is not yet 'closed'. Since _close_send() is called before that, the sending state should definitely already be 'closed' by then.

Perhaps the issue is that the channel send state is already 'close_pending', but not yet done sending data. In that case, the normal close() call would wait for that data to go out before actually closing the channel, but in the case of abort() it probably should close sending even in the 'close_pending' state.

Does it help if you remove the following line in abort()?

        if self._send_state not in {'close_pending', 'closed'}:

...and just do the _close_send() unconditionally? You might also try making the line:

        if self._send_state != 'closed':

@omaxx
Copy link
Author

omaxx commented Feb 24, 2024

self._recv_state is in 'open' during abort() call, so next condition will not work

    def _discard_recv(self) -> None:
<...>
        if self._recv_state == 'close_pending':
            self._recv_state = 'closed'
            self._loop.call_soon(self._cleanup)

@ronf
Copy link
Owner

ronf commented Feb 25, 2024

Thanks - I'll need to look a little more closely as what happens in this case with a regular close() to get from the "open" state to "closed". The only difference with abort() should be that any outstanding data (for both send and receive) should be discarded, allowing the channel to be closed right away without waiting for any queued-up data to be delivered.

The call to _close_send() looks like it should do the right thing to discard data queued for sending and immediately send a MSG_CHANNEL_CLOSE and move to the "closed" send state if the channel wasn't already "closed". I think the close() code is counting on this MSG_CHANNEL_CLOSE to trigger the other side to send back a MSG_CHANNEL_CLOSE, and that's what would trigger the move from "open" to "close_pending", which would move to "closed" once any queued receive data was delivered. Is that not happening in this case?

The other code path to trigger this would be through process_connection_close(), if the whole SSH connection went away.

@ronf
Copy link
Owner

ronf commented Mar 2, 2024

Looking more carefully at the logs here, the issue seems to be that the remote system doesn't send back a MSG_CHANNEL_CLOSE after we send one. So, the recv_state stays as 'open' and cleanup is not performed. In the case where this is working with a Linux server, you can see the log "Received channel close" show up. However, nothing like that shows up in the failing case.

Perhaps the thing to do here is to not do the wait_closed() on the channel in the "cancelled" case. If we end up closing the connection completely, things will get cleaned up then, if not before. If you can, try changing the end of the close() method in SCPHandler from:

        await self._writer.channel.wait_closed()

to:

        if not cancelled:
            await self._writer.channel.wait_closed()

@ronf
Copy link
Owner

ronf commented Jul 2, 2024

I finally got a chance to look at this again today, and I think a small change in the handling of the recv state when the other end doesn't send a MSG_CHANNEL_CLOSE could help there:

index 0323034..f99f493 100644
--- a/asyncssh/channel.py
+++ b/asyncssh/channel.py
@@ -229,7 +229,7 @@ class SSHChannel(Generic[AnyStr], SSHPacketHandler):

         self._close_event.set()

-        if self._conn: # pragma: no branch
+        if self._conn and self._recv_state == 'closed': # pragma: no branch
             self.logger.info('Channel closed%s',
                              ': ' + str(exc) if exc else '')

@@ -263,7 +263,8 @@ class SSHChannel(Generic[AnyStr], SSHPacketHandler):
         # If recv is close_pending, we know send is already closed
         if self._recv_state == 'close_pending':
             self._recv_state = 'closed'
-            self._loop.call_soon(self._cleanup)
+
+        self._loop.call_soon(self._cleanup)

     async def _start_reading(self) -> None:
         """Start processing data on a new connection"""

Basically, this calls _cleanup() when the sender closes the channel, even when the receive direction is still open. This wakes up any blocked tasks. However, until a MSG_CHANNEL_CLOSE is received or the entire SSH connection is torn down, AsyncSSH will keep around the channel object to handle future messages referencing that channel number. Any buffered data is discarded, though, and no new data will be buffered. This avoids potentially sending ProtocolError responses back for data in flight, which could cause the whole connection to be torn down prematurely.

@ronf
Copy link
Owner

ronf commented Jul 2, 2024

The fix about is now commit 220b9d4 in the "develop" branch. I'm still having some trouble reproducing the original problem, but I did confirm that a single Ctrl-C was enough to trigger KeyboardInterrupt and exit the SCP client in the middle of a transfer when I forced the server side to not issue a close. Without this fix, two Ctrl-Cs were necessary to fully stop the SCP.

@ronf
Copy link
Owner

ronf commented Jul 3, 2024

This fix is now available in AsyncSSH 2.15.0. Let me know if you still see any problems around this.

@omaxx
Copy link
Author

omaxx commented Jul 4, 2024

It works now:

[conn=0, chan=0] Received 16384 SCP data bytes
[conn=0, chan=0] Received 16384 SCP data bytes
[conn=0, chan=0] Received 16384 SCP data bytes
^C[conn=0, chan=0] Stopping remote SCP
[conn=0, chan=0] Aborting channel
[conn=0] Closing connection
[conn=0, chan=0] Closing channel
[conn=0] Sending disconnect: Disconnected by application (11)
[conn=0] Connection closed
[conn=0, chan=0] Closing channel due to connection close
192.168.1.1: done
Interrupted!!!

@ronf
Copy link
Owner

ronf commented Jul 4, 2024

Terrific - thanks for confirming, and for all your help getting to the root of the issue!

@ronf ronf closed this as completed Jul 4, 2024
@ronf
Copy link
Owner

ronf commented Sep 19, 2024

The change I made here has ended up causing problems in other cases. I thought I had a fix for that, but even with the fix I was still seeing instability, so I finally decided to back out both changes.

Commit cb87de9 is the "develop" branch is a new attempt to fix this issue. It's working well for me, even when I change the server side to never send a channel close. I also confirmed that after cancelling a transfer in progress, the SSH connection remains open and I'm able to create additional sessions on it. I also tested having multiple transfers in progress simultaneously and all of them were properly cancelled when hitting Ctrl-C, after which the client exited with the "Interrupted" message.

@omaxx, if you get time could you give this version a try and see if it still solves the problem you were seeing?

@ronf ronf reopened this Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants