Skip to content

Commit

Permalink
refactor notifySubscriptions and add submux lock
Browse files Browse the repository at this point in the history
  • Loading branch information
magiconair committed Apr 7, 2022
1 parent f50785f commit 52042d1
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,19 +253,25 @@ func (c *Client) updatePublishTimeout_NeedsSubMuxRLock() {
c.setPublishTimeout(maxTimeout)
}

func (c *Client) notifySubscriptionsOfError(ctx context.Context, subID uint32, err error) {
// we need to hold the subMux lock already
func (c *Client) notifySubscriptionOfError(ctx context.Context, subID uint32, err error) {
c.subMux.RLock()
s := c.subs[subID]
c.subMux.RUnlock()

subsToNotify := c.subs
if subID != 0 {
subsToNotify = map[uint32]*Subscription{
subID: c.subs[subID],
}
if s == nil {
return
}
for _, sub := range subsToNotify {
go s.notify(ctx, &PublishNotificationData{Error: err})
}

func (c *Client) notifyAllSubscriptionsOfError(ctx context.Context, err error) {
c.subMux.RLock()
defer c.subMux.RUnlock()

for _, s := range c.subs {
go func(s *Subscription) {
s.notify(ctx, &PublishNotificationData{Error: err})
}(sub)
}(s)
}
}

Expand Down Expand Up @@ -434,7 +440,11 @@ func (c *Client) publish(ctx context.Context) error {
case err != nil && res != nil:
// irrecoverable error
// todo(fs): do we need to stop and forget the subscription?
c.notifySubscriptionsOfError(ctx, res.SubscriptionID, err)
if res.SubscriptionID == 0 {
c.notifyAllSubscriptionsOfError(ctx, err)
} else {
c.notifySubscriptionOfError(ctx, res.SubscriptionID, err)
}
dlog.Printf("error: %s", err)
return err

Expand Down

0 comments on commit 52042d1

Please sign in to comment.