-
Notifications
You must be signed in to change notification settings - Fork 1
/
backsync.go
156 lines (132 loc) · 4.25 KB
/
backsync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package backsync
import (
"sort"
"strconv"
"time"
"github.com/go-redis/redis"
)
type Manager struct {
redisClient *redis.Client
//storing the items
itemSetName string
//storing the key-value pairs of items-->createTime
updateTimeHashName string
//storing the items under backsync to mainDb, the values is the start syncTime
inuseTimeHashName string
//for a item returned by Top(), it will be "invisible" to other backSync Worker for <timeLimit> second
//if there is no deletion of the item within <timeLimit> seconds, the item will be pickup by another worker
timeLimit time.Duration
//In Top(), only the item with > <minIdleTime> minIdleTime, can be picked up as candidate
minIdleTime time.Duration
}
func New(r *redis.Client, itemSetName, updateTimeHashName, inuseTimeHashName string, timeLimit, minIdleTime time.Duration) *Manager {
return &Manager{
redisClient: r,
itemSetName: itemSetName,
updateTimeHashName: updateTimeHashName,
inuseTimeHashName: inuseTimeHashName,
timeLimit: timeLimit,
minIdleTime: minIdleTime,
}
}
func (m *Manager) Add(item string) error {
ts := time.Now().Unix()
script := `
redis.call('SADD', KEYS[1], KEYS[3])
redis.call('HSET', KEYS[2], KEYS[3], KEYS[4])
return 0`
return m.redisClient.Eval(script, []string{m.itemSetName, m.updateTimeHashName, item, strconv.FormatInt(ts, 10)}).Err()
}
type Candidate struct {
name string
ts int64
}
type CandidateByScore []Candidate
func (a CandidateByScore) Len() int { return len(a) }
func (a CandidateByScore) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a CandidateByScore) Less(i, j int) bool { return a[i].ts < a[j].ts }
func toInt64(val interface{}) int64 {
if s, ok1 := val.(string); ok1 {
if temp, err := strconv.ParseInt(s, 10, 64); err == nil {
return temp
}
}
return 0
}
/*
to probe for 8*n candidate, and return the atmost least-update n items
thus, the Top will return least updated 12.5% records, it is quite good approx
if minIdleSec > 0, then the library will only return record with idle time > minIdleSec
*/
func (m *Manager) Top(n int) (items []string, err error) {
candidateNames, err0 := m.redisClient.SRandMemberN(m.itemSetName, int64(n*8)).Result()
if err0 != nil {
return nil, err0
}
if len(candidateNames) == 0 {
//no records, thus no need to continue
return []string{}, nil
}
inUseTimes, err1 := m.redisClient.HMGet(m.inuseTimeHashName, candidateNames...).Result()
if err1 != nil {
return nil, err1
}
//get back the candidates updateTime
updateTimes, err2 := m.redisClient.HMGet(m.updateTimeHashName, candidateNames...).Result()
if err1 != nil {
return nil, err2
}
//sort the candidates, and remove the inuse candidates
candidates := []Candidate{}
limitTs := time.Now().Add(-1 * m.timeLimit).Unix()
limitIdleTs := time.Now().Add(-1 * m.minIdleTime).Unix()
for i, _ := range candidateNames {
if inuseTs := toInt64(inUseTimes[i]); inuseTs == 0 || inuseTs < limitTs {
//only include the item not pickup already by some backsync worker
candidate := Candidate{
name: candidateNames[i],
ts: toInt64(updateTimes[i]),
}
if candidate.ts < limitIdleTs {
candidates = append(candidates, candidate)
}
}
}
sort.Sort(CandidateByScore(candidates))
//remove the excess candidates
result := []string{}
for _, c := range candidates {
result = append(result, c.name)
}
if len(result) > n {
result = result[:n]
}
//update the inuseHash
if len(result) > 0 {
temp := map[string]interface{}{}
nowTs := strconv.FormatInt(time.Now().Unix(), 10)
for _, itemName := range result {
temp[itemName] = nowTs
}
if err := m.redisClient.HMSet(m.inuseTimeHashName, temp).Err(); err != nil {
return nil, err
}
}
return result, nil
}
func (m *Manager) Delete(items []string) error {
script := `
for i = 4, #KEYS do
local updateTs = redis.call('HGET', KEYS[2], KEYS[i])
local inuseTs = redis.call('HGET', KEYS[3], KEYS[i])
if (updateTs) and (inuseTs) and tonumber(inuseTs) >= tonumber(updateTs) then
redis.call('HDEL', KEYS[2], KEYS[i])
redis.call('SREM', KEYS[1], KEYS[i])
end
redis.call('HDEL', KEYS[3], KEYS[i])
end
return 0
`
params := append([]string{m.itemSetName, m.updateTimeHashName, m.inuseTimeHashName}, items...)
return m.redisClient.Eval(script, params).Err()
}