-
Notifications
You must be signed in to change notification settings - Fork 0
/
snapshot_sync.go
54 lines (47 loc) · 1.19 KB
/
snapshot_sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Copyright (c) 2019 Meng Huang (mhboy@outlook.com)
// This package is licensed under a MIT license that can be found in the LICENSE file.
package raft
import (
"sync/atomic"
"time"
)
// SyncType represents a sync type.
type SyncType struct {
Seconds int
Changes int
}
type snapshotSync struct {
stateMachine *stateMachine
ticker *time.Ticker
syncType *SyncType
done chan struct{}
closed int32
}
func newSnapshotSync(s *stateMachine, syncType *SyncType) *snapshotSync {
return &snapshotSync{
stateMachine: s,
ticker: time.NewTicker(time.Second * time.Duration(syncType.Seconds)),
syncType: syncType,
done: make(chan struct{}, 1),
}
}
func (s *snapshotSync) run() {
for {
select {
case <-s.ticker.C:
changes := s.stateMachine.lastApplied - s.stateMachine.snapshotReadWriter.lastIncludedIndex.ID()
if changes >= uint64(s.syncType.Changes) {
s.stateMachine.SaveSnapshot()
}
case <-s.done:
s.ticker.Stop()
//logger.Tracef("snapshotSync.run Seconds-%d, Changes-%d", s.syncType.Seconds, s.syncType.Changes)
return
}
}
}
func (s *snapshotSync) Stop() {
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
close(s.done)
}
}