From 7650b48ab133e08a8dbea896966ae25eddf91f71 Mon Sep 17 00:00:00 2001 From: nsarras Date: Fri, 3 Feb 2023 23:23:40 -0800 Subject: [PATCH 1/4] Fixing payload misalignment/corruption issue and cleaning up logic to process header and payload together --- dss_datamover/socket_communication.py | 94 +++++++++++++-------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/dss_datamover/socket_communication.py b/dss_datamover/socket_communication.py index ee72ba9..4dc5542 100644 --- a/dss_datamover/socket_communication.py +++ b/dss_datamover/socket_communication.py @@ -60,6 +60,9 @@ def __init__(self, logger=None, ip_address_family="IPv4"): else: self.logger.error("Wrong ip_address_family - {}, Supported {}".format(ip_address_family, IP_ADDRESS_FAMILY)) raise ConnectionError("Socket initialization failed! ") + # set socket to send data as a burst rather than keeping data in the buffer + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + def connect(self, host, port): """ @@ -82,7 +85,6 @@ def connect(self, host, port): self.socket.connect((host, int(port))) except ConnectionRefusedError as e: self.logger.warn(f"ConnectionRefusedError - retrying to connect {time_to_sleep}") - self.logger.warn(f"{e} on host: {host} port: {port}") is_connection_refused = True except ConnectionError as e: self.logger.excep(f"{host}:{port}-ConnectionError - {e}") @@ -138,67 +140,62 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): :timeout: default 60 seconds :return: Return received data in json format. """ - msg_len = None + self.socket.settimeout(timeout) msg = "{}" + time_started = datetime.now() try: msg_len_in_bytes = b'' received_msg_len_size = 0 - time_started = datetime.now() - # Iterate till we receive 10 bytes or timeout. - while received_msg_len_size < MESSAGE_LENGTH: - received_msg_len_in_bytes = self.socket.recv(MESSAGE_LENGTH - received_msg_len_size) - received_msg_len_size += len(received_msg_len_in_bytes) - msg_len_in_bytes += received_msg_len_in_bytes - time_spent_in_seconds = (datetime.now() - time_started).seconds - if time_spent_in_seconds >= timeout: - raise socket.timeout("ClientSocket: Timeout ({} seconds) from recv function".format( - time_spent_in_seconds)) - if msg_len_in_bytes != b'': - msg_len = int(msg_len_in_bytes) - except socket.timeout as e: - raise e - except socket.error as e: - raise socket.error("ClientSocket: Incorrect message length - {}".format(e)) - except ValueError as e: - raise ValueError("ClientSocket: ValueError - {}".format(e)) - if msg_len: + # first receive message length from payload + msg_len_in_bytes = self.socket.recv(MESSAGE_LENGTH) + msg_len = int(msg_len_in_bytes) + + # now retrieve rest of payload from buffer based on msg_len + # TODO: handle rare edge case where only partial bytes were read before socket termination msg_body = b'' - # Iterate till we receive desired number of bytes. received_data_size = 0 while received_data_size < msg_len: data_size = msg_len - received_data_size - try: - received_data = self.socket.recv(data_size) - received_data_size += len(received_data) - msg_body += received_data - except socket.timeout as e: - self.logger.error("ClientSocket: Timeout-{}".format(e)) - break - except socket.error as e: - self.logger.error("ClientSocket: {}".format(e)) + received_data = self.socket.recv(data_size) + received_data_size += len(received_data) + msg_body += received_data + # process received payload if msg_body == b'': raise RuntimeError("ClientSocket: Empty message for message length -{}".format(msg_len)) - - if msg_body: + elif msg_body: if len(msg_body) == msg_len: msg = msg_body.decode("utf8", "ignore") else: self.logger.error("ClientSocket: Received incomplete message.") - if format.upper() == "JSON": - json_data = {} - try: - json_data = json.loads(msg) - except json.JSONDecodeError as e: - self.logger.error("ClientSocket: Bad JSON data - {},{}, {}".format(msg_len, msg, e)) - except Exception as e: - raise Exception("Bad formed message - {}{}, error- {}".format(msg_len, msg, e)) - return json_data - else: - return msg + if format.upper() == "JSON": + json_data = {} + try: + json_data = json.loads(msg) + except json.JSONDecodeError as e: + self.logger.error("ClientSocket: Bad JSON data - {},{}, {}".format(msg_len, msg, e)) + except Exception as e: + raise Exception("Bad formed message - {}{}, error- {}".format(msg_len, msg, e)) + + return json_data + else: + return msg + except socket.timeout as e: + self.logger.error("ClientSocket: Timeout ({} seconds) from recv function".format((datetime.now() - time_started).seconds)) + # return empty status and let the next iteration or worker process the status left on the buffer + return json.loads(msg) + except socket.error as e: + self.logger.error(f"ClientSocket SocketError: {e}") + raise socket.error(f"ClientSocket SocketError: {e}") + except ValueError as e: + raise ValueError(f"ClientSocket: ValueError - {e}") + except RuntimeError as e: + raise RuntimeError(f"ClientSocket: RuntimeError {e}") + except Exception as e: + raise Exception(f"ClientSocket: Exception {e}") def close(self): """ @@ -292,7 +289,7 @@ def send_json(self, message=None, format="JSON"): except RuntimeError as e: self.logger.error("RuntimeError - {}".format(e)) except socket.error as e: - self.logger.error("Message Send Failed - {}".fromat(e)) + self.logger.error("Message Send Failed - {}".format(e)) return False def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): @@ -325,6 +322,9 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): msg_len = int(msg_len_in_bytes.decode('utf8')) except socket.timeout as e: raise e + except BlockingIOError as e: + time.sleep(0.001) + raise e except socket.error as e: self.logger.error("ServerSocket: Determine msg length - {}".format(e)) except ValueError as e: @@ -358,8 +358,8 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): json_data = {} try: json_data = json.loads(msg) - except json.JSONDecodeError as e: - raise json.JSONDecodeError("ClientSocket: Bad JSON data - {}".format(e)) + # except json.JSONDecodeError as e: + # raise json.JSONDecodeError("ClientSocket: Bad JSON data - {}".format(e)) except MemoryError as e: raise MemoryError("MemoryError: JSON load failed - {}".format(e)) except Exception as e: From cffc3e2377271ad3d4c4d4755d031ee29144651a Mon Sep 17 00:00:00 2001 From: nsarras Date: Fri, 3 Feb 2023 23:38:21 -0800 Subject: [PATCH 2/4] Fixing pycodestyle issues --- dss_datamover/socket_communication.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dss_datamover/socket_communication.py b/dss_datamover/socket_communication.py index 4dc5542..fe902f0 100644 --- a/dss_datamover/socket_communication.py +++ b/dss_datamover/socket_communication.py @@ -63,7 +63,6 @@ def __init__(self, logger=None, ip_address_family="IPv4"): # set socket to send data as a burst rather than keeping data in the buffer self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - def connect(self, host, port): """ Connect to a socket with the specified host and port. @@ -324,7 +323,7 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): raise e except BlockingIOError as e: time.sleep(0.001) - raise e + raise e except socket.error as e: self.logger.error("ServerSocket: Determine msg length - {}".format(e)) except ValueError as e: From 65cd51388eb056f0b26fa421fd7a12c56b39e4e2 Mon Sep 17 00:00:00 2001 From: nsarras Date: Tue, 7 Feb 2023 09:29:24 -0800 Subject: [PATCH 3/4] Adding header+payload processing logic to ServerSocket and addressing PR comments --- dss_datamover/socket_communication.py | 126 +++++++++++++------------- 1 file changed, 65 insertions(+), 61 deletions(-) diff --git a/dss_datamover/socket_communication.py b/dss_datamover/socket_communication.py index fe902f0..00b82fc 100644 --- a/dss_datamover/socket_communication.py +++ b/dss_datamover/socket_communication.py @@ -60,7 +60,7 @@ def __init__(self, logger=None, ip_address_family="IPv4"): else: self.logger.error("Wrong ip_address_family - {}, Supported {}".format(ip_address_family, IP_ADDRESS_FAMILY)) raise ConnectionError("Socket initialization failed! ") - # set socket to send data as a burst rather than keeping data in the buffer + # configures socket to send data as soon as it is available, regardless of packet size self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) def connect(self, host, port): @@ -141,15 +141,16 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): """ self.socket.settimeout(timeout) msg = "{}" + msg_len = 0 time_started = datetime.now() try: - msg_len_in_bytes = b'' - received_msg_len_size = 0 - # first receive message length from payload + msg_len_in_bytes = b'' msg_len_in_bytes = self.socket.recv(MESSAGE_LENGTH) msg_len = int(msg_len_in_bytes) + if len(msg_len_in_bytes) != MESSAGE_LENGTH: + raise RuntimeError(f"ClientSocket: Received incorrect message length header {msg_len} bytes") # now retrieve rest of payload from buffer based on msg_len # TODO: handle rare edge case where only partial bytes were read before socket termination @@ -165,11 +166,12 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): if msg_body == b'': raise RuntimeError("ClientSocket: Empty message for message length -{}".format(msg_len)) elif msg_body: - if len(msg_body) == msg_len: + if received_data_size == msg_len: msg = msg_body.decode("utf8", "ignore") else: - self.logger.error("ClientSocket: Received incomplete message.") + raise RuntimeError("ClientSocket: Received incomplete message.") + # return response as JSON if format.upper() == "JSON": json_data = {} try: @@ -184,14 +186,18 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): return msg except socket.timeout as e: self.logger.error("ClientSocket: Timeout ({} seconds) from recv function".format((datetime.now() - time_started).seconds)) - # return empty status and let the next iteration or worker process the status left on the buffer - return json.loads(msg) + # return status depending on received message size, if incomplete a Runtime Error should have been raised + if len(msg_len_in_bytes) == MESSAGE_LENGTH and received_data_size == msg_len: + return json.loads(msg) + else: + return json.loads("{}") except socket.error as e: self.logger.error(f"ClientSocket SocketError: {e}") raise socket.error(f"ClientSocket SocketError: {e}") except ValueError as e: raise ValueError(f"ClientSocket: ValueError - {e}") except RuntimeError as e: + self.logger.error(f"ClientSocket: RuntimeError {e}") raise RuntimeError(f"ClientSocket: RuntimeError {e}") except Exception as e: raise Exception(f"ClientSocket: Exception {e}") @@ -302,71 +308,69 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): :timeout: default 60 seconds :return: Return received data in json format. """ + self.client_socket.settimeout(timeout) msg_len = None msg = "{}" - try: - msg_len_in_bytes = b'' - received_msg_len_size = 0 - time_started = datetime.now() - # Iterate till we receive 10 bytes or timeout. - while received_msg_len_size < MESSAGE_LENGTH: - received_msg_len_in_bytes = self.client_socket.recv(MESSAGE_LENGTH - received_msg_len_size) - received_msg_len_size += len(received_msg_len_in_bytes) - msg_len_in_bytes += received_msg_len_in_bytes - time_spent_in_seconds = (datetime.now() - time_started).seconds - if time_spent_in_seconds >= timeout: - raise socket.timeout("ServerSocket: Timeout ({} seconds) from recv function".format(time_spent_in_seconds)) + time_started = datetime.now() - if msg_len_in_bytes != b'': - msg_len = int(msg_len_in_bytes.decode('utf8')) - except socket.timeout as e: - raise e - except BlockingIOError as e: - time.sleep(0.001) - raise e - except socket.error as e: - self.logger.error("ServerSocket: Determine msg length - {}".format(e)) - except ValueError as e: - raise socket.error("ServerSocket: ValueError - {}".format(e)) - except Exception as e: - self.logger.error("ServerSocket: {}".format(e)) + try: + # first receive message length from payload + msg_len_in_bytes =b'' + msg_len_in_bytes = self.client_socket.recv(MESSAGE_LENGTH) + msg_len = int(msg_len_in_bytes.decode('utf8')) + if len(msg_len_in_bytes) != MESSAGE_LENGTH: + raise RuntimeError(f"ServerSocket: Received incorrect message length header {msg_len} bytes") - if msg_len: + # now retrieve rest of payload from buffer based on msg_len + # TODO: handle rare edge case where only partial bytes were read before socket termination msg_body = b'' received_data_size = 0 - # Iterate till we receive desired number of bytes. while received_data_size < msg_len: data_size = msg_len - received_data_size - try: - received_data = self.client_socket.recv(data_size) - received_data_size += len(received_data) - msg_body += received_data - except socket.timeout as e: - self.logger.error("ServerSocket: Timeout - {}".format(e)) - except socket.error as e: - self.logger.excep("ServerSocket receive bytes - {}".format(e)) - if msg_body == b'': - raise RuntimeError("Empty message for message length -{}".format(msg_len)) + received_data = self.client_socket.recv(data_size) + received_data_size += len(received_data) + msg_body += received_data - if msg_body: - if len(msg_body) == msg_len: + # process received payload + if msg_body == b'': + raise RuntimeError("ServerSocket: Empty message for message length -{}".format(msg_len)) + elif msg_body: + if received_data_size == msg_len: msg = msg_body.decode("utf8", "ignore") else: - self.logger.error("ServerSocket: Received incomplete message.") - if format.upper() == "JSON": - json_data = {} - try: - json_data = json.loads(msg) - # except json.JSONDecodeError as e: - # raise json.JSONDecodeError("ClientSocket: Bad JSON data - {}".format(e)) - except MemoryError as e: - raise MemoryError("MemoryError: JSON load failed - {}".format(e)) - except Exception as e: - raise Exception("ClientSocket: Bad JSON data - {}, error- {}".format(msg, e)) + raise RuntimeError("ServerSocket: Received incomplete message.") + + # return reponse as JSON + if format.upper() == "JSON": + json_data = {} + try: + json_data = json.loads(msg) + except json.JSONDecodeError as e: + self.logger.error("ServerSocket: Bad JSON data - {},{}, {}".format(msg_len, msg, e)) + except Exception as e: + raise Exception("Bad formed message - {}{}, error- {}".format(msg_len, msg, e)) - return json_data - else: - return msg + return json_data + else: + return msg + + except socket.timeout as e: + self.logger.error("ServerSocket: Timeout ({} seconds) from recv function".format((datetime.now() - time_started).seconds)) + # return status depending on received message size, if incomplete a Runtime Error should have been raised + if len(msg_len_in_bytes) == MESSAGE_LENGTH and received_data_size == msg_len: + return json.loads(msg) + else: + return json.loads("{}") + except socket.error as e: + self.logger.error(f"ServerSocket SocketError: {e}") + raise socket.error(f"ServerSocket SocketError: {e}") + except ValueError as e: + raise ValueError(f"ServerSocket: ValueError - {e}") + except RuntimeError as e: + self.logger.error(f"ServerSocket: RuntimeError {e}") + raise RuntimeError(f"ServerSocket: RuntimeError {e}") + except Exception as e: + raise Exception(f"ServerSocket: Exception {e}") def close(self): """ From 62cd48a0a980ab80b0d59197b29eca7c43d8b432 Mon Sep 17 00:00:00 2001 From: nsarras Date: Tue, 7 Feb 2023 09:36:06 -0800 Subject: [PATCH 4/4] Fixing pycodestyle issues --- dss_datamover/socket_communication.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dss_datamover/socket_communication.py b/dss_datamover/socket_communication.py index 00b82fc..314fd81 100644 --- a/dss_datamover/socket_communication.py +++ b/dss_datamover/socket_communication.py @@ -60,7 +60,7 @@ def __init__(self, logger=None, ip_address_family="IPv4"): else: self.logger.error("Wrong ip_address_family - {}, Supported {}".format(ip_address_family, IP_ADDRESS_FAMILY)) raise ConnectionError("Socket initialization failed! ") - # configures socket to send data as soon as it is available, regardless of packet size + # configures socket to send data as soon as it is available, regardless of packet size self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) def connect(self, host, port): @@ -315,7 +315,7 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): try: # first receive message length from payload - msg_len_in_bytes =b'' + msg_len_in_bytes = b'' msg_len_in_bytes = self.client_socket.recv(MESSAGE_LENGTH) msg_len = int(msg_len_in_bytes.decode('utf8')) if len(msg_len_in_bytes) != MESSAGE_LENGTH: @@ -339,7 +339,7 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT): msg = msg_body.decode("utf8", "ignore") else: raise RuntimeError("ServerSocket: Received incomplete message.") - + # return reponse as JSON if format.upper() == "JSON": json_data = {}