Skip to content

Commit

Permalink
Allow a query to be killed
Browse files Browse the repository at this point in the history
While this allows a query to be killed, it doesn't really do anything
yet since the interrupt happens only after the first row gets emitted
(the entire first series).

This section of code will likely have to be refactored to make this work
since we need a way to interrupt a currently running iterator.
  • Loading branch information
jsternberg committed Mar 15, 2016
1 parent 5cbc103 commit 30a6852
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 4 deletions.
9 changes: 9 additions & 0 deletions cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
err = e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
err = e.executeGrantAdminStatement(stmt)
case *influxql.KillQueryStatement:
err = e.executeKillQueryStatement(stmt)
case *influxql.RevokeStatement:
err = e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
Expand Down Expand Up @@ -373,6 +375,13 @@ func (e *QueryExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStat
return e.MetaClient.SetAdminPrivilege(stmt.User, true)
}

func (e *QueryExecutor) executeKillQueryStatement(stmt *influxql.KillQueryStatement) error {
if e.QueryManager == nil {
return influxql.ErrNoQueryManager
}
return e.QueryManager.DetachQuery(stmt.QueryID)
}

func (e *QueryExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) error {
priv := influxql.NoPrivileges

Expand Down
15 changes: 15 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (*DropSubscriptionStatement) node() {}
func (*DropUserStatement) node() {}
func (*GrantStatement) node() {}
func (*GrantAdminStatement) node() {}
func (*KillQueryStatement) node() {}
func (*RevokeStatement) node() {}
func (*RevokeAdminStatement) node() {}
func (*SelectStatement) node() {}
Expand Down Expand Up @@ -220,6 +221,7 @@ func (*DropSubscriptionStatement) stmt() {}
func (*DropUserStatement) stmt() {}
func (*GrantStatement) stmt() {}
func (*GrantAdminStatement) stmt() {}
func (*KillQueryStatement) stmt() {}
func (*ShowContinuousQueriesStatement) stmt() {}
func (*ShowGrantsForUserStatement) stmt() {}
func (*ShowServersStatement) stmt() {}
Expand Down Expand Up @@ -634,6 +636,19 @@ func (s *GrantAdminStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

type KillQueryStatement struct {
// The query to kill.
QueryID uint64
}

func (s *KillQueryStatement) String() string {
return fmt.Sprintf("KILL QUERY %d", s.QueryID)
}

func (s *KillQueryStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// SetPasswordUserStatement represents a command for changing user password.
type SetPasswordUserStatement struct {
// Plain Password
Expand Down
18 changes: 17 additions & 1 deletion influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ func (p *Parser) ParseStatement() (Statement, error) {
return p.parseAlterStatement()
case SET:
return p.parseSetPasswordUserStatement()
case KILL:
return p.parseKillQueryStatement()
default:
return nil, newParseError(tokstr(tok, lit), []string{"SELECT", "DELETE", "SHOW", "CREATE", "DROP", "GRANT", "REVOKE", "ALTER", "SET"}, pos)
return nil, newParseError(tokstr(tok, lit), []string{"SELECT", "DELETE", "SHOW", "CREATE", "DROP", "GRANT", "REVOKE", "ALTER", "SET", "KILL"}, pos)
}
}

Expand Down Expand Up @@ -290,6 +292,20 @@ func (p *Parser) parseSetPasswordUserStatement() (*SetPasswordUserStatement, err
return stmt, nil
}

// parseKillQueryStatement parses a string and returns a kill statement.
// This function assumes the KILL token has already been consumed.
func (p *Parser) parseKillQueryStatement() (*KillQueryStatement, error) {
if err := p.parseTokens([]Token{QUERY}); err != nil {
return nil, err
}

qid, err := p.parseUInt64()
if err != nil {
return nil, err
}
return &KillQueryStatement{QueryID: qid}, nil
}

// parseCreateSubscriptionStatement parses a string and returns a CreatesubScriptionStatement.
// This function assumes the "CREATE SUBSCRIPTION" tokens have already been consumed.
func (p *Parser) parseCreateSubscriptionStatement() (*CreateSubscriptionStatement, error) {
Expand Down
14 changes: 12 additions & 2 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,14 @@ func TestParser_ParseStatement(t *testing.T) {
stmt: &influxql.ShowQueriesStatement{},
},

// KILL QUERY 4
{
s: `KILL QUERY 4`,
stmt: &influxql.KillQueryStatement{
QueryID: 4,
},
},

// SHOW RETENTION POLICIES
{
s: `SHOW RETENTION POLICIES ON mydb`,
Expand Down Expand Up @@ -1639,10 +1647,10 @@ func TestParser_ParseStatement(t *testing.T) {
},

// Errors
{s: ``, err: `found EOF, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER, SET at line 1, char 1`},
{s: ``, err: `found EOF, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER, SET, KILL at line 1, char 1`},
{s: `SELECT`, err: `found EOF, expected identifier, string, number, bool at line 1, char 8`},
{s: `SELECT time FROM myseries`, err: `at least 1 non-time field must be queried`},
{s: `blah blah`, err: `found blah, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER, SET at line 1, char 1`},
{s: `blah blah`, err: `found blah, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER, SET, KILL at line 1, char 1`},
{s: `SELECT field1 X`, err: `found X, expected FROM at line 1, char 15`},
{s: `SELECT field1 FROM "series" WHERE X +;`, err: `found ;, expected identifier, string, number, bool at line 1, char 38`},
{s: `SELECT field1 FROM myseries GROUP`, err: `found EOF, expected BY at line 1, char 35`},
Expand Down Expand Up @@ -1833,6 +1841,8 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `GRANT ALL PRIVILEGES ON testdb TO`, err: `found EOF, expected identifier at line 1, char 35`},
{s: `GRANT ALL TO`, err: `found EOF, expected identifier at line 1, char 14`},
{s: `GRANT ALL PRIVILEGES TO`, err: `found EOF, expected identifier at line 1, char 25`},
{s: `KILL`, err: `found EOF, expected QUERY at line 1, char 6`},
{s: `KILL QUERY 10s`, err: `found 10s, expected number at line 1, char 12`},
{s: `REVOKE`, err: `found EOF, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 8`},
{s: `REVOKE BOGUS`, err: `found BOGUS, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 8`},
{s: `REVOKE READ`, err: `found EOF, expected ON at line 1, char 13`},
Expand Down
6 changes: 5 additions & 1 deletion influxql/query_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/influxdata/influxdb/models"
)

var (
ErrNoQueryManager = errors.New("no query manager available")
)

// QueryTaskInfo holds information about a currently running query.
type QueryTaskInfo struct {
ID uint64
Expand Down Expand Up @@ -60,7 +64,7 @@ func DefaultQueryManager() QueryManager {

func ExecuteShowQueriesStatement(qm QueryManager, q *ShowQueriesStatement) (models.Rows, error) {
if qm == nil {
return nil, errors.New("no query manager available")
return nil, ErrNoQueryManager
}

queries := qm.ListQueries()
Expand Down
1 change: 1 addition & 0 deletions influxql/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func TestScanner_Scan(t *testing.T) {
{s: `INTO`, tok: influxql.INTO},
{s: `KEY`, tok: influxql.KEY},
{s: `KEYS`, tok: influxql.KEYS},
{s: `KILL`, tok: influxql.KILL},
{s: `LIMIT`, tok: influxql.LIMIT},
{s: `SHOW`, tok: influxql.SHOW},
{s: `SHARD`, tok: influxql.SHARD},
Expand Down
2 changes: 2 additions & 0 deletions influxql/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (
INTO
KEY
KEYS
KILL
LIMIT
META
MEASUREMENT
Expand Down Expand Up @@ -220,6 +221,7 @@ var tokens = [...]string{
INTO: "INTO",
KEY: "KEY",
KEYS: "KEYS",
KILL: "KILL",
LIMIT: "LIMIT",
MEASUREMENT: "MEASUREMENT",
MEASUREMENTS: "MEASUREMENTS",
Expand Down

0 comments on commit 30a6852

Please sign in to comment.