-
Notifications
You must be signed in to change notification settings - Fork 52
/
shard.go
433 lines (375 loc) · 13.9 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
package consumer
import (
"context"
"runtime/pprof"
"sync"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
clientv3 "go.etcd.io/etcd/client/v3"
"go.gazette.dev/core/allocator"
"go.gazette.dev/core/broker/client"
pb "go.gazette.dev/core/broker/protocol"
pc "go.gazette.dev/core/consumer/protocol"
"go.gazette.dev/core/consumer/recoverylog"
"go.gazette.dev/core/keyspace"
"go.gazette.dev/core/message"
)
const (
// Frequency with which current FSM hints are written to Etcd.
storeHintsInterval = 5 * time.Minute
// Default size of the channel used between message decode & consumption.
// This value is conservative, but will tolerate a data delay of up to
// 82ms @ 100K messages / sec without stalling.
// May be overridden in the ShardSpec.
defaultReadChannelSize = 1 << 13 // 8192.
// Default size of the ring buffer used for exactly-once sequencing of
// messages. If upstream transactions are large, then larger ring values
// may reduce the occurrence of replay reads required to re-fetch messages
// which have fallen out of the ring.
// May be overridden in the ShardSpec.
defaultRingBufferSize = 1 << 13 // 8192.
// Maximum interval between the newest and an older producer within a journal,
// before the message sequencer will prune the older producer state.
messageSequencerPruneHorizon = time.Hour * 24
)
type shard struct {
svc *Service // Service which owns the shard.
ctx context.Context // Context tied to shard Assignment lifetime.
cancel context.CancelFunc // Invoked when Assignment is lost.
ajc client.AsyncJournalClient // Async client using shard context.
store Store // Store of the shard.
storeReadyCh chan struct{} // Closed when |store| is ready.
sequencer *message.Sequencer // Sequencer of uncommitted messages.
publisher *message.Publisher // Publisher of messages from this shard.
clock message.Clock // Clock which sequences messages from this shard.
wg sync.WaitGroup // Synchronizes over references to the shard.
primary *client.AsyncOperation // Status of servePrimary.
// recovery of the shard from its log (if applicable).
recovery struct {
log pb.Journal // Point-in-time snapshot of ShardSpec.RecoveryLog().
hints *pc.GetHintsResponse // Fetched hints used for recovery of the shard.
player *recoverylog.Player // Player of shard's recovery log, if used.
recorder *recoverylog.Recorder // Recorder of shard's recovery log.
}
// resolved state as-of the most recent shard transition().
resolved struct {
fqn string // Fully qualified Etcd key of this ShardSpec.
spec *pc.ShardSpec // Last transitioned ShardSpec.
assignment keyspace.KeyValue // Last transitioned shard Assignment.
*sync.RWMutex // Guards |spec| and |assignment| (is actually KS.Mu).
}
// progress as-of the most recent completed transaction.
progress struct {
readThrough pb.Offsets // Offsets read through.
publishAt pb.Offsets // ACKs started to each journal.
signalCh chan struct{} // Signalled on update to progress.
sync.Mutex // Guards |progress|.
}
}
func newShard(svc *Service, item keyspace.KeyValue) *shard {
var spec = item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec)
var ctx, cancel = context.WithCancel(
pprof.WithLabels(context.Background(), pprof.Labels(
"shard", spec.Id.String(),
)))
var s = &shard{
svc: svc,
ctx: ctx,
cancel: cancel,
ajc: client.NewAppendService(ctx, svc.Journals),
storeReadyCh: make(chan struct{}),
primary: client.NewAsyncOperation(),
}
s.resolved.fqn = string(item.Raw.Key)
s.resolved.spec = spec
s.resolved.RWMutex = &svc.State.KS.Mu
// We grab this value only once (since RecoveryLog()'s value may change in
// the future). Elsewhere we use |s.recovery.log|.
if rl := s.resolved.spec.RecoveryLog(); rl != "" {
s.recovery.log = rl
s.recovery.player = recoverylog.NewPlayer()
}
// Initialize |progress|. After completeRecovery(), Resolve() may begin
// returning this shard and/or test against |progress.readThrough|.
// |progress| is updated from synchronous calls to completeRecovery()
// and then runTransactions() made by servePrimary().
s.progress.signalCh = make(chan struct{})
s.progress.readThrough = make(pb.Offsets)
s.progress.publishAt = make(pb.Offsets)
// During completeRecovery() we'll initialize with offsets of the recovered
// checkpoint. However it may be missing journals which are included in Sources.
// Initialize them now with zeros, which may be updated later.
for _, src := range s.resolved.spec.Sources {
s.progress.readThrough[src.Journal] = 0
}
return s
}
func (s *shard) Context() context.Context { return s.ctx }
func (s *shard) FQN() string { return s.resolved.fqn }
func (s *shard) JournalClient() client.AsyncJournalClient { return s.ajc }
func (s *shard) RecoveredHints() *pc.GetHintsResponse { return s.recovery.hints }
func (s *shard) PrimaryLoop() client.OpFuture { return s.primary }
func (s *shard) Spec() *pc.ShardSpec {
s.resolved.RLock()
defer s.resolved.RUnlock()
return s.resolved.spec
}
// Assignment of the ShardSpec to the local ConsumerSpec, which motivates this Replica.
func (s *shard) Assignment() keyspace.KeyValue {
s.resolved.RLock()
defer s.resolved.RUnlock()
return s.resolved.assignment
}
// Progress of this Shard in processing consumer transactions.
func (s *shard) Progress() (readThrough, publishAt pb.Offsets) {
s.progress.Lock()
defer s.progress.Unlock()
return s.progress.readThrough.Copy(), s.progress.publishAt.Copy()
}
// transition is called by Resolver with the current ShardSpec and allocator
// Assignment of the replica, and transitions the Replica from its initial
// state to a standby or primary state. |spec| and |assignment| must always be
// non-zero-valued, and r.Mu.Lock must be held.
var transition = func(s *shard, item, assignment keyspace.KeyValue) {
var (
isInit = s.resolved.assignment.Raw.CreateRevision == 0
isSlot0 = assignment.Decoded.(allocator.Assignment).Slot == 0
wasSlot0 = !isInit && s.resolved.assignment.Decoded.(allocator.Assignment).Slot == 0
)
if isInit && !isSlot0 {
s.wg.Add(1) // Transition initial => standby.
go serveStandby(s)
} else if isInit && isSlot0 {
s.wg.Add(2) // Transition initial => primary.
go serveStandby(s)
go servePrimary(s)
} else if !isInit && isSlot0 && !wasSlot0 {
s.wg.Add(1) // Transition standby => primary.
go servePrimary(s)
}
}
// serveStandby recovers and tails the shard recovery log, until the Replica is
// cancelled or promoted to primary.
func serveStandby(s *shard) (err error) {
pprof.SetGoroutineLabels(s.ctx)
// Defer a trap which logs and updates Etcd status based on exit error.
defer func() {
if err != nil && s.ctx.Err() == nil {
log.WithFields(log.Fields{"err": err, "shard": s.FQN()}).Error("serveStandby failed")
updateStatusWithRetry(s, pc.ReplicaStatus{
Code: pc.ReplicaStatus_FAILED,
Errors: []string{err.Error()},
})
}
s.wg.Done()
}()
// If there is no recovery log, serving as standby is a no-op. Advertise
// immediate ability to transition to PRIMARY.
if s.recovery.log == "" {
updateStatusWithRetry(s, pc.ReplicaStatus{Code: pc.ReplicaStatus_STANDBY})
return nil
}
go func() {
updateStatusWithRetry(s, pc.ReplicaStatus{Code: pc.ReplicaStatus_BACKFILL})
// When the player completes back-fill, advertise that we're tailing the log
// and ready to transition to PRIMARY.
select {
case <-s.Context().Done():
return
case <-s.recovery.player.Tailing():
log.WithFields(log.Fields{
"log": s.recovery.log,
"id": s.Spec().Id,
}).Info("now tailing live log")
updateStatusWithRetry(s, pc.ReplicaStatus{Code: pc.ReplicaStatus_STANDBY})
}
}()
return errors.WithMessage(beginRecovery(s), "beginRecovery")
}
// servePrimary completes playback of the recovery log,
// performs initialization, and runs consumer transactions.
func servePrimary(s *shard) (err error) {
pprof.SetGoroutineLabels(s.ctx)
// Defer a trap which logs and updates Etcd status based on exit error.
defer func() {
s.primary.Resolve(err)
if err != nil && s.ctx.Err() == nil {
log.WithFields(log.Fields{"err": err, "shard": s.FQN()}).Error("servePrimary failed")
var statusErr = err.Error()
if len(statusErr) > MAX_ETCD_ERR_LEN {
statusErr = statusErr[:MAX_ETCD_ERR_LEN] + " ...[truncated]"
}
updateStatusWithRetry(s, pc.ReplicaStatus{
Code: pc.ReplicaStatus_FAILED,
Errors: []string{statusErr},
})
}
s.wg.Done()
}()
log.WithFields(log.Fields{
"id": s.Spec().Id,
"log": s.recovery.log,
}).Info("promoted to primary")
// Complete recovery log playback (if applicable) and restore the last
// transaction checkpoint.
var cp pc.Checkpoint
if cp, err = completeRecovery(s); err != nil {
return errors.WithMessage(err, "completeRecovery")
}
updateStatusWithRetry(s, pc.ReplicaStatus{Code: pc.ReplicaStatus_PRIMARY})
// If the shard store records to a log, arrange to periodically write FSMHints.
var hintsCh <-chan time.Time
if s.recovery.log != "" {
var t = time.NewTicker(storeHintsInterval)
defer t.Stop()
hintsCh = t.C
}
// Run consumer transactions until an error occurs (such as context.Cancelled).
s.publisher = message.NewPublisher(s.ajc, &s.clock)
for {
var chanSize = s.Spec().ReadChannelSize
if chanSize == 0 {
chanSize = defaultReadChannelSize
}
var msgCh = make(chan EnvelopeOrError, chanSize)
if mp, ok := s.svc.App.(MessageProducer); ok {
mp.StartReadingMessages(s, s.store, cp, msgCh)
} else {
startReadingMessages(s, cp, msgCh)
}
var ringSize = s.Spec().RingBufferSize
if ringSize == 0 {
ringSize = defaultRingBufferSize
}
s.sequencer = message.NewSequencer(
pc.FlattenReadThrough(cp),
pc.FlattenProducerStates(cp),
int(ringSize),
)
if err = runTransactions(s, cp, msgCh, hintsCh); err != nil {
return errors.WithMessage(err, "runTransactions")
}
// Restore a checkpoint from the shard Store and re-start processing.
// This may be the same Checkpoint we last committed, but it may not be:
// that's up to the application.
cp, err = s.store.RestoreCheckpoint(s)
if err != nil {
return errors.WithMessage(err, "restart store.RestoreCheckpoint")
}
}
}
// waitAndTearDown waits for all outstanding goroutines which are accessing
// the shard, and for all pending Appends to complete, and then tears down
// the shard Store.
func waitAndTearDown(s *shard, done func()) {
s.wg.Wait()
if s.store != nil {
s.store.Destroy()
}
done()
}
// updateStatus publishes |status| under the Shard Assignment key in a checked
// transaction. An existing ReplicaStatus is reduced into |status| prior to update.
func updateStatus(s *shard, status pc.ReplicaStatus) error {
var asn = s.Assignment()
status.Reduce(asn.Decoded.(allocator.Assignment).AssignmentValue.(*pc.ReplicaStatus))
var key = string(asn.Raw.Key)
var val = status.MarshalString()
var resp, err = s.svc.Etcd.Txn(s.ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", asn.Raw.ModRevision)).
Then(clientv3.OpPut(key, val, clientv3.WithIgnoreLease())).
Commit()
if err == nil && !resp.Succeeded {
err = errors.Errorf("transaction failed")
}
if err == nil {
// Block until the update is observed in the KeySpace.
s.svc.State.KS.Mu.RLock()
_ = s.svc.State.KS.WaitForRevision(s.ctx, resp.Header.Revision)
s.svc.State.KS.Mu.RUnlock()
}
return err
}
// updateStatusWithRetry wraps updateStatus with retry behavior.
func updateStatusWithRetry(s *shard, status pc.ReplicaStatus) {
for attempt := 0; true; attempt++ {
if s.ctx.Err() != nil {
return // Already cancelled.
}
select {
case <-s.ctx.Done():
return // Cancelled while waiting to retry.
case <-time.After(backoff(attempt)):
// Pass.
}
var err = updateStatus(s, status)
if err == nil {
return
}
if attempt != 0 {
log.WithFields(log.Fields{"err": err, "attempt": attempt}).
Warn("failed to advertise Etcd shard status (will retry)")
}
}
}
// EnvelopeOrError composes an Envelope with its read error.
type EnvelopeOrError struct {
message.Envelope
Error error
}
// startReadingMessages from source journals into the provided channel.
func startReadingMessages(s *shard, cp pc.Checkpoint, ch chan<- EnvelopeOrError) {
for _, src := range s.Spec().Sources {
// Lower-bound checkpoint offset to the ShardSpec.Source.MinOffset.
var offset = cp.Sources[src.Journal].ReadThrough
if offset < src.MinOffset {
offset = src.MinOffset
}
var it = message.NewReadUncommittedIter(
client.NewRetryReader(s.ctx, s.ajc, pb.ReadRequest{
Journal: src.Journal,
Offset: offset,
Block: true,
DoNotProxy: !s.ajc.IsNoopRouter(),
}), s.svc.App.NewMessage)
s.wg.Add(1)
go func(it message.Iterator) {
defer s.wg.Done()
var v EnvelopeOrError
for v.Error == nil {
v.Envelope, v.Error = it.Next()
// Attempt to place |v| even if context is cancelled,
// but don't hang if we're cancelled and buffer is full.
select {
case ch <- v:
default:
select {
case ch <- v:
case <-s.ctx.Done():
return
}
}
}
}(it)
}
}
func backoff(attempt int) time.Duration {
// The choices of backoff time reflect that we're usually waiting for the
// cluster to converge on a shared understanding of ownership, and that
// involves a couple of Nagle-like read delays (~30ms) as Etcd watch
// updates are applied by participants.
switch attempt {
case 0, 1:
return time.Millisecond * 50
case 2, 3:
return time.Millisecond * 100
case 4, 5:
return time.Second
default:
return 5 * time.Second
}
}
// Etcd values should generally be small.
// We don't want to push arbitrary length error strings.
const MAX_ETCD_ERR_LEN = 2048