Skip to content

Commit

Permalink
Update plugin API with pre-exec and error events.
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Aug 16, 2021
1 parent 5893ec6 commit 28a33e2
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 22 deletions.
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func (a *ExecStmt) logAudit() {
if audit.OnGeneralEvent != nil {
cmd := mysql.Command2Str[byte(atomic.LoadUint32(&a.Ctx.GetSessionVars().CommandValue))]
ctx := context.WithValue(context.Background(), plugin.ExecStartTimeCtxKey, a.Ctx.GetSessionVars().StartTime)
audit.OnGeneralEvent(ctx, sessVars, plugin.Log, cmd)
audit.OnGeneralEvent(ctx, sessVars, plugin.Completed, cmd)
}
return nil
})
Expand Down
22 changes: 10 additions & 12 deletions plugin/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,27 @@ import (
type GeneralEvent byte

const (
// Log presents log event.
Log GeneralEvent = iota
// Error presents error event.
// Starting represents a GeneralEvent that is about to start
Starting GeneralEvent = iota
// Completed represents a GeneralEvent that has completed
Completed
// Error represents a GeneralEvent that has error (and typically couldn't start)
Error
// Result presents result event.
Result
// Status presents status event.
Status
)

// ConnectionEvent presents TiDB connection event.
type ConnectionEvent byte

const (
// Connected presents new connection establish event(finish auth).
// Connected represents new connection establish event(finish auth).
Connected ConnectionEvent = iota
// Disconnect presents disconnect event.
// Disconnect represents disconnect event.
Disconnect
// ChangeUser presents change user.
// ChangeUser represents change user.
ChangeUser
// PreAuth presents event before start auth.
// PreAuth represents event before start auth.
PreAuth
// Reject presents event reject connection event.
// Reject represents event reject connection event.
Reject
)

Expand Down
12 changes: 5 additions & 7 deletions plugin/conn_ip_example/conn_ip_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,12 @@ func OnGeneralEvent(ctx context.Context, sctx *variable.SessionVars, event plugi
fmt.Printf("---- executed by user: %#v\n", sctx.User)
}
switch event {
case plugin.Log:
fmt.Println("---- event: Log")
case plugin.Starting:
fmt.Println("---- event: Statement Starting")
case plugin.Completed:
fmt.Println("---- event: Statement Completed")
case plugin.Error:
fmt.Println("---- event: Error")
case plugin.Result:
fmt.Println("---- event: Result")
case plugin.Status:
fmt.Println("---- event: Status")
fmt.Println("---- event: ERROR!")
default:
fmt.Println("---- event: unrecognized")
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/conn_ip_example/conn_ip_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestLoadPlugin(t *testing.T) {
require.NoErrorf(t, err, "init plugin [%s] fail, error [%s]\n", pluginSign, err)

err = plugin.ForeachPlugin(plugin.Audit, func(auditPlugin *plugin.Plugin) error {
plugin.DeclareAuditManifest(auditPlugin.Manifest).OnGeneralEvent(context.Background(), nil, plugin.Log, "QUERY")
plugin.DeclareAuditManifest(auditPlugin.Manifest).OnGeneralEvent(context.Background(), nil, plugin.Completed, "QUERY")
return nil
})
require.NoErrorf(t, err, "query event fail, error [%s]\n", err)
Expand Down
2 changes: 1 addition & 1 deletion plugin/spi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestExportManifest(t *testing.T) {
err := exported.OnInit(context.Background(), exported)
require.NoError(t, err)
audit := plugin.DeclareAuditManifest(exported)
audit.OnGeneralEvent(context.Background(), nil, plugin.Log, "QUERY")
audit.OnGeneralEvent(context.Background(), nil, plugin.Completed, "QUERY")
require.True(t, callRecorder.NotifyEventCalled)
require.True(t, callRecorder.OnInitCalled)
}
17 changes: 17 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ func (cc *clientConn) Run(ctx context.Context) {

startTime := time.Now()
if err = cc.dispatch(ctx, data); err != nil {
cc.audit(plugin.Error) // tell the plugin API there was a dispatch error
if terror.ErrorEqual(err, io.EOF) {
cc.addMetrics(data[0], startTime, nil)
disconnectNormal.Inc()
Expand Down Expand Up @@ -1627,6 +1628,21 @@ func (cc *clientConn) handlePlanRecreator(ctx context.Context, info executor.Pla
return "", errors.New("plan recreator: not supporting info type")
}

func (cc *clientConn) audit(eventType plugin.GeneralEvent) {
err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
if audit.OnGeneralEvent != nil {
cmd := mysql.Command2Str[byte(atomic.LoadUint32(&cc.ctx.GetSessionVars().CommandValue))]
ctx := context.WithValue(context.Background(), plugin.ExecStartTimeCtxKey, cc.ctx.GetSessionVars().StartTime)
audit.OnGeneralEvent(ctx, cc.ctx.GetSessionVars(), eventType, cmd)
}
return nil
})
if err != nil {
terror.Log(err)
}
}

// handleQuery executes the sql query string and writes result set or result ok to the client.
// As the execution time of this function represents the performance of TiDB, we do time log and metrics here.
// There is a special query `load data` that does not return result, which is handled differently.
Expand Down Expand Up @@ -1823,6 +1839,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{})
reg := trace.StartRegion(ctx, "ExecuteStmt")
cc.audit(plugin.Starting)
rs, err := cc.ctx.ExecuteStmt(ctx, stmt)
reg.End()
// The session tracker detachment from global tracker is solved in the `rs.Close` in most cases.
Expand Down

0 comments on commit 28a33e2

Please sign in to comment.