Skip to content

Commit

Permalink
refactor: Allow pro appliers
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Jan 19, 2020
1 parent 1c6bcc3 commit 5276c2f
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 50 deletions.
18 changes: 15 additions & 3 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Agent struct {
// Pro features
GlobalLock bool
MemberEventHandler func(serf.Event)
ProAppliers LogAppliers

serf *serf.Serf
config *Config
Expand Down Expand Up @@ -295,7 +296,7 @@ func (a *Agent) setupRaft() error {
if err != nil {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
tmpFsm := newFSM(nil)
tmpFsm := newFSM(nil, nil)
if err := raft.RecoverCluster(config, tmpFsm,
logStore, stableStore, snapshots, transport, configuration); err != nil {
return fmt.Errorf("recovery failed: %v", err)
Expand Down Expand Up @@ -331,13 +332,14 @@ func (a *Agent) setupRaft() error {

// Instantiate the Raft systems. The second parameter is a finite state machine
// which stores the actual kv pairs and is operated upon through Apply().
fsm := newFSM(a.Store)
fsm := newFSM(a.Store, a.ProAppliers)
rft, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshots, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
a.leaderCh = rft.LeaderCh()
a.raft = rft

return nil
}

Expand Down Expand Up @@ -462,7 +464,7 @@ func (a *Agent) StartServer() {
log.WithError(err).WithField("dir", a.config.DataDir).Fatal("Error Creating Dir")
}
}
s, err := NewStore(a, filepath.Join(a.config.DataDir, a.config.NodeName))
s, err := NewStore(filepath.Join(a.config.DataDir, a.config.NodeName))
if err != nil {
log.WithError(err).Fatal("dkron: Error initializing store")
}
Expand Down Expand Up @@ -572,6 +574,11 @@ func (a *Agent) LocalMember() serf.Member {
return a.serf.LocalMember()
}

// Leader is used to return the Raft leader
func (a *Agent) Leader() raft.ServerAddress {
return a.raft.Leader()
}

// Servers returns a list of known server
func (a *Agent) Servers() (members []*ServerParts) {
for _, member := range a.serf.Members() {
Expand Down Expand Up @@ -880,3 +887,8 @@ func (a *Agent) applySetJob(job *proto.Job) error {

return nil
}

// RaftApply applies a command to the Raft log
func (a *Agent) RaftApply(cmd []byte) raft.ApplyFuture {
return a.raft.Apply(cmd, raftTimeout)
}
6 changes: 5 additions & 1 deletion dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func (h *HTTPTransport) indexHandler(c *gin.Context) {
func (h *HTTPTransport) jobsHandler(c *gin.Context) {
metadata := c.QueryMap("metadata")

jobs, err := h.agent.Store.GetJobs(&JobOptions{Metadata: metadata})
jobs, err := h.agent.Store.GetJobs(
&JobOptions{
Metadata: metadata,
},
)
if err != nil {
log.WithError(err).Error("api: Unable to get jobs, store not reachable.")
return
Expand Down
20 changes: 18 additions & 2 deletions dkron/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,26 @@ const (
ExecutionDoneType
)

// LogApplier is the definition of a function that can apply a Raft log
type LogApplier func(buf []byte, index uint64) interface{}

// LogAppliers is a mapping of the Raft MessageType to the appropriate log
// applier
type LogAppliers map[MessageType]LogApplier

type dkronFSM struct {
store Storage
mu sync.Mutex

// proAppliers holds the set of pro only LogAppliers
proAppliers LogAppliers
}

// NewFSM is used to construct a new FSM with a blank state
func newFSM(store Storage) *dkronFSM {
func newFSM(store Storage, logAppliers LogAppliers) *dkronFSM {
return &dkronFSM{
store: store,
store: store,
proAppliers: logAppliers,
}
}

Expand All @@ -56,6 +67,11 @@ func (d *dkronFSM) Apply(l *raft.Log) interface{} {
return d.applySetExecution(buf[1:])
}

// Check enterprise only message types.
if applier, ok := d.proAppliers[msgType]; ok {
return applier(buf[1:], l.Index)
}

return nil
}

Expand Down
9 changes: 2 additions & 7 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,7 @@ func (j *Job) GetStatus() string {
}

// GetParent returns the parent job of a job
func (j *Job) GetParent() (*Job, error) {
// Maybe we are testing
if j.Agent == nil {
return nil, ErrNoAgent
}

func (j *Job) GetParent(store *Store) (*Job, error) {
if j.Name == j.ParentJob {
return nil, ErrSameParent
}
Expand All @@ -293,7 +288,7 @@ func (j *Job) GetParent() (*Job, error) {
return nil, ErrNoParent
}

parentJob, err := j.Agent.Store.GetJob(j.ParentJob, nil)
parentJob, err := store.GetJob(j.ParentJob, nil)
if err != nil {
if err == badger.ErrKeyNotFound {
return nil, ErrParentJobNotFound
Expand Down
12 changes: 5 additions & 7 deletions dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ func TestJobGetParent(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

a := &Agent{}
s, err := NewStore(a, dir)
s, err := NewStore(dir)
defer s.Shutdown()
require.NoError(t, err)
a.Store = s

parentTestJob := &Job{
Name: "parent_test",
Expand All @@ -42,11 +40,11 @@ func TestJobGetParent(t *testing.T) {
err = s.SetJob(dependentTestJob, true)
assert.NoError(t, err)

parentTestJob, err = dependentTestJob.GetParent()
parentTestJob, err = dependentTestJob.GetParent(s)
assert.NoError(t, err)
assert.Equal(t, []string{dependentTestJob.Name}, parentTestJob.DependentJobs)

ptj, err := dependentTestJob.GetParent()
ptj, err := dependentTestJob.GetParent(s)
assert.NoError(t, err)
assert.Equal(t, parentTestJob.Name, ptj.Name)

Expand All @@ -60,7 +58,7 @@ func TestJobGetParent(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "", dtj.ParentJob)

ptj, err = dtj.GetParent()
ptj, err = dtj.GetParent(s)
assert.EqualError(t, ErrNoParent, err.Error())

ptj, err = s.GetJob(parentTestJob.Name, nil)
Expand Down Expand Up @@ -111,7 +109,7 @@ func Test_isRunnable(t *testing.T) {
defer os.RemoveAll(dir)

a := &Agent{}
s, err := NewStore(a, dir)
s, err := NewStore(dir)
a.Store = s
defer s.Shutdown()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion dkron/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (a *Agent) establishLeadership(stopCh chan struct{}) error {
if err != nil {
log.Fatal(err)
}
a.sched.Start(jobs)
a.sched.Start(jobs, a)

return nil
}
Expand Down
7 changes: 7 additions & 0 deletions dkron/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ func WithTransportCredentials(tls *tls.Config) AgentOption {
agent.TLSConfig = tls
}
}

// WithStore set store in the agent
func WithStore(store Storage) AgentOption {
return func(agent *Agent) {
agent.Store = store
}
}
2 changes: 1 addition & 1 deletion dkron/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestRunQuery(t *testing.T) {
err = a.Store.SetJob(j1, false)
require.NoError(t, err)

a.sched.Start([]*Job{j1})
a.sched.Start([]*Job{j1}, a)

_, err = a.RunQuery("test_job", &Execution{})
assert.NoError(t, err)
Expand Down
9 changes: 5 additions & 4 deletions dkron/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strings"

"github.com/armon/go-metrics"
"github.com/robfig/cron/v3"
"github.com/distribworks/dkron/v2/extcron"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -39,11 +39,12 @@ func NewScheduler() *Scheduler {

// Start the cron scheduler, adding its corresponding jobs and
// executing them on time.
func (s *Scheduler) Start(jobs []*Job) error {
func (s *Scheduler) Start(jobs []*Job, agent *Agent) error {
s.Cron = cron.New(cron.WithParser(extcron.NewParser()))

metrics.IncrCounter([]string{"scheduler", "start"}, 1)
for _, job := range jobs {
job.Agent = agent
s.AddJob(job)
}
s.Cron.Start()
Expand All @@ -70,9 +71,9 @@ func (s *Scheduler) Stop() {
}

// Restart the scheduler
func (s *Scheduler) Restart(jobs []*Job) {
func (s *Scheduler) Restart(jobs []*Job, agent *Agent) {
s.Stop()
s.Start(jobs)
s.Start(jobs, agent)
}

// GetEntry returns a scheduler entry from a snapshot in
Expand Down
6 changes: 3 additions & 3 deletions dkron/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestSchedule(t *testing.T) {
Owner: "John Dough",
OwnerEmail: "foo@bar.com",
}
sched.Start([]*Job{testJob1})
sched.Start([]*Job{testJob1}, &Agent{})

assert.True(t, sched.Started)
now := time.Now().Truncate(time.Second)
Expand All @@ -36,7 +36,7 @@ func TestSchedule(t *testing.T) {
Owner: "John Dough",
OwnerEmail: "foo@bar.com",
}
sched.Restart([]*Job{testJob2})
sched.Restart([]*Job{testJob2}, &Agent{})

assert.True(t, sched.Started)
assert.Len(t, sched.Cron.Entries(), 1)
Expand All @@ -52,7 +52,7 @@ func TestTimezoneAwareJob(t *testing.T) {
Executor: "shell",
ExecutorConfig: map[string]string{"command": "echo 'test1'", "shell": "true"},
}
sched.Start([]*Job{tzJob})
sched.Start([]*Job{tzJob}, &Agent{})

assert.True(t, sched.Started)
assert.Len(t, sched.Cron.Entries(), 1)
Expand Down
24 changes: 10 additions & 14 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ var (
// It gives dkron the ability to manipulate its embedded storage
// BadgerDB.
type Store struct {
agent *Agent
db *badger.DB
lock *sync.Mutex // for
closed bool
Expand All @@ -44,7 +43,7 @@ type JobOptions struct {
}

// NewStore creates a new Storage instance.
func NewStore(a *Agent, dir string) (*Store, error) {
func NewStore(dir string) (*Store, error) {
opts := badger.DefaultOptions(dir).
WithLogger(log)

Expand All @@ -58,9 +57,8 @@ func NewStore(a *Agent, dir string) (*Store, error) {
}

store := &Store{
db: db,
agent: a,
lock: &sync.Mutex{},
db: db,
lock: &sync.Mutex{},
}

go store.runGcLoop()
Expand Down Expand Up @@ -108,14 +106,16 @@ func (s *Store) setJobTxnFunc(pbj *dkronpb.Job) func(txn *badger.Txn) error {
}
}

// DB is the getter for the BadgerDB instance
func (s *Store) DB() *badger.DB {
return s.db
}

// SetJob stores a job in the storage
func (s *Store) SetJob(job *Job, copyDependentJobs bool) error {
var pbej dkronpb.Job
var ej *Job

// Init the job agent
job.Agent = s.agent

if err := job.Validate(); err != nil {
return err
}
Expand All @@ -135,7 +135,6 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error {
}

ej = NewJobFromProto(&pbej)
ej.Agent = s.agent

if ej.Name != "" {
// When the job runs, these status vars are updated
Expand Down Expand Up @@ -193,7 +192,7 @@ func (s *Store) removeFromParent(child *Job) error {
return nil
}

parent, err := child.GetParent()
parent, err := child.GetParent(s)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +220,7 @@ func (s *Store) addToParent(child *Job) error {
return nil
}

parent, err := child.GetParent()
parent, err := child.GetParent(s)
if err != nil {
return err
}
Expand Down Expand Up @@ -322,7 +321,6 @@ func (s *Store) GetJobs(options *JobOptions) ([]*Job, error) {
}
job := NewJobFromProto(&pbj)

job.Agent = s.agent
if options != nil {
if options.Metadata != nil && len(options.Metadata) > 0 && !s.jobHasMetadata(job, options.Metadata) {
continue
Expand All @@ -347,7 +345,6 @@ func (s *Store) GetJob(name string, options *JobOptions) (*Job, error) {
}

job := NewJobFromProto(&pbj)
job.Agent = s.agent

return job, nil
}
Expand Down Expand Up @@ -394,7 +391,6 @@ func (s *Store) DeleteJob(name string) (*Job, error) {
return ErrDependentJobs
}
job = NewJobFromProto(&pbj)
job.Agent = s.agent

if err := s.DeleteExecutions(name); err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions dkron/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestStore(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

s, err := NewStore(nil, dir)
s, err := NewStore(dir)
require.NoError(t, err)
defer s.Shutdown()

Expand Down Expand Up @@ -475,7 +475,7 @@ func setupStore(t *testing.T) (*Store, string) {
require.NoError(t, err)

a := NewAgent(nil)
s, err := NewStore(a, dir)
s, err := NewStore(dir)
require.NoError(t, err)
a.Store = s

Expand Down
Loading

0 comments on commit 5276c2f

Please sign in to comment.