Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a query manager for running queries #5950

Merged
merged 6 commits into from
Mar 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [#6012](https://github.com/influxdata/influxdb/pull/6012): Add DROP SHARD support.
- [#6025](https://github.com/influxdata/influxdb/pull/6025): Remove deprecated JSON write path.
- [#5744](https://github.com/influxdata/influxdb/issues/5744): Add integer literal support to the query language.
- [#5939](https://github.com/influxdata/influxdb/issues/5939): Support viewing and killing running queries.

### Bugfixes

Expand Down
37 changes: 34 additions & 3 deletions cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type QueryExecutor struct {
// Used for rewriting points back into system for SELECT INTO statements.
PointsWriter *PointsWriter

// Used for managing and tracking running queries.
QueryManager influxql.QueryManager

// Remote execution timeout
Timeout time.Duration

Expand Down Expand Up @@ -69,7 +72,7 @@ func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
return results
}

func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chunkSize int, closing chan struct{}, results chan *influxql.Result) {
func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chunkSize int, closing <-chan struct{}, results chan *influxql.Result) {
defer close(results)

e.statMap.Add(statQueriesActive, 1)
Expand All @@ -78,6 +81,19 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
e.statMap.Add(statQueryExecutionDuration, time.Since(start).Nanoseconds())
}(time.Now())

if e.QueryManager != nil {
var err error
_, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{
Query: query,
Database: database,
InterruptCh: closing,
})
if err != nil {
results <- &influxql.Result{Err: err}
return
}
}

logger := e.logger()

var i int
Expand Down Expand Up @@ -155,6 +171,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
err = e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
err = e.executeGrantAdminStatement(stmt)
case *influxql.KillQueryStatement:
err = e.executeKillQueryStatement(stmt)
case *influxql.RevokeStatement:
err = e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
Expand All @@ -167,6 +185,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
rows, err = e.executeShowDiagnosticsStatement(stmt)
case *influxql.ShowGrantsForUserStatement:
rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowQueriesStatement:
rows, err = e.executeShowQueriesStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.ShowServersStatement:
Expand Down Expand Up @@ -197,7 +217,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
Err: err,
}

// Stop of the first error.
// Stop after the first error.
if err != nil {
break
}
Expand Down Expand Up @@ -357,6 +377,13 @@ func (e *QueryExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStat
return e.MetaClient.SetAdminPrivilege(stmt.User, true)
}

func (e *QueryExecutor) executeKillQueryStatement(stmt *influxql.KillQueryStatement) error {
if e.QueryManager == nil {
return influxql.ErrNoQueryManager
}
return e.QueryManager.KillQuery(stmt.QueryID)
}

func (e *QueryExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) error {
priv := influxql.NoPrivileges

Expand Down Expand Up @@ -384,7 +411,7 @@ func (e *QueryExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordU
func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, chunkSize, statementID int, results chan *influxql.Result, closing <-chan struct{}) error {
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{}
opt := influxql.SelectOptions{InterruptCh: closing}

// Replace instances of "now()" with the current time, and check the resultant times.
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: now})
Expand Down Expand Up @@ -597,6 +624,10 @@ func (e *QueryExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrants
return []*models.Row{row}, nil
}

func (e *QueryExecutor) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) {
return influxql.ExecuteShowQueriesStatement(e.QueryManager, q)
}

func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
di, err := e.MetaClient.Database(q.Database)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/copier"
Expand Down Expand Up @@ -166,6 +167,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor.TSDBStore = s.TSDBStore
s.QueryExecutor.Monitor = s.Monitor
s.QueryExecutor.PointsWriter = s.PointsWriter
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager()
if c.Data.QueryLogEnabled {
s.QueryExecutor.LogOutput = os.Stderr
}
Expand Down
30 changes: 30 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (*DropSubscriptionStatement) node() {}
func (*DropUserStatement) node() {}
func (*GrantStatement) node() {}
func (*GrantAdminStatement) node() {}
func (*KillQueryStatement) node() {}
func (*RevokeStatement) node() {}
func (*RevokeAdminStatement) node() {}
func (*SelectStatement) node() {}
Expand All @@ -119,6 +120,7 @@ func (*ShowDatabasesStatement) node() {}
func (*ShowFieldKeysStatement) node() {}
func (*ShowRetentionPoliciesStatement) node() {}
func (*ShowMeasurementsStatement) node() {}
func (*ShowQueriesStatement) node() {}
func (*ShowSeriesStatement) node() {}
func (*ShowShardGroupsStatement) node() {}
func (*ShowShardsStatement) node() {}
Expand Down Expand Up @@ -220,12 +222,14 @@ func (*DropSubscriptionStatement) stmt() {}
func (*DropUserStatement) stmt() {}
func (*GrantStatement) stmt() {}
func (*GrantAdminStatement) stmt() {}
func (*KillQueryStatement) stmt() {}
func (*ShowContinuousQueriesStatement) stmt() {}
func (*ShowGrantsForUserStatement) stmt() {}
func (*ShowServersStatement) stmt() {}
func (*ShowDatabasesStatement) stmt() {}
func (*ShowFieldKeysStatement) stmt() {}
func (*ShowMeasurementsStatement) stmt() {}
func (*ShowQueriesStatement) stmt() {}
func (*ShowRetentionPoliciesStatement) stmt() {}
func (*ShowSeriesStatement) stmt() {}
func (*ShowShardGroupsStatement) stmt() {}
Expand Down Expand Up @@ -646,6 +650,19 @@ func (s *GrantAdminStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

type KillQueryStatement struct {
// The query to kill.
QueryID uint64
}

func (s *KillQueryStatement) String() string {
return fmt.Sprintf("KILL QUERY %d", s.QueryID)
}

func (s *KillQueryStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// SetPasswordUserStatement represents a command for changing user password.
type SetPasswordUserStatement struct {
// Plain Password
Expand Down Expand Up @@ -2401,6 +2418,19 @@ func (s *DropMeasurementStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// SowQueriesStatement represents a command for listing all running queries.
type ShowQueriesStatement struct{}

// String returns a string representation of the show queries statement.
func (s *ShowQueriesStatement) String() string {
return "SHOW QUERIES"
}

// RequiredPrivileges returns the privilege required to execute a ShowQueriesStatement.
func (s *ShowQueriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: false, Name: "", Privilege: ReadPrivilege}}
}

// ShowRetentionPoliciesStatement represents a command for listing retention policies.
type ShowRetentionPoliciesStatement struct {
// Name of the database to list policies for.
Expand Down
5 changes: 3 additions & 2 deletions influxql/call_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,9 @@ func BenchmarkCallIterator_Min_Float(b *testing.B) {
b.ReportAllocs()

itr, err := influxql.NewCallIterator(input, influxql.IteratorOptions{
Expr: MustParseExpr("min(value)"),
Interval: influxql.Interval{Duration: 1 * time.Hour},
Expr: MustParseExpr("min(value)"),
Interval: influxql.Interval{Duration: 1 * time.Hour},
InterruptCh: make(chan struct{}),
})
if err != nil {
b.Fatal(err)
Expand Down
132 changes: 132 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,39 @@ func (itr *floatIntervalIterator) Next() *FloatPoint {
return p
}

// floatInterruptIterator represents a float implementation of InterruptIterator.
type floatInterruptIterator struct {
input FloatIterator
closing <-chan struct{}
count int
}

func newFloatInterruptIterator(input FloatIterator, closing <-chan struct{}) *floatInterruptIterator {
return &floatInterruptIterator{input: input, closing: closing}
}

func (itr *floatInterruptIterator) Close() error { return itr.input.Close() }

func (itr *floatInterruptIterator) Next() *FloatPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// floatAuxIterator represents a float implementation of AuxIterator.
type floatAuxIterator struct {
input *bufFloatIterator
Expand Down Expand Up @@ -1894,6 +1927,39 @@ func (itr *integerIntervalIterator) Next() *IntegerPoint {
return p
}

// integerInterruptIterator represents a integer implementation of InterruptIterator.
type integerInterruptIterator struct {
input IntegerIterator
closing <-chan struct{}
count int
}

func newIntegerInterruptIterator(input IntegerIterator, closing <-chan struct{}) *integerInterruptIterator {
return &integerInterruptIterator{input: input, closing: closing}
}

func (itr *integerInterruptIterator) Close() error { return itr.input.Close() }

func (itr *integerInterruptIterator) Next() *IntegerPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// integerAuxIterator represents a integer implementation of AuxIterator.
type integerAuxIterator struct {
input *bufIntegerIterator
Expand Down Expand Up @@ -3215,6 +3281,39 @@ func (itr *stringIntervalIterator) Next() *StringPoint {
return p
}

// stringInterruptIterator represents a string implementation of InterruptIterator.
type stringInterruptIterator struct {
input StringIterator
closing <-chan struct{}
count int
}

func newStringInterruptIterator(input StringIterator, closing <-chan struct{}) *stringInterruptIterator {
return &stringInterruptIterator{input: input, closing: closing}
}

func (itr *stringInterruptIterator) Close() error { return itr.input.Close() }

func (itr *stringInterruptIterator) Next() *StringPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// stringAuxIterator represents a string implementation of AuxIterator.
type stringAuxIterator struct {
input *bufStringIterator
Expand Down Expand Up @@ -4536,6 +4635,39 @@ func (itr *booleanIntervalIterator) Next() *BooleanPoint {
return p
}

// booleanInterruptIterator represents a boolean implementation of InterruptIterator.
type booleanInterruptIterator struct {
input BooleanIterator
closing <-chan struct{}
count int
}

func newBooleanInterruptIterator(input BooleanIterator, closing <-chan struct{}) *booleanInterruptIterator {
return &booleanInterruptIterator{input: input, closing: closing}
}

func (itr *booleanInterruptIterator) Close() error { return itr.input.Close() }

func (itr *booleanInterruptIterator) Next() *BooleanPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// booleanAuxIterator represents a boolean implementation of AuxIterator.
type booleanAuxIterator struct {
input *bufBooleanIterator
Expand Down
Loading