Skip to content

Commit

Permalink
udpmux: vend new routes into netstack only on ingress
Browse files Browse the repository at this point in the history
egress flows, ie flows to be handled from netstack, already come in socksified
with routes setup as needed. as such, they need not be vended again. However,
netstack won't necessairly know about ingress flows, ie new flows appearing
on our packet conn (dialed out via some ipn.Proxy), and so netstack must be
informed about those (ie, a new conn dialed into netstack to handle a new dest)
  • Loading branch information
ignoramous committed Aug 31, 2024
1 parent 55df768 commit 8b57b43
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions intra/udpmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,22 @@ import (

// from: github.com/pion/transport/blob/03c807b/udp/conn.go

const (
maxtimeouterrors = 3
const maxtimeouterrors = 3

type flowkind int32

var (
ingress flowkind = 0
egress flowkind = 1
)

func (f flowkind) String() string {
if f == ingress {
return "ingress"
}
return "egress"
}

type sender interface {
id() string
sendto([]byte, net.Addr) (int, error)
Expand Down Expand Up @@ -192,6 +204,7 @@ func (x *muxer) readers() {
n, who, err := x.mxconn.ReadFrom(b)

x.stats.tx.Add(uint32(n)) // upload

if timedout(err) {
timeouterrors++
if timeouterrors < maxtimeouterrors {
Expand All @@ -207,13 +220,15 @@ func (x *muxer) readers() {
log.W("udp: mux: %s read done n(%d): nil remote addr; skip", x.cid, n)
continue
}
// may be existing route or a new route
if dst := x.route(addr2netip(who)); dst != nil {

// may be an existing route or a new route
if dst := x.route(addr2netip(who), ingress); dst != nil {
select {
case dst.incomingCh <- &slice{v: b[:n], free: free}: // incomingCh is never closed
default: // dst probably closed, but not yet unrouted
log.W("udp: mux: %s read: drop(sz: %d); route to %s", x.cid, n, dst.raddr)
}
log.V("udp: mux: %s read: n(%d) from %v <= %v; err %v", x.cid, n, dst, who, err)
} // else: ignore (who is invalid or x is closed)
}
}
Expand All @@ -224,9 +239,9 @@ func (x *muxer) findRoute(to netip.AddrPort) *demuxconn {
return x.routes[to]
}

func (x *muxer) route(to netip.AddrPort) *demuxconn {
func (x *muxer) route(to netip.AddrPort, f flowkind) *demuxconn {
if !to.IsValid() {
log.W("udp: mux: %s route: invalid addr %s", x.cid, to)
log.W("udp: mux: %s route: %s invalid addr %s", x.cid, f, to)
return nil
}

Expand All @@ -237,32 +252,33 @@ func (x *muxer) route(to netip.AddrPort) *demuxconn {
x.rmu.Lock()
defer x.rmu.Unlock()

conn := x.routes[to]
if conn == nil {
conn, ok := x.routes[to]
if conn == nil || !ok {
// new routes created here won't really exist in netstack if
// settings.EndpointIndependentMapping or settings.EndpointIndependentFiltering
// is set to false.
conn = x.newLocked(to)
select {
case <-x.doneCh:
clos(conn)
log.W("udp: mux: %s route: for %s; muxer closed", x.cid, to)
log.W("udp: mux: %s route: %s for %s; muxer closed", x.cid, f, to)
return nil
case x.dxconns <- conn:
n := x.stats.dxcount.Add(1)
x.routes[to] = conn
firstEverDemux := n == 1
// the first demux conn is already vended/sockisifed via netstack
// (see: udpHandler:ProxyMux) so it should not be vended again.
if !firstEverDemux {
// if egress, a demuxed conn is already vended/sockisifed via netstack
// (see: udpHandler:ProxyMux) and so it need not be vended again. Even
// if it were to be, it'd fail with "port/addr already in use"
// ex: route: egress vend failure 1.1.1.1:53; err connect udp 10.111.222.1:42182: port is in use
if f == ingress {
core.Go("udpmux.vend", func() { // a fork in the road
if verr := x.vnd(to); verr != nil {
clos(conn)
log.E("udp: mux: %s route: vend failure %s; err %v", x.cid, to, verr)
log.E("udp: mux: %s route: %s vend failure %s; err %v", x.cid, f, to, verr)
}
})
}
log.I("udp: mux: %s route: new for %s; stats: %d", x.cid, to, x.stats)
log.I("udp: mux: %s route: %s #%d new for %s; stats: %d", x.cid, f, n, to, x.stats)
}
}
return conn
Expand Down Expand Up @@ -469,7 +485,7 @@ func newMuxTable() *muxTable {
return &muxTable{t: make(map[netip.AddrPort]*muxer)}
}

func (e *muxTable) associate(cid, pid string, src, dst netip.AddrPort, mk assocFn, v vendor) (c net.Conn, err error) {
func (e *muxTable) associate(cid, pid string, src, dst netip.AddrPort, mk assocFn, v vendor) (_ net.Conn, err error) {
e.Lock() // lock

var mxr *muxer
Expand Down Expand Up @@ -499,7 +515,7 @@ func (e *muxTable) associate(cid, pid string, src, dst netip.AddrPort, mk assocF

e.Unlock() // unlock
// do not hold e.lock on calls into mxr
c = mxr.route(dst)
c := mxr.route(dst, egress)
if c == nil {
log.E("udp: mux: %s vend: no conn for %s", mxr.cid, dst)
return nil, errUdpSetupConn
Expand Down

1 comment on commit 8b57b43

@ignoramous
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#77

Please sign in to comment.