Skip to content

Commit

Permalink
plugin: support dynamic enable/disable plugins (#11122)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and jackysp committed Jul 9, 2019
1 parent 2ef4c5c commit 8104494
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 17 deletions.
52 changes: 52 additions & 0 deletions executor/admin_plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/util/chunk"
)

// AdminPluginsExec indicates AdminPlugins executor.
type AdminPluginsExec struct {
baseExecutor
Action core.AdminPluginsAction
Plugins []string
}

// Next implements the Executor Next interface.
func (e *AdminPluginsExec) Next(ctx context.Context, _ *chunk.Chunk) error {
switch e.Action {
case core.Enable:
return e.changeDisableFlagAndFlush(false)
case core.Disable:
return e.changeDisableFlagAndFlush(true)
}
return nil
}

func (e *AdminPluginsExec) changeDisableFlagAndFlush(disabled bool) error {
dom := domain.GetDomain(e.ctx)
for _, pluginName := range e.Plugins {
err := plugin.ChangeDisableFlagAndFlush(dom, pluginName, disabled)
if err != nil {
return err
}
}
return nil
}
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildChecksumTable(v)
case *plannercore.ReloadExprPushdownBlacklist:
return b.buildReloadExprPushdownBlacklist(v)
case *plannercore.AdminPlugins:
return b.buildAdminPlugins(v)
case *plannercore.DDL:
return b.buildDDL(v)
case *plannercore.Deallocate:
Expand Down Expand Up @@ -467,6 +469,10 @@ func (b *executorBuilder) buildReloadExprPushdownBlacklist(v *plannercore.Reload
return &ReloadExprPushdownBlacklistExec{baseExecutor{ctx: b.ctx}}
}

func (b *executorBuilder) buildAdminPlugins(v *plannercore.AdminPlugins) Executor {
return &AdminPluginsExec{baseExecutor: baseExecutor{ctx: b.ctx}, Action: v.Action, Plugins: v.Plugins}
}

func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor {
base := newBaseExecutor(b.ctx, nil, v.ExplainID())
base.initCap = chunk.ZeroCapacity
Expand Down
2 changes: 1 addition & 1 deletion executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ func (e *ShowExec) fetchShowPlugins() error {
tiPlugins := plugin.GetAll()
for _, ps := range tiPlugins {
for _, p := range ps {
e.appendRow([]interface{}{p.Name, p.State.String(), p.Kind.String(), p.Path, p.License, strconv.Itoa(int(p.Version))})
e.appendRow([]interface{}{p.Name, p.StateValue(), p.Kind.String(), p.Path, p.License, strconv.Itoa(int(p.Version))})
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190701123046-5768e68c1e65
github.com/pingcap/parser v0.0.0-20190708123555-29973f7a22eb
github.com/pingcap/pd v0.0.0-20190617100349-293d4b5189bf
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXC
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20190701123046-5768e68c1e65 h1:yVYWPPQIq3csxtHvzx2fVO4HrQQOhxYDdYDQ4euSCIc=
github.com/pingcap/parser v0.0.0-20190701123046-5768e68c1e65/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20190708123555-29973f7a22eb h1:jfhJo/D1bWMF+zVaVdmixWG5EnxbnFt99GS2pdxuToo=
github.com/pingcap/parser v0.0.0-20190708123555-29973f7a22eb/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v0.0.0-20190617100349-293d4b5189bf h1:vmlN6DpZI5LtHd8r9YRAsyCeTU2pxRq+WlWn5CZ+ax4=
github.com/pingcap/pd v0.0.0-20190617100349-293d4b5189bf/go.mod h1:3DlDlFT7EF64A1bmb/tulZb6wbPSagm5G4p1AlhaEDs=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU=
Expand Down
17 changes: 17 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ type ReloadExprPushdownBlacklist struct {
baseSchemaProducer
}

// AdminPluginsAction indicate action will be taken on plugins.
type AdminPluginsAction int

const (
// Enable indicates enable plugins.
Enable AdminPluginsAction = iota + 1
// Disable indicates disable plugins.
Disable
)

// AdminPlugins administrates tidb plugins.
type AdminPlugins struct {
baseSchemaProducer
Action AdminPluginsAction
Plugins []string
}

// Change represents a change plan.
type Change struct {
baseSchemaProducer
Expand Down
4 changes: 4 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,10 @@ func (b *PlanBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) {
ret = p
case ast.AdminReloadExprPushdownBlacklist:
return &ReloadExprPushdownBlacklist{}, nil
case ast.AdminPluginEnable:
return &AdminPlugins{Action: Enable, Plugins: as.Plugins}, nil
case ast.AdminPluginDisable:
return &AdminPlugins{Action: Disable, Plugins: as.Plugins}, nil
default:
return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as)
}
Expand Down
3 changes: 1 addition & 2 deletions plugin/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package plugin
import (
"context"

"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/sessionctx/variable"
)

Expand Down Expand Up @@ -77,7 +76,7 @@ type AuditManifest struct {
Manifest
// OnConnectionEvent will be called when TiDB receive or disconnect from client.
// return error will ignore and close current connection.
OnConnectionEvent func(ctx context.Context, identity *auth.UserIdentity, event ConnectionEvent, info *variable.ConnectionInfo) error
OnConnectionEvent func(ctx context.Context, event ConnectionEvent, info *variable.ConnectionInfo) error
// OnGeneralEvent will be called during TiDB execution.
OnGeneralEvent func(ctx context.Context, sctx *variable.SessionVars, event GeneralEvent, cmd string)
// OnGlobalVariableEvent will be called when Change GlobalVariable.
Expand Down
77 changes: 71 additions & 6 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,28 @@ type Config struct {
// Plugin presents a TiDB plugin.
type Plugin struct {
*Manifest
library *gplugin.Plugin
State State
Path string
library *gplugin.Plugin
State State
Path string
Disabled uint32
}

// StateValue returns readable state string.
func (p *Plugin) StateValue() string {
flag := "enable"
if atomic.LoadUint32(&p.Disabled) == 1 {
flag = "disable"
}
return p.State.String() + "-" + flag
}

// DisableFlag changes the disable flag of plugin.
func (p *Plugin) DisableFlag(disable bool) {
if disable {
atomic.StoreUint32(&p.Disabled, 1)
} else {
atomic.StoreUint32(&p.Disabled, 0)
}
}

func (p *Plugin) validate(ctx context.Context, tiPlugins *plugins) error {
Expand Down Expand Up @@ -225,6 +244,7 @@ func Init(ctx context.Context, cfg Config) (err error) {
path: pluginWatchPrefix + tiPlugins.plugins[kind][i].Name,
etcd: cfg.EtcdClient,
manifest: tiPlugins.plugins[kind][i].Manifest,
plugin: &tiPlugins.plugins[kind][i],
}
tiPlugins.plugins[kind][i].flushWatcher = watcher
go util.WithRecovery(watcher.watchLoop, nil)
Expand All @@ -241,6 +261,7 @@ type flushWatcher struct {
path string
etcd *clientv3.Client
manifest *Manifest
plugin *Plugin
}

func (w *flushWatcher) watchLoop() {
Expand All @@ -250,14 +271,37 @@ func (w *flushWatcher) watchLoop() {
case <-w.ctx.Done():
return
case <-watchChan:
err := w.manifest.OnFlush(w.ctx, w.manifest)
disabled, err := w.getPluginDisabledFlag()
if err != nil {
logutil.BgLogger().Error("get plugin disabled flag failure", zap.String("plugin", w.manifest.Name), zap.Error(err))
}
if disabled {
atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 1)
} else {
atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 0)
}
err = w.manifest.OnFlush(w.ctx, w.manifest)
if err != nil {
logutil.BgLogger().Error("notify plugin flush event failed", zap.String("plugin", w.manifest.Name), zap.Error(err))
}
}
}
}

func (w *flushWatcher) getPluginDisabledFlag() (bool, error) {
if w == nil || w.etcd == nil {
return true, errors.New("etcd is need to get plugin enable status")
}
resp, err := w.etcd.Get(context.Background(), w.manifest.flushWatcher.path)
if err != nil {
return true, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return false, nil
}
return string(resp.Kvs[0].Value) == "1", nil
}

type loadFn func(plugin *Plugin, dir string, pluginID ID) (manifest func() *Manifest, err error)

var testHook *struct {
Expand Down Expand Up @@ -366,6 +410,9 @@ func ForeachPlugin(kind Kind, fn func(plugin *Plugin) error) error {
if p.State != Ready {
continue
}
if atomic.LoadUint32(&p.Disabled) == 1 {
continue
}
err := fn(p)
if err != nil {
return err
Expand All @@ -382,7 +429,7 @@ func IsEnable(kind Kind) bool {
}
for i := range plugins.plugins[kind] {
p := &plugins.plugins[kind][i]
if p.State == Ready {
if p.State == Ready && atomic.LoadUint32(&p.Disabled) != 1 {
return true
}
}
Expand All @@ -404,7 +451,25 @@ func NotifyFlush(dom *domain.Domain, pluginName string) error {
if p == nil || p.Manifest.flushWatcher == nil || p.State != Ready {
return errors.Errorf("plugin %s doesn't exists or unsupported flush or doesn't start with PD", pluginName)
}
_, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, "")
_, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, strconv.Itoa(int(p.Disabled)))
if err != nil {
return err
}
return nil
}

// ChangeDisableFlagAndFlush changes plugin disable flag and notify other nodes to do same change.
func ChangeDisableFlagAndFlush(dom *domain.Domain, pluginName string, disable bool) error {
p := getByName(pluginName)
if p == nil || p.Manifest.flushWatcher == nil || p.State != Ready {
return errors.Errorf("plugin %s doesn't exists or unsupported flush or doesn't start with PD", pluginName)
}
disableInt := uint32(0)
if disable {
disableInt = 1
}
atomic.StoreUint32(&p.Disabled, disableInt)
_, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, strconv.Itoa(int(disableInt)))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,7 @@ func (cc *clientConn) handleChangeUser(ctx context.Context, data []byte) error {
authPlugin := plugin.DeclareAuditManifest(p.Manifest)
if authPlugin.OnConnectionEvent != nil {
connInfo := cc.ctx.GetSessionVars().ConnectionInfo
err = authPlugin.OnConnectionEvent(context.Background(), &auth.UserIdentity{Hostname: connInfo.Host}, plugin.ChangeUser, connInfo)
err = authPlugin.OnConnectionEvent(context.Background(), plugin.ChangeUser, connInfo)
if err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (

"github.com/blacktear23/go-proxyprotocol"
"github.com/pingcap/errors"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -342,7 +341,7 @@ func (s *Server) Run() error {
terror.Log(clientConn.Close())
return errors.Trace(err)
}
err = authPlugin.OnConnectionEvent(context.Background(), &auth.UserIdentity{Hostname: host}, plugin.PreAuth, nil)
err = authPlugin.OnConnectionEvent(context.Background(), plugin.PreAuth, &variable.ConnectionInfo{Host: host})
if err != nil {
logutil.BgLogger().Info("do connection event failed", zap.Error(err))
terror.Log(clientConn.Close())
Expand Down Expand Up @@ -429,7 +428,7 @@ func (s *Server) onConn(conn *clientConn) {
authPlugin := plugin.DeclareAuditManifest(p.Manifest)
if authPlugin.OnConnectionEvent != nil {
sessionVars := conn.ctx.GetSessionVars()
return authPlugin.OnConnectionEvent(context.Background(), sessionVars.User, plugin.Connected, sessionVars.ConnectionInfo)
return authPlugin.OnConnectionEvent(context.Background(), plugin.Connected, sessionVars.ConnectionInfo)
}
return nil
})
Expand All @@ -445,7 +444,7 @@ func (s *Server) onConn(conn *clientConn) {
if authPlugin.OnConnectionEvent != nil {
sessionVars := conn.ctx.GetSessionVars()
sessionVars.ConnectionInfo.Duration = float64(time.Since(connectedTime)) / float64(time.Millisecond)
err := authPlugin.OnConnectionEvent(context.Background(), sessionVars.User, plugin.Disconnect, sessionVars.ConnectionInfo)
err := authPlugin.OnConnectionEvent(context.Background(), plugin.Disconnect, sessionVars.ConnectionInfo)
if err != nil {
logutil.BgLogger().Warn("do connection event failed", zap.String("plugin", authPlugin.Name), zap.Error(err))
}
Expand Down

0 comments on commit 8104494

Please sign in to comment.