Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge branch 'release-2.0' into cherry-pick-1699-to-release-2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored Jun 10, 2021
2 parents eaadf5f + dcc6842 commit 4800cba
Show file tree
Hide file tree
Showing 21 changed files with 553 additions and 216 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ define run_unit_test
mkdir -p $(TEST_DIR)
$(FAILPOINT_ENABLE)
@export log_level=error; \
$(GOTEST) -covermode=atomic -coverprofile="$(TEST_DIR)/cov.$(2).out" $(TEST_RACE_FLAG) $(1) \
$(GOTEST) -timeout 2m -covermode=atomic -coverprofile="$(TEST_DIR)/cov.$(2).out" $(TEST_RACE_FLAG) $(1) \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)
endef
Expand Down
309 changes: 173 additions & 136 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ message SyncStatus {
repeated ShardingGroup unresolvedGroups = 9; // sharding groups which current are un-resolved
bool synced = 10; // whether sync is catched-up in this moment
string binlogType = 11;
int64 secondsBehindMaster = 12; // sync unit delay seconds behind master.
}

// SourceStatus represents status for source runing on dm-worker
Expand Down
6 changes: 5 additions & 1 deletion dm/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/syncer"
)

// Status returns the status of the current sub task.
Expand Down Expand Up @@ -100,7 +101,10 @@ func (w *Worker) Status(stName string) []*pb.SubTaskStatus {
case pb.UnitType_Load:
stStatus.Status = &pb.SubTaskStatus_Load{Load: us.(*pb.LoadStatus)}
case pb.UnitType_Sync:
stStatus.Status = &pb.SubTaskStatus_Sync{Sync: us.(*pb.SyncStatus)}
cus := cu.(*syncer.Syncer) // ss must be *syncer.Syncer
ss := us.(*pb.SyncStatus)
ss.SecondsBehindMaster = cus.GetSecondsBehindMaster()
stStatus.Status = &pb.SubTaskStatus_Sync{Sync: ss}
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,18 @@ func GetServerUUID(ctx context.Context, db *sql.DB, flavor string) (string, erro
return serverUUID, err
}

// GetServerUnixTS gets server's `UNIX_TIMESTAMP()`.
func GetServerUnixTS(ctx context.Context, db *sql.DB) (int64, error) {
var ts int64
row := db.QueryRowContext(ctx, "SELECT UNIX_TIMESTAMP()")
err := row.Scan(&ts)
if err != nil {
log.L().Error("can't SELECT UNIX_TIMESTAMP()", zap.Error(err))
return ts, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
return ts, err
}

// GetMariaDBUUID gets equivalent `server_uuid` for MariaDB
// `gtid_domain_id` joined `server_id` with domainServerIDSeparator.
func GetMariaDBUUID(ctx context.Context, db *sql.DB) (string, error) {
Expand Down
18 changes: 18 additions & 0 deletions pkg/utils/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package utils

import (
"context"
"strconv"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -167,6 +169,22 @@ func (t *testDBSuite) TestGetServerUUID(c *C) {
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (t *testDBSuite) TestGetServerUnixTS(c *C) {
ctx := context.Background()

db, mock, err := sqlmock.New()
c.Assert(err, IsNil)

ts := time.Now().Unix()
rows := sqlmock.NewRows([]string{"UNIX_TIMESTAMP()"}).AddRow(strconv.FormatInt(ts, 10))
mock.ExpectQuery("SELECT UNIX_TIMESTAMP()").WillReturnRows(rows)

ts2, err := GetServerUnixTS(ctx, db)
c.Assert(err, IsNil)
c.Assert(ts, Equals, ts2)
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (t *testDBSuite) TestGetParser(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), DefaultDBTimeout)
defer cancel()
Expand Down
4 changes: 4 additions & 0 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (conn *UpStreamConn) getServerUUID(ctx context.Context, flavor string) (str
return utils.GetServerUUID(ctx, conn.BaseDB.DB, flavor)
}

func (conn *UpStreamConn) getServerUnixTS(ctx context.Context) (int64, error) {
return utils.GetServerUnixTS(ctx, conn.BaseDB.DB)
}

func (conn *UpStreamConn) getParser(ctx context.Context) (*parser.Parser, error) {
return utils.GetParser(ctx, conn.BaseDB.DB)
}
Expand Down
6 changes: 6 additions & 0 deletions syncer/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (s *testDBSuite) TestGetServerID(c *C) {
c.Assert(id, Greater, uint32(0))
}

func (s *testDBSuite) TestGetServerUnixTS(c *C) {
id, err := utils.GetServerUnixTS(context.Background(), s.db)
c.Assert(err, IsNil)
c.Assert(id, Greater, int64(0))
}

func (s *testDBSuite) TestBinaryLogs(c *C) {
files, err := getBinaryLogs(s.db)
c.Assert(err, IsNil)
Expand Down
52 changes: 31 additions & 21 deletions syncer/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package syncer
import (
"fmt"

"github.com/go-mysql-org/go-mysql/replication"

"github.com/pingcap/dm/pkg/binlog"
)

Expand Down Expand Up @@ -73,41 +75,48 @@ type job struct {
currentLocation binlog.Location // end location of the sql in binlog, for user to skip sql manually by changing checkpoint
ddls []string
originSQL string // show origin sql when error, only DDL now

eventHeader *replication.EventHeader
}

func (j *job) String() string {
// only output some important information, maybe useful in execution.
return fmt.Sprintf("tp: %s, sql: %s, args: %v, key: %s, ddls: %s, last_location: %s, start_location: %s, current_location: %s", j.tp, j.sql, j.args, j.key, j.ddls, j.location, j.startLocation, j.currentLocation)
}

func newJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql string, args []interface{}, key string, location, startLocation, cmdLocation binlog.Location) *job {
func newDMLJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql string, args []interface{},
key string, location, startLocation, cmdLocation binlog.Location, eventHeader *replication.EventHeader) *job {
return &job{
tp: tp,
sourceTbl: map[string][]string{sourceSchema: {sourceTable}},
targetSchema: targetSchema,
targetTable: targetTable,
sql: sql,
args: args,
key: key,
startLocation: startLocation,
tp: tp,
sourceTbl: map[string][]string{sourceSchema: {sourceTable}},
targetSchema: targetSchema,
targetTable: targetTable,
sql: sql,
args: args,
key: key,
retry: true,

location: location,
startLocation: startLocation,
currentLocation: cmdLocation,
retry: true,
eventHeader: eventHeader,
}
}

// newDDL job is used to create a new ddl job
// when cfg.ShardMode == "", ddlInfo == nil,sourceTbls != nil, we use sourceTbls to record ddl affected tables.
// when cfg.ShardMode == ShardOptimistic || ShardPessimistic, ddlInfo != nil, sourceTbls == nil.
func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, location, startLocation, cmdLocation binlog.Location,
sourceTbls map[string]map[string]struct{}, originSQL string) *job {
sourceTbls map[string]map[string]struct{}, originSQL string, eventHeader *replication.EventHeader) *job {
j := &job{
tp: ddl,
ddls: ddls,
tp: ddl,
ddls: ddls,
originSQL: originSQL,

location: location,
startLocation: startLocation,
currentLocation: cmdLocation,
originSQL: originSQL,
eventHeader: eventHeader,
}

if ddlInfo != nil {
Expand All @@ -130,6 +139,14 @@ func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, location, startLocation,
return j
}

func newSkipJob(ec *eventContext) *job {
return &job{
tp: skip,
location: *ec.lastLocation,
eventHeader: ec.header,
}
}

func newXIDJob(location, startLocation, cmdLocation binlog.Location) *job {
return &job{
tp: xid,
Expand All @@ -145,13 +162,6 @@ func newFlushJob() *job {
}
}

func newSkipJob(location binlog.Location) *job {
return &job{
tp: skip,
location: location,
}
}

// put queues into bucket to monitor them.
func queueBucketName(queueID int) string {
return fmt.Sprintf("q_%d", queueID%defaultBucketCount)
Expand Down
8 changes: 5 additions & 3 deletions syncer/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,17 @@ func (t *testJobSuite) TestJob(c *C) {
},
}

location := binlog.NewLocation("")
ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location}
testCases := []struct {
job *job
jobStr string
}{
{
newJob(insert, "test", "t1", "test", "t1", "insert into test.t1 values(?)", []interface{}{1}, "1", binlog.NewLocation(""), binlog.NewLocation(""), binlog.NewLocation("")),
newDMLJob(insert, "test", "t1", "test", "t1", "insert into test.t1 values(?)", []interface{}{1}, "1", location, location, location, ec.header),
"tp: insert, sql: insert into test.t1 values(?), args: [1], key: 1, ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ",
}, {
newDDLJob(ddlInfo, []string{"create database test"}, binlog.NewLocation(""), binlog.NewLocation(""), binlog.NewLocation(""), nil, "create database test"),
newDDLJob(ddlInfo, []string{"create database test"}, binlog.NewLocation(""), binlog.NewLocation(""), binlog.NewLocation(""), nil, "create database test", ec.header),
"tp: ddl, sql: , args: [], key: , ddls: [create database test], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ",
}, {
newXIDJob(binlog.NewLocation(""), binlog.NewLocation(""), binlog.NewLocation("")),
Expand All @@ -99,7 +101,7 @@ func (t *testJobSuite) TestJob(c *C) {
newFlushJob(),
"tp: flush, sql: , args: [], key: , ddls: [], last_location: position: (, 0), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ",
}, {
newSkipJob(binlog.NewLocation("")),
newSkipJob(ec),
"tp: skip, sql: , args: [], key: , ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 0), gtid-set: , current_location: position: (, 0), gtid-set: ",
},
}
Expand Down
2 changes: 0 additions & 2 deletions syncer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ var (
Help: "counter for syncer exits with error",
}, []string{"task", "source_id"})

// TODO(ehco): fix this.
// some problems with it.
replicationLagGauge = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Expand Down
2 changes: 1 addition & 1 deletion syncer/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *Syncer) handleQueryEventOptimistic(
tableNames: needTrackDDLs[0].tableNames,
stmt: needTrackDDLs[0].stmt,
}
job := newDDLJob(ddlInfo, needHandleDDLs, *ec.lastLocation, *ec.startLocation, *ec.currentLocation, nil, originSQL)
job := newDDLJob(ddlInfo, needHandleDDLs, *ec.lastLocation, *ec.startLocation, *ec.currentLocation, nil, originSQL, ec.header)
err = s.addJobFunc(job)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 4800cba

Please sign in to comment.