forked from alicebob/miniredis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
138 lines (114 loc) · 2.64 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Basic stream implementation.
package miniredis
import (
"errors"
"fmt"
"math"
"strconv"
"strings"
"time"
)
var (
errInvalidStreamValue = errors.New("stream id is not bigger than the top item")
)
type streamKey []StreamEntry
// A StreamEntry is an entry in a stream. The ID is always of the form
// "123-123". Values should have an even length of entries.
type StreamEntry struct {
ID string
Values []string
}
func (ss *streamKey) generateID(now time.Time) string {
ts := uint64(now.UnixNano()) / 1000000
lastID := ss.lastID()
next := fmt.Sprintf("%d-%d", ts, 0)
if streamCmp(lastID, next) == -1 {
return next
}
last := parseStreamID(lastID)
return fmt.Sprintf("%d-%d", last[0], last[1]+1)
}
func (ss *streamKey) lastID() string {
if len(*ss) == 0 {
return "0-0"
}
return (*ss)[len(*ss)-1].ID
}
func parseStreamID(id string) [2]uint64 {
var res [2]uint64
parts := strings.SplitN(id, "-", 2)
res[0], _ = strconv.ParseUint(parts[0], 10, 64)
if len(parts) == 2 {
res[1], _ = strconv.ParseUint(parts[1], 10, 64)
}
return res
}
// compares two stream IDs (of the full format: "123-123"). Returns: -1, 0, 1
func streamCmp(a, b string) int {
ap := parseStreamID(a)
bp := parseStreamID(b)
if ap[0] < bp[0] {
return -1
}
if ap[0] > bp[0] {
return 1
}
if ap[1] < bp[1] {
return -1
}
if ap[1] > bp[1] {
return 1
}
return 0
}
// formatStreamID makes a full id ("42-42") out of a partial one ("42")
func formatStreamID(id string) (string, error) {
var ts [2]uint64
parts := strings.SplitN(id, "-", 2)
if len(parts) > 0 {
p, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return "", errInvalidEntryID
}
ts[0] = p
}
if len(parts) > 1 {
p, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return "", errInvalidEntryID
}
ts[1] = p
}
return fmt.Sprintf("%d-%d", ts[0], ts[1]), nil
}
func formatStreamRangeBound(id string, start bool, reverse bool) (string, error) {
if id == "-" {
return "0-0", nil
}
if id == "+" {
return fmt.Sprintf("%d-%d", uint64(math.MaxUint64), uint64(math.MaxUint64)), nil
}
if id == "0" {
return "0-0", nil
}
parts := strings.Split(id, "-")
if len(parts) == 2 {
return formatStreamID(id)
}
// Incomplete IDs case
ts, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return "", errInvalidEntryID
}
if (!start && !reverse) || (start && reverse) {
return fmt.Sprintf("%d-%d", ts, uint64(math.MaxUint64)), nil
}
return fmt.Sprintf("%d-%d", ts, 0), nil
}
func reversedStreamEntries(o []StreamEntry) []StreamEntry {
newStream := make([]StreamEntry, len(o))
for i, e := range o {
newStream[len(o)-i-1] = e
}
return newStream
}