Skip to content

Commit

Permalink
webrtc: correctly report incoming packet address on muxed connection (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt authored Oct 12, 2023
1 parent 1195bf5 commit c2124f7
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 96 deletions.
47 changes: 32 additions & 15 deletions p2p/transport/webrtc/udpmux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"

logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -42,9 +43,15 @@ type UDPMux struct {

queue chan Candidate

mx sync.Mutex
mx sync.Mutex
// ufragMap allows us to multiplex incoming STUN packets based on ufrag
ufragMap map[ufragConnKey]*muxedConnection
addrMap map[string]*muxedConnection
// addrMap allows us to correctly direct incoming packets after the connection
// is established and ufrag isn't available on all packets
addrMap map[string]*muxedConnection
// ufragAddrMap allows cleaning up all addresses from the addrMap once the connection is closed
// During the ICE connectivity checks, the same ufrag might be used on multiple addresses.
ufragAddrMap map[ufragConnKey][]net.Addr

// the context controls the lifecycle of the mux
wg sync.WaitGroup
Expand All @@ -57,12 +64,13 @@ var _ ice.UDPMux = &UDPMux{}
func NewUDPMux(socket net.PacketConn) *UDPMux {
ctx, cancel := context.WithCancel(context.Background())
mux := &UDPMux{
ctx: ctx,
cancel: cancel,
socket: socket,
ufragMap: make(map[ufragConnKey]*muxedConnection),
addrMap: make(map[string]*muxedConnection),
queue: make(chan Candidate, 32),
ctx: ctx,
cancel: cancel,
socket: socket,
ufragMap: make(map[ufragConnKey]*muxedConnection),
addrMap: make(map[string]*muxedConnection),
ufragAddrMap: make(map[ufragConnKey][]net.Addr),
queue: make(chan Candidate, 32),
}

return mux
Expand Down Expand Up @@ -130,7 +138,11 @@ func (mux *UDPMux) readLoop() {

n, addr, err := mux.socket.ReadFrom(buf)
if err != nil {
log.Errorf("error reading from socket: %v", err)
if strings.Contains(err.Error(), "use of closed network connection") {
log.Debugf("readLoop exiting: socket %s closed", mux.socket.LocalAddr())
} else {
log.Errorf("error reading from socket %s: %v", mux.socket.LocalAddr(), err)
}
pool.Put(buf)
return
}
Expand All @@ -157,7 +169,7 @@ func (mux *UDPMux) processPacket(buf []byte, addr net.Addr) (processed bool) {
conn, ok := mux.addrMap[addr.String()]
mux.mx.Unlock()
if ok {
if err := conn.Push(buf); err != nil {
if err := conn.Push(buf, addr); err != nil {
log.Debugf("could not push packet: %v", err)
return false
}
Expand Down Expand Up @@ -196,7 +208,7 @@ func (mux *UDPMux) processPacket(buf []byte, addr net.Addr) (processed bool) {
}
}

if err := conn.Push(buf); err != nil {
if err := conn.Push(buf, addr); err != nil {
log.Debugf("could not push packet: %v", err)
return false
}
Expand Down Expand Up @@ -250,9 +262,12 @@ func (mux *UDPMux) RemoveConnByUfrag(ufrag string) {

for _, isIPv6 := range [...]bool{true, false} {
key := ufragConnKey{ufrag: ufrag, isIPv6: isIPv6}
if conn, ok := mux.ufragMap[key]; ok {
if _, ok := mux.ufragMap[key]; ok {
delete(mux.ufragMap, key)
delete(mux.addrMap, conn.RemoteAddr().String())
for _, addr := range mux.ufragAddrMap[key] {
delete(mux.addrMap, addr.String())
}
delete(mux.ufragAddrMap, key)
}
}
}
Expand All @@ -264,12 +279,14 @@ func (mux *UDPMux) getOrCreateConn(ufrag string, isIPv6 bool, _ *UDPMux, addr ne
defer mux.mx.Unlock()

if conn, ok := mux.ufragMap[key]; ok {
mux.addrMap[addr.String()] = conn
mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)
return false, conn
}

conn := newMuxedConnection(mux, func() { mux.RemoveConnByUfrag(ufrag) }, addr)
conn := newMuxedConnection(mux, func() { mux.RemoveConnByUfrag(ufrag) })
mux.ufragMap[key] = conn
mux.addrMap[addr.String()] = conn

mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)
return true, conn
}
Loading

0 comments on commit c2124f7

Please sign in to comment.