-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: avoid deadlocks in query and broadcast behaviours #63
Conversation
// The sender may attempt to drain any pending notifications before closing the other channels. | ||
// The NotifyFinished channel will be closed once the sender has attempted to send the Finished notification. | ||
NotifyFinished() chan<- CtxEvent[E] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the current usage of the QueryMonitor
I think it would be a nicer API if these were just regular methods that accepted CtxEvent[*EventQueryProgressed]
and CtxEvent[E]
events. I can only see it used down below as:
func (w *queryNotifier[E]) TryNotifyProgressed(ctx context.Context, ev *EventQueryProgressed) bool {
if w.stopping {
return false
}
ce := CtxEvent[*EventQueryProgressed]{Ctx: ctx, Event: ev}
select {
case w.monitor.NotifyProgressed() <- ce:
return true
default:
w.pending = append(w.pending, ce)
return false
}
}
func (w *queryNotifier[E]) NotifyFinished(ctx context.Context, ev E) {
w.stopping = true
w.DrainPending()
close(w.monitor.NotifyProgressed())
select {
case w.monitor.NotifyFinished() <- CtxEvent[E]{Ctx: ctx, Event: ev}:
default:
}
close(w.monitor.NotifyFinished())
}
This requires users of the types that implement this interface to deal with quite some internal details.
Alternative suggestion:
// A QueryMonitor receives event notifications on the progress of a query
type QueryMonitor[E TerminalQueryEvent] interface {
NotifyProgressed(e CtxEvent[*EventQueryProgressed]) bool // indicating successful notification
NotifyFinished(e CtxEvent[E])
}
closing the specific channels could happen inside the type that implements that interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach doesn't give the caller of QueryMonitor, which is a behaviiour, any control over the blocking behaviour. A channel allows the behaviour to detect and avoid blocking. A method call could do anything and moves the slow consumer problem into the monitor implementation.
close(w.monitor.NotifyProgressed()) | ||
|
||
select { | ||
case w.monitor.NotifyFinished() <- CtxEvent[E]{Ctx: ctx, Event: ev}: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a good overview but could it be a problem if we enter the default case here? Other parts might rely on receiving the finished event (not an event due to closing the channel)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, but the channel is closed so the consumer will always know the query has finished. The alternative is to block, possibly forever if the consumer has gone away or is blocked themselves. And we would also not have a good place to clean up monitors for finished queries. The QueryMonitor documentation states it's the responsibility of the implementation to have capacity to accept one single finished notification.
This is similar to how https://pkg.go.dev/os/signal#Notify handles notification. The owner of the channel must ensure sufficient capacity, in our case a capacity of 1.
The query and broadcast behaviours notify query initiators of the ongoing progress of a query or broadcast. They also notify when the query or broadcast has finished.
This set of changes fixes two types of deadlock:
QueryMonitor
type that buffers progress events that cannot be notified. TheQueryMonitor
also uses a separate channel for notifying the completion of a query or broadcast and has better defined semantics for when notifications will be sent and when they will stop being sent.Notify
andPerform
methods of the behaviours were guarded by a single mutex since both could advance the state of the embedded state machine. However, notifying a query initiator could cause the intiator to call theNotify
method to stop the query. Since the notification was made while the mutex was held this would deadlock on the call toNotify
. This has been fixed by separating the locking behaviour betweenNotify
andPerform
and refactoring the logic to ensure that the state machines are advanced byPerform
only.Notify
now only queues the inbound event.