forked from couchbase/go-couchbase
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tap.go
143 lines (127 loc) · 3.39 KB
/
tap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package couchbase
import (
"github.com/couchbase/gomemcached/client"
"github.com/couchbase/goutils/logging"
"sync"
"time"
)
const initialRetryInterval = 1 * time.Second
const maximumRetryInterval = 30 * time.Second
// A TapFeed streams mutation events from a bucket.
//
// Events from the bucket can be read from the channel 'C'. Remember
// to call Close() on it when you're done, unless its channel has
// closed itself already.
type TapFeed struct {
C <-chan memcached.TapEvent
bucket *Bucket
args *memcached.TapArguments
nodeFeeds []*memcached.TapFeed // The TAP feeds of the individual nodes
output chan memcached.TapEvent // Same as C but writeably-typed
wg sync.WaitGroup
quit chan bool
}
// StartTapFeed creates and starts a new Tap feed
func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) {
if args == nil {
defaultArgs := memcached.DefaultTapArguments()
args = &defaultArgs
}
feed := &TapFeed{
bucket: b,
args: args,
output: make(chan memcached.TapEvent, 10),
quit: make(chan bool),
}
go feed.run()
feed.C = feed.output
return feed, nil
}
// Goroutine that runs the feed
func (feed *TapFeed) run() {
retryInterval := initialRetryInterval
bucketOK := true
for {
// Connect to the TAP feed of each server node:
if bucketOK {
killSwitch, err := feed.connectToNodes()
if err == nil {
// Run until one of the sub-feeds fails:
select {
case <-killSwitch:
case <-feed.quit:
return
}
feed.closeNodeFeeds()
retryInterval = initialRetryInterval
}
}
// On error, try to refresh the bucket in case the list of nodes changed:
logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v",
feed.bucket.Name, retryInterval)
err := feed.bucket.Refresh()
bucketOK = err == nil
select {
case <-time.After(retryInterval):
case <-feed.quit:
return
}
if retryInterval *= 2; retryInterval > maximumRetryInterval {
retryInterval = maximumRetryInterval
}
}
}
func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) {
killSwitch = make(chan bool)
for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
var singleFeed *memcached.TapFeed
singleFeed, err = serverConn.StartTapFeed(feed.args)
if err != nil {
logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err)
feed.closeNodeFeeds()
return
}
feed.nodeFeeds = append(feed.nodeFeeds, singleFeed)
go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host)
feed.wg.Add(1)
}
return
}
// Goroutine that forwards Tap events from a single node's feed to the aggregate feed.
func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) {
defer feed.wg.Done()
for {
select {
case event, ok := <-singleFeed.C:
if !ok {
if singleFeed.Error != nil {
logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error)
}
killSwitch <- true
return
}
feed.output <- event
case <-feed.quit:
return
}
}
}
func (feed *TapFeed) closeNodeFeeds() {
for _, f := range feed.nodeFeeds {
f.Close()
}
feed.nodeFeeds = nil
}
// Close a Tap feed.
func (feed *TapFeed) Close() error {
select {
case <-feed.quit:
return nil
default:
}
feed.closeNodeFeeds()
close(feed.quit)
feed.wg.Wait()
close(feed.output)
return nil
}