-
Notifications
You must be signed in to change notification settings - Fork 1
/
sked.go
211 lines (196 loc) · 4.99 KB
/
sked.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package sked
import (
"bytes"
"errors"
"sync"
"time"
"github.com/boltdb/bolt"
)
// Defines global options.
const (
// DefaultBucket is the default bucket name to use if not provided.
DefaultBucket = "sked"
// DefaultCheckIntervalMS is the interval to poll the db for new events.
DefaultCheckIntervalMS = 500
// DefaultDBPath is the file path to the db.
DefaultDBPath = "sked.db"
// DefaultOpenTimeoutMS is the time to wait to open a db.
DefaultOpenTimeoutMS = 1000
// DefaultEventChannelSize is the amount of events to buffer on the events channel.
DefaultEventChannelSize = 1
// DefaultExpiresAfter is the number of seconds before an event is expired.
DefaultExpiresAfter = 1800
)
var (
// ErrNoMatch for cases where there was no match found.
ErrNoMatch = errors.New("no match found")
)
// Sked ...
type Sked struct {
DB *bolt.DB
done chan struct{}
events chan []byte
logger Logger
options *Options
rw *sync.Mutex // boltdb only allows 1 read-write transaction at a time
}
// Options are used to initialize a new Sked.
type Options struct {
// Bucket is the bucket name to use to find events.
Bucket string
// CheckIntervalMS is the interval in milliseconds that the poller
// should check for scheduled events. Defaults to DefaultCheckIntervalMS.
CheckIntervalMS int
// OpenTimeoutMS is the timeout a new Sked will take opening a DB in ms.
OpenTimeoutMS int
// DBPath is the path to store the database file.
DBPath string
// EventChannelSize is the size of the event channel to buffer before blocking.
EventChannelSize int
}
// Result is used for GetAll operations.
type Result struct {
Key *Key `json:"key"`
Event []byte `json:"event"`
}
// New will initialize a new Sked scheduler and start a poller for events.
func New(o *Options, l Logger) (*Sked, error) {
if o == nil {
o = &Options{}
}
s := &Sked{
done: make(chan struct{}),
logger: l,
options: o,
rw: &sync.Mutex{},
}
if o.Bucket == "" {
o.Bucket = DefaultBucket
}
if o.CheckIntervalMS == 0 {
o.CheckIntervalMS = DefaultCheckIntervalMS
}
if o.OpenTimeoutMS == 0 {
o.OpenTimeoutMS = DefaultOpenTimeoutMS
}
if o.DBPath == "" {
o.DBPath = DefaultDBPath
}
if o.EventChannelSize == 0 {
o.EventChannelSize = DefaultEventChannelSize
}
s.events = make(chan []byte, o.EventChannelSize)
s.logger.Printf("config: %+v", o)
db, err := bolt.Open(o.DBPath, 0600, &bolt.Options{Timeout: time.Duration(o.OpenTimeoutMS) * time.Millisecond})
if err != nil {
return nil, err
}
s.DB = db
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(o.Bucket))
return err
})
if err != nil {
return nil, err
}
go s.poll()
return s, nil
}
// Add will add a scheduled event. If expiresAfter is not proviced a default will be used.
func (s *Sked) Add(t time.Time, event []byte, tag string, expiresAfter int) (uint64, error) {
s.rw.Lock()
defer s.rw.Unlock()
var id uint64
err := s.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(s.options.Bucket))
newid, _ := b.NextSequence()
if expiresAfter == 0 {
expiresAfter = DefaultExpiresAfter
}
keystr := NewKeyString(t, newid, tag, expiresAfter)
err := b.Put([]byte(keystr), event)
id = newid
return err
})
return id, err
}
// Delete will remove a scheduled event. You can use id or tag, or both.
func (s *Sked) Delete(id uint64, tag string) error {
s.rw.Lock()
defer s.rw.Unlock()
err := s.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(s.options.Bucket))
cur := b.Cursor()
key := &Key{ID: id, Tag: tag}
matchBytes, err := key.MatchKey()
if err != nil {
return err
}
for k, _ := cur.First(); k != nil; k, _ = cur.Next() {
if bytes.Contains(k, matchBytes) {
b.Delete(k)
}
}
return nil
})
return err
}
// Get an event by event id.
func (s *Sked) Get(id uint64) (*Result, error) {
var (
result *Result
err error
)
err = s.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(s.options.Bucket))
cur := b.Cursor()
key := &Key{ID: id}
matchBytes, err := key.MatchKey()
if err != nil {
return err
}
for k, v := cur.First(); k != nil; k, v = cur.Next() {
if bytes.Contains(k, matchBytes) {
r := &Result{Event: v}
r.Key, err = ParseKey(string(k))
result = r
return err
}
}
return nil
})
return result, err
}
// GetAll will get all events by tag.
func (s *Sked) GetAll(tag string) ([]*Result, error) {
var (
results = []*Result{}
err error
)
err = s.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(s.options.Bucket))
cur := b.Cursor()
key := &Key{Tag: tag}
matchBytes, err := key.MatchKey()
if err != nil {
return err
}
for k, v := cur.First(); k != nil; k, v = cur.Next() {
if bytes.Contains(k, matchBytes) {
r := &Result{Event: v}
r.Key, err = ParseKey(string(k))
if err != nil {
return err
}
results = append(results, r)
}
}
return nil
})
return results, err
}
// Close will close the DB.
func (s *Sked) Close() error {
s.done <- struct{}{}
return s.DB.Close()
}