Skip to content

Commit

Permalink
udpmux: close existing demuxed conns on drain()
Browse files Browse the repository at this point in the history
  • Loading branch information
ignoramous committed Aug 23, 2024
1 parent 1efa7b1 commit e4adcc1
Showing 1 changed file with 8 additions and 24 deletions.
32 changes: 8 additions & 24 deletions intra/udpmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ func (x *muxer) stop() error {
x.drain()
err = x.mxconn.Close() // close the muxed conn

x.dxconnWG.Wait() // all conns close / error out
go x.cb() // dissociate
x.dxconnWG.Wait() // all conns close / error out
core.Go("udpmux.cb", x.cb) // dissociate
x.stats.dur = time.Since(x.stats.start)
log.I("udp: mux: %s stopped; stats: %s", x.cid, x.stats)
})
Expand All @@ -157,29 +157,13 @@ func (x *muxer) stop() error {
}

func (x *muxer) drain() {
dc := make([]*demuxconn, 0, len(x.dxconns))
draintimeout := 2 * time.Second
tick := time.NewTicker(draintimeout)

defer func() {
log.I("udp: mux: %s draining... %d", x.cid, len(dc))
go x.unroute(dc...)
for _, c := range dc {
clos(c)
}
}()

for { // close unaccepted connections
select {
case c := <-x.dxconns:
tick.Reset(draintimeout)
// unroute must be called from a different
// goroutine as it blocks on rmu
dc = append(dc, c)
case <-tick.C:
return
}
x.rmu.Lock()
defer x.rmu.Unlock()
log.I("udp: mux: %s drain: closing %d demuxed conns", x.cid, len(x.routes))
for _, c := range x.routes {
clos(c) // will unroute as well
}

}

// readers has to tasks:
Expand Down

0 comments on commit e4adcc1

Please sign in to comment.