Skip to content

Commit

Permalink
fix private channel resubscribe deadlock (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Sep 18, 2020
1 parent eec5ba2 commit f81c7b0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
12 changes: 6 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ func (c *Client) connectFromScratch(isReconnect bool, reconnectWaitCB func()) er
return
}

err = c.resubscribe()
err = c.resubscribe(c.id)
if err != nil {
// we need just to close the connection and outgoing requests here
// but preserve all subscriptions.
Expand All @@ -913,9 +913,9 @@ func (c *Client) connectFromScratch(isReconnect bool, reconnectWaitCB func()) er
return err
}

func (c *Client) resubscribe() error {
func (c *Client) resubscribe(clientID string) error {
for _, sub := range c.subs {
err := sub.resubscribe(true)
err := sub.resubscribe(true, clientID)
if err != nil {
return err
}
Expand Down Expand Up @@ -1017,7 +1017,7 @@ func (c *Client) sendSubRefresh(channel string, fn func(protocol.SubRefreshResul
clientID := c.id
c.mu.RUnlock()

token, err := c.privateSign(channel)
token, err := c.privateSign(channel, clientID)
if err != nil {
return
}
Expand Down Expand Up @@ -1124,13 +1124,13 @@ func (c *Client) sendConnect(isReconnect bool, fn func(protocol.ConnectResult, e
})
}

func (c *Client) privateSign(channel string) (string, error) {
func (c *Client) privateSign(channel string, clientID string) (string, error) {
var token string
if strings.HasPrefix(channel, c.config.PrivateChannelPrefix) && c.events != nil {
handler := c.events.onPrivateSub
if handler != nil {
ev := PrivateSubEvent{
ClientID: c.clientID(),
ClientID: clientID,
Channel: channel,
}
ps, err := handler.OnPrivateSub(c, ev)
Expand Down
6 changes: 3 additions & 3 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (s *Subscription) Subscribe() error {
if !s.centrifuge.connected() {
return nil
}
return s.resubscribe(false)
return s.resubscribe(false, s.centrifuge.clientID())
}

func (s *Subscription) triggerOnUnsubscribe(needResubscribe bool, needRecover bool) {
Expand Down Expand Up @@ -435,7 +435,7 @@ func (s *Subscription) handleUnsub(m protocol.Unsub) {
}
}

func (s *Subscription) resubscribe(isResubscribe bool) error {
func (s *Subscription) resubscribe(isResubscribe bool, clientID string) error {
s.mu.Lock()
if s.status == SUBSCRIBED || s.status == SUBSCRIBING {
s.mu.Unlock()
Expand All @@ -450,7 +450,7 @@ func (s *Subscription) resubscribe(isResubscribe bool) error {
s.status = SUBSCRIBING
s.mu.Unlock()

token, err := s.centrifuge.privateSign(s.channel)
token, err := s.centrifuge.privateSign(s.channel, clientID)
if err != nil {
s.mu.Lock()
s.status = UNSUBSCRIBED
Expand Down

0 comments on commit f81c7b0

Please sign in to comment.