Skip to content

Commit

Permalink
feat(cron.FbStore): store cron jobs as flatbuffers
Browse files Browse the repository at this point in the history
after reviewing benchmarks, I'm going to remove cbor as a JobStore format. Here's the benchmarks from this commit on my machine:

$ go test --bench=. -v --run=XXX -benchmem
goos: darwin
goarch: amd64
pkg: github.com/qri-io/qri/cron
BenchmarkFbJobStore-4     	     500	   2575622 ns/op	  655281 B/op	   14015 allocs/op
BenchmarkFileJobStore-4   	      10	 254254905 ns/op	  448635 B/op	   10082 allocs/op
PASS
ok  	github.com/qri-io/qri/cron	4.308s
  • Loading branch information
b5 committed May 9, 2019
1 parent daad975 commit 755186d
Show file tree
Hide file tree
Showing 14 changed files with 885 additions and 17 deletions.
27 changes: 27 additions & 0 deletions cron/cron.fbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// IDL file for cron

namespace cron_fbs;

table Job {
name:string;
type:string;
lastRun:string;
lastError:string;
periodicity:string;
secrets:[Secret];
}

table Secret {
key:string;
val:string;
}

// flatbuffers don't (currently) support using a vector as a root type
// in an ideal world we'd just `root_type [Job]`, wrapping in this table
// skips the problem and gives us the option to store "store global" state
// issue: https://github.com/google/flatbuffers/issues/4854
table Jobs {
list:[Job];
}

root_type Jobs;
85 changes: 68 additions & 17 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"sync"
"time"

flatbuffers "github.com/google/flatbuffers/go"
golog "github.com/ipfs/go-log"
"github.com/qri-io/dataset"
"github.com/qri-io/ioes"
"github.com/qri-io/iso8601"
"github.com/qri-io/qfs"
cron "github.com/qri-io/qri/cron/cron_fbs"
)

var (
Expand All @@ -29,23 +31,6 @@ var (
DefaultCheckInterval = time.Minute * 15
)

// Job represents a "cron job" that can be scheduled for repeated execution at
// a specified Periodicity (time interval)
type Job struct {
Name string
Type JobType
LastRun time.Time
LastError string
Periodicity iso8601.RepeatingInterval
Secrets map[string]string
}

// NextExec returns the next time execution horizion. If job periodicity is
// improperly configured, the returned time will be zero
func (job *Job) NextExec() time.Time {
return job.Periodicity.After(job.LastRun)
}

// ReadJobs are functions for fetching a set of jobs. ReadJobs defines canoncial
// behavior for listing & fetching jobs
type ReadJobs interface {
Expand Down Expand Up @@ -107,6 +92,9 @@ const (
type JobStore interface {
// JobStores must implement the ReadJobs interface for fetching stored jobs
ReadJobs
// PutJob places one or more jobs in the store. Putting a job who's name
// already exists must overwrite the previous job, making all job names unique
PutJobs(...*Job) error
// PutJob places a job in the store. Putting a job who's name already exists
// must overwrite the previous job, making all job names unique
PutJob(*Job) error
Expand Down Expand Up @@ -298,6 +286,32 @@ func (s *MemJobStore) Job(name string) (*Job, error) {
return nil, fmt.Errorf("not found")
}

// PutJobs places one or more jobs in the store. Putting a job who's name
// already exists must overwrite the previous job, making all job names unique
func (s *MemJobStore) PutJobs(js ...*Job) error {
s.lock.Lock()
defer func() {
sort.Sort(s.jobs)
s.lock.Unlock()
}()

for _, job := range js {
if err := ValidateJob(job); err != nil {
return err
}

for i, j := range s.jobs {
if job.Name == j.Name {
s.jobs[i] = job
return nil
}
}

s.jobs = append(s.jobs, job)
}
return nil
}

// PutJob places a job in the store. If the job name matches the name of a job
// that already exists, it will be overwritten with the new job
func (s *MemJobStore) PutJob(job *Job) error {
Expand Down Expand Up @@ -354,3 +368,40 @@ func (js jobs) Less(i, j int) bool {
return js[i].LastRun.After(js[j].LastRun)
}
func (js jobs) Swap(i, j int) { js[i], js[j] = js[j], js[i] }

func (js jobs) MarshalFb() []byte {
builder := flatbuffers.NewBuilder(0)
count := len(js)
offsets := make([]flatbuffers.UOffsetT, count)
for i, j := range js {
offsets[i] = j.MarshalFb(builder)
}

cron.JobsStartListVector(builder, count)
for i := count - 1; i >= 0; i-- {
builder.PrependUOffsetT(offsets[i])
}
jsvo := builder.EndVector(count)

cron.JobsStart(builder)
cron.JobsAddList(builder, jsvo)
off := cron.JobsEnd(builder)

builder.Finish(off)
return builder.FinishedBytes()
}

func unmarshalJobsFb(data []byte) (js jobs, err error) {
jsFb := cron.GetRootAsJobs(data, 0)
dec := &cron.Job{}
js = make(jobs, jsFb.ListLength())
for i := 0; i < jsFb.ListLength(); i++ {
jsFb.List(dec, i)
js[i] = &Job{}
if err := js[i].UnmarshalFb(dec); err != nil {
return nil, err
}
}

return js, nil
}
115 changes: 115 additions & 0 deletions cron/cron_fbs/Job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions cron/cron_fbs/Jobs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 56 additions & 0 deletions cron/cron_fbs/Secret.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 755186d

Please sign in to comment.