Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing payload misalignment/corruption issue #14

Conversation

nsarras
Copy link
Contributor

@nsarras nsarras commented Feb 4, 2023

  • Root cause of the issue was socket timeouts while not having read all of the msg length header bytes, therefore leading to misalignment of the consecutive payload (thanks Suresh for finding this)
  • Added fix to gracefully handle socket timeouts
  • Configured socket timeout in order to ensure that monitor process does not hanging waiting for a response from client applications
  • Cleaned up logic to process header and payload together, dramatically improving readability and efficiency
  • Added socket configuration option to ensure that the ClientSocket sends data in bursts and does not leave data hanging in the buffer

Verified and tested on 4 node dist. cluster with 80GB of data

capture1

@nsarras nsarras requested a review from a team as a code owner February 4, 2023 07:30
@nsarras nsarras linked an issue Feb 4, 2023 that may be closed by this pull request
@nsarras nsarras self-assigned this Feb 4, 2023
@@ -60,6 +60,8 @@ def __init__(self, logger=None, ip_address_family="IPv4"):
else:
self.logger.error("Wrong ip_address_family - {}, Supported {}".format(ip_address_family, IP_ADDRESS_FAMILY))
raise ConnectionError("Socket initialization failed! ")
# set socket to send data as a burst rather than keeping data in the buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is wrong.

msg = "{}"
time_started = datetime.now()

try:
msg_len_in_bytes = b''
received_msg_len_size = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused variable. Remove it

if msg_len:
# first receive message length from payload
msg_len_in_bytes = self.socket.recv(MESSAGE_LENGTH)
msg_len = int(msg_len_in_bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure the received data is equal to MESSAGE_LENGTH

if msg_body == b'':
raise RuntimeError("ClientSocket: Empty message for message length -{}".format(msg_len))

if msg_body:
elif msg_body:
if len(msg_body) == msg_len:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to do len(msg_body) when the received_data_size is available on the top.

return json_data
else:
return msg
if format.upper() == "JSON":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes wrong with the code landing on line 171. Check it

except socket.timeout as e:
self.logger.error("ClientSocket: Timeout ({} seconds) from recv function".format((datetime.now() - time_started).seconds))
# return empty status and let the next iteration or worker process the status left on the buffer
return json.loads(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the msg is filled up before and got any exception, you are sending wrong data to the caller. Check again

@@ -358,8 +357,8 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT):
json_data = {}
try:
json_data = json.loads(msg)
except json.JSONDecodeError as e:
raise json.JSONDecodeError("ClientSocket: Bad JSON data - {}".format(e))
# except json.JSONDecodeError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason for commenting out

@@ -325,6 +321,9 @@ def recv_json(self, format="JSON", timeout=RECV_TIMEOUT):
msg_len = int(msg_len_in_bytes.decode('utf8'))
except socket.timeout as e:
raise e
except BlockingIOError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also we need to add the above logic where the header and payload are received together similar to clientsocket

Copy link
Contributor

@grandsuri grandsuri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to fix

@nsarras
Copy link
Contributor Author

nsarras commented Feb 7, 2023

Verified latest changes on client and server side with 80GB of data.
image

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 7, 2023

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 5 Code Smells

0.0% 0.0% Coverage
0.0% 0.0% Duplication

Copy link
Contributor

@grandsuri grandsuri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

@grandsuri grandsuri merged commit 71cb74b into master Feb 7, 2023
@grandsuri grandsuri deleted the 13-data-mover-payload-misalignment-corruption-on-larger-data-migrations branch February 7, 2023 18:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Data Mover] Payload misalignment / corruption on larger data migrations
2 participants