Skip to content

Commit

Permalink
feat(cron.file): FileJobStore for saving jobs to a backing CBOR file
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed May 9, 2019
1 parent 7b38164 commit 9d4de77
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (c *Cron) Unschedule(name string) error {
// MemJobStore is an in-memory implementation of the JobStore interface
// Jobs stored in MemJobStore can be persisted for the duration of a process
// at the longest.
// MemJobStore is save for concurrent use
// MemJobStore is safe for concurrent use
type MemJobStore struct {
lock sync.Mutex
jobs jobs
Expand Down
5 changes: 3 additions & 2 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func CompareJobs(a, b *Job) error {
if a.Periodicity != b.Periodicity {
return fmt.Errorf("Periodicity mismatch. %s != %s", a.Name, b.Name)
}
if !a.LastRun.Equal(b.LastRun) {
// use unix comparisons to ignore millisecond & nanosecond precision errors
if a.LastRun.Unix() != b.LastRun.Unix() {
return fmt.Errorf("LastRun mismatch. %s != %s", a.LastRun, b.LastRun)
}
if a.Type != b.Type {
Expand Down Expand Up @@ -156,7 +157,7 @@ func RunJobStoreTests(t *testing.T, newStore func() JobStore) {
t.Fatal(err)
}
if len(jobs) != 1 {
t.Errorf("expected default get to return inserted job")
t.Fatal("expected default get to return inserted job")
}
if err := CompareJobs(jobOne, jobs[0]); err != nil {
t.Errorf("stored job mistmatch: %s", err)
Expand Down
161 changes: 161 additions & 0 deletions cron/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package cron

import (
"bytes"
"fmt"
"io/ioutil"
"os"
"sort"
"sync"

"github.com/ugorji/go/codec"
)

// NewFileJobStore creates a job store that persists to a CBOR file
// specified at path
func NewFileJobStore(path string) JobStore {
return &FileJobStore{
path: path,
}
}

// FileJobStore is a jobstore implementation that saves to a CBOR file
// Jobs stored in FileJobStore can be persisted for the duration of a process
// at the longest.
// FileJobStore is safe for concurrent use
type FileJobStore struct {
lock sync.Mutex
path string
}

// Jobs lists jobs currently in the store
func (s *FileJobStore) Jobs(offset, limit int) ([]*Job, error) {
s.lock.Lock()
defer s.lock.Unlock()

js, err := s.loadJobs()
if err != nil {
return nil, err
}

if limit <= 0 {
limit = len(js)
}

ss := make([]*Job, limit)
added := 0
for i, job := range js {
if i < offset {
continue
} else if added == limit {
break
}

ss[added] = job
added++
}
return ss[:added], nil
}

func (s *FileJobStore) handle() codec.Handle {
return &codec.CborHandle{
// Need to use RFC3339 timestamps to preserve as much precision as possible
TimeRFC3339: true,
}
}

func (s *FileJobStore) loadJobs() (js jobs, err error) {
f, err := os.Open(s.path)
if os.IsNotExist(err) {
return jobs{}, nil
}

js = jobs{}
err = codec.NewDecoder(f, s.handle()).Decode(&js)
return
}

func (s *FileJobStore) saveJobs(jobs []*Job) error {
buf := &bytes.Buffer{}
if err := codec.NewEncoder(buf, s.handle()).Encode(jobs); err != nil {
return err
}
return ioutil.WriteFile(s.path, buf.Bytes(), os.ModePerm)
}

// Job gets job details from the store by name
func (s *FileJobStore) Job(name string) (*Job, error) {
s.lock.Lock()
defer s.lock.Unlock()

js, err := s.loadJobs()
if err != nil {
return nil, err
}

for _, job := range js {
if job.Name == name {
return job, nil
}
}
return nil, fmt.Errorf("not found")
}

// PutJob places a job in the store. If the job name matches the name of a job
// that already exists, it will be overwritten with the new job
func (s *FileJobStore) PutJob(job *Job) error {
if err := ValidateJob(job); err != nil {
return err
}

s.lock.Lock()
defer s.lock.Unlock()

js, err := s.loadJobs()
if err != nil {
return err
}

for i, j := range js {
if job.Name == j.Name {
js[i] = job

sort.Sort(js)
return s.saveJobs(js)
}
}

js = append(js, job)
sort.Sort(js)
return s.saveJobs(js)
}

// DeleteJob removes a job from the store by name. deleting a non-existent job
// won't return an error
func (s *FileJobStore) DeleteJob(name string) error {
s.lock.Lock()
defer s.lock.Unlock()

js, err := s.loadJobs()
if err != nil {
return err
}

for i, j := range js {
if j.Name == name {
if i+1 == len(js) {
js = js[:i]
break
}

js = append(js[:i], js[i+1:]...)
break
}
}
return s.saveJobs(js)
}

// Destroy removes the path entirely
func (s *FileJobStore) Destroy() error {
return os.Remove(s.path)
}
20 changes: 20 additions & 0 deletions cron/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cron

import (
"io/ioutil"
"os"
"path/filepath"
"testing"
)

func TestFsJobStore(t *testing.T) {
tmp, err := ioutil.TempDir(os.TempDir(), "TestFsJobStore")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
newStore := func() JobStore {
return NewFileJobStore(filepath.Join(tmp, "jobs.cbor"))
}
RunJobStoreTests(t, newStore)
}

0 comments on commit 9d4de77

Please sign in to comment.