diff --git a/pkg/sslnet/multicast_receiver.go b/pkg/sslnet/multicast_receiver.go index 626269c..01dbb04 100644 --- a/pkg/sslnet/multicast_receiver.go +++ b/pkg/sslnet/multicast_receiver.go @@ -11,6 +11,8 @@ const maxDatagramSize = 8192 type MulticastReceiver struct { activeIfis map[string]bool + connections []*net.UDPConn + running bool consumer func([]byte) mutex sync.Mutex SkipInterfaces []string @@ -24,11 +26,23 @@ func NewMulticastReceiver(consumer func([]byte)) (r *MulticastReceiver) { } func (r *MulticastReceiver) Start(multicastAddress string) { - go r.Receive(multicastAddress) + r.running = true + go r.receive(multicastAddress) } -func (r *MulticastReceiver) Receive(multicastAddress string) { - for { +func (r *MulticastReceiver) Stop() { + r.mutex.Lock() + defer r.mutex.Unlock() + r.running = false + for _, c := range r.connections { + if err := c.Close(); err != nil { + log.Println("Could not close connection: ", err) + } + } +} + +func (r *MulticastReceiver) receive(multicastAddress string) { + for r.isRunning() { ifis, _ := net.Interfaces() for _, ifi := range ifis { if ifi.Flags&net.FlagMulticast == 0 || // No multicast support @@ -46,6 +60,12 @@ func (r *MulticastReceiver) Receive(multicastAddress string) { } } +func (r *MulticastReceiver) isRunning() bool { + r.mutex.Lock() + defer r.mutex.Unlock() + return r.running +} + func (r *MulticastReceiver) skipInterface(ifiName string) bool { for _, skipIfi := range r.SkipInterfaces { if skipIfi == ifiName { @@ -67,13 +87,14 @@ func (r *MulticastReceiver) receiveOnInterface(multicastAddress string, ifi net. log.Printf("Could not listen at %v: %v", multicastAddress, err) return } + if err := listener.SetReadBuffer(maxDatagramSize); err != nil { log.Println("Could not set read buffer: ", err) } r.mutex.Lock() + r.connections = append(r.connections, listener) r.activeIfis[ifi.Name] = true - defer delete(r.activeIfis, ifi.Name) r.mutex.Unlock() log.Printf("Listening on %s (%s)", multicastAddress, ifi.Name) @@ -94,4 +115,12 @@ func (r *MulticastReceiver) receiveOnInterface(multicastAddress string, ifi net. if err := listener.Close(); err != nil { log.Println("Could not close listener: ", err) } + r.mutex.Lock() + delete(r.activeIfis, ifi.Name) + for i, c := range r.connections { + if c == listener { + r.connections = append(r.connections[:i], r.connections[i+1:]...) + } + } + r.mutex.Unlock() }