Skip to content

Commit

Permalink
status(dm): add progress/bps from tidb/dumpling (#7511)
Browse files Browse the repository at this point in the history
ref #7343
  • Loading branch information
okJiang authored Nov 3, 2022
1 parent 915ad30 commit 02a7f72
Show file tree
Hide file tree
Showing 15 changed files with 547 additions and 321 deletions.
4 changes: 4 additions & 0 deletions dm/dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ func (m *Dumpling) status() *pb.DumpStatus {
FinishedBytes: dumpStatus.FinishedBytes,
FinishedRows: dumpStatus.FinishedRows,
EstimateTotalRows: dumpStatus.EstimateTotalRows,
Progress: dumpStatus.Progress,
Bps: int64(dumpStatus.CurrentSpeedBPS),
}
var estimateProgress string
if s.FinishedRows >= s.EstimateTotalRows {
Expand All @@ -290,6 +292,8 @@ func (m *Dumpling) status() *pb.DumpStatus {
zap.Int64("estimated_total_rows", int64(s.EstimateTotalRows)),
zap.Int64("finished_rows", int64(s.FinishedRows)),
zap.String("estimated_progress", estimateProgress),
zap.String("new progress", s.Progress),
zap.Int64("bps", s.Bps),
)
return s
}
Expand Down
47 changes: 37 additions & 10 deletions dm/dumpling/dumpling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

var _ = Suite(&testDumplingSuite{})
Expand Down Expand Up @@ -130,8 +131,9 @@ func (t *testDumplingSuite) TestDefaultConfig(c *C) {
c.Assert(dumpling.dumpConfig.Rows, Not(Equals), export.UnspecifiedSize)
}

func (t *testDumplingSuite) TestCallStatus(c *C) {
m := NewDumpling(t.cfg)
func TestCallStatus(t *testing.T) {
cfg := genDumpCfg(t)
m := NewDumpling(cfg)
m.metricProxies = defaultMetricProxies
ctx := context.Background()

Expand All @@ -146,21 +148,25 @@ func (t *testDumplingSuite) TestCallStatus(c *C) {
dumpConf.PromRegistry = tidbpromutil.NewDefaultRegistry()

s := m.Status(nil).(*pb.DumpStatus)
c.Assert(s.CompletedTables, Equals, float64(0))
c.Assert(s.FinishedBytes, Equals, float64(0))
c.Assert(s.FinishedRows, Equals, float64(0))
c.Assert(s.EstimateTotalRows, Equals, float64(0))
require.Equal(t, s.CompletedTables, float64(0))
require.Equal(t, s.FinishedBytes, float64(0))
require.Equal(t, s.FinishedRows, float64(0))
require.Equal(t, s.EstimateTotalRows, float64(0))
require.Equal(t, s.Progress, "")
require.Equal(t, s.Bps, int64(0))

// NewDumper is the only way we can set conf to Dumper, but it will return error. so we just ignore the error
dumpling, _ := export.NewDumper(ctx, dumpConf)
m.core = dumpling

m.Close()
s = m.Status(nil).(*pb.DumpStatus)
c.Assert(s.CompletedTables, Equals, float64(0))
c.Assert(s.FinishedBytes, Equals, float64(0))
c.Assert(s.FinishedRows, Equals, float64(0))
c.Assert(s.EstimateTotalRows, Equals, float64(0))
require.Equal(t, s.CompletedTables, float64(0))
require.Equal(t, s.FinishedBytes, float64(0))
require.Equal(t, s.FinishedRows, float64(0))
require.Equal(t, s.EstimateTotalRows, float64(0))
require.Equal(t, s.Progress, "")
require.Equal(t, s.Bps, int64(0))
}

func (t *testDumplingSuite) TestParseArgsWontOverwrite(c *C) {
Expand Down Expand Up @@ -208,3 +214,24 @@ func (t *testDumplingSuite) TestConstructArgs(c *C) {
c.Assert(exportCfg.SessionParams, NotNil)
c.Assert(exportCfg.SessionParams["time_zone"], Equals, "+01:00")
}

func genDumpCfg(t *testing.T) *config.SubTaskConfig {
t.Helper()

dir := t.TempDir()
return &config.SubTaskConfig{
Name: "dumpling_ut",
Timezone: "UTC",
From: config.GetDBConfigForTest(),
LoaderConfig: config.LoaderConfig{
Dir: dir,
},
BAList: &filter.Rules{
DoDBs: []string{testDumplingSchemaName},
DoTables: []*filter.Table{{
Schema: testDumplingSchemaName,
Name: testDumplingTableName,
}},
},
}
}
19 changes: 10 additions & 9 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/dumpling/export"
tidbpromutil "github.com/pingcap/tidb/util/promutil"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pb"
Expand Down Expand Up @@ -74,7 +75,7 @@ type LightningLoader struct {
metaBinlog atomic.String
metaBinlogGTID atomic.String

statusRecorder *statusRecorder
speedRecorder *export.SpeedRecorder
}

// NewLightning creates a new Loader importing data with lightning.
Expand All @@ -91,7 +92,7 @@ func NewLightning(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName st
lightningGlobalConfig: lightningCfg,
core: lightning.New(lightningCfg),
logger: logger.WithFields(zap.String("task", cfg.Name), zap.String("unit", "lightning-load")),
statusRecorder: newStatusRecorder(),
speedRecorder: export.NewSpeedRecorder(),
}
return loader
}
Expand Down Expand Up @@ -465,7 +466,7 @@ func (l *LightningLoader) Update(ctx context.Context, cfg *config.SubTaskConfig)
func (l *LightningLoader) status() *pb.LoadStatus {
finished, total := l.core.Status()
progress := percent(finished, total, l.finish.Load())
currentSpeed := l.statusRecorder.getSpeed(finished)
currentSpeed := int64(l.speedRecorder.GetSpeed(float64(finished)))

l.logger.Info("progress status of lightning",
zap.Int64("finished_bytes", finished),
Expand All @@ -474,12 +475,12 @@ func (l *LightningLoader) status() *pb.LoadStatus {
zap.Int64("current speed (bytes / seconds)", currentSpeed),
)
s := &pb.LoadStatus{
FinishedBytes: finished,
TotalBytes: total,
Progress: progress,
MetaBinlog: l.metaBinlog.Load(),
MetaBinlogGTID: l.metaBinlogGTID.Load(),
CurrentSpeedBytesPerSecond: currentSpeed,
FinishedBytes: finished,
TotalBytes: total,
Progress: progress,
MetaBinlog: l.metaBinlog.Load(),
MetaBinlogGTID: l.metaBinlogGTID.Load(),
Bps: currentSpeed,
}
return s
}
Expand Down
19 changes: 10 additions & 9 deletions dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/util/filter"
regexprrouter "github.com/pingcap/tidb/util/regexpr-router"
router "github.com/pingcap/tidb/util/table-router"
Expand Down Expand Up @@ -445,7 +446,7 @@ type Loader struct {
dbTableDataFinishedSize map[string]map[string]*atomic.Int64
dbTableDataLastFinishedSize map[string]map[string]*atomic.Int64
dbTableDataLastUpdatedTime atomic.Time
statusRecorder *statusRecorder
speedRecorder *export.SpeedRecorder

metaBinlog atomic.String
metaBinlogGTID atomic.String
Expand All @@ -466,14 +467,14 @@ type Loader struct {
// NewLoader creates a new Loader.
func NewLoader(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName string) *Loader {
loader := &Loader{
cfg: cfg,
cli: cli,
db2Tables: make(map[string]Tables2DataFiles),
tableInfos: make(map[string]*tableInfo),
workerWg: new(sync.WaitGroup),
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "load")),
workerName: workerName,
statusRecorder: newStatusRecorder(),
cfg: cfg,
cli: cli,
db2Tables: make(map[string]Tables2DataFiles),
tableInfos: make(map[string]*tableInfo),
workerWg: new(sync.WaitGroup),
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "load")),
workerName: workerName,
speedRecorder: export.NewSpeedRecorder(),
}
loader.fileJobQueueClosed.Store(true) // not open yet
return loader
Expand Down
55 changes: 7 additions & 48 deletions dm/loader/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,68 +14,27 @@
package loader

import (
"sync"
"time"

"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"go.uber.org/zap"
)

type statusRecorder struct {
mu sync.Mutex
lastFinished int64
lastUpdateTime time.Time
speedBPS int64
}

func newStatusRecorder() *statusRecorder {
return &statusRecorder{
lastUpdateTime: time.Now(),
}
}

func (s *statusRecorder) getSpeed(finished int64) int64 {
s.mu.Lock()
defer s.mu.Unlock()

if finished == s.lastFinished {
// for finished bytes does not get forwarded, use old speed to avoid
// display zero. We may find better strategy in future.
return s.speedBPS
}

now := time.Now()
elapsed := int64(now.Sub(s.lastUpdateTime).Seconds())
if elapsed == 0 {
elapsed = 1
}
currentSpeed := (finished - s.lastFinished) / elapsed
if currentSpeed == 0 {
currentSpeed = 1
}

s.lastFinished = finished
s.lastUpdateTime = now
s.speedBPS = currentSpeed

return currentSpeed
}

// Status implements Unit.Status.
func (l *Loader) Status(_ *binlog.SourceStatus) interface{} {
finishedSize := l.finishedDataSize.Load()
totalSize := l.totalDataSize.Load()
progress := percent(finishedSize, totalSize, l.finish.Load())
currentSpeed := l.statusRecorder.getSpeed(finishedSize)
currentSpeed := int64(l.speedRecorder.GetSpeed(float64(finishedSize)))

s := &pb.LoadStatus{
FinishedBytes: finishedSize,
TotalBytes: totalSize,
Progress: progress,
MetaBinlog: l.metaBinlog.Load(),
MetaBinlogGTID: l.metaBinlogGTID.Load(),
CurrentSpeedBytesPerSecond: currentSpeed,
FinishedBytes: finishedSize,
TotalBytes: totalSize,
Progress: progress,
MetaBinlog: l.metaBinlog.Load(),
MetaBinlogGTID: l.metaBinlogGTID.Load(),
Bps: currentSpeed,
}
go l.printStatus()
return s
Expand Down
3 changes: 2 additions & 1 deletion dm/loader/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import (
"sync"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/atomic"
)

func (*testLoaderSuite) TestConcurrentStatus(c *C) {
l := &Loader{statusRecorder: newStatusRecorder()}
l := &Loader{speedRecorder: export.NewSpeedRecorder()}
l.cfg = &config.SubTaskConfig{}
l.logger = log.L()
l.finishedDataSize.Store(100)
Expand Down
Loading

0 comments on commit 02a7f72

Please sign in to comment.