From 52042d1963be32daf219d6996e44884a714b10e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frank=20Schr=C3=B6der?= Date: Thu, 7 Apr 2022 12:25:50 +0200 Subject: [PATCH] refactor notifySubscriptions and add submux lock --- client_sub.go | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/client_sub.go b/client_sub.go index fab620ea..2f311e3a 100644 --- a/client_sub.go +++ b/client_sub.go @@ -253,19 +253,25 @@ func (c *Client) updatePublishTimeout_NeedsSubMuxRLock() { c.setPublishTimeout(maxTimeout) } -func (c *Client) notifySubscriptionsOfError(ctx context.Context, subID uint32, err error) { - // we need to hold the subMux lock already +func (c *Client) notifySubscriptionOfError(ctx context.Context, subID uint32, err error) { + c.subMux.RLock() + s := c.subs[subID] + c.subMux.RUnlock() - subsToNotify := c.subs - if subID != 0 { - subsToNotify = map[uint32]*Subscription{ - subID: c.subs[subID], - } + if s == nil { + return } - for _, sub := range subsToNotify { + go s.notify(ctx, &PublishNotificationData{Error: err}) +} + +func (c *Client) notifyAllSubscriptionsOfError(ctx context.Context, err error) { + c.subMux.RLock() + defer c.subMux.RUnlock() + + for _, s := range c.subs { go func(s *Subscription) { s.notify(ctx, &PublishNotificationData{Error: err}) - }(sub) + }(s) } } @@ -434,7 +440,11 @@ func (c *Client) publish(ctx context.Context) error { case err != nil && res != nil: // irrecoverable error // todo(fs): do we need to stop and forget the subscription? - c.notifySubscriptionsOfError(ctx, res.SubscriptionID, err) + if res.SubscriptionID == 0 { + c.notifyAllSubscriptionsOfError(ctx, err) + } else { + c.notifySubscriptionOfError(ctx, res.SubscriptionID, err) + } dlog.Printf("error: %s", err) return err