forked from probe-lab/zikade
-
Notifications
You must be signed in to change notification settings - Fork 0
/
notifee.go
94 lines (82 loc) · 3.56 KB
/
notifee.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package zikade
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/plprobelab/zikade/kadt"
)
// networkEventsSubscription registers a subscription on the libp2p event bus
// for several events. The DHT uses these events for various tasks like routing
// table or DHT mode updates.
func (d *DHT) networkEventsSubscription() (event.Subscription, error) {
evts := []interface{}{
// register for event bus notifications of when peers successfully
// complete identification in order to update the routing table.
new(event.EvtPeerIdentificationCompleted),
// register for event bus protocol ID changes in order to update the
// routing table. If a peer stops supporting the DHT protocol, we want
// to remove it from the routing table.
new(event.EvtPeerProtocolsUpdated),
// register for event bus notifications for when our local
// address/addresses change, so we can advertise those to the network
new(event.EvtLocalAddressesUpdated),
// we want to know when we are disconnecting from other peers.
new(event.EvtPeerConnectednessChanged),
}
// register for event bus local reachability changes in order to trigger
// switching between client and server modes. We only register for these
// events if the DHT is operating in ModeOptAuto{Server,Client}.
if d.cfg.Mode == ModeOptAutoServer || d.cfg.Mode == ModeOptAutoClient {
evts = append(evts, new(event.EvtLocalReachabilityChanged))
}
return d.host.EventBus().Subscribe(evts)
}
// consumeNetworkEvents takes an event bus subscription and consumes all events
// emitted on that subscription. It calls out to various event handlers.
func (d *DHT) consumeNetworkEvents(sub event.Subscription) {
for evt := range sub.Out() {
switch evt := evt.(type) {
case event.EvtLocalReachabilityChanged:
d.onEvtLocalReachabilityChanged(evt)
case event.EvtLocalAddressesUpdated:
case event.EvtPeerProtocolsUpdated:
case event.EvtPeerIdentificationCompleted:
d.onEvtPeerIdentificationCompleted(evt)
case event.EvtPeerConnectednessChanged:
default:
d.log.Warn("unknown libp2p event", "type", fmt.Sprintf("%T", evt))
}
}
}
// onEvtLocalReachabilityChanged handles reachability change events and sets
// the DHTs mode accordingly. We only subscribe to these events if the DHT
// operates in an automatic mode. This means we can directly change to
// client/server mode based on the reachability event and don't need to check
// if the configuration constrains us to a specific mode.
func (d *DHT) onEvtLocalReachabilityChanged(evt event.EvtLocalReachabilityChanged) {
d.log.With("reachability", evt.Reachability.String()).Debug("handling reachability changed event")
// set DHT mode based on new reachability
switch evt.Reachability {
case network.ReachabilityPrivate:
d.setClientMode()
case network.ReachabilityPublic:
d.setServerMode()
case network.ReachabilityUnknown:
if d.cfg.Mode == ModeOptAutoClient {
d.setClientMode()
} else if d.cfg.Mode == ModeOptAutoServer {
d.setServerMode()
} else {
// we should only be subscribed to EvtLocalReachabilityChanged events
// if the DHT is configured to operate in any auto mode.
d.log.With("mode", d.cfg.Mode).Warn("unexpected mode configuration")
}
default:
d.log.With("reachability", evt.Reachability).Warn("unknown reachability type")
}
}
func (d *DHT) onEvtPeerIdentificationCompleted(evt event.EvtPeerIdentificationCompleted) {
// tell the coordinator about a new candidate for inclusion in the routing table
d.kad.AddNodes(context.Background(), []kadt.PeerID{kadt.PeerID(evt.Peer)})
}