Skip to content

Commit

Permalink
Move inner job creation loop to own method
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Oct 1, 2024
1 parent c8022f1 commit befd443
Showing 1 changed file with 44 additions and 45 deletions.
89 changes: 44 additions & 45 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,57 +167,56 @@ func (m *Monitor) Start(ctx context.Context, handler JobHandler) <-chan error {
return
}

jobs := resp.CommandJobs()

// Wrapped so staleCancel is called before the next loop iteration.
func() {
// A sneaky way to create a channel that is closed after a duration.
// Why not pass directly to handler.Create? Because that might
// interrupt scheduling a pod, when all we want is to bound the
// time spent waiting for the limiter.
staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.PollInterval)
defer staleCancel()

// TODO: sort by ScheduledAt in the API
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt)
})

for _, j := range jobs {
if staleCtx.Err() != nil {
// Results already stale; try again later.
return
}

jobTags := toMapAndLogErrors(logger, j.AgentQueryRules)

// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", j))
continue
}

job := Job{
CommandJob: &j.CommandJob,
StaleCh: staleCtx.Done(),
}

logger.Debug("creating job", zap.String("uuid", j.Uuid))
if err := handler.Create(ctx, job); err != nil {
if ctx.Err() != nil {
return
}
logger.Error("failed to create job", zap.Error(err))
}
}
}()
m.createJobs(ctx, logger, handler, agentTags, resp.CommandJobs())
}
}()

return errs
}

func (m *Monitor) createJobs(ctx context.Context, logger *zap.Logger, handler JobHandler, agentTags map[string]string, jobs []*api.JobJobTypeCommand) {
// A sneaky way to create a channel that is closed after a duration.
// Why not pass directly to handler.Create? Because that might
// interrupt scheduling a pod, when all we want is to bound the
// time spent waiting for the limiter.
staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.PollInterval)
defer staleCancel()

// TODO: sort by ScheduledAt in the API
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt)
})

for _, j := range jobs {
if staleCtx.Err() != nil {
// Results already stale; try again later.
return
}

jobTags := toMapAndLogErrors(logger, j.AgentQueryRules)

// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", j))
continue
}

job := Job{
CommandJob: &j.CommandJob,
StaleCh: staleCtx.Done(),
}

logger.Debug("creating job", zap.String("uuid", j.Uuid))
if err := handler.Create(ctx, job); err != nil {
if ctx.Err() != nil {
return
}
logger.Error("failed to create job", zap.Error(err))
}
}
}

func encodeClusterGraphQLID(clusterUUID string) string {
return base64.StdEncoding.EncodeToString([]byte("Cluster---" + clusterUUID))
}

0 comments on commit befd443

Please sign in to comment.