Skip to content

Commit

Permalink
refactor: Allow pro appliers
Browse files Browse the repository at this point in the history
This refactor includes:

* Remove dependency of the agent in store and reduce usage in Job
* Upgrade gin
* Add helper methods
* Move directory creation to the Store instantiation
  • Loading branch information
Victor Castell committed Jan 21, 2020
1 parent 1c6bcc3 commit 5ee2663
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 64 deletions.
29 changes: 15 additions & 14 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Agent struct {
// Pro features
GlobalLock bool
MemberEventHandler func(serf.Event)
ProAppliers LogAppliers

serf *serf.Serf
config *Config
Expand Down Expand Up @@ -295,7 +296,7 @@ func (a *Agent) setupRaft() error {
if err != nil {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
tmpFsm := newFSM(nil)
tmpFsm := newFSM(nil, nil)
if err := raft.RecoverCluster(config, tmpFsm,
logStore, stableStore, snapshots, transport, configuration); err != nil {
return fmt.Errorf("recovery failed: %v", err)
Expand Down Expand Up @@ -331,13 +332,14 @@ func (a *Agent) setupRaft() error {

// Instantiate the Raft systems. The second parameter is a finite state machine
// which stores the actual kv pairs and is operated upon through Apply().
fsm := newFSM(a.Store)
fsm := newFSM(a.Store, a.ProAppliers)
rft, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshots, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
a.leaderCh = rft.LeaderCh()
a.raft = rft

return nil
}

Expand Down Expand Up @@ -451,18 +453,7 @@ func (a *Agent) StartServer() {
}

if a.Store == nil {
dirExists, err := exists(a.config.DataDir)
if err != nil {
log.WithError(err).WithField("dir", a.config.DataDir).Fatal("Invalid Dir")
}
if !dirExists {
// Try to create the directory
err := os.Mkdir(a.config.DataDir, 0700)
if err != nil {
log.WithError(err).WithField("dir", a.config.DataDir).Fatal("Error Creating Dir")
}
}
s, err := NewStore(a, filepath.Join(a.config.DataDir, a.config.NodeName))
s, err := NewStore(filepath.Join(a.config.DataDir, a.config.NodeName))
if err != nil {
log.WithError(err).Fatal("dkron: Error initializing store")
}
Expand Down Expand Up @@ -572,6 +563,11 @@ func (a *Agent) LocalMember() serf.Member {
return a.serf.LocalMember()
}

// Leader is used to return the Raft leader
func (a *Agent) Leader() raft.ServerAddress {
return a.raft.Leader()
}

// Servers returns a list of known server
func (a *Agent) Servers() (members []*ServerParts) {
for _, member := range a.serf.Members() {
Expand Down Expand Up @@ -880,3 +876,8 @@ func (a *Agent) applySetJob(job *proto.Job) error {

return nil
}

// RaftApply applies a command to the Raft log
func (a *Agent) RaftApply(cmd []byte) raft.ApplyFuture {
return a.raft.Apply(cmd, raftTimeout)
}
6 changes: 5 additions & 1 deletion dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func (h *HTTPTransport) indexHandler(c *gin.Context) {
func (h *HTTPTransport) jobsHandler(c *gin.Context) {
metadata := c.QueryMap("metadata")

jobs, err := h.agent.Store.GetJobs(&JobOptions{Metadata: metadata})
jobs, err := h.agent.Store.GetJobs(
&JobOptions{
Metadata: metadata,
},
)
if err != nil {
log.WithError(err).Error("api: Unable to get jobs, store not reachable.")
return
Expand Down
20 changes: 18 additions & 2 deletions dkron/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,26 @@ const (
ExecutionDoneType
)

// LogApplier is the definition of a function that can apply a Raft log
type LogApplier func(buf []byte, index uint64) interface{}

// LogAppliers is a mapping of the Raft MessageType to the appropriate log
// applier
type LogAppliers map[MessageType]LogApplier

type dkronFSM struct {
store Storage
mu sync.Mutex

// proAppliers holds the set of pro only LogAppliers
proAppliers LogAppliers
}

// NewFSM is used to construct a new FSM with a blank state
func newFSM(store Storage) *dkronFSM {
func newFSM(store Storage, logAppliers LogAppliers) *dkronFSM {
return &dkronFSM{
store: store,
store: store,
proAppliers: logAppliers,
}
}

Expand All @@ -56,6 +67,11 @@ func (d *dkronFSM) Apply(l *raft.Log) interface{} {
return d.applySetExecution(buf[1:])
}

// Check enterprise only message types.
if applier, ok := d.proAppliers[msgType]; ok {
return applier(buf[1:], l.Index)
}

return nil
}

Expand Down
9 changes: 2 additions & 7 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,7 @@ func (j *Job) GetStatus() string {
}

// GetParent returns the parent job of a job
func (j *Job) GetParent() (*Job, error) {
// Maybe we are testing
if j.Agent == nil {
return nil, ErrNoAgent
}

func (j *Job) GetParent(store *Store) (*Job, error) {
if j.Name == j.ParentJob {
return nil, ErrSameParent
}
Expand All @@ -293,7 +288,7 @@ func (j *Job) GetParent() (*Job, error) {
return nil, ErrNoParent
}

parentJob, err := j.Agent.Store.GetJob(j.ParentJob, nil)
parentJob, err := store.GetJob(j.ParentJob, nil)
if err != nil {
if err == badger.ErrKeyNotFound {
return nil, ErrParentJobNotFound
Expand Down
17 changes: 8 additions & 9 deletions dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ func TestJobGetParent(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

a := &Agent{}
s, err := NewStore(a, dir)
s, err := NewStore(dir)
defer s.Shutdown()
require.NoError(t, err)
a.Store = s

parentTestJob := &Job{
Name: "parent_test",
Expand All @@ -42,11 +40,11 @@ func TestJobGetParent(t *testing.T) {
err = s.SetJob(dependentTestJob, true)
assert.NoError(t, err)

parentTestJob, err = dependentTestJob.GetParent()
parentTestJob, err = dependentTestJob.GetParent(s)
assert.NoError(t, err)
assert.Equal(t, []string{dependentTestJob.Name}, parentTestJob.DependentJobs)

ptj, err := dependentTestJob.GetParent()
ptj, err := dependentTestJob.GetParent(s)
assert.NoError(t, err)
assert.Equal(t, parentTestJob.Name, ptj.Name)

Expand All @@ -60,7 +58,7 @@ func TestJobGetParent(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "", dtj.ParentJob)

ptj, err = dtj.GetParent()
ptj, err = dtj.GetParent(s)
assert.EqualError(t, ErrNoParent, err.Error())

ptj, err = s.GetJob(parentTestJob.Name, nil)
Expand Down Expand Up @@ -110,9 +108,10 @@ func Test_isRunnable(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

a := &Agent{}
s, err := NewStore(a, dir)
a.Store = s
s, err := NewStore(dir)
a := &Agent{
Store: s,
}
defer s.Shutdown()
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion dkron/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (a *Agent) establishLeadership(stopCh chan struct{}) error {
if err != nil {
log.Fatal(err)
}
a.sched.Start(jobs)
a.sched.Start(jobs, a)

return nil
}
Expand Down
7 changes: 7 additions & 0 deletions dkron/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ func WithTransportCredentials(tls *tls.Config) AgentOption {
agent.TLSConfig = tls
}
}

// WithStore set store in the agent
func WithStore(store Storage) AgentOption {
return func(agent *Agent) {
agent.Store = store
}
}
7 changes: 5 additions & 2 deletions dkron/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestRunQuery(t *testing.T) {
c.BootstrapExpect = 1

a := NewAgent(c)
a.Start()
err = a.Start()
require.NoError(t, err)
time.Sleep(2 * time.Second)

// Test error with no job
Expand All @@ -45,8 +46,10 @@ func TestRunQuery(t *testing.T) {
err = a.Store.SetJob(j1, false)
require.NoError(t, err)

a.sched.Start([]*Job{j1})
a.sched.Start([]*Job{j1}, a)

_, err = a.RunQuery("test_job", &Execution{})
assert.NoError(t, err)

a.Stop()
}
9 changes: 5 additions & 4 deletions dkron/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strings"

"github.com/armon/go-metrics"
"github.com/robfig/cron/v3"
"github.com/distribworks/dkron/v2/extcron"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -39,11 +39,12 @@ func NewScheduler() *Scheduler {

// Start the cron scheduler, adding its corresponding jobs and
// executing them on time.
func (s *Scheduler) Start(jobs []*Job) error {
func (s *Scheduler) Start(jobs []*Job, agent *Agent) error {
s.Cron = cron.New(cron.WithParser(extcron.NewParser()))

metrics.IncrCounter([]string{"scheduler", "start"}, 1)
for _, job := range jobs {
job.Agent = agent
s.AddJob(job)
}
s.Cron.Start()
Expand All @@ -70,9 +71,9 @@ func (s *Scheduler) Stop() {
}

// Restart the scheduler
func (s *Scheduler) Restart(jobs []*Job) {
func (s *Scheduler) Restart(jobs []*Job, agent *Agent) {
s.Stop()
s.Start(jobs)
s.Start(jobs, agent)
}

// GetEntry returns a scheduler entry from a snapshot in
Expand Down
8 changes: 5 additions & 3 deletions dkron/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestSchedule(t *testing.T) {
Owner: "John Dough",
OwnerEmail: "foo@bar.com",
}
sched.Start([]*Job{testJob1})
sched.Start([]*Job{testJob1}, &Agent{})

assert.True(t, sched.Started)
now := time.Now().Truncate(time.Second)
Expand All @@ -36,10 +36,11 @@ func TestSchedule(t *testing.T) {
Owner: "John Dough",
OwnerEmail: "foo@bar.com",
}
sched.Restart([]*Job{testJob2})
sched.Restart([]*Job{testJob2}, &Agent{})

assert.True(t, sched.Started)
assert.Len(t, sched.Cron.Entries(), 1)
sched.Stop()
}

func TestTimezoneAwareJob(t *testing.T) {
Expand All @@ -52,8 +53,9 @@ func TestTimezoneAwareJob(t *testing.T) {
Executor: "shell",
ExecutorConfig: map[string]string{"command": "echo 'test1'", "shell": "true"},
}
sched.Start([]*Job{tzJob})
sched.Start([]*Job{tzJob}, &Agent{})

assert.True(t, sched.Started)
assert.Len(t, sched.Cron.Entries(), 1)
sched.Stop()
}
Loading

0 comments on commit 5ee2663

Please sign in to comment.