@@ -18,6 +18,7 @@ package discoverymanager
1818import (
1919 "fmt"
2020 "sync"
21+ "time"
2122
2223 "github.com/arduino/arduino-cli/arduino/discovery"
2324 "github.com/arduino/arduino-cli/i18n"
@@ -83,7 +84,12 @@ func (dm *DiscoveryManager) Start() {
8384 return
8485 }
8586
86- go dm .feeder ()
87+ go func () {
88+ // Feed all watchers with data coming from the discoveries
89+ for ev := range dm .feed {
90+ dm .feedEvent (ev )
91+ }
92+ }()
8793
8894 var wg sync.WaitGroup
8995 for _ , d := range dm .discoveries {
@@ -136,13 +142,13 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
136142 dm .Start ()
137143
138144 watcher := & PortWatcher {
139- feed : make (chan * discovery.Event ),
145+ feed : make (chan * discovery.Event , 10 ),
140146 }
141147 watcher .closeCB = func () {
142148 dm .watchersMutex .Lock ()
143149 delete (dm .watchers , watcher )
144- dm .watchersMutex .Unlock ()
145150 close (watcher .feed )
151+ dm .watchersMutex .Unlock ()
146152 }
147153 go func () {
148154 dm .watchersMutex .Lock ()
@@ -182,44 +188,43 @@ func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (dis
182188 return nil
183189}
184190
185- func (dm * DiscoveryManager ) feeder () {
186- // Feed all watchers with data coming from the discoveries
187- for ev := range dm .feed {
188- dm .watchersMutex .Lock ()
189- for watcher := range dm .watchers {
190- select {
191- case watcher .feed <- ev :
192- // OK
193- default :
194- // If the watcher is not able to process event fast enough
195- // remove the watcher from the list of watchers
196- go watcher .Close ()
197- }
191+ func (dm * DiscoveryManager ) feedEvent (ev * discovery.Event ) {
192+ dm .watchersMutex .Lock ()
193+ defer dm .watchersMutex .Unlock ()
194+
195+ if ev .Type == "stop" {
196+ // Remove all the cached events for the terminating discovery
197+ delete (dm .watchersCache , ev .DiscoveryID )
198+ return
199+ }
200+
201+ // Send the event to all watchers
202+ for watcher := range dm .watchers {
203+ select {
204+ case watcher .feed <- ev :
205+ // OK
206+ case <- time .After (time .Millisecond * 500 ):
207+ // If the watcher is not able to process event fast enough
208+ // remove the watcher from the list of watchers
209+ logrus .Info ("Watcher is not able to process events fast enough, removing it from the list of watchers" )
210+ delete (dm .watchers , watcher )
198211 }
199- dm .cacheEvent (ev )
200- dm .watchersMutex .Unlock ()
201212 }
202- }
203213
204- func ( dm * DiscoveryManager ) cacheEvent ( ev * discovery. Event ) {
214+ // Cache the event for the discovery
205215 cache := dm .watchersCache [ev .DiscoveryID ]
206216 if cache == nil {
207217 cache = map [string ]* discovery.Event {}
208218 dm .watchersCache [ev .DiscoveryID ] = cache
209219 }
210-
211220 eventID := ev .Port .Address + "|" + ev .Port .Protocol
212221 switch ev .Type {
213222 case "add" :
214223 cache [eventID ] = ev
215224 case "remove" :
216225 delete (cache , eventID )
217- case "quit" :
218- // Remove all the events for this discovery
219- delete (dm .watchersCache , ev .DiscoveryID )
220226 default :
221227 logrus .Errorf ("Unhandled event from discovery: %s" , ev .Type )
222- return
223228 }
224229}
225230
0 commit comments