-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathchandy_lamport.go
114 lines (99 loc) · 3.76 KB
/
chandy_lamport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package main
import (
"encoding/json"
"fmt"
"github.com/opentracing/opentracing-go"
)
const (
// MtCLSnapshot represents a chandy-lamport snapshot message
MtCLSnapshot = MsgType("cl_snapshot")
clSnapshots = "cl_snapshots"
)
// InitiateCLSnapshot kicks off a chandy-lamport snapshot
// This Peer that's initiating the process:
// Saves its own local state
// Sends a snapshot request message bearing a snapshot token to all other processes
// A process receiving the snapshot token for the first time on any message:
// Sends the observer process its own saved state
// Attaches the snapshot token to all subsequent messages (to help propagate the snapshot token)
// When a process that has already received the snapshot token receives a message that does not bear the snapshot token,
// this process will forward that message to the observer process.
// This message was obviously sent before the snapshot “cut off” (as it does not bear a snapshot token and thus must
// have come from before the snapshot token was sent out) and needs to be included in the snapshot.
func InitiateCLSnapshot(p *Peer) {
log.Infof("%s initiating CL snapshot", p.ID)
snapshotToken := NewMessageID()
span := opentracing.StartSpan("cl_snapshot")
p.SetState(clSpanStateKey(snapshotToken), span)
TakeCLSnapshot(p, snapshotToken, span)
}
// TakeCLSnapshot checks if a snapshot has been taken,
// if not it propagates the snapshot token to all connected peers
// and takes a local snapshot
func TakeCLSnapshot(p *Peer, snapshotToken string, span opentracing.Span) {
// get snapshots map from state
snapshots := map[string]interface{}{}
if val := p.GetState(clSnapshots); val != nil {
if sl, ok := val.(map[string]interface{}); ok {
snapshots = sl
}
}
// check if snapshot is empty
if snapshots[snapshotToken] == nil {
log.Infof("%s recording snapshot. forwarding to %d peers", p.ID, len(p.Peerstore.Peers())-1)
// record snapshot
ss := LocalCLSnapshot(p)
snapshots[snapshotToken] = ss
p.SetState(clSnapshots, snapshots)
span.SetTag("state", ss)
// send marker to each connected peer
for _, peer := range p.Peerstore.Peers() {
if peer != p.ID {
p.SendMessage(peer, NewMessage(string(p.ID), MtCLSnapshot, snapshotToken, span))
}
}
}
}
// LocalCLSnapshot generates a snapshot of a peer's local state
func LocalCLSnapshot(p *Peer) string {
conns := p.Host.Network().Conns()
connPeers := make([]string, len(conns))
for i, c := range conns {
connPeers[i] = c.RemotePeer().Pretty()
}
state := map[string]interface{}{
"peerID": p.ID.Pretty(),
"pet": p.GetState("pet"),
"conns": connPeers,
}
data, err := json.Marshal(state)
if err != nil {
log.Errorf("error marshaling state data to json: %s", err.Error())
}
return string(data)
}
// ChandyLamportHandler handles messages of type MtCLSnapshot
func ChandyLamportHandler(p *Peer, ws *WrappedStream, msg Message) (hangup bool) {
if snapshotToken, ok := msg.Payload.(string); ok {
var span opentracing.Span
wireContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, msg.Tracing)
if err != nil {
// If for whatever reason a span isn't found, go ahead an start a new root span
span = opentracing.StartSpan("snapshot")
} else {
span = opentracing.StartSpan("snapshot", opentracing.ChildOf(wireContext))
}
TakeCLSnapshot(p, snapshotToken, span)
span.Finish()
// if we initiated this span, close it off
if initspan, ok := p.GetState(clSpanStateKey(snapshotToken)).(opentracing.Span); ok {
// log.Infof("%s finalized snapshot: %s", p.ID, snapshotToken)
initspan.Finish()
}
}
return true
}
// clSpanStateKey is where we keep spans in state for initiated snapshots
func clSpanStateKey(snapshotToken string) string {
return fmt.Sprintf("initiatedSnapshot.%s", snapshotToken)
}