Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dumpling : add a function for the variable call of dm #30033

Merged
merged 16 commits into from
Nov 29, 2021
Merged
4 changes: 4 additions & 0 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

// import mysql driver
Expand Down Expand Up @@ -49,6 +50,7 @@ type Dumper struct {

tidbPDClientForGC pd.Client
selectTiDBTableRegionFunc func(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta) (pkFields []string, pkVals [][]string, err error)
totalTables int64
}

// NewDumper returns a new Dumper
Expand Down Expand Up @@ -158,6 +160,8 @@ func (d *Dumper) Dump() (dumpErr error) {
tctx.L().Info("cannot update select table region info for TiDB", zap.Error(err))
}

atomic.StoreInt64(&d.totalTables, int64(calculateTableCount(conf.Tables)))

rebuildConn := func(conn *sql.Conn) (*sql.Conn, error) {
// make sure that the lock connection is still alive
err1 := conCtrl.PingContext(tctx)
Expand Down
40 changes: 26 additions & 14 deletions dumpling/export/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
const logProgressTick = 2 * time.Minute

func (d *Dumper) runLogProgress(tctx *tcontext.Context) {
conf := d.conf
totalTables := float64(calculateTableCount(conf.Tables))
logProgressTicker := time.NewTicker(logProgressTick)
lastCheckpoint := time.Now()
lastBytes := float64(0)
Expand All @@ -28,26 +26,40 @@ func (d *Dumper) runLogProgress(tctx *tcontext.Context) {
return
case <-logProgressTicker.C:
nanoseconds := float64(time.Since(lastCheckpoint).Nanoseconds())

completedTables := ReadCounter(finishedTablesCounter, conf.Labels)
finishedBytes := ReadGauge(finishedSizeGauge, conf.Labels)
finishedRows := ReadGauge(finishedRowsGauge, conf.Labels)
estimateTotalRows := ReadCounter(estimateTotalRowsCounter, conf.Labels)

lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
midd := d.GetParameters()
tctx.L().Info("progress",
zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", completedTables, totalTables, completedTables/totalTables*100)),
zap.String("finished rows", fmt.Sprintf("%.0f", finishedRows)),
zap.String("estimate total rows", fmt.Sprintf("%.0f", estimateTotalRows)),
zap.String("finished size", units.HumanSize(finishedBytes)),
zap.Float64("average speed(MiB/s)", (finishedBytes-lastBytes)/(1048576e-9*nanoseconds)),
zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", midd.CompletedTables, float64(d.totalTables), midd.CompletedTables/float64(d.totalTables)*100)),
zap.String("finished rows", fmt.Sprintf("%.0f", midd.FinishedRows)),
zap.String("estimate total rows", fmt.Sprintf("%.0f", midd.EstimateTotalRows)),
zap.String("finished size", units.HumanSize(midd.FinishedBytes)),
zap.Float64("average speed(MiB/s)", (midd.FinishedBytes-lastBytes)/(1048576e-9*nanoseconds)),
)

lastCheckpoint = time.Now()
lastBytes = finishedBytes
lastBytes = midd.FinishedBytes
}
}
}

type Midparams struct {
CompletedTables float64
FinishedBytes float64
FinishedRows float64
EstimateTotalRows float64
TotalTables float64
}

func (d *Dumper) GetParameters() (midparams *Midparams) {
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
conf := d.conf
mid := &Midparams{}
mid.TotalTables = float64(d.totalTables)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atomic.LoadInt64

mid.CompletedTables = ReadCounter(finishedTablesCounter, conf.Labels)
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
mid.FinishedBytes = ReadGauge(finishedSizeGauge, conf.Labels)
mid.FinishedRows = ReadGauge(finishedRowsGauge, conf.Labels)
mid.EstimateTotalRows = ReadCounter(estimateTotalRowsCounter, conf.Labels)
return mid
}

func calculateTableCount(m DatabaseTables) int {
cnt := 0
for _, tables := range m {
Expand Down
32 changes: 32 additions & 0 deletions dumpling/export/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

package export

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGetParameters(t *testing.T) {
conf := defaultConfigForTest(t)
d := &Dumper{conf: conf}
InitMetricsVector(conf.Labels)

mid := d.GetParameters()
require.EqualValues(t, float64(0), mid.CompletedTables)
require.EqualValues(t, float64(0), mid.FinishedBytes)
require.EqualValues(t, float64(0), mid.FinishedRows)
require.EqualValues(t, float64(0), mid.EstimateTotalRows)

AddCounter(finishedTablesCounter, conf.Labels, 10)
AddGauge(finishedSizeGauge, conf.Labels, 20)
AddGauge(finishedRowsGauge, conf.Labels, 30)
AddCounter(estimateTotalRowsCounter, conf.Labels, 40)

mid = d.GetParameters()
require.EqualValues(t, float64(10), mid.CompletedTables)
require.EqualValues(t, float64(20), mid.FinishedBytes)
require.EqualValues(t, float64(30), mid.FinishedRows)
require.EqualValues(t, float64(40), mid.EstimateTotalRows)
}