From f81c7b02a56f693bdebb247570eaa6fab1d22f4a Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Fri, 18 Sep 2020 23:15:12 +0300 Subject: [PATCH] fix private channel resubscribe deadlock (#38) --- client.go | 12 ++++++------ subscription.go | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/client.go b/client.go index d02b3e6..79cb35e 100644 --- a/client.go +++ b/client.go @@ -892,7 +892,7 @@ func (c *Client) connectFromScratch(isReconnect bool, reconnectWaitCB func()) er return } - err = c.resubscribe() + err = c.resubscribe(c.id) if err != nil { // we need just to close the connection and outgoing requests here // but preserve all subscriptions. @@ -913,9 +913,9 @@ func (c *Client) connectFromScratch(isReconnect bool, reconnectWaitCB func()) er return err } -func (c *Client) resubscribe() error { +func (c *Client) resubscribe(clientID string) error { for _, sub := range c.subs { - err := sub.resubscribe(true) + err := sub.resubscribe(true, clientID) if err != nil { return err } @@ -1017,7 +1017,7 @@ func (c *Client) sendSubRefresh(channel string, fn func(protocol.SubRefreshResul clientID := c.id c.mu.RUnlock() - token, err := c.privateSign(channel) + token, err := c.privateSign(channel, clientID) if err != nil { return } @@ -1124,13 +1124,13 @@ func (c *Client) sendConnect(isReconnect bool, fn func(protocol.ConnectResult, e }) } -func (c *Client) privateSign(channel string) (string, error) { +func (c *Client) privateSign(channel string, clientID string) (string, error) { var token string if strings.HasPrefix(channel, c.config.PrivateChannelPrefix) && c.events != nil { handler := c.events.onPrivateSub if handler != nil { ev := PrivateSubEvent{ - ClientID: c.clientID(), + ClientID: clientID, Channel: channel, } ps, err := handler.OnPrivateSub(c, ev) diff --git a/subscription.go b/subscription.go index 889d12e..9c0839b 100644 --- a/subscription.go +++ b/subscription.go @@ -316,7 +316,7 @@ func (s *Subscription) Subscribe() error { if !s.centrifuge.connected() { return nil } - return s.resubscribe(false) + return s.resubscribe(false, s.centrifuge.clientID()) } func (s *Subscription) triggerOnUnsubscribe(needResubscribe bool, needRecover bool) { @@ -435,7 +435,7 @@ func (s *Subscription) handleUnsub(m protocol.Unsub) { } } -func (s *Subscription) resubscribe(isResubscribe bool) error { +func (s *Subscription) resubscribe(isResubscribe bool, clientID string) error { s.mu.Lock() if s.status == SUBSCRIBED || s.status == SUBSCRIBING { s.mu.Unlock() @@ -450,7 +450,7 @@ func (s *Subscription) resubscribe(isResubscribe bool) error { s.status = SUBSCRIBING s.mu.Unlock() - token, err := s.centrifuge.privateSign(s.channel) + token, err := s.centrifuge.privateSign(s.channel, clientID) if err != nil { s.mu.Lock() s.status = UNSUBSCRIBED