Skip to content

Commit

Permalink
chore(task/executor): Add tests for extern injection.
Browse files Browse the repository at this point in the history
  • Loading branch information
brettbuddin committed Aug 25, 2020
1 parent 07fcbc9 commit 41bfba5
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 24 deletions.
127 changes: 113 additions & 14 deletions task/backend/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import (

"github.com/golang/mock/gomock"
"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
tracetest "github.com/influxdata/influxdb/v2/kit/tracing/testing"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
influxdbmock "github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/task/backend"
Expand Down Expand Up @@ -121,8 +124,104 @@ func TestTaskExecutor_QuerySuccess(t *testing.T) {
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.SucceedQuery(script, nil)

<-promise.Done()

if got := promise.Error(); got != nil {
t.Fatal(got)
}

// confirm run is removed from in-mem store
run, err = tes.i.FindRunByID(context.Background(), task.ID, run.ID)
if run != nil || err == nil || !strings.Contains(err.Error(), "run not found") {
t.Fatal("run was returned when it should have been removed from kv")
}

// ensure the run returned by TaskControlService.FinishRun(...)
// has run logs formatted as expected
if run = tes.tcs.run; run == nil {
t.Fatal("expected run returned by FinishRun to not be nil")
}

if len(run.Log) < 3 {
t.Fatalf("expected 3 run logs, found %d", len(run.Log))
}

sctx := span.Context().(jaeger.SpanContext)
expectedMessage := fmt.Sprintf("trace_id=%s is_sampled=true", sctx.TraceID())
if expectedMessage != run.Log[1].Message {
t.Errorf("expected %q, found %q", expectedMessage, run.Log[1].Message)
}
}

func TestTaskExecutor_QuerySuccessWithExternInjection(t *testing.T) {
t.Parallel()

tes := taskExecutorSystem(t)

var (
script = fmt.Sprintf(fmtTestScript, t.Name())
ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
span = opentracing.GlobalTracer().StartSpan("test-span")
)
ctx = opentracing.ContextWithSpan(ctx, span)

task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{
OrganizationID: tes.tc.OrgID,
OwnerID: tes.tc.Auth.GetUserID(),
Flux: script,
})
if err != nil {
t.Fatal(err)
}

// Simulate previous run to establish a timestamp
latestSuccess := time.Now().UTC()
task, err = tes.i.UpdateTask(ctx, task.ID, influxdb.TaskUpdate{
LatestSuccess: &latestSuccess,
})
extern := &ast.File{
Body: []ast.Statement{&ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: latestSuccessOption},
Init: &ast.DateTimeLiteral{
Value: latestSuccess,
},
},
},
},
}

ctx, err = feature.Annotate(ctx, influxdbmock.NewFlagger(map[feature.Flag]interface{}{
feature.InjectLatestSuccessTime(): true,
}))
if err != nil {
t.Fatal(err)
}

promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
promiseID := influxdb.ID(promise.ID())

run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
if err != nil {
t.Fatal(err)
}

if run.ID != promiseID {
t.Fatal("promise and run dont match")
}

if run.RunAt != time.Unix(126, 0).UTC() {
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
}

tes.svc.WaitForQueryLive(t, script, extern)
tes.svc.SucceedQuery(script, extern)

<-promise.Done()

Expand Down Expand Up @@ -179,8 +278,8 @@ func TestTaskExecutor_QueryFailure(t *testing.T) {
t.Fatal("promise and run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg"))

<-promise.Done()

Expand Down Expand Up @@ -228,8 +327,8 @@ func TestManualRun(t *testing.T) {
t.Fatal("promise and run and manual run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.SucceedQuery(script, nil)

if got := promise.Error(); got != nil {
t.Fatal(got)
Expand Down Expand Up @@ -271,8 +370,8 @@ func TestTaskExecutor_ResumingRun(t *testing.T) {
t.Fatal("promise and run and manual run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.SucceedQuery(script, nil)

if got := promise.Error(); got != nil {
t.Fatal(got)
Expand All @@ -299,8 +398,8 @@ func TestTaskExecutor_WorkerLimit(t *testing.T) {
t.Fatal("expected a worker to be started")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg"))

<-promise.Done()

Expand Down Expand Up @@ -383,15 +482,15 @@ func TestTaskExecutor_Metrics(t *testing.T) {
t.Fatal("promise and run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.WaitForQueryLive(t, script, nil)

mg = promtest.MustGather(t, reg)
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 total runs active, got %v", got)
}

tes.svc.SucceedQuery(script)
tes.svc.SucceedQuery(script, nil)
<-promise.Done()

mg = promtest.MustGather(t, reg)
Expand Down Expand Up @@ -483,8 +582,8 @@ func TestTaskExecutor_IteratorFailure(t *testing.T) {
t.Fatal("promise and run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.SucceedQuery(script, nil)

<-promise.Done()

Expand Down
31 changes: 21 additions & 10 deletions task/backend/executor/support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
Expand All @@ -31,14 +32,24 @@ type fakeQueryService struct {

var _ query.AsyncQueryService = (*fakeQueryService)(nil)

func makeAST(q string) lang.ASTCompiler {
func makeAST(q string, extern *ast.File) lang.ASTCompiler {
pkg, err := runtime.ParseToJSON(q)
if err != nil {
panic(err)
}

var externBytes []byte
if extern != nil && len(extern.Body) > 0 {
var err error
externBytes, err = json.Marshal(extern)
if err != nil {
panic(err)
}
}
return lang.ASTCompiler{
AST: pkg,
Now: time.Unix(123, 0),
AST: pkg,
Now: time.Unix(123, 0),
Extern: externBytes,
}
}

Expand Down Expand Up @@ -85,12 +96,12 @@ func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (flux.
}

// SucceedQuery allows the running query matching the given script to return on its Ready channel.
func (s *fakeQueryService) SucceedQuery(script string) {
func (s *fakeQueryService) SucceedQuery(script string, extern *ast.File) {
s.mu.Lock()
defer s.mu.Unlock()

// Unblock the flux.
ast := makeAST(script)
ast := makeAST(script, extern)
spec := makeASTString(ast)
fq, ok := s.queries[spec]
if !ok {
Expand All @@ -103,12 +114,12 @@ func (s *fakeQueryService) SucceedQuery(script string) {
}

// FailQuery closes the running query's Ready channel and sets its error to the given value.
func (s *fakeQueryService) FailQuery(script string, forced error) {
func (s *fakeQueryService) FailQuery(script string, extern *ast.File, forced error) {
s.mu.Lock()
defer s.mu.Unlock()

// Unblock the flux.
ast := makeAST(script)
ast := makeAST(script, nil)
spec := makeASTString(ast)
fq, ok := s.queries[spec]
if !ok {
Expand All @@ -129,12 +140,12 @@ func (s *fakeQueryService) FailNextQuery(forced error) {
// WaitForQueryLive ensures that the query has made it into the service.
// This is particularly useful for the synchronous executor,
// because the execution starts on a separate goroutine.
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string, extern *ast.File) {
t.Helper()

const attempts = 10
ast := makeAST(script)
astUTC := makeAST(script)
ast := makeAST(script, extern)
astUTC := makeAST(script, extern)
astUTC.Now = ast.Now.UTC()
spec := makeASTString(ast)
specUTC := makeASTString(astUTC)
Expand Down

0 comments on commit 41bfba5

Please sign in to comment.