Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add binlog_status for http api and TIDB_SERVERS_INFO table (#13025) #13187

Merged
merged 3 commits into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,29 +301,31 @@ timezone.*
```shell
$curl http://127.0.0.1:10080/info/all
{
"servers_num": 2,
"owner_id": "29a65ec0-d931-4f9e-a212-338eaeffab96",
"is_all_server_version_consistent": true,
"all_servers_info": {
"275a19ae-d248-4dc0-b78c-6613a7509423": {
"ddl_id": "275a19ae-d248-4dc0-b78c-6613a7509423",
"git_hash": "f572e33854e1c0f942f031e9656d0004f99995c6",
"ip": "192.168.197.206",
"29a65ec0-d931-4f9e-a212-338eaeffab96": {
"version": "5.7.25-TiDB-v4.0.0-alpha-669-g8f2a09a52-dirty",
"git_hash": "8f2a09a52fdcaf9d9bfd775d2c6023f363dc121e",
"ddl_id": "29a65ec0-d931-4f9e-a212-338eaeffab96",
"ip": "",
"listening_port": 4000,
"status_port": 10080,
"lease": "45s",
"binlog_status": "Off"
},
"cd13c9eb-c3ee-4887-af9b-e64f3162d92c": {
"version": "5.7.25-TiDB-v4.0.0-alpha-669-g8f2a09a52-dirty",
"git_hash": "8f2a09a52fdcaf9d9bfd775d2c6023f363dc121e",
"ddl_id": "cd13c9eb-c3ee-4887-af9b-e64f3162d92c",
"ip": "",
"listening_port": 4001,
"status_port": 10081,
"version": "5.7.25-TiDB-v2.1.0-rc.3-355-gf572e3385-dirty"
},
"f7e73ed5-63b4-4cb4-ba7c-42b32dc74e77": {
"ddl_id": "f7e73ed5-63b4-4cb4-ba7c-42b32dc74e77",
"git_hash": "f572e33854e1c0f942f031e9656d0004f99995c6",
"ip": "192.168.197.206",
"lease": "45s",
"listening_port": 4000,
"status_port": 10080,
"version": "5.7.25-TiDB-v2.1.0-rc.3-355-gf572e3385-dirty"
"binlog_status": "Off"
}
},
"is_all_server_version_consistent": true,
"owner_id": "f7e73ed5-63b4-4cb4-ba7c-42b32dc74e77",
"servers_num": 2
}
}
```

Expand Down
33 changes: 21 additions & 12 deletions domain/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/printer"
Expand Down Expand Up @@ -56,11 +57,12 @@ type InfoSyncer struct {
// It will not be updated when tidb-server running. So please only put static information in ServerInfo struct.
type ServerInfo struct {
ServerVersionInfo
ID string `json:"ddl_id"`
IP string `json:"ip"`
Port uint `json:"listening_port"`
StatusPort uint `json:"status_port"`
Lease string `json:"lease"`
ID string `json:"ddl_id"`
IP string `json:"ip"`
Port uint `json:"listening_port"`
StatusPort uint `json:"status_port"`
Lease string `json:"lease"`
BinlogStatus string `json:"binlog_status"`
}

// ServerVersionInfo is the server version and git_hash.
Expand Down Expand Up @@ -168,7 +170,11 @@ func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt
return err
}
is.session = session

binloginfo.RegisterStatusListener(func(status binloginfo.BinlogStatus) error {
is.info.BinlogStatus = status.String()
err := is.storeServerInfo(ctx)
return errors.Trace(err)
})
err = is.storeServerInfo(ctx)
return err
}
Expand All @@ -194,7 +200,9 @@ func getInfo(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt
continue
}
for _, kv := range resp.Kvs {
info := &ServerInfo{}
info := &ServerInfo{
BinlogStatus: binloginfo.BinlogStatusUnknown.String(),
}
err = json.Unmarshal(kv.Value, info)
if err != nil {
logutil.Logger(context.Background()).Info("get key failed", zap.String("key", string(kv.Key)), zap.ByteString("value", kv.Value),
Expand All @@ -212,11 +220,12 @@ func getInfo(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt
func getServerInfo(id string) *ServerInfo {
cfg := config.GetGlobalConfig()
info := &ServerInfo{
ID: id,
IP: cfg.AdvertiseAddress,
Port: cfg.Port,
StatusPort: cfg.Status.StatusPort,
Lease: cfg.Lease,
ID: id,
IP: cfg.AdvertiseAddress,
Port: cfg.Port,
StatusPort: cfg.Status.StatusPort,
Lease: cfg.Lease,
BinlogStatus: binloginfo.GetStatus().String(),
}
info.Version = mysql.ServerVersion
info.GitHash = printer.TiDBGitHash
Expand Down
62 changes: 60 additions & 2 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb-tools/tidb-binlog/node"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
Expand All @@ -52,6 +53,33 @@ type BinlogInfo struct {
Client *pumpcli.PumpsClient
}

// BinlogStatus is the status of binlog
type BinlogStatus int

const (
//BinlogStatusUnknown stands for unknown binlog status
BinlogStatusUnknown BinlogStatus = iota
//BinlogStatusOn stands for the binlog is enabled
BinlogStatusOn
//BinlogStatusOff stands for the binlog is disabled
BinlogStatusOff
//BinlogStatusSkipping stands for the binlog status
BinlogStatusSkipping
)

// String implements String function in fmt.Stringer
func (s BinlogStatus) String() string {
switch s {
case BinlogStatusOn:
return "On"
case BinlogStatusOff:
return "Off"
case BinlogStatusSkipping:
return "Skipping"
}
return "Unknown"
}

// GetPumpsClient gets the pumps client instance.
func GetPumpsClient() *pumpcli.PumpsClient {
pumpsClientLock.RLock()
Expand Down Expand Up @@ -81,6 +109,9 @@ func GetPrewriteValue(ctx sessionctx.Context, createIfNotExists bool) *binlog.Pr

var skipBinlog uint32
var ignoreError uint32
var statusListener = func(_ BinlogStatus) error {
return nil
}

// DisableSkipBinlogFlag disable the skipBinlog flag.
func DisableSkipBinlogFlag() {
Expand All @@ -98,6 +129,24 @@ func SetIgnoreError(on bool) {
}
}

// GetStatus gets the status of binlog
func GetStatus() BinlogStatus {
conf := config.GetGlobalConfig()
if !conf.Binlog.Enable {
return BinlogStatusOff
}
skip := atomic.LoadUint32(&skipBinlog)
if skip > 0 {
return BinlogStatusSkipping
}
return BinlogStatusOn
}

// RegisterStatusListener registers a listener function to watch binlog status
func RegisterStatusListener(listener func(BinlogStatus) error) {
statusListener = listener
}

// WriteBinlog writes a binlog to Pump.
func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
skip := atomic.LoadUint32(&skipBinlog)
Expand All @@ -113,12 +162,21 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
// it will retry in PumpsClient if write binlog fail.
err := info.Client.WriteBinlog(info.Data)
if err != nil {
logutil.Logger(context.Background()).Error("write binlog failed", zap.Error(err))
logutil.Logger(context.Background()).Error("write binlog failed",
zap.String("binlog_type", info.Data.Tp.String()),
zap.Uint64("binlog_start_ts", uint64(info.Data.StartTs)),
zap.Uint64("binlog_commit_ts", uint64(info.Data.CommitTs)),
zap.Error(err))
if atomic.LoadUint32(&ignoreError) == 1 {
logutil.Logger(context.Background()).Error("write binlog fail but error ignored")
metrics.CriticalErrorCounter.Add(1)
// If error happens once, we'll stop writing binlog.
atomic.CompareAndSwapUint32(&skipBinlog, skip, skip+1)
swapped := atomic.CompareAndSwapUint32(&skipBinlog, skip, skip+1)
if swapped && skip == 0 {
if err := statusListener(BinlogStatusSkipping); err != nil {
logutil.Logger(context.Background()).Warn("update binlog status failed", zap.Error(err))
}
}
return nil
}

Expand Down