Skip to content

Commit

Permalink
block on full
Browse files Browse the repository at this point in the history
  • Loading branch information
calebdoxsey committed Dec 12, 2024
1 parent 276e7dd commit a116bf5
Showing 1 changed file with 17 additions and 72 deletions.
89 changes: 17 additions & 72 deletions tunnel/tunnel_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"net/netip"
"strings"
"sync/atomic"
"time"

"github.com/quic-go/quic-go/http3"
Expand Down Expand Up @@ -129,9 +128,6 @@ type udpSessionManager struct {
handler udpSessionHandler
in chan UDPPacket
out chan UDPPacket

droppedInboundPackets atomic.Int64
droppedOutboundPackets atomic.Int64
}

func newUDPSessionManager(conn *net.UDPConn, handler udpSessionHandler) *udpSessionManager {
Expand All @@ -149,7 +145,6 @@ func (mgr *udpSessionManager) run(ctx context.Context) error {
eg.Go(func() error { return mgr.read(ectx) })
eg.Go(func() error { return mgr.dispatch(ectx) })
eg.Go(func() error { return mgr.write(ectx) })
eg.Go(func() error { return mgr.report(ectx) })
err := eg.Wait()
log.Ctx(ctx).Error().Err(err).Msg("stopped udp session manager")
return err
Expand Down Expand Up @@ -188,9 +183,6 @@ func (mgr *udpSessionManager) dispatch(ctx context.Context) error {
for {
select {
case <-ctx.Done():
if dropped := dropAll(mgr.in); dropped > 0 {
log.Ctx(ctx).Error().Int("count", dropped).Msg("dropped inbound packets")
}
return context.Cause(ctx)
case packet := <-mgr.in:
s, ok := sessions[packet.Addr]
Expand All @@ -205,14 +197,9 @@ func (mgr *udpSessionManager) dispatch(ctx context.Context) error {
}()
sessions[packet.Addr] = s
}
if dropped := sendOrDrop(s.in, packet); dropped > 0 {
mgr.droppedInboundPackets.Add(int64(dropped))
}
s.HandlePacket(ctx, packet)
case s := <-stopped:
delete(sessions, s.addr)
if dropped := dropAll(s.in); dropped > 0 {
mgr.droppedInboundPackets.Add(int64(dropped))
}
}
}
}
Expand All @@ -225,9 +212,6 @@ func (mgr *udpSessionManager) write(ctx context.Context) error {
var packet UDPPacket
select {
case <-ctx.Done():
if dropped := dropAll(mgr.out); dropped > 0 {
mgr.droppedOutboundPackets.Add(int64(dropped))
}
return context.Cause(ctx)
case packet = <-mgr.out:
}
Expand All @@ -245,38 +229,30 @@ func (mgr *udpSessionManager) write(ctx context.Context) error {
}
}

func (mgr *udpSessionManager) report(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case <-ticker.C:
}

inbound, outbound := mgr.droppedInboundPackets.Swap(0), mgr.droppedOutboundPackets.Swap(0)
if inbound > 0 || outbound > 0 {
log.Ctx(ctx).Error().
Int64("inbound", inbound).
Int64("outbound", outbound).
Msg("dropped packets")
}
}
}

type udpSession struct {
mgr *udpSessionManager
addr netip.AddrPort
in chan UDPPacket

cancel context.CancelCauseFunc
cancelCtx context.Context
}

func newUDPSession(mgr *udpSessionManager, addr netip.AddrPort) *udpSession {
return &udpSession{
s := &udpSession{
mgr: mgr,
addr: addr,
in: make(chan UDPPacket, 128),
in: make(chan UDPPacket, 1),
}
s.cancelCtx, s.cancel = context.WithCancelCause(context.Background())
return s
}

func (s *udpSession) HandlePacket(ctx context.Context, packet UDPPacket) {
select {
case <-ctx.Done():
case <-s.cancelCtx.Done():
case s.in <- packet:
}
}

Expand Down Expand Up @@ -306,41 +282,10 @@ func (s *udpSession) run(ctx context.Context) error {
log.Ctx(ctx).Info().Msg("starting udp session")
err := s.mgr.handler(ctx, s)
log.Ctx(ctx).Error().Err(err).Msg("stopped udp session")
s.cancel(err)
return err
}

func dropAll[T any](ch chan T) (dropped int) {
for {
select {
case <-ch:
dropped++
default:
return dropped
}
}
}

func sendOrDrop[T any](ch chan T, packet T) (dropped int) {
for {
select {
case ch <- packet:
return dropped
default:
}

if cap(ch) == 0 {
dropped++
return dropped
}

select {
case <-ch:
dropped++
default:
}
}
}

func readUDPCapsuleDatagram(
src quicvarint.Reader,
) ([]byte, error) {
Expand Down

0 comments on commit a116bf5

Please sign in to comment.