Skip to content

Commit

Permalink
fix: failed to renew transport before transport closed on peer instan…
Browse files Browse the repository at this point in the history
…ce (#4)
  • Loading branch information
PeerXu authored Apr 9, 2021
1 parent 27b6c1a commit 847a686
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 51 deletions.
5 changes: 0 additions & 5 deletions pkg/meepo/close_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ func (mp *Meepo) closeTransport(peerID string) error {
return err
}

mp.removeTransport(peerID)
logger.Tracef("remove transport")
mp.removeTeleportationsByPeerID(peerID)
logger.Tracef("remove teleportations")

logger.Infof("transport closed")

return nil
Expand Down
17 changes: 3 additions & 14 deletions pkg/meepo/meepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,10 @@ func (mp *Meepo) removeTransportNL(id string) {
delete(mp.transports, id)
}

func (mp *Meepo) removeTeleportationsByPeerID(id string) {
mp.teleportationsMtx.Lock()
defer mp.teleportationsMtx.Unlock()
mp.removeTeleportationsByPeerIDNL(id)
}

func (mp *Meepo) removeTeleportationsByPeerIDNL(id string) {
ts, _ := mp.listTeleportationsByPeerIDNL(id)
func (mp *Meepo) closeTeleportationsByPeerID(id string) {
ts, _ := mp.listTeleportationsByPeerID(id)
for _, t := range ts {
switch t.Portal() {
case teleportation.PortalSink:
mp.removeTeleportationSinkNL(t.Name())
case teleportation.PortalSource:
mp.removeTeleportationSourceNL(t.Name())
}
t.Close()
}
}

Expand Down
34 changes: 12 additions & 22 deletions pkg/meepo/new_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,15 @@ func (mp *Meepo) NewTransport(peerID string) (transport.Transport, error) {
tp.OnDataChannelCreate("sys", mp.onTransportSysDataChannelCreate)
logger.Tracef("register on data channel create handler")

tp.OnTransportState(transport.TransportStateFailed, func(transport.HandleID) {
mp.removeTeleportationsByPeerID(peerID)
logger.Tracef("remove teleportations")
h := func(transport.HandleID) {
mp.closeTeleportationsByPeerID(peerID)
logger.Tracef("close teleportations")

mp.removeTransport(peerID)
logger.Tracef("remove transport")
})
tp.OnTransportState(transport.TransportStateClosed, func(transport.HandleID) {
mp.removeTeleportationsByPeerID(peerID)
logger.Tracef("remove teleportations")

mp.removeTransport(peerID)
logger.Tracef("remove transport")
})
}
tp.OnTransportState(transport.TransportStateFailed, h)
tp.OnTransportState(transport.TransportStateClosed, h)
logger.Tracef("register on transport state change handler")

mp.addTransport(peerID, tp)
Expand Down Expand Up @@ -182,20 +177,15 @@ func (mp *Meepo) onNewTransport(src *signaling.Descriptor) (*signaling.Descripto
tp.OnDataChannelCreate("sys", mp.onTransportSysDataChannelCreate)
logger.Tracef("register on data channel create handler")

tp.OnTransportState(transport.TransportStateFailed, func(transport.HandleID) {
mp.removeTeleportationsByPeerID(peerID)
logger.Tracef("remove teleportations")
h := func(transport.HandleID) {
mp.closeTeleportationsByPeerID(peerID)
logger.Tracef("close teleportations")

mp.removeTransport(peerID)
logger.Tracef("remove transport")
})
tp.OnTransportState(transport.TransportStateClosed, func(transport.HandleID) {
mp.removeTeleportationsByPeerID(peerID)
logger.Tracef("remove teleportations")

mp.removeTransport(peerID)
logger.Tracef("remove transport")
})
}
tp.OnTransportState(transport.TransportStateFailed, h)
tp.OnTransportState(transport.TransportStateClosed, h)
logger.Tracef("register on transport state change handler")

wg.Wait()
Expand Down
11 changes: 8 additions & 3 deletions pkg/meepo/teleport.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ func (mp *Meepo) Teleport(peerID string, remote net.Addr, opts ...TeleportOption
return nil, err
}
wg.Add(1)
tp.OnTransportState(transport.TransportStateConnected, func(hid transport.HandleID) {
fn := func(transport.HandleID) {
wg.Done()
tp.UnsetOnTransportState(transport.TransportStateConnected, hid)
})
}
h1 := tp.OnTransportState(transport.TransportStateConnected, fn)
defer tp.UnsetOnTransportState(transport.TransportStateConnected, h1)
h2 := tp.OnTransportState(transport.TransportStateFailed, fn)
defer tp.UnsetOnTransportState(transport.TransportStateFailed, h2)
h3 := tp.OnTransportState(transport.TransportStateClosed, fn)
defer tp.UnsetOnTransportState(transport.TransportStateClosed, h3)

wg.Wait()
}
Expand Down
24 changes: 19 additions & 5 deletions pkg/signaling/redis/signaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,11 @@ func (e *RedisEngine) getContext() context.Context {
}

func (e *RedisEngine) sendEvent(id string, evt *Event) error {
logger := e.getLogger().WithField("#method", "sendEvent")
logger := e.getLogger().WithFields(logrus.Fields{
"#method": "sendEvent",
"name": evt.Name,
"session": evt.Session,
})

msg, err := encode(evt)
if err != nil {
Expand Down Expand Up @@ -452,16 +456,26 @@ func (e *RedisEngine) Wire(dst, src *signaling.Descriptor) (*signaling.Descripto
}()
logger.Tracef("acquire session channel")

var wiredEvt *Event
var wiredErr error
var wiredWg sync.WaitGroup

wiredWg.Add(1)
go func() {
wiredEvt, wiredErr = e.waitWiredEvent(wireEvt.Session)
wiredWg.Done()
}()

if err := e.sendEvent(dst.ID, wireEvt); err != nil {
logger.WithError(err).Debugf("failed to send event")
return nil, err
}

wiredEvt, err := e.waitWiredEvent(wireEvt.Session)
if err != nil {
return nil, err
wiredWg.Wait()
if wiredErr != nil {
return nil, wiredErr
}
logger.Tracef("receive event from sesion channel")
logger.Tracef("receive event from session channel")

return wiredEvt.Descriptor, nil
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/util/sync/channel.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sync

import "sync"
import (
"sync"
)

type ChannelLocker interface {
Acquire(id int32) error
Expand All @@ -11,7 +13,7 @@ type ChannelLocker interface {

type channelLocker struct {
chs map[int32]chan interface{}
mtx sync.Mutex
mtx sync.Locker
}

func (t *channelLocker) Acquire(id int32) error {
Expand Down Expand Up @@ -74,5 +76,6 @@ func (t *channelLocker) getNL(id int32) (chan interface{}, error) {
func NewChannelLocker() ChannelLocker {
return &channelLocker{
chs: make(map[int32]chan interface{}),
mtx: new(sync.Mutex),
}
}

0 comments on commit 847a686

Please sign in to comment.