Skip to content

Commit

Permalink
Try to fix execution done missing
Browse files Browse the repository at this point in the history
As #349 describes, a job with forbidden concurrency doesn't execute again if the target node is restarted.

This PR tries to sove it by implementing a mechanism that asks the running nodes about the job status before checking if the job finished and before running the job.
  • Loading branch information
Victor Castell committed May 3, 2018
1 parent 80f2222 commit f1689d7
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 26 deletions.
53 changes: 47 additions & 6 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
ErrNoRPCAddress = errors.New("No RPC address tag found in server")

defaultLeaderTTL = 20 * time.Second

runningExecutions = make(map[string]*Execution)
)

type Agent struct {
Expand Down Expand Up @@ -386,8 +388,8 @@ func (a *Agent) eventLoop() {
metrics.IncrCounter([]string{"agent", "event_received", e.String()}, 1)

// Log all member events
if failed, ok := e.(serf.MemberEvent); ok {
for _, member := range failed.Members {
if me, ok := e.(serf.MemberEvent); ok {
for _, member := range me.Members {
log.WithFields(logrus.Fields{
"node": a.config.NodeName,
"member": member.Name,
Expand Down Expand Up @@ -440,14 +442,20 @@ func (a *Agent) eventLoop() {
query.Respond(exJSON)
}

if query.Name == QueryRPCConfig && a.config.Server {
if query.Name == QueryExecutionDone {
log.WithFields(logrus.Fields{
"query": query.Name,
"payload": string(query.Payload),
"at": query.LTime,
}).Debug("agent: RPC Config requested")

err := query.Respond([]byte(a.getRPCAddr()))
}).Debug("agent: Execution done requested")

// Find if the indicated execution is done processing
var err error
if _, ok := runningExecutions[string(query.Payload)]; ok {
err = query.Respond([]byte("false"))
} else {
err = query.Respond([]byte("true"))
}
if err != nil {
log.WithError(err).Error("agent: query.Respond")
}
Expand Down Expand Up @@ -553,3 +561,36 @@ func (a *Agent) Leave() error {
func (a *Agent) SetTags(tags map[string]string) error {
return a.serf.SetTags(tags)
}

// RefreshJobStatus asks the nodes their progress on an execution
func (a *Agent) RefreshJobStatus(jobName string) {
var group string

execs, _ := a.Store.GetLastExecutionGroup(jobName)
nodes := []string{}

for _, ex := range execs {
log.WithFields(logrus.Fields{
"member": ex.NodeName,
"execution_key": ex.Key(),
}).Debug("agent: Asking member for pending execution")

nodes = append(nodes, ex.NodeName)
group = strconv.FormatInt(ex.Group, 10)
}

statuses := a.executionDoneQuery(nodes, group)

for _, ex := range execs {
if s, ok := statuses[ex.NodeName]; ok {
done, _ := strconv.ParseBool(s)
if done {
ex.FinishedAt = time.Now()
a.Store.SetExecution(ex)
}
} else {
ex.FinishedAt = time.Now()
a.Store.SetExecution(ex)
}
}
}
4 changes: 3 additions & 1 deletion dkron/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (a *Agent) invokeJob(job *Job, execution *Execution) error {
// Check if executor is exists
if executor, ok := a.ExecutorPlugins[jex]; ok {
log.WithField("plugin", jex).Debug("invoke: calling executor plugin")
runningExecutions[execution.Key()] = execution
out, err := executor.Execute(&ExecuteRequest{
JobName: job.Name,
Config: exc,
Expand All @@ -57,7 +58,7 @@ func (a *Agent) invokeJob(job *Job, execution *Execution) error {

output.Write(out)
} else {
log.Errorf("invoke: Specified executor %s is not present", executor)
log.WithField("executor", executor).Error("invoke: Specified executor is not present")
}

execution.FinishedAt = time.Now()
Expand All @@ -69,6 +70,7 @@ func (a *Agent) invokeJob(job *Job, execution *Execution) error {
return err
}

delete(runningExecutions, execution.Key())
rc := &RPCClient{ServerAddr: string(rpcServer)}
return rc.callExecutionDone(execution)
}
Expand Down
1 change: 1 addition & 0 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (j *Job) Unlock() error {
}

func (j *Job) isRunnable() bool {
j.Agent.RefreshJobStatus(j.Name)
status := j.Status()

if status == Running {
Expand Down
50 changes: 49 additions & 1 deletion dkron/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
const (
QuerySchedulerRestart = "scheduler:restart"
QueryRunJob = "run:job"
QueryRPCConfig = "rpc:config"
QueryExecutionDone = "execution:done"
)

type RunQueryParam struct {
Expand Down Expand Up @@ -157,3 +157,51 @@ func (a *Agent) schedulerRestartQuery(leaderName string) {
}
log.WithField("query", QuerySchedulerRestart).Debug("agent: Done receiving acks and responses")
}

// Broadcast a ExecutionDone to the cluster.
func (a *Agent) executionDoneQuery(nodes []string, group string) map[string]string {
params := &serf.QueryParam{
FilterNodes: nodes,
RequestAck: true,
}

log.WithFields(logrus.Fields{
"query": QueryExecutionDone,
"members": nodes,
}).Debug("agent: Sending query")

qr, err := a.serf.Query(QueryExecutionDone, []byte(group), params)
if err != nil {
log.WithError(err).Fatal("agent: Error sending the execution done query")
}
defer qr.Close()

statuses := make(map[string]string)
ackCh := qr.AckCh()
respCh := qr.ResponseCh()

for !qr.Finished() {
select {
case ack, ok := <-ackCh:
if ok {
log.WithFields(logrus.Fields{
"from": ack,
}).Debug("agent: Received ack")
}
case resp, ok := <-respCh:
if ok {
log.WithFields(logrus.Fields{
"from": resp.From,
"payload": string(resp.Payload),
}).Debug("agent: Received response")

statuses[resp.From] = string(resp.Payload)
}
}
}
log.WithField("query", QueryExecutionDone).Debug("agent: Done receiving acks and responses")

// In case the query finishes by deadline without receiving a response from the node
// set the execution as finished, maybe the node is gone by now.
return statuses
}
51 changes: 33 additions & 18 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ const MaxExecutions = 100

type Storage interface {
SetJob(job *Job) error
AtomicJobPut(job *Job, prevJobKVPair *store.KVPair) (bool, error)
SetJobDependencyTree(job *Job, previousJob *Job) error
GetJobs() ([]*Job, error)
GetJob(name string) (*Job, error)
GetJobWithKVPair(name string) (*Job, *store.KVPair, error)
DeleteJob(name string) (*Job, error)
GetExecutions(jobName string) ([]*Execution, error)
GetLastExecutionGroup(jobName string) ([]*Execution, error)
GetExecutionGroup(execution *Execution) ([]*Execution, error)
GetGroupedExecutions(jobName string) (map[int64][]*Execution, []int64, error)
GetCurrentExecutions(nodeName string) ([]*Execution, error)
SetExecution(execution *Execution) (string, error)
DeleteExecutions(jobName string) error
GetLeader() []byte
Expand Down Expand Up @@ -279,24 +282,7 @@ func (s *Store) GetExecutions(jobName string) ([]*Execution, error) {
return nil, err
}

var executions []*Execution

for _, node := range res {
if store.Backend(s.backend) != store.ZK {
path := store.SplitKey(node.Key)
dir := path[len(path)-2]
if dir != jobName {
continue
}
}
var execution Execution
err := json.Unmarshal([]byte(node.Value), &execution)
if err != nil {
return nil, err
}
executions = append(executions, &execution)
}
return executions, nil
return s.unmarshalExecutions(res, jobName)
}

func (s *Store) GetLastExecutionGroup(jobName string) ([]*Execution, error) {
Expand Down Expand Up @@ -415,6 +401,35 @@ func (s *Store) SetExecution(execution *Execution) (string, error) {
return key, nil
}

func (s *Store) GetCurrentExecutions(nodeName string) ([]*Execution, error) {
res, err := s.Client.List(fmt.Sprintf("%s/current_executions/%s", s.keyspace, nodeName), nil)
if err != nil {
return nil, err
}

return s.unmarshalExecutions(res, nodeName)
}

func (s *Store) unmarshalExecutions(res []*store.KVPair, stopWord string) ([]*Execution, error) {
var executions []*Execution
for _, node := range res {
if store.Backend(s.backend) != store.ZK {
path := store.SplitKey(node.Key)
dir := path[len(path)-2]
if dir != stopWord {
continue
}
}
var execution Execution
err := json.Unmarshal([]byte(node.Value), &execution)
if err != nil {
return nil, err
}
executions = append(executions, &execution)
}
return executions, nil
}

// Removes all executions of a job
func (s *Store) DeleteExecutions(jobName string) error {
return s.Client.DeleteTree(fmt.Sprintf("%s/executions/%s", s.keyspace, jobName))
Expand Down

0 comments on commit f1689d7

Please sign in to comment.