-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
Copy pathwatch.go
177 lines (146 loc) · 4.41 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package state
import (
"fmt"
"sync"
"github.com/armon/go-radix"
)
// Watch is the external interface that's common to all the different flavors.
type Watch interface {
// Wait registers the given channel and calls it back when the watch
// fires.
Wait(notifyCh chan struct{})
// Clear deregisters the given channel.
Clear(notifyCh chan struct{})
}
// FullTableWatch implements a single notify group for a table.
type FullTableWatch struct {
group NotifyGroup
}
// NewFullTableWatch returns a new full table watch.
func NewFullTableWatch() *FullTableWatch {
return &FullTableWatch{}
}
// See Watch.
func (w *FullTableWatch) Wait(notifyCh chan struct{}) {
w.group.Wait(notifyCh)
}
// See Watch.
func (w *FullTableWatch) Clear(notifyCh chan struct{}) {
w.group.Clear(notifyCh)
}
// Notify wakes up all the watchers registered for this table.
func (w *FullTableWatch) Notify() {
w.group.Notify()
}
// DumbWatchManager is a wrapper that allows nested code to arm full table
// watches multiple times but fire them only once. This doesn't have any
// way to clear the state, and it's not thread-safe, so it should be used once
// and thrown away inside the context of a single thread.
type DumbWatchManager struct {
// tableWatches holds the full table watches.
tableWatches map[string]*FullTableWatch
// armed tracks whether the table should be notified.
armed map[string]bool
}
// NewDumbWatchManager returns a new dumb watch manager.
func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchManager {
return &DumbWatchManager{
tableWatches: tableWatches,
armed: make(map[string]bool),
}
}
// Arm arms the given table's watch.
func (d *DumbWatchManager) Arm(table string) {
if _, ok := d.tableWatches[table]; !ok {
panic(fmt.Sprintf("unknown table: %s", table))
}
if _, ok := d.armed[table]; !ok {
d.armed[table] = true
}
}
// Notify fires watches for all the armed tables.
func (d *DumbWatchManager) Notify() {
for table, _ := range d.armed {
d.tableWatches[table].Notify()
}
}
// PrefixWatch maintains a notify group for each prefix, allowing for much more
// fine-grained watches.
type PrefixWatch struct {
// watches has the set of notify groups, organized by prefix.
watches *radix.Tree
// lock protects the watches tree.
lock sync.Mutex
}
// NewPrefixWatch returns a new prefix watch.
func NewPrefixWatch() *PrefixWatch {
return &PrefixWatch{
watches: radix.New(),
}
}
// GetSubwatch returns the notify group for the given prefix.
func (w *PrefixWatch) GetSubwatch(prefix string) *NotifyGroup {
w.lock.Lock()
defer w.lock.Unlock()
if raw, ok := w.watches.Get(prefix); ok {
return raw.(*NotifyGroup)
}
group := &NotifyGroup{}
w.watches.Insert(prefix, group)
return group
}
// Notify wakes up all the watchers associated with the given prefix. If subtree
// is true then we will also notify all the tree under the prefix, such as when
// a key is being deleted.
func (w *PrefixWatch) Notify(prefix string, subtree bool) {
w.lock.Lock()
defer w.lock.Unlock()
var cleanup []string
fn := func(k string, v interface{}) bool {
group := v.(*NotifyGroup)
group.Notify()
if k != "" {
cleanup = append(cleanup, k)
}
return false
}
// Invoke any watcher on the path downward to the key.
w.watches.WalkPath(prefix, fn)
// If the entire prefix may be affected (e.g. delete tree),
// invoke the entire prefix.
if subtree {
w.watches.WalkPrefix(prefix, fn)
}
// Delete the old notify groups.
for i := len(cleanup) - 1; i >= 0; i-- {
w.watches.Delete(cleanup[i])
}
// TODO (slackpad) If a watch never fires then we will never clear it
// out of the tree. The old state store had the same behavior, so this
// has been around for a while. We should probably add a prefix scan
// with a function that clears out any notify groups that are empty.
}
// MultiWatch wraps several watches and allows any of them to trigger the
// caller.
type MultiWatch struct {
// watches holds the list of subordinate watches to forward events to.
watches []Watch
}
// NewMultiWatch returns a new new multi watch over the given set of watches.
func NewMultiWatch(watches ...Watch) *MultiWatch {
return &MultiWatch{
watches: watches,
}
}
// See Watch.
func (w *MultiWatch) Wait(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Wait(notifyCh)
}
}
// See Watch.
func (w *MultiWatch) Clear(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Clear(notifyCh)
}
}