From 231cdd3abb5f6d4f042e080bc18731ae471a67c8 Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 6 Nov 2019 11:47:09 +0800 Subject: [PATCH] Add binlog_status for http api and `TIDB_SERVERS_INFO` table (#13025) # Conflicts: # domain/info.go # infoschema/tables.go # sessionctx/binloginfo/binloginfo.go # Conflicts: # docs/tidb_http_api.md --- domain/info.go | 33 +++++++++------ sessionctx/binloginfo/binloginfo.go | 62 ++++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 14 deletions(-) diff --git a/domain/info.go b/domain/info.go index 9416d861362f1..11daaedce6b6e 100644 --- a/domain/info.go +++ b/domain/info.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/owner" + "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/printer" @@ -56,11 +57,12 @@ type InfoSyncer struct { // It will not be updated when tidb-server running. So please only put static information in ServerInfo struct. type ServerInfo struct { ServerVersionInfo - ID string `json:"ddl_id"` - IP string `json:"ip"` - Port uint `json:"listening_port"` - StatusPort uint `json:"status_port"` - Lease string `json:"lease"` + ID string `json:"ddl_id"` + IP string `json:"ip"` + Port uint `json:"listening_port"` + StatusPort uint `json:"status_port"` + Lease string `json:"lease"` + BinlogStatus string `json:"binlog_status"` } // ServerVersionInfo is the server version and git_hash. @@ -168,7 +170,11 @@ func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt return errors.Trace(err) } is.session = session - + binloginfo.RegisterStatusListener(func(status binloginfo.BinlogStatus) error { + is.info.BinlogStatus = status.String() + err := is.storeServerInfo(ctx) + return errors.Trace(err) + }) err = is.storeServerInfo(ctx) return errors.Trace(err) } @@ -194,7 +200,9 @@ func getInfo(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt continue } for _, kv := range resp.Kvs { - info := &ServerInfo{} + info := &ServerInfo{ + BinlogStatus: binloginfo.BinlogStatusUnknown.String(), + } err = json.Unmarshal(kv.Value, info) if err != nil { logutil.Logger(context.Background()).Info("get key failed", zap.String("key", string(kv.Key)), zap.ByteString("value", kv.Value), @@ -212,11 +220,12 @@ func getInfo(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt func getServerInfo(id string) *ServerInfo { cfg := config.GetGlobalConfig() info := &ServerInfo{ - ID: id, - IP: cfg.AdvertiseAddress, - Port: cfg.Port, - StatusPort: cfg.Status.StatusPort, - Lease: cfg.Lease, + ID: id, + IP: cfg.AdvertiseAddress, + Port: cfg.Port, + StatusPort: cfg.Status.StatusPort, + Lease: cfg.Lease, + BinlogStatus: binloginfo.GetStatus().String(), } info.Version = mysql.ServerVersion info.GitHash = printer.TiDBGitHash diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index 67c1953fc9114..dec30cdf1d00f 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb-tools/tidb-binlog/node" pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" @@ -52,6 +53,33 @@ type BinlogInfo struct { Client *pumpcli.PumpsClient } +// BinlogStatus is the status of binlog +type BinlogStatus int + +const ( + //BinlogStatusUnknown stands for unknown binlog status + BinlogStatusUnknown BinlogStatus = iota + //BinlogStatusOn stands for the binlog is enabled + BinlogStatusOn + //BinlogStatusOff stands for the binlog is disabled + BinlogStatusOff + //BinlogStatusSkipping stands for the binlog status + BinlogStatusSkipping +) + +// String implements String function in fmt.Stringer +func (s BinlogStatus) String() string { + switch s { + case BinlogStatusOn: + return "On" + case BinlogStatusOff: + return "Off" + case BinlogStatusSkipping: + return "Skipping" + } + return "Unknown" +} + // GetPumpsClient gets the pumps client instance. func GetPumpsClient() *pumpcli.PumpsClient { pumpsClientLock.RLock() @@ -81,6 +109,9 @@ func GetPrewriteValue(ctx sessionctx.Context, createIfNotExists bool) *binlog.Pr var skipBinlog uint32 var ignoreError uint32 +var statusListener = func(_ BinlogStatus) error { + return nil +} // DisableSkipBinlogFlag disable the skipBinlog flag. func DisableSkipBinlogFlag() { @@ -98,6 +129,24 @@ func SetIgnoreError(on bool) { } } +// GetStatus gets the status of binlog +func GetStatus() BinlogStatus { + conf := config.GetGlobalConfig() + if !conf.Binlog.Enable { + return BinlogStatusOff + } + skip := atomic.LoadUint32(&skipBinlog) + if skip > 0 { + return BinlogStatusSkipping + } + return BinlogStatusOn +} + +// RegisterStatusListener registers a listener function to watch binlog status +func RegisterStatusListener(listener func(BinlogStatus) error) { + statusListener = listener +} + // WriteBinlog writes a binlog to Pump. func (info *BinlogInfo) WriteBinlog(clusterID uint64) error { skip := atomic.LoadUint32(&skipBinlog) @@ -113,12 +162,21 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error { // it will retry in PumpsClient if write binlog fail. err := info.Client.WriteBinlog(info.Data) if err != nil { - logutil.Logger(context.Background()).Error("write binlog failed", zap.Error(err)) + logutil.Logger(context.Background()).Error("write binlog failed", + zap.String("binlog_type", info.Data.Tp.String()), + zap.Uint64("binlog_start_ts", uint64(info.Data.StartTs)), + zap.Uint64("binlog_commit_ts", uint64(info.Data.CommitTs)), + zap.Error(err)) if atomic.LoadUint32(&ignoreError) == 1 { logutil.Logger(context.Background()).Error("write binlog fail but error ignored") metrics.CriticalErrorCounter.Add(1) // If error happens once, we'll stop writing binlog. - atomic.CompareAndSwapUint32(&skipBinlog, skip, skip+1) + swapped := atomic.CompareAndSwapUint32(&skipBinlog, skip, skip+1) + if swapped && skip == 0 { + if err := statusListener(BinlogStatusSkipping); err != nil { + logutil.Logger(context.Background()).Warn("update binlog status failed", zap.Error(err)) + } + } return nil }