Skip to content

Commit

Permalink
dumpling : add a function for the variable call of dm (#30033)
Browse files Browse the repository at this point in the history
  • Loading branch information
docsir authored Nov 29, 2021
1 parent 3487bb7 commit 87f4c7d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
4 changes: 4 additions & 0 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

// import mysql driver
Expand Down Expand Up @@ -50,6 +51,7 @@ type Dumper struct {

tidbPDClientForGC pd.Client
selectTiDBTableRegionFunc func(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta) (pkFields []string, pkVals [][]string, err error)
totalTables int64
}

// NewDumper returns a new Dumper
Expand Down Expand Up @@ -159,6 +161,8 @@ func (d *Dumper) Dump() (dumpErr error) {
tctx.L().Info("cannot update select table region info for TiDB", zap.Error(err))
}

atomic.StoreInt64(&d.totalTables, int64(calculateTableCount(conf.Tables)))

rebuildConn := func(conn *sql.Conn) (*sql.Conn, error) {
// make sure that the lock connection is still alive
err1 := conCtrl.PingContext(tctx)
Expand Down
41 changes: 27 additions & 14 deletions dumpling/export/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package export

import (
"fmt"
"sync/atomic"
"time"

"github.com/docker/go-units"
Expand All @@ -15,8 +16,6 @@ import (
const logProgressTick = 2 * time.Minute

func (d *Dumper) runLogProgress(tctx *tcontext.Context) {
conf := d.conf
totalTables := float64(calculateTableCount(conf.Tables))
logProgressTicker := time.NewTicker(logProgressTick)
lastCheckpoint := time.Now()
lastBytes := float64(0)
Expand All @@ -28,26 +27,40 @@ func (d *Dumper) runLogProgress(tctx *tcontext.Context) {
return
case <-logProgressTicker.C:
nanoseconds := float64(time.Since(lastCheckpoint).Nanoseconds())

completedTables := ReadCounter(finishedTablesCounter, conf.Labels)
finishedBytes := ReadGauge(finishedSizeGauge, conf.Labels)
finishedRows := ReadGauge(finishedRowsGauge, conf.Labels)
estimateTotalRows := ReadCounter(estimateTotalRowsCounter, conf.Labels)

midd := d.GetParameters()
tctx.L().Info("progress",
zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", completedTables, totalTables, completedTables/totalTables*100)),
zap.String("finished rows", fmt.Sprintf("%.0f", finishedRows)),
zap.String("estimate total rows", fmt.Sprintf("%.0f", estimateTotalRows)),
zap.String("finished size", units.HumanSize(finishedBytes)),
zap.Float64("average speed(MiB/s)", (finishedBytes-lastBytes)/(1048576e-9*nanoseconds)),
zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", midd.CompletedTables, float64(d.totalTables), midd.CompletedTables/float64(d.totalTables)*100)),
zap.String("finished rows", fmt.Sprintf("%.0f", midd.FinishedRows)),
zap.String("estimate total rows", fmt.Sprintf("%.0f", midd.EstimateTotalRows)),
zap.String("finished size", units.HumanSize(midd.FinishedBytes)),
zap.Float64("average speed(MiB/s)", (midd.FinishedBytes-lastBytes)/(1048576e-9*nanoseconds)),
)

lastCheckpoint = time.Now()
lastBytes = finishedBytes
lastBytes = midd.FinishedBytes
}
}
}

type Midparams struct {
CompletedTables float64
FinishedBytes float64
FinishedRows float64
EstimateTotalRows float64
TotalTables int64
}

func (d *Dumper) GetParameters() (midparams *Midparams) {
conf := d.conf
mid := &Midparams{}
mid.TotalTables = atomic.LoadInt64(&d.totalTables)
mid.CompletedTables = ReadCounter(finishedTablesCounter, conf.Labels)
mid.FinishedBytes = ReadGauge(finishedSizeGauge, conf.Labels)
mid.FinishedRows = ReadGauge(finishedRowsGauge, conf.Labels)
mid.EstimateTotalRows = ReadCounter(estimateTotalRowsCounter, conf.Labels)
return mid
}

func calculateTableCount(m DatabaseTables) int {
cnt := 0
for _, tables := range m {
Expand Down
32 changes: 32 additions & 0 deletions dumpling/export/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

package export

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGetParameters(t *testing.T) {
conf := defaultConfigForTest(t)
d := &Dumper{conf: conf}
InitMetricsVector(conf.Labels)

mid := d.GetParameters()
require.EqualValues(t, float64(0), mid.CompletedTables)
require.EqualValues(t, float64(0), mid.FinishedBytes)
require.EqualValues(t, float64(0), mid.FinishedRows)
require.EqualValues(t, float64(0), mid.EstimateTotalRows)

AddCounter(finishedTablesCounter, conf.Labels, 10)
AddGauge(finishedSizeGauge, conf.Labels, 20)
AddGauge(finishedRowsGauge, conf.Labels, 30)
AddCounter(estimateTotalRowsCounter, conf.Labels, 40)

mid = d.GetParameters()
require.EqualValues(t, float64(10), mid.CompletedTables)
require.EqualValues(t, float64(20), mid.FinishedBytes)
require.EqualValues(t, float64(30), mid.FinishedRows)
require.EqualValues(t, float64(40), mid.EstimateTotalRows)
}

0 comments on commit 87f4c7d

Please sign in to comment.