Skip to content

Commit

Permalink
update task schedler to use the new task control service (#12949)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyondhill authored Mar 28, 2019
1 parent 0da5abb commit c78344c
Show file tree
Hide file tree
Showing 13 changed files with 1,064 additions and 629 deletions.
7 changes: 4 additions & 3 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,12 +547,13 @@ func (m *Launcher) run(ctx context.Context) (err error) {
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, store)

lw := taskbackend.NewPointLogWriter(pointsWriter)
m.scheduler = taskbackend.NewScheduler(store, executor, lw, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskControlService := taskbackend.TaskControlAdaptor(store, lw, lr)
m.scheduler = taskbackend.NewScheduler(taskControlService, executor, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
m.scheduler.Start(ctx)
m.reg.MustRegister(m.scheduler.PrometheusCollectors()...)

queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, store), lr, m.scheduler, authSvc, userResourceSvc, orgSvc)
taskSvc = task.NewValidator(m.logger.With(zap.String("service", "task-authz-validator")), taskSvc, bucketSvc)
m.taskStore = store
Expand Down
2 changes: 1 addition & 1 deletion cmd/influxd/launcher/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ stuff f=-123.456,b=true,s="hello"
}
from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, bOut.Name, be.Org.Name),
}
created, err := be.TaskService().CreateTask(pctx.SetAuthorizer(ctx, be.Auth), create)
created, err := be.TaskService().CreateTask(ctx, create)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion http/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func httpTaskServiceFactory(t *testing.T) (*servicetest.System, context.CancelFu
}

return &servicetest.System{
TaskControlService: servicetest.TaskControlAdaptor(store, rrw, rrw),
TaskControlService: backend.TaskControlAdaptor(store, rrw, rrw),
TaskService: taskService,
Ctx: ctx,
I: i,
Expand Down
15 changes: 15 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ type Task struct {
UpdatedAt string `json:"updatedAt,omitempty"`
}

// EffectiveCron returns the effective cron string of the options.
// If the cron option was specified, it is returned.
// If the every option was specified, it is converted into a cron string using "@every".
// Otherwise, the empty string is returned.
// The value of the offset option is not considered.
func (t *Task) EffectiveCron() string {
if t.Cron != "" {
return t.Cron
}
if t.Every != "" {
return "@every " + t.Every
}
return ""
}

// Run is a record created when a run of a task is scheduled.
type Run struct {
ID ID `json:"id,omitempty"`
Expand Down
35 changes: 27 additions & 8 deletions task/backend/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ func (c *Coordinator) claimExistingTasks() {
continue
}

t := task // Copy to avoid mistaken closure around task value.
if err := c.sch.ClaimTask(&t.Task, &t.Meta); err != nil {
t, err := backend.ToInfluxTask(&task.Task, &task.Meta)
if err != nil {
continue
}

// I may need a context with an auth here
if err := c.sch.ClaimTask(context.Background(), t); err != nil {
c.logger.Error("failed claim task", zap.Error(err))
continue
}
Expand All @@ -84,8 +89,11 @@ func (c *Coordinator) CreateTask(ctx context.Context, req backend.CreateTaskRequ
if err != nil {
return id, err
}

if err := c.sch.ClaimTask(task, meta); err != nil {
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return id, err
}
if err := c.sch.ClaimTask(ctx, t); err != nil {
_, delErr := c.Store.DeleteTask(ctx, id)
if delErr != nil {
return id, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, delErr)
Expand Down Expand Up @@ -114,13 +122,18 @@ func (c *Coordinator) UpdateTask(ctx context.Context, req backend.UpdateTaskRequ
}
}

if err := c.sch.UpdateTask(task, meta); err != nil && err != backend.ErrTaskNotClaimed {
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return res, err
}

if err := c.sch.UpdateTask(ctx, t); err != nil && err != backend.ErrTaskNotClaimed {
return res, err
}

// If enabling the task, claim it after modifying the script.
if req.Status == backend.TaskActive {
if err := c.sch.ClaimTask(task, meta); err != nil && err != backend.ErrTaskAlreadyClaimed {
if err := c.sch.ClaimTask(ctx, t); err != nil && err != backend.ErrTaskAlreadyClaimed {
return res, err
}
}
Expand Down Expand Up @@ -162,9 +175,15 @@ func (c *Coordinator) ManuallyRunTimeRange(ctx context.Context, taskID platform.
if err != nil {
return r, err
}
t, m, err := c.Store.FindTaskByIDWithMeta(ctx, taskID)
task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return nil, err
}

t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return nil, err
}
return r, c.sch.UpdateTask(t, m)

return r, c.sch.UpdateTask(ctx, t)
}
14 changes: 7 additions & 7 deletions task/backend/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"go.uber.org/zap/zaptest"
)

func timeoutSelector(ch <-chan *mock.Task) (*mock.Task, error) {
func timeoutSelector(ch <-chan *platform.Task) (*platform.Task, error) {
select {
case task := <-ch:
return task, nil
case <-time.After(time.Second):
case <-time.After(10 * time.Second):
return nil, errors.New("timeout on select")
}
}
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}

Expand All @@ -65,7 +65,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}

Expand Down Expand Up @@ -102,7 +102,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}

Expand All @@ -115,7 +115,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}

Expand All @@ -129,7 +129,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != newScript {
if task.Flux != newScript {
t.Fatal("task sent to scheduler doesnt match task created")
}
}
Expand Down
Loading

0 comments on commit c78344c

Please sign in to comment.