-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor relay threads #71
base: main
Are you sure you want to change the base?
Conversation
@callebtc can you give this a review if you get a chance? |
Also @kdmukai, can you give this a review if you have a chance? |
) | ||
self._connection_thread.start() | ||
|
||
if not is_reconnect: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be more direct here to safe this as self._outgoing_message_thread, and check that directly.
__init__():
...
self._outgoing_message_thread = None
def connect(self):
...
if not self._outgoing_message_thread:
self._outgoing_message_thread = Thread(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then no is_reconnect
needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Will change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't its instantiation also just be moved into the __post_init__
? Doesn't seem like there would ever a case where it would have to be re-created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be I don't think it makes sense to start the outgoing_message_thread
until the connection is made.
Having a separate monitor thread seems much better than triggering reconnect in On suggestion: |
if self.connected: | ||
message = self.queue.get() | ||
if self.is_connected(): | ||
message = self.outgoing_messages.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment here that Queue.get()
blocks and waits by default. Was confusing to read this without that prior knowledge.
Also obv verify that the blocking still allows this thread to terminate when the main thread exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's necessary since that is the normal way any Queue would work.
self.queue.put(message) | ||
else: | ||
time.sleep(0.1) | ||
self.outgoing_messages.put(message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually more robust error-handling is probably needed here. If the message is just undeliverable for some reason, it's pointless to return it to the Queue. Maybe for now at least dump the traceback so we're aware of the failure. Something like:
except:
import traceback
traceback.print_tb()
(I forget which traceback calls do what w/formatting, presenting, etc)
Also can't recall if prints from within threads always make it out to the console.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I disagree on the return into the Queue. Relays constantly drop connections and allow reconnect. This ensures that a message in the queue will be delivered when the connection is back up.
Since we keep track of how often an error was encountered (with self.error_counter
) we should do something with it. Right now it just stops reconnecting after it reaches self.error_threshold
but an exponentially increasing reconnect sleep timer would be more elegant IMO.
def connect(self, is_reconnect=False): | ||
if not self.is_connected(): | ||
with self.lock: | ||
self._connection_thread = Thread( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some DEBUG logging is probably going to be necessary to optionally enable monitoring the connect/reconnect cycles of each Relay
. Need max visibility into what's going on when in threading hell.
Overall, yes, I think the I haven't run this PR yet, but the biggest need here is just a ton of code comments to explain what's going on, overview of who manages what, what each thread is for, when threads block, what cleanup guarantees we have (e.g. is |
|
||
time.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we sleep here? Shouldn't be necessary with the queue and the connection state anymore.
self.connect() | ||
def is_connected(self) -> bool: | ||
with self.lock: | ||
if self._connection_thread is None or not self._connection_thread.is_alive(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you assume that the Websocket is connected because the thread is alive? The connection could've been dropped by the relay. Does the thread necessarily come ton a halt in case of an error?
If that's the case, I prefer the reconnect inside the thread instead of spawning a new one (as it was done before).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think the thread is killed on an error. However, I am explicitly closing the connection after error_threshold
is reached which does kill the thread.
|
||
def _on_message(self, class_obj, message: str): | ||
self.message_pool.add_message(message, self.url) | ||
|
||
def _on_error(self, class_obj, error): | ||
self.connected = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this is removed. Errors are thrown when the relay disconnects the client. This is not a nostr error but a WebSocket error. I haven't encountered a case where an error was not a disconnect.
).start() | ||
|
||
time.sleep(1) | ||
relay.connect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much better!
if not relay.is_connected(): | ||
relay.connect(True) | ||
|
||
time.sleep(self.connection_monitor_interval_secs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be the exponentially increasing sleep counter here.
I think error handling is an issue (see my comment) but apart from that LGTM. I agree with @kdmukai that the way Threads a distributed across files could be confusing but I think this way is better than before where both threads were launched from the relay manager. It makes more sense that a Relay has its own Queue thread instead of running side by side with it. Could not test yet! |
@jeremywhelchel I like this idea but if the backoff strategy is per relay, how would the reconnection monitor thread on the relay manager be able to handle that? It seems like we would need a reconnection monitor thread for each relay. |
"http_proxy_port": self.proxy_config.port if self.proxy_config is not None else None, | ||
"proxy_type": self.proxy_config.type if self.proxy_config is not None else None | ||
}, | ||
name=f"{self.url}-connection" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
daemon=True should probably be set
Relay class
connect
methodis_connected
method which checks if thews.run_forever
thread is alive. I think this is more reliable because this thread will be torn down on disconnect.Relay Manager
add_relay
(handled inrelay.connect
now)relay_connection_monitor
thread, which periodically (connection_monitor_interval_secs
) checks if the relays are disconnected and reconnects them if they are.Note: I felt like reconnection logic belongs in the relay manager as it's job is to manage the relays