Skip to content

Commit

Permalink
fix(control): make controller return planner failures (#446)
Browse files Browse the repository at this point in the history
Previously planner errors would be returned at runtime, even though
execution had not yet begun.  This change makes it so that errors will
be returned (with http error code 400) for unplannable queries.

Fixes #149
  • Loading branch information
Christopher M. Wolff authored Dec 11, 2018
1 parent 67851f9 commit 88e6579
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 37 deletions.
2 changes: 1 addition & 1 deletion cmd/flux/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func runHttp() {
q, err := querier.c.Query(ctx, pr.Compiler)
if err != nil {
log.Println("error executing query:", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
defer func() {
Expand Down
72 changes: 36 additions & 36 deletions control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,31 @@ func (c *Controller) compileQuery(q *Query, compiler flux.Compiler) error {
}

q.spec = *spec

if q.tryPlan() {
// Plan query to determine needed resources
lp, err := c.lplanner.Plan(&q.spec)
if err != nil {
return errors.Wrap(err, "failed to create logical plan")
}
if entry := c.logger.Check(zapcore.DebugLevel, "logical plan"); entry != nil {
entry.Write(zap.String("plan", fmt.Sprint(plan.Formatted(lp))))
}

p, err := c.pplanner.Plan(lp)
if err != nil {
return errors.Wrap(err, "failed to create physical plan")
}
q.plan = p
q.concurrency = p.Resources.ConcurrencyQuota
if q.concurrency > c.maxConcurrency {
q.concurrency = c.maxConcurrency
}
q.memory = p.Resources.MemoryBytesQuota
if entry := c.logger.Check(zapcore.DebugLevel, "physical plan"); entry != nil {
entry.Write(zap.String("plan", fmt.Sprint(plan.Formatted(q.plan))))
}
}
return nil
}

Expand Down Expand Up @@ -350,31 +375,6 @@ func (c *Controller) processQuery(q *Query) (pop bool, err error) {
}
}()

if q.tryPlan() {
// Plan query to determine needed resources
lp, err := c.lplanner.Plan(&q.spec)
if err != nil {
return true, errors.Wrap(err, "failed to create logical plan")
}
if entry := c.logger.Check(zapcore.DebugLevel, "logical plan"); entry != nil {
entry.Write(zap.String("plan", fmt.Sprint(plan.Formatted(lp))))
}

p, err := c.pplanner.Plan(lp)
if err != nil {
return true, errors.Wrap(err, "failed to create physical plan")
}
q.plan = p
q.concurrency = p.Resources.ConcurrencyQuota
if q.concurrency > c.maxConcurrency {
q.concurrency = c.maxConcurrency
}
q.memory = p.Resources.MemoryBytesQuota
if entry := c.logger.Check(zapcore.DebugLevel, "physical plan"); entry != nil {
entry.Write(zap.String("plan", fmt.Sprint(plan.Formatted(q.plan))))
}
}

// Check if we have enough resources
if c.check(q) {
// Update resource gauges
Expand Down Expand Up @@ -660,12 +660,20 @@ func (q *Query) tryCompile() bool {
return q.transitionTo(Compiling, Created)
}

// tryPlan attempts to transition the query into the Planning state.
func (q *Query) tryPlan() bool {
q.mu.Lock()
defer q.mu.Unlock()

return q.transitionTo(Planning, Compiling)
}

// tryQueue attempts to transition the query into the Queueing state.
func (q *Query) tryQueue() bool {
q.mu.Lock()
defer q.mu.Unlock()

return q.transitionTo(Queueing, Compiling)
return q.transitionTo(Queueing, Planning)
}

// tryRequeue attempts to transition the query into the Requeueing state.
Expand All @@ -677,23 +685,15 @@ func (q *Query) tryRequeue() bool {
// Already in the correct state.
return true
}
return q.transitionTo(Requeueing, Planning)
}

// tryPlan attempts to transition the query into the Planning state.
func (q *Query) tryPlan() bool {
q.mu.Lock()
defer q.mu.Unlock()

return q.transitionTo(Planning, Queueing)
return q.transitionTo(Requeueing, Queueing)
}

// tryExec attempts to transition the query into the Executing state.
func (q *Query) tryExec() bool {
q.mu.Lock()
defer q.mu.Unlock()

return q.transitionTo(Executing, Requeueing, Planning)
return q.transitionTo(Executing, Requeueing, Queueing)
}

// State is the query state.
Expand Down
46 changes: 46 additions & 0 deletions control/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/influxdata/flux"
_ "github.com/influxdata/flux/builtin"
"github.com/influxdata/flux/functions/inputs"
"github.com/influxdata/flux/internal/pkg/syncutil"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/mock"
Expand Down Expand Up @@ -63,6 +64,51 @@ func TestController_CompileQuery_Failure(t *testing.T) {
}
}

func TestController_PlanQuery_Failure(t *testing.T) {
// this compiler returns a spec that cannot be planned
// (no range to push into from)
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (*flux.Spec, error) {
return &flux.Spec{
Operations: []*flux.Operation{{
ID: "from",
Spec: &inputs.FromOpSpec{Bucket: "telegraf"},
}},
}, nil
},
}

ctrl := New(Config{})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer func() {
if err := ctrl.Shutdown(ctx); err != nil {
t.Fatal(err)
}
cancel()
}()

// Run the query. It should return an error.
if _, err := ctrl.Query(context.Background(), compiler); err == nil {
t.Fatal("expected error")
}

// Verify the metrics say there are no queries.
gauge, err := ctrl.metrics.all.GetMetricWithLabelValues()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

metric := &dto.Metric{}
if err := gauge.Write(metric); err != nil {
t.Fatalf("unexpected error: %s", err)
}

if got, exp := int(metric.Gauge.GetValue()), 0; got != exp {
t.Fatalf("unexpected metric value: exp=%d got=%d", exp, got)
}
}

func TestController_EnqueueQuery_Failure(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (*flux.Spec, error) {
Expand Down

0 comments on commit 88e6579

Please sign in to comment.