diff --git a/cron/cron.fbs b/cron/cron.fbs new file mode 100644 index 000000000..f1313e107 --- /dev/null +++ b/cron/cron.fbs @@ -0,0 +1,27 @@ +// IDL file for cron + +namespace cron_fbs; + +table Job { + name:string; + type:string; + lastRun:string; + lastError:string; + periodicity:string; + secrets:[Secret]; +} + +table Secret { + key:string; + val:string; +} + +// flatbuffers don't (currently) support using a vector as a root type +// in an ideal world we'd just `root_type [Job]`, wrapping in this table +// skips the problem and gives us the option to store "store global" state +// issue: https://github.com/google/flatbuffers/issues/4854 +table Jobs { + list:[Job]; +} + +root_type Jobs; \ No newline at end of file diff --git a/cron/cron.go b/cron/cron.go index 060fa4b7a..18dc3ecb2 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -11,11 +11,13 @@ import ( "sync" "time" + flatbuffers "github.com/google/flatbuffers/go" golog "github.com/ipfs/go-log" "github.com/qri-io/dataset" "github.com/qri-io/ioes" "github.com/qri-io/iso8601" "github.com/qri-io/qfs" + cron "github.com/qri-io/qri/cron/cron_fbs" ) var ( @@ -29,23 +31,6 @@ var ( DefaultCheckInterval = time.Minute * 15 ) -// Job represents a "cron job" that can be scheduled for repeated execution at -// a specified Periodicity (time interval) -type Job struct { - Name string - Type JobType - LastRun time.Time - LastError string - Periodicity iso8601.RepeatingInterval - Secrets map[string]string -} - -// NextExec returns the next time execution horizion. If job periodicity is -// improperly configured, the returned time will be zero -func (job *Job) NextExec() time.Time { - return job.Periodicity.After(job.LastRun) -} - // ReadJobs are functions for fetching a set of jobs. ReadJobs defines canoncial // behavior for listing & fetching jobs type ReadJobs interface { @@ -107,6 +92,9 @@ const ( type JobStore interface { // JobStores must implement the ReadJobs interface for fetching stored jobs ReadJobs + // PutJob places one or more jobs in the store. Putting a job who's name + // already exists must overwrite the previous job, making all job names unique + PutJobs(...*Job) error // PutJob places a job in the store. Putting a job who's name already exists // must overwrite the previous job, making all job names unique PutJob(*Job) error @@ -298,6 +286,32 @@ func (s *MemJobStore) Job(name string) (*Job, error) { return nil, fmt.Errorf("not found") } +// PutJobs places one or more jobs in the store. Putting a job who's name +// already exists must overwrite the previous job, making all job names unique +func (s *MemJobStore) PutJobs(js ...*Job) error { + s.lock.Lock() + defer func() { + sort.Sort(s.jobs) + s.lock.Unlock() + }() + + for _, job := range js { + if err := ValidateJob(job); err != nil { + return err + } + + for i, j := range s.jobs { + if job.Name == j.Name { + s.jobs[i] = job + return nil + } + } + + s.jobs = append(s.jobs, job) + } + return nil +} + // PutJob places a job in the store. If the job name matches the name of a job // that already exists, it will be overwritten with the new job func (s *MemJobStore) PutJob(job *Job) error { @@ -354,3 +368,40 @@ func (js jobs) Less(i, j int) bool { return js[i].LastRun.After(js[j].LastRun) } func (js jobs) Swap(i, j int) { js[i], js[j] = js[j], js[i] } + +func (js jobs) MarshalFb() []byte { + builder := flatbuffers.NewBuilder(0) + count := len(js) + offsets := make([]flatbuffers.UOffsetT, count) + for i, j := range js { + offsets[i] = j.MarshalFb(builder) + } + + cron.JobsStartListVector(builder, count) + for i := count - 1; i >= 0; i-- { + builder.PrependUOffsetT(offsets[i]) + } + jsvo := builder.EndVector(count) + + cron.JobsStart(builder) + cron.JobsAddList(builder, jsvo) + off := cron.JobsEnd(builder) + + builder.Finish(off) + return builder.FinishedBytes() +} + +func unmarshalJobsFb(data []byte) (js jobs, err error) { + jsFb := cron.GetRootAsJobs(data, 0) + dec := &cron.Job{} + js = make(jobs, jsFb.ListLength()) + for i := 0; i < jsFb.ListLength(); i++ { + jsFb.List(dec, i) + js[i] = &Job{} + if err := js[i].UnmarshalFb(dec); err != nil { + return nil, err + } + } + + return js, nil +} diff --git a/cron/cron_fbs/Job.go b/cron/cron_fbs/Job.go new file mode 100644 index 000000000..f21fa0586 --- /dev/null +++ b/cron/cron_fbs/Job.go @@ -0,0 +1,115 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package cron_fbs + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Job struct { + _tab flatbuffers.Table +} + +func GetRootAsJob(buf []byte, offset flatbuffers.UOffsetT) *Job { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Job{} + x.Init(buf, n+offset) + return x +} + +func (rcv *Job) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Job) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Job) Name() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Job) Type() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Job) LastRun() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Job) LastError() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Job) Periodicity() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Job) Secrets(obj *Secret, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Job) SecretsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func JobStart(builder *flatbuffers.Builder) { + builder.StartObject(6) +} +func JobAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0) +} +func JobAddType(builder *flatbuffers.Builder, type_ flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(type_), 0) +} +func JobAddLastRun(builder *flatbuffers.Builder, lastRun flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(lastRun), 0) +} +func JobAddLastError(builder *flatbuffers.Builder, lastError flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(lastError), 0) +} +func JobAddPeriodicity(builder *flatbuffers.Builder, periodicity flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(periodicity), 0) +} +func JobAddSecrets(builder *flatbuffers.Builder, secrets flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(5, flatbuffers.UOffsetT(secrets), 0) +} +func JobStartSecretsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func JobEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/cron/cron_fbs/Jobs.go b/cron/cron_fbs/Jobs.go new file mode 100644 index 000000000..0747490af --- /dev/null +++ b/cron/cron_fbs/Jobs.go @@ -0,0 +1,60 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package cron_fbs + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Jobs struct { + _tab flatbuffers.Table +} + +func GetRootAsJobs(buf []byte, offset flatbuffers.UOffsetT) *Jobs { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Jobs{} + x.Init(buf, n+offset) + return x +} + +func (rcv *Jobs) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Jobs) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Jobs) List(obj *Job, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Jobs) ListLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func JobsStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func JobsAddList(builder *flatbuffers.Builder, list flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(list), 0) +} +func JobsStartListVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func JobsEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/cron/cron_fbs/Secret.go b/cron/cron_fbs/Secret.go new file mode 100644 index 000000000..c7fab9d7b --- /dev/null +++ b/cron/cron_fbs/Secret.go @@ -0,0 +1,56 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package cron_fbs + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Secret struct { + _tab flatbuffers.Table +} + +func GetRootAsSecret(buf []byte, offset flatbuffers.UOffsetT) *Secret { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Secret{} + x.Init(buf, n+offset) + return x +} + +func (rcv *Secret) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Secret) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Secret) Key() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Secret) Val() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func SecretStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func SecretAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(key), 0) +} +func SecretAddVal(builder *flatbuffers.Builder, val flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(val), 0) +} +func SecretEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/cron/cron_fbs/StrMapVal.go b/cron/cron_fbs/StrMapVal.go new file mode 100644 index 000000000..97785cd41 --- /dev/null +++ b/cron/cron_fbs/StrMapVal.go @@ -0,0 +1,56 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package cron_fbs + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type StrMapVal struct { + _tab flatbuffers.Table +} + +func GetRootAsStrMapVal(buf []byte, offset flatbuffers.UOffsetT) *StrMapVal { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &StrMapVal{} + x.Init(buf, n+offset) + return x +} + +func (rcv *StrMapVal) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *StrMapVal) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *StrMapVal) Key() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *StrMapVal) Val() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func StrMapValStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func StrMapValAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(key), 0) +} +func StrMapValAddVal(builder *flatbuffers.Builder, val flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(val), 0) +} +func StrMapValEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/cron/cron_fbs/StringMapVal.go b/cron/cron_fbs/StringMapVal.go new file mode 100644 index 000000000..02bcad0c2 --- /dev/null +++ b/cron/cron_fbs/StringMapVal.go @@ -0,0 +1,56 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package cron_fbs + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type StringMapVal struct { + _tab flatbuffers.Table +} + +func GetRootAsStringMapVal(buf []byte, offset flatbuffers.UOffsetT) *StringMapVal { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &StringMapVal{} + x.Init(buf, n+offset) + return x +} + +func (rcv *StringMapVal) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *StringMapVal) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *StringMapVal) Key() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *StringMapVal) Val() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func StringMapValStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func StringMapValAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(key), 0) +} +func StringMapValAddVal(builder *flatbuffers.Builder, val flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(val), 0) +} +func StringMapValEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/cron/fb_file.go b/cron/fb_file.go new file mode 100644 index 000000000..d1c5ccdbc --- /dev/null +++ b/cron/fb_file.go @@ -0,0 +1,175 @@ +package cron + +import ( + "fmt" + "io/ioutil" + "os" + "sort" + "sync" +) + +// NewFbFileJobStore creates a job store that persists to a CBOR file +// specified at path +func NewFbFileJobStore(path string) JobStore { + return &FbFileJobStore{ + path: path, + } +} + +// FbFileJobStore is a jobstore implementation that saves to a CBOR file +// Jobs stored in FbFileJobStore can be persisted for the duration of a process +// at the longest. +// FbFileJobStore is safe for concurrent use +type FbFileJobStore struct { + lock sync.Mutex + path string +} + +// Jobs lists jobs currently in the store +func (s *FbFileJobStore) Jobs(offset, limit int) ([]*Job, error) { + s.lock.Lock() + defer s.lock.Unlock() + + js, err := s.loadJobs() + if err != nil { + return nil, err + } + + if limit <= 0 { + limit = len(js) + } + + ss := make([]*Job, limit) + added := 0 + for i, job := range js { + if i < offset { + continue + } else if added == limit { + break + } + + ss[added] = job + added++ + } + return ss[:added], nil +} + +func (s *FbFileJobStore) loadJobs() (js jobs, err error) { + data, err := ioutil.ReadFile(s.path) + if os.IsNotExist(err) { + return jobs{}, nil + } + + return unmarshalJobsFb(data) +} + +func (s *FbFileJobStore) saveJobs(js jobs) error { + return ioutil.WriteFile(s.path, js.MarshalFb(), os.ModePerm) +} + +// Job gets job details from the store by name +func (s *FbFileJobStore) Job(name string) (*Job, error) { + s.lock.Lock() + defer s.lock.Unlock() + + js, err := s.loadJobs() + if err != nil { + return nil, err + } + + for _, job := range js { + if job.Name == name { + return job, nil + } + } + return nil, fmt.Errorf("not found") +} + +// PutJobs places one or more jobs in the store. Putting a job who's name +// already exists must overwrite the previous job, making all job names unique +func (s *FbFileJobStore) PutJobs(add ...*Job) error { + s.lock.Lock() + defer s.lock.Unlock() + + js, err := s.loadJobs() + if err != nil { + return err + } + + for _, job := range add { + if err := ValidateJob(job); err != nil { + return err + } + + for i, j := range js { + if job.Name == j.Name { + js[i] = job + return nil + } + } + + js = append(js, job) + } + + sort.Sort(js) + return s.saveJobs(js) +} + +// PutJob places a job in the store. If the job name matches the name of a job +// that already exists, it will be overwritten with the new job +func (s *FbFileJobStore) PutJob(job *Job) error { + if err := ValidateJob(job); err != nil { + return err + } + + s.lock.Lock() + defer s.lock.Unlock() + + js, err := s.loadJobs() + if err != nil { + return err + } + + for i, j := range js { + if job.Name == j.Name { + js[i] = job + + sort.Sort(js) + return s.saveJobs(js) + } + } + + js = append(js, job) + sort.Sort(js) + return s.saveJobs(js) +} + +// DeleteJob removes a job from the store by name. deleting a non-existent job +// won't return an error +func (s *FbFileJobStore) DeleteJob(name string) error { + s.lock.Lock() + defer s.lock.Unlock() + + js, err := s.loadJobs() + if err != nil { + return err + } + + for i, j := range js { + if j.Name == name { + if i+1 == len(js) { + js = js[:i] + break + } + + js = append(js[:i], js[i+1:]...) + break + } + } + return s.saveJobs(js) +} + +// Destroy removes the path entirely +func (s *FbFileJobStore) Destroy() error { + return os.Remove(s.path) +} diff --git a/cron/fb_file_test.go b/cron/fb_file_test.go new file mode 100644 index 000000000..6d51f5fb6 --- /dev/null +++ b/cron/fb_file_test.go @@ -0,0 +1,49 @@ +package cron + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" +) + +func TestFbJobStore(t *testing.T) { + tmp, err := ioutil.TempDir(os.TempDir(), "TestFsJobStore") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + newStore := func() JobStore { + return NewFbFileJobStore(filepath.Join(tmp, "jobs.dat")) + } + RunJobStoreTests(t, newStore) +} + +func BenchmarkFbJobStore(b *testing.B) { + js := make(jobs, 1000) + for i := range js { + js[i] = &Job{ + Name: fmt.Sprintf("job_%d", i), + Type: JTDataset, + Periodicity: mustRepeatingInterval("R/P1H"), + } + } + + tmp, err := ioutil.TempDir(os.TempDir(), "TestFsJobStore") + if err != nil { + b.Fatal(err) + } + + defer os.RemoveAll(tmp) + store := NewFbFileJobStore(filepath.Join(tmp, "jobs.dat")) + + for i := 0; i < b.N; i++ { + if err := store.PutJobs(js...); err != nil { + b.Fatal(err) + } + if _, err := store.Jobs(0, 0); err != nil { + b.Fatal(err) + } + } +} diff --git a/cron/file.go b/cron/file.go index 097bc2f27..88f68300f 100644 --- a/cron/file.go +++ b/cron/file.go @@ -101,6 +101,36 @@ func (s *FileJobStore) Job(name string) (*Job, error) { return nil, fmt.Errorf("not found") } +// PutJobs places one or more jobs in the store. Putting a job who's name +// already exists must overwrite the previous job, making all job names unique +func (s *FileJobStore) PutJobs(add ...*Job) error { + s.lock.Lock() + defer s.lock.Unlock() + + js, err := s.loadJobs() + if err != nil { + return err + } + + for _, job := range add { + if err := ValidateJob(job); err != nil { + return err + } + + for i, j := range js { + if job.Name == j.Name { + js[i] = job + return nil + } + } + + js = append(js, job) + } + + sort.Sort(js) + return s.saveJobs(js) +} + // PutJob places a job in the store. If the job name matches the name of a job // that already exists, it will be overwritten with the new job func (s *FileJobStore) PutJob(job *Job) error { diff --git a/cron/file_test.go b/cron/file_test.go index cf28a86af..89bd8ce65 100644 --- a/cron/file_test.go +++ b/cron/file_test.go @@ -1,6 +1,7 @@ package cron import ( + "fmt" "io/ioutil" "os" "path/filepath" @@ -18,3 +19,31 @@ func TestFsJobStore(t *testing.T) { } RunJobStoreTests(t, newStore) } + +func BenchmarkFileJobStore(b *testing.B) { + js := make(jobs, 1000) + for i := range js { + js[i] = &Job{ + Name: fmt.Sprintf("job_%d", i), + Type: JTDataset, + Periodicity: mustRepeatingInterval("R/P1H"), + } + } + + tmp, err := ioutil.TempDir(os.TempDir(), "BenchmarkFbJobStore") + if err != nil { + b.Fatal(err) + } + + defer os.RemoveAll(tmp) + store := NewFileJobStore(filepath.Join(tmp, "jobs.cbor")) + + for i := 0; i < b.N; i++ { + if err := store.PutJobs(js...); err != nil { + b.Fatal(err) + } + if _, err := store.Jobs(0, 0); err != nil { + b.Fatal(err) + } + } +} diff --git a/cron/job.go b/cron/job.go new file mode 100644 index 000000000..8a1a4bc71 --- /dev/null +++ b/cron/job.go @@ -0,0 +1,66 @@ +package cron + +import ( + "time" + + flatbuffers "github.com/google/flatbuffers/go" + "github.com/qri-io/iso8601" + cron "github.com/qri-io/qri/cron/cron_fbs" +) + +// Job represents a "cron job" that can be scheduled for repeated execution at +// a specified Periodicity (time interval) +type Job struct { + Name string + Type JobType + LastRun time.Time + LastError string + Periodicity iso8601.RepeatingInterval + Secrets map[string]string +} + +// NextExec returns the next time execution horizion. If job periodicity is +// improperly configured, the returned time will be zero +func (job *Job) NextExec() time.Time { + return job.Periodicity.After(job.LastRun) +} + +// MarshalFb writes a job to a builder +func (job *Job) MarshalFb(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + name := builder.CreateString(job.Name) + typ := builder.CreateString(string(job.Type)) + lastRun := builder.CreateString(job.LastRun.Format(time.RFC3339)) + lastError := builder.CreateString(job.LastError) + p := builder.CreateString(job.Periodicity.String()) + + cron.JobStart(builder) + cron.JobAddName(builder, name) + cron.JobAddType(builder, typ) + cron.JobAddLastRun(builder, lastRun) + cron.JobAddLastError(builder, lastError) + cron.JobAddPeriodicity(builder, p) + return cron.JobEnd(builder) +} + +// UnmarshalFb decodes a job from a flatbuffer +func (job *Job) UnmarshalFb(j *cron.Job) error { + lastRun, err := time.Parse(time.RFC3339, string(j.LastRun())) + if err != nil { + return err + } + + p, err := iso8601.ParseRepeatingInterval(string(j.Periodicity())) + if err != nil { + return err + } + + *job = Job{ + Name: string(j.Name()), + LastRun: lastRun, + Type: JobType(j.Type()), + LastError: string(j.LastError()), + Periodicity: p, + // TODO (b5) - secrets storages + } + return nil +} diff --git a/cron/job_test.go b/cron/job_test.go new file mode 100644 index 000000000..ec64b7ad9 --- /dev/null +++ b/cron/job_test.go @@ -0,0 +1,56 @@ +package cron + +import ( + "testing" + + flatbuffers "github.com/google/flatbuffers/go" + cron "github.com/qri-io/qri/cron/cron_fbs" +) + +func TestJobFb(t *testing.T) { + jorbs := jobs{ + &Job{ + Name: "job_one", + Periodicity: mustRepeatingInterval("R/PT1H"), + Type: JTDataset, + }, + &Job{ + Name: "job_two", + Periodicity: mustRepeatingInterval("R/PT1D"), + Type: JTShellScript, + }, + } + + builder := flatbuffers.NewBuilder(0) + offsets := make([]flatbuffers.UOffsetT, len(jorbs)) + for i, j := range jorbs { + offsets[i] = j.MarshalFb(builder) + } + + cron.JobsStartListVector(builder, len(jorbs)) + for i := len(jorbs) - 1; i >= 0; i-- { + builder.PrependUOffsetT(offsets[i]) + } + jsvo := builder.EndVector(len(jorbs)) + + cron.JobsStart(builder) + cron.JobsAddList(builder, jsvo) + off := cron.JobsEnd(builder) + + builder.Finish(off) + data := builder.FinishedBytes() + + js := cron.GetRootAsJobs(data, 0) + dec := &cron.Job{} + t.Log(js.ListLength()) + for i := 0; i < js.ListLength(); i++ { + js.List(dec, i) + decJob := &Job{} + if err := decJob.UnmarshalFb(dec); err != nil { + t.Error(err) + } + if err := CompareJobs(jorbs[i], decJob); err != nil { + t.Error(err) + } + } +} diff --git a/lib/cron.go b/lib/cron.go new file mode 100644 index 000000000..ba421991e --- /dev/null +++ b/lib/cron.go @@ -0,0 +1,42 @@ +package lib + +import ( + "fmt" + + "github.com/qri-io/qri/cron" +) + +// NewCronMethods creates a cron handle from an instance +func NewCronMethods(inst *Instance) *CronMethods { + return &CronMethods{inst: inst} +} + +// CronMethods encapsulates business logic for the qri cron service +type CronMethods struct { + inst *Instance + cron *cron.Cron +} + +func (m *CronMethods) Add() error { + return fmt.Errorf("not finished") +} + +func (m *CronMethods) Remove() error { + return fmt.Errorf("not finished") +} + +func (m *CronMethods) List(p *ListParams, jobs *[]*cron.Job) error { + return fmt.Errorf("not finished") +} + +func (m *CronMethods) Log() error { + return fmt.Errorf("not finished") +} + +func (m *CronMethods) Start() error { + return fmt.Errorf("not finished") +} + +func (m *CronMethods) Stop() error { + return fmt.Errorf("not finished") +}