-
Notifications
You must be signed in to change notification settings - Fork 4
fix: protocol publisher fails to report error #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ff709de to
0619a40
Compare
9cb577d to
f134054
Compare
add missing `component_name` to pub/sub for trickle health state protocol: track background publisher task
f134054 to
108928c
Compare
…op callback to stream_processor
pytrickle/client.py
Outdated
| logger.info("Stopping protocol due to client loops ending") | ||
|
|
||
| # Call the optional on_stream_stop callback before stopping protocol | ||
| if hasattr(self.frame_processor, 'on_stream_stop') and self.frame_processor.on_stream_stop: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
frame_processor does not have on_stream_stop right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, it's not an abstract method of frame_processor. It's appended by StreamProcessor on initialization like other callbacks registered to StreamProcessor.
StreamProcessor accepts an on_stream_stop callback as a parameter, similar to param_updater. _InternalFrameProcessor extends the abstract FrameProcessor class and stores the on_stream_stop callback as an attribute
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to include it I think we should add it to FrameProcessor similar to the error_callback. It seems strange to call something from TrickleClient that is setup in a higher level abstraction.
Let me know if I am missing something here tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, on_stream_stop is more of an event triggered by the client, rather than a method called by the FrameProcessor. I added it as an abstract class now so it is available to implement in frame processors f347e41
Client still needs to call on_stream_stop at this point in the protocol shutdown sequence. There is no other coordination with FrameProcessors currently afaik.
Lines 117 to 124 in f347e41
| if self.frame_processor.on_stream_stop: | |
| try: | |
| await self.frame_processor.on_stream_stop() | |
| logger.info("Stream stop callback executed successfully") | |
| except Exception as e: | |
| logger.error(f"Error in stream stop callback: {e}") | |
| await self.protocol.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems strange to call something from TrickleClient that is setup in a higher level abstraction.
Yeah, it sounds off because TrickleClient was originally intended as a class for interacting with trickle protocol directly in a multimedia context. In a consumer context it's an internal component so the term client is a bit misleading.
I think once we reorganize classes into package namespaces this will be more clear
pytrickle/client.py
Outdated
| try: | ||
| await asyncio.wait_for(self.stop_event.wait(), timeout=self.send_data_interval) | ||
| break # Stop event was set, exit loop | ||
| done, pending = await asyncio.wait( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about just waiting for stop_event here?
Will error_event cause channels to close down before this could execute one last time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 2703cff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the waiting for stop/error to a function. Can you test to confirm still same behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested and it works great, no issues found in stopping the publisher. Ran a high-rate publisher test and got ~120 msg/s which was expected. One message did fail, but did not catch why, likely due to message size limit from batching
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could possibly improve this by allowing _wait_for_interval to raise it's own error
ConnectionResetError in publisher during client teardown…l_shutdown events to client
This pull request introduces significant improvements to error handling, background task management, and stream lifecycle events in the
pytricklelibrary and its example usage. The changes focus on making error callbacks consistently asynchronous, improving background task cleanup, and ensuring that stream stop events are handled gracefully. Additionally, the protocol and client classes now propagate errors and shutdown events more reliably, and global exception handling for asyncio is improved to suppress expected errors during shutdown.Error handling and callback improvements:
pytrickle/__init__.py[1]pytrickle/client.py[2] [3] [4] [5]pytrickle/client.py[1] [2] [3]Background task management and cleanup:
TrickleComponent, with automatic removal and exception handling on task completion. (pytrickle/base.py[1] [2]on_stream_stopcallback. (examples/process_video_example.py[1] [2] [3]Asyncio and shutdown robustness:
pytrickle/base.py[1]pytrickle/protocol.py[2] [3]pytrickle/client.py[1] [2]Protocol task error handling:
pytrickle/protocol.py[1] [2] [3]These changes make the library more robust, easier to debug, and safer to use in production environments by improving error visibility and resource cleanup.