-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgcounter.go
101 lines (82 loc) · 1.91 KB
/
gcounter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Package crdt provides Convergent and Commutative Replicated Data Types
package crdt
import (
"strconv"
"github.com/koding/redis"
)
// GCounter is a grow-only counter (inspired by vector clocks) in which only
// increment and merge are possible. Divergent histories are resolved by taking
// the maximum count for the counter. The value of the counter is the sum of
// all counts.
type GCounter struct {
ccrdt *CRDT
key string
}
// NewGCounter creates a new GCounter
func (c *CRDT) NewGCounter(key string) *GCounter {
return &GCounter{
ccrdt: c,
key: key,
}
}
// Add adds item to the GCounter with a given delta
func (g *GCounter) Add(delta int64) error {
errCount := 0
var lastErr error
// add goroutine support
for _, c := range g.ccrdt.sessions.All() {
// TODO we can do read-repair here
// redis returns lastest value
_, err := c.IncrBy(g.key, delta)
if err != nil {
lastErr = err
errCount++
}
}
// at least we have one success
if errCount != g.ccrdt.sessions.Count() {
return nil
}
return lastErr
}
// Merge returns the sum of all the actors
func (g *GCounter) Merge() (int64, error) {
var res int64
var repairNeeded bool
values := make(map[*redis.RedisSession]int64)
for i, c := range g.ccrdt.sessions.All() {
val, err := c.Get(g.key)
if err != nil && err != redis.ErrNil {
// ignore the errors
// return 0, err
}
if val == "" {
val = "0"
}
d, err := strconv.ParseInt(val, 10, 64)
if err != nil {
// ignore the errors
// return 0, err
}
// add data to a temp cache
values[c] = d
// if the `res`is smaller than the current value, previous ones should
// be repaired
if res < d {
// if this is the first operation, ignore the case
if i != 0 {
repairNeeded = true
}
res = d
}
}
if repairNeeded {
for ses, per := range values {
if res == per {
continue
}
ses.IncrBy(g.key, res-per)
}
}
return res, nil
}