-
Notifications
You must be signed in to change notification settings - Fork 0
/
scorekeeper.go
219 lines (183 loc) · 5.07 KB
/
scorekeeper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package scorekeeper
import (
"encoding/json"
"errors"
"fmt"
"github.com/bdharris08/scorekeeper/score"
"github.com/bdharris08/scorekeeper/stat"
"github.com/bdharris08/scorekeeper/store"
)
// ScoreKeeper keeps scores using some store.
// It is the top level object for the ScoreKeeper library.
// It fulfills the requested AddAction and GetStats methods.
type ScoreKeeper struct {
// factory for constructiong registered store types
f score.ScoreFactory
// scoreStore for storing scores in
s store.ScoreStore
// Scores chan will allow clients (through AddAction) to send scores to the worker.
// Constrain scores channel to only receive, ensuring only the worker reads.
// errors can be returned by the included channel, like an addressed envelope in an envelope.
scores chan<- scoreEnvelope
// Requests chan will be used by clients (through GetStats) to request stats from the worker.
// Constrain requests channel to only receive, ensuring only the worker reads.
requests chan<- requestEnvelope
// close(exit) to stop the worker.
quit chan bool
}
// New creates and returns a ScoreKeeper with the provided ScoreStore.
// It starts
func New(st store.ScoreStore, sf score.ScoreFactory) (*ScoreKeeper, error) {
sk := &ScoreKeeper{}
// default to memoryStore if none was provided
if st == nil {
st = &store.MemoryStore{S: map[string]map[string][]score.Score{}}
}
sk.s = st
if len(sf) == 0 {
return nil, fmt.Errorf("scoreTypes must be provided")
}
sk.f = sf
return sk, nil
}
// Start a worker routine to listen on the scores channel.
func (sk *ScoreKeeper) Start() {
sk.quit = make(chan bool)
sk.scores, sk.requests = sk.work()
}
var ErrNotRunning = errors.New("scorekeeper not running. Use Start()")
// Stop the worker goroutine.
func (sk *ScoreKeeper) Stop() error {
if sk.quit != nil {
close(sk.quit)
return nil
}
return ErrNotRunning
}
// ValidScoreType checks for the presence of scoreType in the score factory
func ValidScoreType(sk *ScoreKeeper, scoreType string) bool {
_, ok := sk.f[scoreType]
return ok
}
// result stats from the scorekeeper, or an error
type result struct {
result string
err error
}
// scoreEnvelope allows the caller to send a Score and receive an error from the worker
type scoreEnvelope struct {
score score.Score
err chan error
}
// requestEnvelope encapsulates a request for a type of score and a channel to receive the result
type requestEnvelope struct {
scoreType string
r chan result
}
// work on new scores sent from AddAction.
func (sk *ScoreKeeper) work() (chan<- scoreEnvelope, chan<- requestEnvelope) {
scores := make(chan scoreEnvelope)
requests := make(chan requestEnvelope)
go func() {
for {
select {
case <-sk.quit:
return
case s := <-scores:
err := sk.s.Store(s.score)
s.err <- err
case re := <-requests:
res, err := sk.get(re.scoreType)
re.r <- result{
result: res,
err: err,
}
default:
// loop until quit
}
}
}()
return scores, requests
}
var ErrNoKeeper = errors.New("scorekeeper uninitialized. Use New()")
// AddAction takes a json-encoded string action and keeps it for later.
func (sk *ScoreKeeper) AddAction(scoreType, action string) error {
if sk.s == nil {
return ErrNoKeeper
}
if sk.scores == nil || sk.quit == nil {
return ErrNotRunning
}
if valid := ValidScoreType(sk, scoreType); !valid {
return score.ErrBadScoreType
}
s, err := score.Create(sk.f, scoreType)
if err != nil {
return fmt.Errorf("failed to AddAction of type %s: %w", scoreType, err)
}
if err := s.Read(action); err != nil {
return err
}
errCh := make(chan error)
sk.scores <- scoreEnvelope{
score: s,
err: errCh,
}
err = <-errCh
return err
}
// GetStats computes some statistics about the actions stored in the ScoreKeeper.
// It returns those statistics as a json-encoded string
func (sk *ScoreKeeper) GetStats(scoreType string) (string, error) {
if sk.s == nil {
return "", ErrNoKeeper
}
if sk.scores == nil || sk.quit == nil {
return "", ErrNotRunning
}
if valid := ValidScoreType(sk, scoreType); !valid {
return "", score.ErrBadScoreType
}
// pass a channel to the worker and wait for it to return the result
requestCh := make(chan result)
sk.requests <- requestEnvelope{
scoreType: scoreType,
r: requestCh,
}
res := <-requestCh
return res.result, res.err
}
// get scores from the store and compute averages, returning a json-encoded string
func (sk *ScoreKeeper) get(scoreType string) (string, error) {
if sk.s == nil {
return "", store.ErrNoStore
}
scoreMap, err := sk.s.Retrieve(sk.f, scoreType)
if err != nil {
return "", err
}
if len(scoreMap) == 0 {
return "", stat.ErrNoData
}
avgs := make([]score.AverageTime, 0, len(scoreMap))
for name, scores := range scoreMap {
a := stat.Average{}
res, err := a.Compute(scores)
if err != nil {
return "", err
}
avg, ok := res.(float64)
if !ok {
return "", stat.ErrTypeInvalid
}
avgs = append(avgs, score.AverageTime{
Action: name,
Average: avg,
})
}
b, err := json.Marshal(avgs)
if err != nil {
return "", err
}
return string(b), nil
}