Skip to content

Commit

Permalink
Merge pull request #48 from suborbital/connor/cache
Browse files Browse the repository at this point in the history
Add cache to Ctx
  • Loading branch information
cohix authored Jan 4, 2021
2 parents b87ca7a + b202891 commit 1008b3c
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 12 deletions.
92 changes: 92 additions & 0 deletions hive/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package hive

import (
"sync"
"time"

"github.com/pkg/errors"
)

// ErrCacheKeyNotFound is returned when a non-existent cache key is requested
var ErrCacheKeyNotFound = errors.New("key not found")

// Cache represents access to a persistent cache
type Cache interface {
Set(key string, val []byte, ttl int) error
Get(key string) ([]byte, error)
Delete(key string) error
}

// memoryCache is a "default" cache implementation for Hive
type memoryCache struct {
values map[string]*uniqueVal

lock sync.RWMutex
}

// this is used to 1) allow pointers and 2) ensure checks for unique values are cheaper (pointer equality)
type uniqueVal struct {
val []byte
}

func newMemoryCache() *memoryCache {
m := &memoryCache{
values: make(map[string]*uniqueVal),
lock: sync.RWMutex{},
}

return m
}

func (m *memoryCache) Set(key string, val []byte, ttl int) error {
m.lock.Lock()
defer m.lock.Unlock()

uVal := &uniqueVal{
val: val,
}

m.values[key] = uVal

if ttl > 0 {
go func() {
<-time.After(time.Second * time.Duration(ttl))

m.lock.Lock()
defer m.lock.Unlock()

currentVal := m.values[key]
if currentVal == uVal {
delete(m.values, key)
}
}()
}

return nil
}

func (m *memoryCache) Get(key string) ([]byte, error) {
m.lock.RLock()
defer m.lock.RUnlock()

uVal, exists := m.values[key]
if !exists {
return nil, ErrCacheKeyNotFound
}

return uVal.val, nil
}

func (m *memoryCache) Delete(key string) error {
m.lock.Lock()
defer m.lock.Unlock()

_, exists := m.values[key]
if !exists {
return nil
}

delete(m.values, key)

return nil
}
85 changes: 85 additions & 0 deletions hive/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package hive

import (
"testing"
"time"

"github.com/pkg/errors"
)

type setTester struct{}

func (c *setTester) Run(job Job, ctx *Ctx) (interface{}, error) {
data := job.Bytes()

if err := ctx.Cache.Set("important", data, 1); err != nil {
return nil, err
}

return nil, nil
}

// OnChange runs on worker changes
func (c *setTester) OnChange(_ ChangeEvent) error {
return nil
}

type getTester struct{}

func (c *getTester) Run(job Job, ctx *Ctx) (interface{}, error) {
key := job.String()

val, err := ctx.Cache.Get(key)
if err != nil {
return nil, err
}

return string(val), nil
}

// OnChange runs on worker changes
func (c *getTester) OnChange(_ ChangeEvent) error {
return nil
}

func TestCacheGetSet(t *testing.T) {
h := New()
h.Handle("set", &setTester{})
h.Handle("get", &getTester{})

_, err := h.Do(NewJob("set", "very important information")).Then()
if err != nil {
t.Error(errors.Wrap(err, "failed to set"))
return
}

val, err := h.Do(NewJob("get", "important")).Then()
if err != nil {
t.Error(errors.Wrap(err, "get job failed"))
return
}

if val.(string) != "very important information" {
t.Error("result did not match expected 'very important information': ", val.(string))
}
}

func TestCacheGetSetWithTTL(t *testing.T) {
h := New()
h.Handle("set", &setTester{})
h.Handle("get", &getTester{})

_, err := h.Do(NewJob("set", "very important information")).Then()
if err != nil {
t.Error(errors.Wrap(err, "failed to set"))
return
}

<-time.After(time.Second * 2)

_, err = h.Do(NewJob("get", "important")).Then()
if err == nil {
t.Error("should have errored, did not")
return
}
}
10 changes: 10 additions & 0 deletions hive/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,19 @@ var errDoFuncNotSet = errors.New("do func has not been set")

// Ctx is a Job context
type Ctx struct {
Cache Cache
doFunc DoFunc
}

func newCtx(cache Cache, doFunc DoFunc) *Ctx {
c := &Ctx{
Cache: cache,
doFunc: doFunc,
}

return c
}

// Do runs a new job
func (c *Ctx) Do(job Job) *Result {
if c.doFunc == nil {
Expand Down
4 changes: 3 additions & 1 deletion hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type Hive struct {
// New returns a Hive ready to accept Jobs
func New() *Hive {
logger := vlog.Default()
cache := newMemoryCache()

h := &Hive{
scheduler: newScheduler(logger),
scheduler: newScheduler(logger, cache),
log: logger,
}

Expand Down
2 changes: 2 additions & 0 deletions hive/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func (j Job) String() string {
func (j Job) Bytes() []byte {
if v, ok := j.data.([]byte); ok {
return v
} else if s, ok := j.data.(string); ok {
return []byte(s)
}

return nil
Expand Down
6 changes: 4 additions & 2 deletions hive/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ import (
type scheduler struct {
workers map[string]*worker
store Storage
cache Cache
logger *vlog.Logger
sync.Mutex
}

func newScheduler(logger *vlog.Logger) *scheduler {
func newScheduler(logger *vlog.Logger, cache Cache) *scheduler {
s := &scheduler{
workers: map[string]*worker{},
store: newMemoryStorage(),
cache: cache,
logger: logger,
Mutex: sync.Mutex{},
}
Expand Down Expand Up @@ -68,7 +70,7 @@ func (s *scheduler) handle(jobType string, runnable Runnable, options ...Option)
opts = o(opts)
}

w := newWorker(runnable, s.store, opts)
w := newWorker(runnable, s.store, s.cache, opts)
if s.workers == nil {
s.workers = map[string]*worker{jobType: w}
} else {
Expand Down
20 changes: 11 additions & 9 deletions hive/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type worker struct {
runner Runnable
workChan chan JobReference
store Storage
cache Cache
options workerOpts

threads []*workThread
Expand All @@ -32,11 +33,12 @@ type worker struct {
}

// newWorker creates a new goWorker
func newWorker(runner Runnable, store Storage, opts workerOpts) *worker {
func newWorker(runner Runnable, store Storage, cache Cache, opts workerOpts) *worker {
w := &worker{
runner: runner,
workChan: make(chan JobReference, defaultChanSize),
store: store,
cache: cache,
options: opts,
threads: make([]*workThread, opts.poolSize),
threadLock: sync.Mutex{},
Expand Down Expand Up @@ -68,7 +70,7 @@ func (w *worker) start(doFunc DoFunc) error {
for {
// fill the "pool" with workThreads
for i := started; i < w.options.poolSize; i++ {
wt := newWorkThread(w.runner, w.workChan, w.store, w.options.jobTimeoutSeconds)
wt := newWorkThread(w.runner, w.workChan, w.store, w.cache, w.options.jobTimeoutSeconds)

// give the runner opportunity to provision resources if needed
if err := w.runner.OnChange(ChangeTypeStart); err != nil {
Expand Down Expand Up @@ -106,20 +108,22 @@ type workThread struct {
runner Runnable
workChan chan JobReference
store Storage
cache Cache
timeoutSeconds int
ctx context.Context
context context.Context
cancelFunc context.CancelFunc
}

func newWorkThread(runner Runnable, workChan chan JobReference, store Storage, timeoutSeconds int) *workThread {
func newWorkThread(runner Runnable, workChan chan JobReference, store Storage, cache Cache, timeoutSeconds int) *workThread {
ctx, cancelFunc := context.WithCancel(context.Background())

wt := &workThread{
runner: runner,
workChan: workChan,
store: store,
cache: cache,
timeoutSeconds: timeoutSeconds,
ctx: ctx,
context: ctx,
cancelFunc: cancelFunc,
}

Expand All @@ -130,7 +134,7 @@ func (wt *workThread) run(doFunc DoFunc) {
go func() {
for {
// die if the context has been cancelled
if wt.ctx.Err() != nil {
if wt.context.Err() != nil {
break
}

Expand All @@ -144,9 +148,7 @@ func (wt *workThread) run(doFunc DoFunc) {
continue
}

ctx := &Ctx{
doFunc: doFunc,
}
ctx := newCtx(wt.cache, doFunc)

var result interface{}

Expand Down

0 comments on commit 1008b3c

Please sign in to comment.