Skip to content

Commit

Permalink
Merge pull request #5950 from influxdata/js-5939-query-manager
Browse files Browse the repository at this point in the history
Implement a query manager for running queries
  • Loading branch information
jsternberg committed Mar 21, 2016
2 parents 8afff49 + 504bac5 commit b12cf04
Show file tree
Hide file tree
Showing 17 changed files with 627 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [#6012](https://github.com/influxdata/influxdb/pull/6012): Add DROP SHARD support.
- [#6025](https://github.com/influxdata/influxdb/pull/6025): Remove deprecated JSON write path.
- [#5744](https://github.com/influxdata/influxdb/issues/5744): Add integer literal support to the query language.
- [#5939](https://github.com/influxdata/influxdb/issues/5939): Support viewing and killing running queries.

### Bugfixes

Expand Down
37 changes: 34 additions & 3 deletions cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type QueryExecutor struct {
// Used for rewriting points back into system for SELECT INTO statements.
PointsWriter *PointsWriter

// Used for managing and tracking running queries.
QueryManager influxql.QueryManager

// Remote execution timeout
Timeout time.Duration

Expand Down Expand Up @@ -69,7 +72,7 @@ func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
return results
}

func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chunkSize int, closing chan struct{}, results chan *influxql.Result) {
func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chunkSize int, closing <-chan struct{}, results chan *influxql.Result) {
defer close(results)

e.statMap.Add(statQueriesActive, 1)
Expand All @@ -78,6 +81,19 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
e.statMap.Add(statQueryExecutionDuration, time.Since(start).Nanoseconds())
}(time.Now())

if e.QueryManager != nil {
var err error
_, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{
Query: query,
Database: database,
InterruptCh: closing,
})
if err != nil {
results <- &influxql.Result{Err: err}
return
}
}

logger := e.logger()

var i int
Expand Down Expand Up @@ -155,6 +171,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
err = e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
err = e.executeGrantAdminStatement(stmt)
case *influxql.KillQueryStatement:
err = e.executeKillQueryStatement(stmt)
case *influxql.RevokeStatement:
err = e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
Expand All @@ -167,6 +185,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
rows, err = e.executeShowDiagnosticsStatement(stmt)
case *influxql.ShowGrantsForUserStatement:
rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowQueriesStatement:
rows, err = e.executeShowQueriesStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.ShowServersStatement:
Expand Down Expand Up @@ -197,7 +217,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
Err: err,
}

// Stop of the first error.
// Stop after the first error.
if err != nil {
break
}
Expand Down Expand Up @@ -357,6 +377,13 @@ func (e *QueryExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStat
return e.MetaClient.SetAdminPrivilege(stmt.User, true)
}

func (e *QueryExecutor) executeKillQueryStatement(stmt *influxql.KillQueryStatement) error {
if e.QueryManager == nil {
return influxql.ErrNoQueryManager
}
return e.QueryManager.KillQuery(stmt.QueryID)
}

func (e *QueryExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) error {
priv := influxql.NoPrivileges

Expand Down Expand Up @@ -384,7 +411,7 @@ func (e *QueryExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordU
func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, chunkSize, statementID int, results chan *influxql.Result, closing <-chan struct{}) error {
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{}
opt := influxql.SelectOptions{InterruptCh: closing}

// Replace instances of "now()" with the current time, and check the resultant times.
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: now})
Expand Down Expand Up @@ -597,6 +624,10 @@ func (e *QueryExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrants
return []*models.Row{row}, nil
}

func (e *QueryExecutor) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) {
return influxql.ExecuteShowQueriesStatement(e.QueryManager, q)
}

func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
di, err := e.MetaClient.Database(q.Database)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/copier"
Expand Down Expand Up @@ -166,6 +167,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor.TSDBStore = s.TSDBStore
s.QueryExecutor.Monitor = s.Monitor
s.QueryExecutor.PointsWriter = s.PointsWriter
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager()
if c.Data.QueryLogEnabled {
s.QueryExecutor.LogOutput = os.Stderr
}
Expand Down
30 changes: 30 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (*DropSubscriptionStatement) node() {}
func (*DropUserStatement) node() {}
func (*GrantStatement) node() {}
func (*GrantAdminStatement) node() {}
func (*KillQueryStatement) node() {}
func (*RevokeStatement) node() {}
func (*RevokeAdminStatement) node() {}
func (*SelectStatement) node() {}
Expand All @@ -119,6 +120,7 @@ func (*ShowDatabasesStatement) node() {}
func (*ShowFieldKeysStatement) node() {}
func (*ShowRetentionPoliciesStatement) node() {}
func (*ShowMeasurementsStatement) node() {}
func (*ShowQueriesStatement) node() {}
func (*ShowSeriesStatement) node() {}
func (*ShowShardGroupsStatement) node() {}
func (*ShowShardsStatement) node() {}
Expand Down Expand Up @@ -220,12 +222,14 @@ func (*DropSubscriptionStatement) stmt() {}
func (*DropUserStatement) stmt() {}
func (*GrantStatement) stmt() {}
func (*GrantAdminStatement) stmt() {}
func (*KillQueryStatement) stmt() {}
func (*ShowContinuousQueriesStatement) stmt() {}
func (*ShowGrantsForUserStatement) stmt() {}
func (*ShowServersStatement) stmt() {}
func (*ShowDatabasesStatement) stmt() {}
func (*ShowFieldKeysStatement) stmt() {}
func (*ShowMeasurementsStatement) stmt() {}
func (*ShowQueriesStatement) stmt() {}
func (*ShowRetentionPoliciesStatement) stmt() {}
func (*ShowSeriesStatement) stmt() {}
func (*ShowShardGroupsStatement) stmt() {}
Expand Down Expand Up @@ -646,6 +650,19 @@ func (s *GrantAdminStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

type KillQueryStatement struct {
// The query to kill.
QueryID uint64
}

func (s *KillQueryStatement) String() string {
return fmt.Sprintf("KILL QUERY %d", s.QueryID)
}

func (s *KillQueryStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// SetPasswordUserStatement represents a command for changing user password.
type SetPasswordUserStatement struct {
// Plain Password
Expand Down Expand Up @@ -2401,6 +2418,19 @@ func (s *DropMeasurementStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// SowQueriesStatement represents a command for listing all running queries.
type ShowQueriesStatement struct{}

// String returns a string representation of the show queries statement.
func (s *ShowQueriesStatement) String() string {
return "SHOW QUERIES"
}

// RequiredPrivileges returns the privilege required to execute a ShowQueriesStatement.
func (s *ShowQueriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: false, Name: "", Privilege: ReadPrivilege}}
}

// ShowRetentionPoliciesStatement represents a command for listing retention policies.
type ShowRetentionPoliciesStatement struct {
// Name of the database to list policies for.
Expand Down
5 changes: 3 additions & 2 deletions influxql/call_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,9 @@ func BenchmarkCallIterator_Min_Float(b *testing.B) {
b.ReportAllocs()

itr, err := influxql.NewCallIterator(input, influxql.IteratorOptions{
Expr: MustParseExpr("min(value)"),
Interval: influxql.Interval{Duration: 1 * time.Hour},
Expr: MustParseExpr("min(value)"),
Interval: influxql.Interval{Duration: 1 * time.Hour},
InterruptCh: make(chan struct{}),
})
if err != nil {
b.Fatal(err)
Expand Down
132 changes: 132 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,39 @@ func (itr *floatIntervalIterator) Next() *FloatPoint {
return p
}

// floatInterruptIterator represents a float implementation of InterruptIterator.
type floatInterruptIterator struct {
input FloatIterator
closing <-chan struct{}
count int
}

func newFloatInterruptIterator(input FloatIterator, closing <-chan struct{}) *floatInterruptIterator {
return &floatInterruptIterator{input: input, closing: closing}
}

func (itr *floatInterruptIterator) Close() error { return itr.input.Close() }

func (itr *floatInterruptIterator) Next() *FloatPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// floatAuxIterator represents a float implementation of AuxIterator.
type floatAuxIterator struct {
input *bufFloatIterator
Expand Down Expand Up @@ -1894,6 +1927,39 @@ func (itr *integerIntervalIterator) Next() *IntegerPoint {
return p
}

// integerInterruptIterator represents a integer implementation of InterruptIterator.
type integerInterruptIterator struct {
input IntegerIterator
closing <-chan struct{}
count int
}

func newIntegerInterruptIterator(input IntegerIterator, closing <-chan struct{}) *integerInterruptIterator {
return &integerInterruptIterator{input: input, closing: closing}
}

func (itr *integerInterruptIterator) Close() error { return itr.input.Close() }

func (itr *integerInterruptIterator) Next() *IntegerPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// integerAuxIterator represents a integer implementation of AuxIterator.
type integerAuxIterator struct {
input *bufIntegerIterator
Expand Down Expand Up @@ -3215,6 +3281,39 @@ func (itr *stringIntervalIterator) Next() *StringPoint {
return p
}

// stringInterruptIterator represents a string implementation of InterruptIterator.
type stringInterruptIterator struct {
input StringIterator
closing <-chan struct{}
count int
}

func newStringInterruptIterator(input StringIterator, closing <-chan struct{}) *stringInterruptIterator {
return &stringInterruptIterator{input: input, closing: closing}
}

func (itr *stringInterruptIterator) Close() error { return itr.input.Close() }

func (itr *stringInterruptIterator) Next() *StringPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// stringAuxIterator represents a string implementation of AuxIterator.
type stringAuxIterator struct {
input *bufStringIterator
Expand Down Expand Up @@ -4536,6 +4635,39 @@ func (itr *booleanIntervalIterator) Next() *BooleanPoint {
return p
}

// booleanInterruptIterator represents a boolean implementation of InterruptIterator.
type booleanInterruptIterator struct {
input BooleanIterator
closing <-chan struct{}
count int
}

func newBooleanInterruptIterator(input BooleanIterator, closing <-chan struct{}) *booleanInterruptIterator {
return &booleanInterruptIterator{input: input, closing: closing}
}

func (itr *booleanInterruptIterator) Close() error { return itr.input.Close() }

func (itr *booleanInterruptIterator) Next() *BooleanPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// booleanAuxIterator represents a boolean implementation of AuxIterator.
type booleanAuxIterator struct {
input *bufBooleanIterator
Expand Down
Loading

0 comments on commit b12cf04

Please sign in to comment.