From 28a33e2fd12af54be5a0764d0c0469fa437fa5cd Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 16 Aug 2021 14:22:54 -0600 Subject: [PATCH] Update plugin API with pre-exec and error events. --- executor/adapter.go | 2 +- plugin/audit.go | 22 +++++++++---------- plugin/conn_ip_example/conn_ip_example.go | 12 +++++----- .../conn_ip_example/conn_ip_example_test.go | 2 +- plugin/spi_test.go | 2 +- server/conn.go | 17 ++++++++++++++ 6 files changed, 35 insertions(+), 22 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 2b86471b998f9..98dde31851570 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -826,7 +826,7 @@ func (a *ExecStmt) logAudit() { if audit.OnGeneralEvent != nil { cmd := mysql.Command2Str[byte(atomic.LoadUint32(&a.Ctx.GetSessionVars().CommandValue))] ctx := context.WithValue(context.Background(), plugin.ExecStartTimeCtxKey, a.Ctx.GetSessionVars().StartTime) - audit.OnGeneralEvent(ctx, sessVars, plugin.Log, cmd) + audit.OnGeneralEvent(ctx, sessVars, plugin.Completed, cmd) } return nil }) diff --git a/plugin/audit.go b/plugin/audit.go index c64255e1b293d..95efcf8599be2 100644 --- a/plugin/audit.go +++ b/plugin/audit.go @@ -24,29 +24,27 @@ import ( type GeneralEvent byte const ( - // Log presents log event. - Log GeneralEvent = iota - // Error presents error event. + // Starting represents a GeneralEvent that is about to start + Starting GeneralEvent = iota + // Completed represents a GeneralEvent that has completed + Completed + // Error represents a GeneralEvent that has error (and typically couldn't start) Error - // Result presents result event. - Result - // Status presents status event. - Status ) // ConnectionEvent presents TiDB connection event. type ConnectionEvent byte const ( - // Connected presents new connection establish event(finish auth). + // Connected represents new connection establish event(finish auth). Connected ConnectionEvent = iota - // Disconnect presents disconnect event. + // Disconnect represents disconnect event. Disconnect - // ChangeUser presents change user. + // ChangeUser represents change user. ChangeUser - // PreAuth presents event before start auth. + // PreAuth represents event before start auth. PreAuth - // Reject presents event reject connection event. + // Reject represents event reject connection event. Reject ) diff --git a/plugin/conn_ip_example/conn_ip_example.go b/plugin/conn_ip_example/conn_ip_example.go index b40828f818db4..a5b806aa67ae2 100644 --- a/plugin/conn_ip_example/conn_ip_example.go +++ b/plugin/conn_ip_example/conn_ip_example.go @@ -99,14 +99,12 @@ func OnGeneralEvent(ctx context.Context, sctx *variable.SessionVars, event plugi fmt.Printf("---- executed by user: %#v\n", sctx.User) } switch event { - case plugin.Log: - fmt.Println("---- event: Log") + case plugin.Starting: + fmt.Println("---- event: Statement Starting") + case plugin.Completed: + fmt.Println("---- event: Statement Completed") case plugin.Error: - fmt.Println("---- event: Error") - case plugin.Result: - fmt.Println("---- event: Result") - case plugin.Status: - fmt.Println("---- event: Status") + fmt.Println("---- event: ERROR!") default: fmt.Println("---- event: unrecognized") } diff --git a/plugin/conn_ip_example/conn_ip_example_test.go b/plugin/conn_ip_example/conn_ip_example_test.go index 811722123cbe1..8b7cd09b04018 100644 --- a/plugin/conn_ip_example/conn_ip_example_test.go +++ b/plugin/conn_ip_example/conn_ip_example_test.go @@ -65,7 +65,7 @@ func TestLoadPlugin(t *testing.T) { require.NoErrorf(t, err, "init plugin [%s] fail, error [%s]\n", pluginSign, err) err = plugin.ForeachPlugin(plugin.Audit, func(auditPlugin *plugin.Plugin) error { - plugin.DeclareAuditManifest(auditPlugin.Manifest).OnGeneralEvent(context.Background(), nil, plugin.Log, "QUERY") + plugin.DeclareAuditManifest(auditPlugin.Manifest).OnGeneralEvent(context.Background(), nil, plugin.Completed, "QUERY") return nil }) require.NoErrorf(t, err, "query event fail, error [%s]\n", err) diff --git a/plugin/spi_test.go b/plugin/spi_test.go index 02cd623f01c5b..672656f6a200d 100644 --- a/plugin/spi_test.go +++ b/plugin/spi_test.go @@ -47,7 +47,7 @@ func TestExportManifest(t *testing.T) { err := exported.OnInit(context.Background(), exported) require.NoError(t, err) audit := plugin.DeclareAuditManifest(exported) - audit.OnGeneralEvent(context.Background(), nil, plugin.Log, "QUERY") + audit.OnGeneralEvent(context.Background(), nil, plugin.Completed, "QUERY") require.True(t, callRecorder.NotifyEventCalled) require.True(t, callRecorder.OnInitCalled) } diff --git a/server/conn.go b/server/conn.go index 400a6bf57b55f..a2135bfb3d8fa 100644 --- a/server/conn.go +++ b/server/conn.go @@ -977,6 +977,7 @@ func (cc *clientConn) Run(ctx context.Context) { startTime := time.Now() if err = cc.dispatch(ctx, data); err != nil { + cc.audit(plugin.Error) // tell the plugin API there was a dispatch error if terror.ErrorEqual(err, io.EOF) { cc.addMetrics(data[0], startTime, nil) disconnectNormal.Inc() @@ -1627,6 +1628,21 @@ func (cc *clientConn) handlePlanRecreator(ctx context.Context, info executor.Pla return "", errors.New("plan recreator: not supporting info type") } +func (cc *clientConn) audit(eventType plugin.GeneralEvent) { + err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { + audit := plugin.DeclareAuditManifest(p.Manifest) + if audit.OnGeneralEvent != nil { + cmd := mysql.Command2Str[byte(atomic.LoadUint32(&cc.ctx.GetSessionVars().CommandValue))] + ctx := context.WithValue(context.Background(), plugin.ExecStartTimeCtxKey, cc.ctx.GetSessionVars().StartTime) + audit.OnGeneralEvent(ctx, cc.ctx.GetSessionVars(), eventType, cmd) + } + return nil + }) + if err != nil { + terror.Log(err) + } +} + // handleQuery executes the sql query string and writes result set or result ok to the client. // As the execution time of this function represents the performance of TiDB, we do time log and metrics here. // There is a special query `load data` that does not return result, which is handled differently. @@ -1823,6 +1839,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [ ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{}) reg := trace.StartRegion(ctx, "ExecuteStmt") + cc.audit(plugin.Starting) rs, err := cc.ctx.ExecuteStmt(ctx, stmt) reg.End() // The session tracker detachment from global tracker is solved in the `rs.Close` in most cases.