-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework python river client and server #7
Conversation
…esntGetResentStaleMessagesFromServer
except Exception as e: | ||
# Log the error and return an appropriate error response | ||
logging.exception("Error during RPC communication") | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also need to have some sort of pending thing so we can throw UNEXPECTED_DISCONNECT on session end and this is still waiting for a response
raise RiverException(error.code, error.message) | ||
return response_deserializer(response["payload"]) | ||
except RiverException as e: | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dumb q: why don't we log RiverException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO log-and-throw is a mild anti-pattern. If every layer does this, you get a lot of noise in the logs. The final layer which does not re-throw should log. Every other layer should not catch or should throw with additional context. Just my 2c.
if msg.controlFlags & ACK_BIT != 0: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if msg.controlFlags & ACK_BIT != 0: | |
continue | |
if msg.controlFlags & ACK_BIT == 0: | |
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the original logic is correct - we should read next message if it is ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't spot any bugs, just nits and thoughts
|
||
Expects the input and output be messages that will be msgpacked. | ||
""" | ||
stream_id = nanoid.generate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine but I wonder if there's an advantage to using nanoid
package vs the built-in secrets
, https://docs.python.org/3/library/secrets.html#secrets.token_urlsafe .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the main reason is keep our id consistent with the one we are using in javascript
replit_river/client_session.py
Outdated
response = await output.get() | ||
except (RuntimeError, ChannelClosed) as e: | ||
# if the stream is closed before we get a response, we will get a | ||
# RuntimeError: RuntimeError: Event loop is closed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh weird, why would closing a stream result in "Event loop is closed"? Does every stream have its own event loop? I wonder if this behavior should be documented in the send_message docstring, it's a public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually now think this RuntimeError should not happen, i removed the check for this, if that happens we shoudl investigate
raise RiverException(error.code, error.message) | ||
return response_deserializer(response["payload"]) | ||
except RiverException as e: | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO log-and-throw is a mild anti-pattern. If every layer does this, you get a lot of noise in the logs. The final layer which does not re-throw should log. Every other layer should not catch or should throw with additional context. Just my 2c.
replit_river/task_manager.py
Outdated
try: | ||
await task_to_remove | ||
if task_to_remove in background_tasks: | ||
background_tasks.remove(task_to_remove) | ||
exception = task_to_remove.exception() | ||
except asyncio.CancelledError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple issues here, potentially. I don't think you want/need to await
the task and call exception()
. If there was a exception, the first await would throw.
The second issue is if the task manager itself is cancelled, you'll end up in this same exception handler. If you want to handle this case differently, you need to test asyncio.current_task().cancelling() > 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, i was mixing up the done callback and the cancellation here.
Reworked a bit, ptal
try: | ||
await send_transport_message( | ||
TransportMessage( | ||
from_=transport_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, of course. I wonder if from_id and to_id might be better names, since their types don't make it obvious what are the expected values of from_
and to
.
optional idea, this might be a lot of search and replace.
logging.debug("Ignoring transport message : %r", e) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's no feedback sent to the client in this case, I'd have a general concern that the server could get stuck retrying in this while True
loop. In practice, maybe we'd eventually see timeouts and ConnectionClosed
? I don't know the details enough to say.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, technically we should see timelouts and ConnectionClosed, but i added a timeout to get websocket, this give us more confidence.
Why
python river impl is leaking resources and also does not work with retry/disconencting well
river protocol: https://github.com/replit/river/blob/main/PROTOCOL.md#transports-sessions-and-connections
What changed
Test plan
we have some basic unit tests here
the golden test suites are https://github.com/replit/river-babel, which is a cross language tests for river
current states:
client/server/num tests passed
bun -> python: 25/25
python -> bun: 21/25 (error notifs are not working well now)
python -> python: 21/25 (error notifs are not working well now)