Skip to content

Commit

Permalink
lightning: record network usage (#39673)
Browse files Browse the repository at this point in the history
ref #39674
  • Loading branch information
buchuitoudegou committed Dec 8, 2022
1 parent 168c2ab commit 4b98439
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 0 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ go_library(
"//br/pkg/version/build",
"//expression",
"//planner/core",
"//util",
"//util/promutil",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type MySQLConnectParam struct {
SQLMode string
MaxAllowedPacket uint64
TLS string
Net string
Vars map[string]string
}

Expand All @@ -64,6 +65,9 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config {
cfg.User = param.User
cfg.Passwd = param.Password
cfg.Net = "tcp"
if param.Net != "" {
cfg.Net = param.Net
}
cfg.Addr = net.JoinHostPort(param.Host, strconv.Itoa(param.Port))
cfg.Params["charset"] = "utf8mb4"
cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode)
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type DBStore struct {
IndexSerialScanConcurrency int `toml:"index-serial-scan-concurrency" json:"index-serial-scan-concurrency"`
ChecksumTableConcurrency int `toml:"checksum-table-concurrency" json:"checksum-table-concurrency"`
Vars map[string]string `toml:"-" json:"vars"`

IOTotalBytes *atomic.Uint64 `toml:"-" json:"-"`
UUID string `toml:"-" json:"-"`
}

type Config struct {
Expand Down
34 changes: 34 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"sync"
"time"

"github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/import_sstpb"
Expand All @@ -53,11 +55,13 @@ import (
"github.com/pingcap/tidb/br/pkg/version/build"
_ "github.com/pingcap/tidb/expression" // get rid of `import cycle`: just init expression.RewriteAstExpr,and called at package `backend.kv`.
_ "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/promutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/shurcooL/httpgzip"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -370,6 +374,36 @@ func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config.
taskCfg.TaskID = int64(val.(int))
})

failpoint.Inject("SetIOTotalBytes", func(_ failpoint.Value) {
o.logger.Info("set io total bytes")
taskCfg.TiDB.IOTotalBytes = atomic.NewUint64(0)
taskCfg.TiDB.UUID = uuid.New().String()
go func() {
for {
time.Sleep(time.Millisecond * 10)
log.L().Info("IOTotalBytes", zap.Uint64("IOTotalBytes", taskCfg.TiDB.IOTotalBytes.Load()))
}
}()
})
if taskCfg.TiDB.IOTotalBytes != nil {
o.logger.Info("found IO total bytes counter")
mysql.RegisterDialContext(taskCfg.TiDB.UUID, func(ctx context.Context, addr string) (net.Conn, error) {
o.logger.Debug("connection with IO bytes counter")
d := &net.Dialer{}
conn, err := d.DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
}
tcpConn := conn.(*net.TCPConn)
// try https://github.com/go-sql-driver/mysql/blob/bcc459a906419e2890a50fc2c99ea6dd927a88f2/connector.go#L56-L64
err = tcpConn.SetKeepAlive(true)
if err != nil {
o.logger.Warn("set TCP keep alive failed", zap.Error(err))
}
return util.NewTCPConnWithIOCounter(tcpConn, taskCfg.TiDB.IOTotalBytes), nil
})
}

return l.run(taskCtx, taskCfg, o)
}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) {
SQLMode: dsn.StrSQLMode,
MaxAllowedPacket: dsn.MaxAllowedPacket,
TLS: dsn.TLS,
Net: dsn.UUID,
}

db, err := param.Connect()
Expand Down
2 changes: 2 additions & 0 deletions br/tests/lightning_record_network/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
backend = 'tidb'
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create database db;
1 change: 1 addition & 0 deletions br/tests/lightning_record_network/data/db.test-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table test ( id int primary key, a int, b int );
21 changes: 21 additions & 0 deletions br/tests/lightning_record_network/data/db.test.1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
insert into db.test values
(1,1,1),
(2,1,1),
(3,1,1),
(4,1,1),
(5,1,1),
(6,1,1),
(7,1,1),
(8,1,1),
(9,1,1),
(10,1,1),
(11,1,1),
(12,1,1),
(13,1,1),
(14,1,1),
(15,1,1),
(16,1,1),
(17,1,1),
(18,1,1),
(19,1,1),
(20,1,1);
22 changes: 22 additions & 0 deletions br/tests/lightning_record_network/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/sh
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -euE

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/SetIOTotalBytes=return(1)"
run_lightning

grep 'IOTotal' "$TEST_DIR/lightning.log" | grep -v 'IOTotalBytes=0'
1 change: 1 addition & 0 deletions util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_google_grpc//:grpc",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
Expand Down
29 changes: 29 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -202,3 +204,30 @@ func FmtNonASCIIPrintableCharToHex(str string) string {
}
return b.String()
}

// TCPConnWithIOCounter is a wrapper of net.TCPConn with counter that accumulates
// the bytes this connection reads/writes.
type TCPConnWithIOCounter struct {
*net.TCPConn
c *atomic.Uint64
}

// NewTCPConnWithIOCounter creates a new TCPConnWithIOCounter.
func NewTCPConnWithIOCounter(conn *net.TCPConn, c *atomic.Uint64) net.Conn {
return &TCPConnWithIOCounter{
TCPConn: conn,
c: c,
}
}

func (t *TCPConnWithIOCounter) Read(b []byte) (n int, err error) {
n, err = t.TCPConn.Read(b)
t.c.Add(uint64(n))
return n, err
}

func (t *TCPConnWithIOCounter) Write(b []byte) (n int, err error) {
n, err = t.TCPConn.Write(b)
t.c.Add(uint64(n))
return n, err
}

0 comments on commit 4b98439

Please sign in to comment.