Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update task schedler to use the new task control service #12949

Merged
merged 13 commits into from
Mar 28, 2019
7 changes: 4 additions & 3 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,12 +547,13 @@ func (m *Launcher) run(ctx context.Context) (err error) {
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, store)

lw := taskbackend.NewPointLogWriter(pointsWriter)
m.scheduler = taskbackend.NewScheduler(store, executor, lw, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskControlService := taskbackend.TaskControlAdaptor(store, lw, lr)
m.scheduler = taskbackend.NewScheduler(taskControlService, executor, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
m.scheduler.Start(ctx)
m.reg.MustRegister(m.scheduler.PrometheusCollectors()...)

queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, store), lr, m.scheduler, authSvc, userResourceSvc, orgSvc)
taskSvc = task.NewValidator(m.logger.With(zap.String("service", "task-authz-validator")), taskSvc, bucketSvc)
m.taskStore = store
Expand Down
2 changes: 1 addition & 1 deletion cmd/influxd/launcher/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ stuff f=-123.456,b=true,s="hello"
}
from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, bOut.Name, be.Org.Name),
}
created, err := be.TaskService().CreateTask(pctx.SetAuthorizer(ctx, be.Auth), create)
created, err := be.TaskService().CreateTask(ctx, create)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion http/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func httpTaskServiceFactory(t *testing.T) (*servicetest.System, context.CancelFu
}

return &servicetest.System{
TaskControlService: servicetest.TaskControlAdaptor(store, rrw, rrw),
TaskControlService: backend.TaskControlAdaptor(store, rrw, rrw),
TaskService: taskService,
Ctx: ctx,
I: i,
Expand Down
15 changes: 15 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ type Task struct {
UpdatedAt string `json:"updatedAt,omitempty"`
}

// EffectiveCron returns the effective cron string of the options.
// If the cron option was specified, it is returned.
// If the every option was specified, it is converted into a cron string using "@every".
// Otherwise, the empty string is returned.
// The value of the offset option is not considered.
func (t *Task) EffectiveCron() string {
if t.Cron != "" {
return t.Cron
}
if t.Every != "" {
return "@every " + t.Every
}
return ""
}

// Run is a record created when a run of a task is scheduled.
type Run struct {
ID ID `json:"id,omitempty"`
Expand Down
35 changes: 27 additions & 8 deletions task/backend/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ func (c *Coordinator) claimExistingTasks() {
continue
}

t := task // Copy to avoid mistaken closure around task value.
if err := c.sch.ClaimTask(&t.Task, &t.Meta); err != nil {
t, err := backend.ToInfluxTask(&task.Task, &task.Meta)
if err != nil {
continue
}

// I may need a context with an auth here
if err := c.sch.ClaimTask(context.Background(), t); err != nil {
c.logger.Error("failed claim task", zap.Error(err))
continue
}
Expand All @@ -84,8 +89,11 @@ func (c *Coordinator) CreateTask(ctx context.Context, req backend.CreateTaskRequ
if err != nil {
return id, err
}

if err := c.sch.ClaimTask(task, meta); err != nil {
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return id, err
}
if err := c.sch.ClaimTask(ctx, t); err != nil {
_, delErr := c.Store.DeleteTask(ctx, id)
if delErr != nil {
return id, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, delErr)
Expand Down Expand Up @@ -114,13 +122,18 @@ func (c *Coordinator) UpdateTask(ctx context.Context, req backend.UpdateTaskRequ
}
}

if err := c.sch.UpdateTask(task, meta); err != nil && err != backend.ErrTaskNotClaimed {
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return res, err
}

if err := c.sch.UpdateTask(ctx, t); err != nil && err != backend.ErrTaskNotClaimed {
return res, err
}

// If enabling the task, claim it after modifying the script.
if req.Status == backend.TaskActive {
if err := c.sch.ClaimTask(task, meta); err != nil && err != backend.ErrTaskAlreadyClaimed {
if err := c.sch.ClaimTask(ctx, t); err != nil && err != backend.ErrTaskAlreadyClaimed {
return res, err
}
}
Expand Down Expand Up @@ -162,9 +175,15 @@ func (c *Coordinator) ManuallyRunTimeRange(ctx context.Context, taskID platform.
if err != nil {
return r, err
}
t, m, err := c.Store.FindTaskByIDWithMeta(ctx, taskID)
task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return nil, err
}

t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return nil, err
}
return r, c.sch.UpdateTask(t, m)

return r, c.sch.UpdateTask(ctx, t)
}
14 changes: 7 additions & 7 deletions task/backend/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"go.uber.org/zap/zaptest"
)

func timeoutSelector(ch <-chan *mock.Task) (*mock.Task, error) {
func timeoutSelector(ch <-chan *platform.Task) (*platform.Task, error) {
select {
case task := <-ch:
return task, nil
case <-time.After(time.Second):
case <-time.After(10 * time.Second):
return nil, errors.New("timeout on select")
}
}
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}

Expand All @@ -65,7 +65,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}

Expand Down Expand Up @@ -102,7 +102,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}

Expand All @@ -115,7 +115,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}

Expand All @@ -129,7 +129,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}

if task.Script != newScript {
if task.Flux != newScript {
t.Fatal("task sent to scheduler doesnt match task created")
}
}
Expand Down
Loading