Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing payload misalignment/corruption issue #14

Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 46 additions & 47 deletions dss_datamover/socket_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def __init__(self, logger=None, ip_address_family="IPv4"):
else:
self.logger.error("Wrong ip_address_family - {}, Supported {}".format(ip_address_family, IP_ADDRESS_FAMILY))
raise ConnectionError("Socket initialization failed! ")
# set socket to send data as a burst rather than keeping data in the buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is wrong.

self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

def connect(self, host, port):
"""
Expand All @@ -82,7 +84,6 @@ def connect(self, host, port):
self.socket.connect((host, int(port)))
except ConnectionRefusedError as e:
self.logger.warn(f"ConnectionRefusedError - retrying to connect {time_to_sleep}")
self.logger.warn(f"{e} on host: {host} port: {port}")
is_connection_refused = True
except ConnectionError as e:
self.logger.excep(f"{host}:{port}-ConnectionError - {e}")
Expand Down Expand Up @@ -138,67 +139,62 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT):
:timeout: default 60 seconds
:return: Return received data in json format.
"""
msg_len = None
self.socket.settimeout(timeout)
msg = "{}"
time_started = datetime.now()

try:
msg_len_in_bytes = b''
received_msg_len_size = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused variable. Remove it

time_started = datetime.now()
# Iterate till we receive 10 bytes or timeout.
while received_msg_len_size < MESSAGE_LENGTH:
received_msg_len_in_bytes = self.socket.recv(MESSAGE_LENGTH - received_msg_len_size)
received_msg_len_size += len(received_msg_len_in_bytes)
msg_len_in_bytes += received_msg_len_in_bytes
time_spent_in_seconds = (datetime.now() - time_started).seconds
if time_spent_in_seconds >= timeout:
raise socket.timeout("ClientSocket: Timeout ({} seconds) from recv function".format(
time_spent_in_seconds))
if msg_len_in_bytes != b'':
msg_len = int(msg_len_in_bytes)
except socket.timeout as e:
raise e
except socket.error as e:
raise socket.error("ClientSocket: Incorrect message length - {}".format(e))
except ValueError as e:
raise ValueError("ClientSocket: ValueError - {}".format(e))

if msg_len:
# first receive message length from payload
msg_len_in_bytes = self.socket.recv(MESSAGE_LENGTH)
msg_len = int(msg_len_in_bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure the received data is equal to MESSAGE_LENGTH


# now retrieve rest of payload from buffer based on msg_len
# TODO: handle rare edge case where only partial bytes were read before socket termination
msg_body = b''
# Iterate till we receive desired number of bytes.
received_data_size = 0
while received_data_size < msg_len:
data_size = msg_len - received_data_size
try:
received_data = self.socket.recv(data_size)
received_data_size += len(received_data)
msg_body += received_data
except socket.timeout as e:
self.logger.error("ClientSocket: Timeout-{}".format(e))
break
except socket.error as e:
self.logger.error("ClientSocket: {}".format(e))
received_data = self.socket.recv(data_size)
received_data_size += len(received_data)
msg_body += received_data

# process received payload
if msg_body == b'':
raise RuntimeError("ClientSocket: Empty message for message length -{}".format(msg_len))

if msg_body:
elif msg_body:
if len(msg_body) == msg_len:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to do len(msg_body) when the received_data_size is available on the top.

msg = msg_body.decode("utf8", "ignore")
else:
self.logger.error("ClientSocket: Received incomplete message.")
if format.upper() == "JSON":
json_data = {}
try:
json_data = json.loads(msg)
except json.JSONDecodeError as e:
self.logger.error("ClientSocket: Bad JSON data - {},{}, {}".format(msg_len, msg, e))
except Exception as e:
raise Exception("Bad formed message - {}{}, error- {}".format(msg_len, msg, e))

return json_data
else:
return msg
if format.upper() == "JSON":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes wrong with the code landing on line 171. Check it

json_data = {}
try:
json_data = json.loads(msg)
except json.JSONDecodeError as e:
self.logger.error("ClientSocket: Bad JSON data - {},{}, {}".format(msg_len, msg, e))
except Exception as e:
raise Exception("Bad formed message - {}{}, error- {}".format(msg_len, msg, e))

return json_data
else:
return msg
except socket.timeout as e:
self.logger.error("ClientSocket: Timeout ({} seconds) from recv function".format((datetime.now() - time_started).seconds))
# return empty status and let the next iteration or worker process the status left on the buffer
return json.loads(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the msg is filled up before and got any exception, you are sending wrong data to the caller. Check again

except socket.error as e:
self.logger.error(f"ClientSocket SocketError: {e}")
raise socket.error(f"ClientSocket SocketError: {e}")
except ValueError as e:
raise ValueError(f"ClientSocket: ValueError - {e}")
except RuntimeError as e:
raise RuntimeError(f"ClientSocket: RuntimeError {e}")
except Exception as e:
raise Exception(f"ClientSocket: Exception {e}")

def close(self):
"""
Expand Down Expand Up @@ -292,7 +288,7 @@ def send_json(self, message=None, format="JSON"):
except RuntimeError as e:
self.logger.error("RuntimeError - {}".format(e))
except socket.error as e:
self.logger.error("Message Send Failed - {}".fromat(e))
self.logger.error("Message Send Failed - {}".format(e))
return False

def recv_json(self, format="JSON", timeout=RECV_TIMEOUT):
Expand Down Expand Up @@ -325,6 +321,9 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT):
msg_len = int(msg_len_in_bytes.decode('utf8'))
except socket.timeout as e:
raise e
except BlockingIOError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also we need to add the above logic where the header and payload are received together similar to clientsocket

time.sleep(0.001)
raise e
except socket.error as e:
self.logger.error("ServerSocket: Determine msg length - {}".format(e))
except ValueError as e:
Expand Down Expand Up @@ -358,8 +357,8 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT):
json_data = {}
try:
json_data = json.loads(msg)
except json.JSONDecodeError as e:
raise json.JSONDecodeError("ClientSocket: Bad JSON data - {}".format(e))
# except json.JSONDecodeError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason for commenting out

# raise json.JSONDecodeError("ClientSocket: Bad JSON data - {}".format(e))
except MemoryError as e:
raise MemoryError("MemoryError: JSON load failed - {}".format(e))
except Exception as e:
Expand Down