Skip to content

Commit

Permalink
simplify and wait for all IO threads
Browse files Browse the repository at this point in the history
  • Loading branch information
totaam committed Jul 12, 2023
1 parent fc2767f commit 209dace
Showing 1 changed file with 56 additions and 62 deletions.
118 changes: 56 additions & 62 deletions xpra/net/protocol/socket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from enum import Enum
from time import monotonic
from socket import error as socket_error
from threading import Lock, RLock, Event, Thread
from threading import Lock, RLock, Event, Thread, current_thread
from queue import Queue
from typing import Dict, List, Tuple, Any, ByteString, Callable, Optional, Iterable

Expand Down Expand Up @@ -62,6 +62,10 @@
SEND_INVALID_PACKET_DATA = strtobytes(os.environ.get("XPRA_SEND_INVALID_PACKET_DATA", b"ZZinvalid-packetZZ"))


def noop(): # pragma: no cover
pass


def exit_queue() -> Queue:
queue = Queue()
for _ in range(10): #just 2 should be enough!
Expand Down Expand Up @@ -183,14 +187,15 @@ def is_sending_encrypted(self) -> bool:
return bool(self.cipher_out) or self._conn.socktype in ("ssl", "wss", "ssh")

def wait_for_io_threads_exit(self, timeout=None) -> bool:
io_threads = (self._read_thread, self._write_thread)
io_threads = (self._read_thread, self._write_thread, self._read_parser_thread, self._read_parser_thread)
current = current_thread()
for t in io_threads:
if t and t.is_alive():
if t and t!=current and t.is_alive():
t.join(timeout)
exited = True
cinfo = self._conn or "cleared connection"
for t in io_threads:
if t and t.is_alive():
if t and t!=current and t.is_alive():
log.warn("Warning: %s thread of %s is still alive (timeout=%s)", t.name, cinfo, timeout)
exited = False
return exited
Expand Down Expand Up @@ -305,7 +310,7 @@ def start_network_read_thread():
self.timeout_add(SEND_INVALID_PACKET*1000, self.raw_write, SEND_INVALID_PACKET_DATA)


def send_disconnect(self, reasons, done_callback=None) -> None:
def send_disconnect(self, reasons, done_callback=noop) -> None:
packet = ["disconnect"]+[str(x) for x in reasons]
self.flush_then_close(self.encode, packet, done_callback=done_callback)

Expand Down Expand Up @@ -1108,7 +1113,7 @@ def debug_str(s):

def flush_then_close(self, encoder:Optional[Callable]=None,
last_packet=None,
done_callback:Optional[Callable]=None) -> None: #pylint: disable=method-hidden
done_callback:Callable=noop) -> None: #pylint: disable=method-hidden
""" Note: this is best-effort only
the packet may not get sent.
Expand All @@ -1118,18 +1123,14 @@ def flush_then_close(self, encoder:Optional[Callable]=None,
we wait again for the queue to flush,
then no matter what, we close the connection and stop the threads.
"""
def closing_already(encoder, last_packet, done_callback=None):
def closing_already(encoder, last_packet, done_callback=noop):
log("flush_then_close%s had already been called, this new request has been ignored",
(encoder, last_packet, done_callback))
self.flush_then_close = closing_already
log("flush_then_close%s closed=%s", (encoder, last_packet, done_callback), self._closed)
def done() -> None:
log("flush_then_close: done, callback=%s", done_callback)
if done_callback:
done_callback()
if self._closed:
log("flush_then_close: already closed")
done()
done_callback()
return
def writelockrelease() -> None:
wl = self._write_lock
Expand All @@ -1138,70 +1139,65 @@ def writelockrelease() -> None:
wl.release()
except Exception as e:
log(f"error releasing the write lock: {e}")
def close_and_release():
log("close_and_release()")
self.close()
writelockrelease()
done_callback()
def wait_for_queue(timeout:int=10) -> None:
#IMPORTANT: if we are here, we have the write lock held!
if not self._write_queue.empty():
#write queue still has stuff in it..
if timeout<=0:
log("flush_then_close: queue still busy, closing without sending the last packet")
writelockrelease()
self.close()
done()
else:
log("flush_then_close: still waiting for queue to flush")
self.timeout_add(100, wait_for_queue, timeout-1)
else:
if not last_packet:
self.close()
done()
close_and_release()
return
log("flush_then_close: queue is now empty, sending the last packet and closing")
def close_and_release():
log("flush_then_close: wait_for_packet_sent() close_and_release()")
self.close()
writelockrelease()
done()
def wait_for_packet_sent():
log("flush_then_close: wait_for_packet_sent() queue.empty()=%s, closed=%s",
self._write_queue.empty(), self._closed)
if self._write_queue.empty() or self._closed:
#it got sent, we're done!
close_and_release()
return False
return not self._closed #run until we manage to close (here or via the timeout)
def packet_queued(*_args):
#if we're here, we have the lock and the packet is in the write queue
log("flush_then_close: packet_queued() closed=%s", self._closed)
if wait_for_packet_sent():
#check again every 100ms
self.timeout_add(100, wait_for_packet_sent)
if encoder:
chunks = encoder(last_packet)
self._add_chunks_to_queue(last_packet[0], chunks,
start_cb=None, end_cb=packet_queued,
synchronous=False, more=False)
else:
self.raw_write((last_packet, ), "flush-then-close")
#just in case wait_for_packet_sent never fires:
self.timeout_add(5*1000, close_and_release)
#retry later:
log("flush_then_close: still waiting for queue to flush")
self.timeout_add(100, wait_for_queue, timeout-1)
return
if not last_packet:
close_and_release()
return
log("flush_then_close: queue is now empty, sending the last packet and closing")
def wait_for_packet_sent():
closed = self._closed
log("flush_then_close: wait_for_packet_sent() queue.empty()=%s, closed=%s",
self._write_queue.empty(), closed)
if self._write_queue.empty() or closed:
#it got sent, we're done!
close_and_release()
return False
return not closed #run until we manage to close (here or via the timeout)
def packet_queued(*_args):
#if we're here, we have the lock and the packet is in the write queue
log("flush_then_close: packet_queued() closed=%s", self._closed)
if wait_for_packet_sent():
#check again every 100ms
self.timeout_add(100, wait_for_packet_sent)
if encoder:
chunks = encoder(last_packet)
self._add_chunks_to_queue(last_packet[0], chunks,
start_cb=None, end_cb=packet_queued,
synchronous=False, more=False)
else:
self.raw_write((last_packet, ), "flush-then-close")
#just in case wait_for_packet_sent never fires:
self.timeout_add(5*1000, close_and_release)

def wait_for_write_lock(timeout:int=100) -> None:
wl = self._write_lock
if not wl:
#cleaned up already
return
if not wl.acquire(False):
if timeout<=0:
log("flush_then_close: timeout waiting for the write lock")
self.close()
done()
else:
log("flush_then_close: write lock is busy, will retry %s more times", timeout)
self.timeout_add(10, wait_for_write_lock, timeout-1)
else:
if wl.acquire(timeout=timeout/1000):
log("flush_then_close: acquired the write lock")
#we have the write lock - we MUST free it!
wait_for_queue()
else:
log("flush_then_close: timeout waiting for the write lock")
self.close()
done_callback()
#normal codepath:
# -> wait_for_write_lock
# -> wait_for_queue
Expand Down Expand Up @@ -1278,8 +1274,6 @@ def clean(self) -> None:
self._write_lock = None
self._source_has_more = None
self._conn = None #should be redundant
def noop(): # pragma: no cover
pass
self.source_has_more = noop


Expand Down

0 comments on commit 209dace

Please sign in to comment.