Skip to content

Commit

Permalink
worker(dm): fix no rows in SHOW MASTER STATUS will clear query-status (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 22, 2022
1 parent 2ebc95f commit 383b7a0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
4 changes: 2 additions & 2 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,9 +647,9 @@ func (w *SourceWorker) QueryStatus(ctx context.Context, name string) ([]*pb.SubT
if err := w.updateSourceStatus(ctx); err != nil {
if terror.ErrNoMasterStatus.Equal(err) {
w.l.Warn("This source's bin_log is OFF, so it only supports full_mode.", zap.String("sourceID", w.cfg.SourceID), zap.Error(err))
return nil, nil, nil
} else {
w.l.Error("failed to update source status", zap.Error(err))
}
w.l.Error("failed to update source status", zap.Error(err))
} else {
sourceStatus = w.sourceStatus.Load().(*binlog.SourceStatus)
}
Expand Down
31 changes: 31 additions & 0 deletions dm/dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"

Expand Down Expand Up @@ -113,6 +115,11 @@ func (t *testServer) testWorker(c *C) {
c.Assert(err, ErrorMatches, ".*worker already closed.*")
}

func mockShowMasterStatusNoRows(mockDB sqlmock.Sqlmock) {
rows := mockDB.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"})
mockDB.ExpectQuery(`SHOW MASTER STATUS`).WillReturnRows(rows)
}

type testServer2 struct{}

var _ = Suite(&testServer2{})
Expand Down Expand Up @@ -626,3 +633,27 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) {
return w.relayHolder.Stage() == pb.Stage_Stopped
}), IsTrue)
}

func TestMasterBinlogOff(t *testing.T) {
ctx := context.Background()
cfg, err := config.LoadFromFile(sourceSampleFile)
require.NoError(t, err)
cfg.From.Password = "no need to connect"

w, err := NewSourceWorker(cfg, nil, "", "")
require.NoError(t, err)
w.closed.Store(false)

// start task
var subtaskCfg config.SubTaskConfig
require.NoError(t, subtaskCfg.DecodeFile(subtaskSampleFile, true))
require.NoError(t, w.StartSubTask(&subtaskCfg, pb.Stage_Running, true))

_, mockDB, err := conn.InitMockDBFull()
require.NoError(t, err)
mockShowMasterStatusNoRows(mockDB)
status, _, err := w.QueryStatus(ctx, subtaskCfg.Name)
require.NoError(t, err)
require.Len(t, status, 1)
require.Equal(t, subtaskCfg.Name, status[0].Name)
}
13 changes: 13 additions & 0 deletions dm/pkg/conn/mockdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ func InitVersionDB(c *check.C) sqlmock.Sqlmock {
return mock
}

func InitMockDBFull() (*sql.DB, sqlmock.Sqlmock, error) {
db, mock, err := sqlmock.New()
if err != nil {
return nil, nil, err
}
if mdbp, ok := DefaultDBProvider.(*mockDBProvider); ok {
mdbp.db = db
} else {
DefaultDBProvider = &mockDBProvider{db: db}
}
return db, mock, err
}

// TODO: export Config in https://github.com/pingcap/tidb/blob/a8fa29b56d633b1ec843e21cb89131dd4fd601db/br/pkg/mock/mock_cluster.go#L35
// Cluster is mock tidb cluster.
type Cluster struct {
Expand Down

0 comments on commit 383b7a0

Please sign in to comment.