Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Allow pro appliers #669

Merged
merged 4 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Agent struct {
// Pro features
GlobalLock bool
MemberEventHandler func(serf.Event)
ProAppliers LogAppliers

serf *serf.Serf
config *Config
Expand Down Expand Up @@ -295,7 +296,7 @@ func (a *Agent) setupRaft() error {
if err != nil {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
tmpFsm := newFSM(nil)
tmpFsm := newFSM(nil, nil)
if err := raft.RecoverCluster(config, tmpFsm,
logStore, stableStore, snapshots, transport, configuration); err != nil {
return fmt.Errorf("recovery failed: %v", err)
Expand Down Expand Up @@ -331,13 +332,14 @@ func (a *Agent) setupRaft() error {

// Instantiate the Raft systems. The second parameter is a finite state machine
// which stores the actual kv pairs and is operated upon through Apply().
fsm := newFSM(a.Store)
fsm := newFSM(a.Store, a.ProAppliers)
rft, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshots, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
a.leaderCh = rft.LeaderCh()
a.raft = rft

return nil
}

Expand Down Expand Up @@ -451,18 +453,7 @@ func (a *Agent) StartServer() {
}

if a.Store == nil {
dirExists, err := exists(a.config.DataDir)
if err != nil {
log.WithError(err).WithField("dir", a.config.DataDir).Fatal("Invalid Dir")
}
if !dirExists {
// Try to create the directory
err := os.Mkdir(a.config.DataDir, 0700)
if err != nil {
log.WithError(err).WithField("dir", a.config.DataDir).Fatal("Error Creating Dir")
}
}
s, err := NewStore(a, filepath.Join(a.config.DataDir, a.config.NodeName))
s, err := NewStore(filepath.Join(a.config.DataDir, a.config.NodeName))
if err != nil {
log.WithError(err).Fatal("dkron: Error initializing store")
}
Expand Down Expand Up @@ -572,6 +563,11 @@ func (a *Agent) LocalMember() serf.Member {
return a.serf.LocalMember()
}

// Leader is used to return the Raft leader
func (a *Agent) Leader() raft.ServerAddress {
return a.raft.Leader()
}

// Servers returns a list of known server
func (a *Agent) Servers() (members []*ServerParts) {
for _, member := range a.serf.Members() {
Expand Down Expand Up @@ -880,3 +876,8 @@ func (a *Agent) applySetJob(job *proto.Job) error {

return nil
}

// RaftApply applies a command to the Raft log
func (a *Agent) RaftApply(cmd []byte) raft.ApplyFuture {
return a.raft.Apply(cmd, raftTimeout)
}
9 changes: 7 additions & 2 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (h *HTTPTransport) ServeHTTP() {
}

// APIRoutes registers the api routes on the gin RouterGroup.
func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup) {
func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup, middleware ...gin.HandlerFunc) {
r.GET("/debug/vars", expvar.Handler())

h.Engine.GET("/health", func(c *gin.Context) {
Expand All @@ -63,6 +63,7 @@ func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup) {

r.GET("/v1", h.indexHandler)
v1 := r.Group("/v1")
v1.Use(middleware...)
v1.GET("/", h.indexHandler)
v1.GET("/members", h.membersHandler)
v1.GET("/leader", h.leaderHandler)
Expand Down Expand Up @@ -117,7 +118,11 @@ func (h *HTTPTransport) indexHandler(c *gin.Context) {
func (h *HTTPTransport) jobsHandler(c *gin.Context) {
metadata := c.QueryMap("metadata")

jobs, err := h.agent.Store.GetJobs(&JobOptions{Metadata: metadata})
jobs, err := h.agent.Store.GetJobs(
&JobOptions{
Metadata: metadata,
},
)
if err != nil {
log.WithError(err).Error("api: Unable to get jobs, store not reachable.")
return
Expand Down
20 changes: 18 additions & 2 deletions dkron/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,26 @@ const (
ExecutionDoneType
)

// LogApplier is the definition of a function that can apply a Raft log
type LogApplier func(buf []byte, index uint64) interface{}

// LogAppliers is a mapping of the Raft MessageType to the appropriate log
// applier
type LogAppliers map[MessageType]LogApplier

type dkronFSM struct {
store Storage
mu sync.Mutex

// proAppliers holds the set of pro only LogAppliers
proAppliers LogAppliers
}

// NewFSM is used to construct a new FSM with a blank state
func newFSM(store Storage) *dkronFSM {
func newFSM(store Storage, logAppliers LogAppliers) *dkronFSM {
return &dkronFSM{
store: store,
store: store,
proAppliers: logAppliers,
}
}

Expand All @@ -56,6 +67,11 @@ func (d *dkronFSM) Apply(l *raft.Log) interface{} {
return d.applySetExecution(buf[1:])
}

// Check enterprise only message types.
if applier, ok := d.proAppliers[msgType]; ok {
return applier(buf[1:], l.Index)
}

return nil
}

Expand Down
9 changes: 2 additions & 7 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,7 @@ func (j *Job) GetStatus() string {
}

// GetParent returns the parent job of a job
func (j *Job) GetParent() (*Job, error) {
// Maybe we are testing
if j.Agent == nil {
return nil, ErrNoAgent
}

func (j *Job) GetParent(store *Store) (*Job, error) {
if j.Name == j.ParentJob {
return nil, ErrSameParent
}
Expand All @@ -293,7 +288,7 @@ func (j *Job) GetParent() (*Job, error) {
return nil, ErrNoParent
}

parentJob, err := j.Agent.Store.GetJob(j.ParentJob, nil)
parentJob, err := store.GetJob(j.ParentJob, nil)
if err != nil {
if err == badger.ErrKeyNotFound {
return nil, ErrParentJobNotFound
Expand Down
17 changes: 8 additions & 9 deletions dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ func TestJobGetParent(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

a := &Agent{}
s, err := NewStore(a, dir)
s, err := NewStore(dir)
defer s.Shutdown()
require.NoError(t, err)
a.Store = s

parentTestJob := &Job{
Name: "parent_test",
Expand All @@ -42,11 +40,11 @@ func TestJobGetParent(t *testing.T) {
err = s.SetJob(dependentTestJob, true)
assert.NoError(t, err)

parentTestJob, err = dependentTestJob.GetParent()
parentTestJob, err = dependentTestJob.GetParent(s)
assert.NoError(t, err)
assert.Equal(t, []string{dependentTestJob.Name}, parentTestJob.DependentJobs)

ptj, err := dependentTestJob.GetParent()
ptj, err := dependentTestJob.GetParent(s)
assert.NoError(t, err)
assert.Equal(t, parentTestJob.Name, ptj.Name)

Expand All @@ -60,7 +58,7 @@ func TestJobGetParent(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "", dtj.ParentJob)

ptj, err = dtj.GetParent()
ptj, err = dtj.GetParent(s)
assert.EqualError(t, ErrNoParent, err.Error())

ptj, err = s.GetJob(parentTestJob.Name, nil)
Expand Down Expand Up @@ -110,9 +108,10 @@ func Test_isRunnable(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

a := &Agent{}
s, err := NewStore(a, dir)
a.Store = s
s, err := NewStore(dir)
a := &Agent{
Store: s,
}
defer s.Shutdown()
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion dkron/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (a *Agent) establishLeadership(stopCh chan struct{}) error {
if err != nil {
log.Fatal(err)
}
a.sched.Start(jobs)
a.sched.Start(jobs, a)

return nil
}
Expand Down
7 changes: 7 additions & 0 deletions dkron/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ func WithTransportCredentials(tls *tls.Config) AgentOption {
agent.TLSConfig = tls
}
}

// WithStore set store in the agent
func WithStore(store Storage) AgentOption {
return func(agent *Agent) {
agent.Store = store
}
}
7 changes: 5 additions & 2 deletions dkron/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestRunQuery(t *testing.T) {
c.BootstrapExpect = 1

a := NewAgent(c)
a.Start()
err = a.Start()
require.NoError(t, err)
time.Sleep(2 * time.Second)

// Test error with no job
Expand All @@ -45,8 +46,10 @@ func TestRunQuery(t *testing.T) {
err = a.Store.SetJob(j1, false)
require.NoError(t, err)

a.sched.Start([]*Job{j1})
a.sched.Start([]*Job{j1}, a)

_, err = a.RunQuery("test_job", &Execution{})
assert.NoError(t, err)

a.Stop()
}
9 changes: 5 additions & 4 deletions dkron/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strings"

"github.com/armon/go-metrics"
"github.com/robfig/cron/v3"
"github.com/distribworks/dkron/v2/extcron"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -39,11 +39,12 @@ func NewScheduler() *Scheduler {

// Start the cron scheduler, adding its corresponding jobs and
// executing them on time.
func (s *Scheduler) Start(jobs []*Job) error {
func (s *Scheduler) Start(jobs []*Job, agent *Agent) error {
s.Cron = cron.New(cron.WithParser(extcron.NewParser()))

metrics.IncrCounter([]string{"scheduler", "start"}, 1)
for _, job := range jobs {
job.Agent = agent
s.AddJob(job)
}
s.Cron.Start()
Expand All @@ -70,9 +71,9 @@ func (s *Scheduler) Stop() {
}

// Restart the scheduler
func (s *Scheduler) Restart(jobs []*Job) {
func (s *Scheduler) Restart(jobs []*Job, agent *Agent) {
s.Stop()
s.Start(jobs)
s.Start(jobs, agent)
}

// GetEntry returns a scheduler entry from a snapshot in
Expand Down
8 changes: 5 additions & 3 deletions dkron/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestSchedule(t *testing.T) {
Owner: "John Dough",
OwnerEmail: "foo@bar.com",
}
sched.Start([]*Job{testJob1})
sched.Start([]*Job{testJob1}, &Agent{})

assert.True(t, sched.Started)
now := time.Now().Truncate(time.Second)
Expand All @@ -36,10 +36,11 @@ func TestSchedule(t *testing.T) {
Owner: "John Dough",
OwnerEmail: "foo@bar.com",
}
sched.Restart([]*Job{testJob2})
sched.Restart([]*Job{testJob2}, &Agent{})

assert.True(t, sched.Started)
assert.Len(t, sched.Cron.Entries(), 1)
sched.Stop()
}

func TestTimezoneAwareJob(t *testing.T) {
Expand All @@ -52,8 +53,9 @@ func TestTimezoneAwareJob(t *testing.T) {
Executor: "shell",
ExecutorConfig: map[string]string{"command": "echo 'test1'", "shell": "true"},
}
sched.Start([]*Job{tzJob})
sched.Start([]*Job{tzJob}, &Agent{})

assert.True(t, sched.Started)
assert.Len(t, sched.Cron.Entries(), 1)
sched.Stop()
}
Loading