From 7aa818ad98d5c867e3a0594d56cfa6f95a26d794 Mon Sep 17 00:00:00 2001 From: ehco1996 Date: Wed, 9 Mar 2022 13:30:33 +0800 Subject: [PATCH] address comment --- dm/_utils/terror_gen/errors_release.txt | 2 +- dm/dm/master/scheduler/scheduler.go | 28 +- dm/dm/pb/dmworker.pb.go | 59 ++-- dm/dm/proto/dmworker.proto | 441 ++++++++++++------------ dm/dm/worker/source_worker.go | 18 +- dm/errors.toml | 2 +- dm/pkg/terror/error_list.go | 2 +- 7 files changed, 273 insertions(+), 279 deletions(-) mode change 100644 => 100755 dm/dm/worker/source_worker.go diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index 29ec2801392..44762d1475f 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -538,7 +538,7 @@ ErrSchedulerRelayWorkersWrongBound,[code=46021:class=scheduler:scope=internal:le ErrSchedulerRelayWorkersWrongRelay,[code=46022:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for another sources %s respectively, Workaround: Please correct sources in `stop-relay`." ErrSchedulerSourceOpRelayExist,[code=46023:class=scheduler:scope=internal:level=high], "Message: source with name %s need to operate has existing relay workers %s, Workaround: Please `stop-relay` first." ErrSchedulerLatchInUse,[code=46024:class=scheduler:scope=internal:level=low], "Message: when %s, resource %s is in use by other client, Workaround: Please try again later" -ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update when no relay workers and no running tasks for now" +ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update when not enable relay and no running tasks for now" ErrSchedulerWrongWorkerInput,[code=46026:class=scheduler:scope=internal:level=medium], "Message: require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]" ErrSchedulerBoundDiffWithStartedRelay,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s], Workaround: If you intend to bind the source with worker, you can stop-relay for current source." ErrSchedulerStartRelayOnSpecified,[code=46028:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now, Workaround: Please stop all relay workers first, or specify worker name for `start-relay`." diff --git a/dm/dm/master/scheduler/scheduler.go b/dm/dm/master/scheduler/scheduler.go index b9b71282b00..b89986fd2ec 100644 --- a/dm/dm/master/scheduler/scheduler.go +++ b/dm/dm/master/scheduler/scheduler.go @@ -146,7 +146,6 @@ type Scheduler struct { // a mirror of bounds whose element is not deleted when worker unbound. worker -> SourceBound lastBound map[string]ha.SourceBound - // TODO: seems this memory status is useless. // expectant relay stages for sources, source ID -> stage. // add: // - bound the source to a worker (at first time). @@ -412,16 +411,12 @@ func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error { if tasks := s.GetTaskNameListBySourceName(cfg.SourceID, &runningStage); len(tasks) > 0 { return terror.ErrSchedulerSourceCfgUpdate.Generate(cfg.SourceID) } - // 3. check if there is relay workers for this source - relayWorkers, err := s.getRelayWorkers(cfg.SourceID) - if err != nil { - return err - } - if len(relayWorkers) > 0 { + // 3. check if this source is enable relay + if _, ok := s.expectRelayStages[cfg.SourceID]; ok { return terror.ErrSchedulerSourceCfgUpdate.Generate(cfg.SourceID) } // 4. put the config into etcd. - _, err = ha.PutSourceCfg(s.etcdCli, cfg) + _, err := ha.PutSourceCfg(s.etcdCli, cfg) if err != nil { return err } @@ -1048,10 +1043,9 @@ func (s *Scheduler) UpdateSubTasks(ctx context.Context, cfgs ...config.SubTaskCo for _, cfg := range cfgs { taskNamesM[cfg.Name] = struct{}{} } - taskNames := strMapToSlice(taskNamesM) - if len(taskNames) > 1 { + if len(taskNamesM) > 1 { // only subtasks from one task supported now. - return terror.ErrSchedulerMultiTask.Generate(taskNames) + return terror.ErrSchedulerMultiTask.Generate(strMapToSlice(taskNamesM)) } // check whether exists. for _, cfg := range cfgs { @@ -1066,18 +1060,17 @@ func (s *Scheduler) UpdateSubTasks(ctx context.Context, cfgs ...config.SubTaskCo } } // check whether in running stage - for _, cfg := range cfgs { - stage := s.GetExpectSubTaskStage(cfg.Name, cfg.SourceID) - if stage.Expect == pb.Stage_Running { - return terror.ErrSchedulerSubTaskCfgUpdate.Generate(cfg.Name, cfg.SourceID) - } + cfg := cfgs[0] + stage := s.GetExpectSubTaskStage(cfg.Name, cfg.SourceID) + if stage.Expect == pb.Stage_Running { + return terror.ErrSchedulerSubTaskCfgUpdate.Generate(cfg.Name, cfg.SourceID) } // check by workers todo batch for _, cfg := range cfgs { worker := s.bounds[cfg.SourceID] if worker == nil { - return terror.ErrWorkerNoStart + return terror.ErrSchedulerSubTaskCfgUpdate.Generatef("this source: %s have not bound to worker", cfg.SourceID) } resp, err := worker.checkSubtasksCanUpdate(ctx, &cfg) if err != nil { @@ -1741,7 +1734,6 @@ func (s *Scheduler) UpdateExpectSubTaskStage(newStage pb.Stage, taskName string, // GetExpectSubTaskStage returns the current expect subtask stage. // If the stage not exists, an invalid stage is returned. -// This func is used for testing. func (s *Scheduler) GetExpectSubTaskStage(task, source string) ha.Stage { invalidStage := ha.NewSubTaskStage(pb.Stage_InvalidStage, source, task) diff --git a/dm/dm/pb/dmworker.pb.go b/dm/dm/pb/dmworker.pb.go index cabd96046b8..04eb73c35ac 100644 --- a/dm/dm/pb/dmworker.pb.go +++ b/dm/dm/pb/dmworker.pb.go @@ -81,15 +81,12 @@ func (TaskOp) EnumDescriptor() ([]byte, []int) { // is transferred from Paused when resuming is requested // transfers to Paused when error occurred or requested from external // transfers to Stopped when requested from external -// transfers to Finished when sub task processing completed (no Syncer -// used) -// Paused: indicates the processing is paused, and can be resume from external -// request -// is transferred from Running when error occurred or requested from -// external transfers to Running when resuming is requested from -// external transfers to Stopped when requested from external -// Stopped: indicates the processing is stopped, and can not be resume (or -// re-run) again +// transfers to Finished when sub task processing completed (no Syncer used) +// Paused: indicates the processing is paused, and can be resume from external request +// is transferred from Running when error occurred or requested from external +// transfers to Running when resuming is requested from external +// transfers to Stopped when requested from external +// Stopped: indicates the processing is stopped, and can not be resume (or re-run) again // is transferred from Running / Paused when requested from external // can not transfer to any stages // Finished: indicates the processing is finished, and no need to re-run @@ -764,10 +761,12 @@ func (m *LoadStatus) GetMetaBinlogGTID() string { return "" } -// ShardingGroup represents a DDL sharding group, this is used by SyncStatus, -// and is differ from ShardingGroup in syncer pkg target: target table name DDL: -// in syncing DDL firstPos: first DDL binlog pos for this group synced: synced -// source tables unsynced: unsynced source tables +// ShardingGroup represents a DDL sharding group, this is used by SyncStatus, and is differ from ShardingGroup in syncer pkg +// target: target table name +// DDL: in syncing DDL +// firstPos: first DDL binlog pos for this group +// synced: synced source tables +// unsynced: unsynced source tables type ShardingGroup struct { Target string `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"` DDLs []string `protobuf:"bytes,2,rep,name=DDLs,proto3" json:"DDLs,omitempty"` @@ -1924,8 +1923,7 @@ func (m *SubTaskErrorList) GetError() []*SubTaskError { // ProcessResult represents results produced by a dm unit // isCanceled: indicates whether the process is canceled from external -// when Stop or Pause is requested from external, isCanceled will be -// true +// when Stop or Pause is requested from external, isCanceled will be true // errors: includes all (potential) errors occured when processing type ProcessResult struct { IsCanceled bool `protobuf:"varint,1,opt,name=isCanceled,proto3" json:"isCanceled,omitempty"` @@ -1988,9 +1986,8 @@ func (m *ProcessResult) GetDetail() []byte { } // ProcessError is same as terror used in dm -// NOTE: currently stack trace is not supported, `Message` is the -// `terror.Error.getMsg` result and `RawCause` is the `Error` result of error -// from `terror.Error.Cause()`. +// NOTE: currently stack trace is not supported, `Message` is the `terror.Error.getMsg` result +// and `RawCause` is the `Error` result of error from `terror.Error.Cause()`. type ProcessError struct { ErrCode int32 `protobuf:"varint,1,opt,name=ErrCode,proto3" json:"ErrCode,omitempty"` ErrClass string `protobuf:"bytes,2,opt,name=ErrClass,proto3" json:"ErrClass,omitempty"` @@ -2083,11 +2080,11 @@ func (m *ProcessError) GetWorkaround() string { return "" } -// PurgeRelayRequest represents a request to purge relay log files for this -// dm-worker inactive: whether purge inactive relay log files time: whether -// purge relay log files before this time, the number of seconds elapsed since -// January 1, 1970 UTC filename: whether purge relay log files before this -// filename subDir: specify relay sub directory for @filename +// PurgeRelayRequest represents a request to purge relay log files for this dm-worker +// inactive: whether purge inactive relay log files +// time: whether purge relay log files before this time, the number of seconds elapsed since January 1, 1970 UTC +// filename: whether purge relay log files before this filename +// subDir: specify relay sub directory for @filename type PurgeRelayRequest struct { Inactive bool `protobuf:"varint,1,opt,name=inactive,proto3" json:"inactive,omitempty"` Time int64 `protobuf:"varint,2,opt,name=time,proto3" json:"time,omitempty"` @@ -2896,15 +2893,13 @@ type WorkerClient interface { // PurgeRelay purges relay log files for this dm-worker PurgeRelay(ctx context.Context, in *PurgeRelayRequest, opts ...grpc.CallOption) (*CommonWorkerResponse, error) // Operate (get/set/remove) schema for a specified table in tracker. - // a `set`/`remove` operation should be an one-time operation (only take - // effect once), so we use a gRPC method rather than a etcd operation now (no - // persistent operation state). + // a `set`/`remove` operation should be an one-time operation (only take effect once), + // so we use a gRPC method rather than a etcd operation now (no persistent operation state). OperateSchema(ctx context.Context, in *OperateWorkerSchemaRequest, opts ...grpc.CallOption) (*CommonWorkerResponse, error) OperateV1Meta(ctx context.Context, in *OperateV1MetaRequest, opts ...grpc.CallOption) (*OperateV1MetaResponse, error) HandleError(ctx context.Context, in *HandleWorkerErrorRequest, opts ...grpc.CallOption) (*CommonWorkerResponse, error) GetWorkerCfg(ctx context.Context, in *GetWorkerCfgRequest, opts ...grpc.CallOption) (*GetWorkerCfgResponse, error) - // only some fields of the configuration of the subtask in the sync phase can - // be updated + // only some fields of the configuration of the subtask in the sync phase can be updated CheckSubtasksCanUpdate(ctx context.Context, in *CheckSubtasksCanUpdateRequest, opts ...grpc.CallOption) (*CheckSubtasksCanUpdateResponse, error) } @@ -2985,15 +2980,13 @@ type WorkerServer interface { // PurgeRelay purges relay log files for this dm-worker PurgeRelay(context.Context, *PurgeRelayRequest) (*CommonWorkerResponse, error) // Operate (get/set/remove) schema for a specified table in tracker. - // a `set`/`remove` operation should be an one-time operation (only take - // effect once), so we use a gRPC method rather than a etcd operation now (no - // persistent operation state). + // a `set`/`remove` operation should be an one-time operation (only take effect once), + // so we use a gRPC method rather than a etcd operation now (no persistent operation state). OperateSchema(context.Context, *OperateWorkerSchemaRequest) (*CommonWorkerResponse, error) OperateV1Meta(context.Context, *OperateV1MetaRequest) (*OperateV1MetaResponse, error) HandleError(context.Context, *HandleWorkerErrorRequest) (*CommonWorkerResponse, error) GetWorkerCfg(context.Context, *GetWorkerCfgRequest) (*GetWorkerCfgResponse, error) - // only some fields of the configuration of the subtask in the sync phase can - // be updated + // only some fields of the configuration of the subtask in the sync phase can be updated CheckSubtasksCanUpdate(context.Context, *CheckSubtasksCanUpdateRequest) (*CheckSubtasksCanUpdateResponse, error) } diff --git a/dm/dm/proto/dmworker.proto b/dm/dm/proto/dmworker.proto index 145a3d87a08..200aa7a8912 100644 --- a/dm/dm/proto/dmworker.proto +++ b/dm/dm/proto/dmworker.proto @@ -2,61 +2,57 @@ syntax = "proto3"; package pb; + service Worker { - rpc QueryStatus(QueryStatusRequest) returns (QueryStatusResponse) {} + rpc QueryStatus (QueryStatusRequest) returns (QueryStatusResponse) {} - // PurgeRelay purges relay log files for this dm-worker - rpc PurgeRelay(PurgeRelayRequest) returns (CommonWorkerResponse) {} + // PurgeRelay purges relay log files for this dm-worker + rpc PurgeRelay (PurgeRelayRequest) returns (CommonWorkerResponse) {} - // Operate (get/set/remove) schema for a specified table in tracker. - // a `set`/`remove` operation should be an one-time operation (only take - // effect once), so we use a gRPC method rather than a etcd operation now (no - // persistent operation state). - rpc OperateSchema(OperateWorkerSchemaRequest) returns (CommonWorkerResponse) { - } + // Operate (get/set/remove) schema for a specified table in tracker. + // a `set`/`remove` operation should be an one-time operation (only take effect once), + // so we use a gRPC method rather than a etcd operation now (no persistent operation state). + rpc OperateSchema(OperateWorkerSchemaRequest) returns(CommonWorkerResponse) {} - rpc OperateV1Meta(OperateV1MetaRequest) returns (OperateV1MetaResponse) {} + rpc OperateV1Meta(OperateV1MetaRequest) returns(OperateV1MetaResponse) {} - rpc HandleError(HandleWorkerErrorRequest) returns (CommonWorkerResponse) {} + rpc HandleError(HandleWorkerErrorRequest) returns(CommonWorkerResponse) {} - rpc GetWorkerCfg(GetWorkerCfgRequest) returns (GetWorkerCfgResponse) {} + rpc GetWorkerCfg(GetWorkerCfgRequest) returns(GetWorkerCfgResponse) {} - // only some fields of the configuration of the subtask in the sync phase can - // be updated - rpc CheckSubtasksCanUpdate(CheckSubtasksCanUpdateRequest) - returns (CheckSubtasksCanUpdateResponse) {} + // only some fields of the configuration of the subtask in the sync phase can be updated + rpc CheckSubtasksCanUpdate(CheckSubtasksCanUpdateRequest) returns (CheckSubtasksCanUpdateResponse) {} } enum TaskOp { - InvalidOp = 0; // placeholder - Stop = 1; - Pause = 2; - Resume = 3; - Start = 4; - Update = 5; - AutoResume = 6; - Delete = 7; + InvalidOp = 0; // placeholder + Stop = 1; + Pause = 2; + Resume = 3; + Start = 4; + Update = 5; + AutoResume = 6; + Delete = 7; } message QueryStatusRequest { - string name = 1; // sub task's name, empty for all sub tasks + string name = 1; // sub task's name, empty for all sub tasks } message CommonWorkerResponse { - bool result = 1; - string msg = - 2; // when result is true, msg is empty, except for operate schema - string source = 3; // source ID, set by dm-master - string worker = 4; // worker name, set by dm-worker config + bool result = 1; + string msg = 2; // when result is true, msg is empty, except for operate schema + string source = 3; // source ID, set by dm-master + string worker = 4; // worker name, set by dm-worker config } // QueryStatusResponse represents status response for query on a dm-worker // status: dm-worker's current sub tasks' status message QueryStatusResponse { - bool result = 1; - string msg = 2; - SourceStatus sourceStatus = 3; - repeated SubTaskStatus subTaskStatus = 4; + bool result = 1; + string msg = 2; + SourceStatus sourceStatus = 3; + repeated SubTaskStatus subTaskStatus = 4; } // Stage represents current stage for a (sub) task @@ -70,111 +66,109 @@ message QueryStatusResponse { // is transferred from Paused when resuming is requested // transfers to Paused when error occurred or requested from external // transfers to Stopped when requested from external -// transfers to Finished when sub task processing completed (no Syncer -// used) -// Paused: indicates the processing is paused, and can be resume from external -// request -// is transferred from Running when error occurred or requested from -// external transfers to Running when resuming is requested from -// external transfers to Stopped when requested from external -// Stopped: indicates the processing is stopped, and can not be resume (or -// re-run) again +// transfers to Finished when sub task processing completed (no Syncer used) +// Paused: indicates the processing is paused, and can be resume from external request +// is transferred from Running when error occurred or requested from external +// transfers to Running when resuming is requested from external +// transfers to Stopped when requested from external +// Stopped: indicates the processing is stopped, and can not be resume (or re-run) again // is transferred from Running / Paused when requested from external // can not transfer to any stages // Finished: indicates the processing is finished, and no need to re-run // is transferred from Running when processing completed // should not transfer to any stages enum Stage { - InvalidStage = 0; // placeholder - New = 1; - Running = 2; - Paused = 3; - Stopped = 4; - Finished = 5; - - Pausing = 6; - Resuming = 7; - Stopping = 8; + InvalidStage = 0; // placeholder + New = 1; + Running = 2; + Paused = 3; + Stopped = 4; + Finished = 5; + + Pausing = 6; + Resuming = 7; + Stopping = 8; } // CheckStatus represents status for check unit // adds fields later message CheckStatus { - bool passed = 1; - int32 total = 2; - int32 successful = 3; - int32 failed = 4; - int32 warning = 5; + bool passed = 1; + int32 total = 2; + int32 successful = 3; + int32 failed = 4; + int32 warning = 5; - bytes detail = 6; + bytes detail = 6; } // DumpStatus represents status for dump unit // add fields later message DumpStatus { - int64 totalTables = 1; - double completedTables = 2; - double finishedBytes = 3; - double finishedRows = 4; - double estimateTotalRows = 5; + int64 totalTables = 1; + double completedTables = 2; + double finishedBytes = 3; + double finishedRows = 4; + double estimateTotalRows = 5; } // LoadStatus represents status for load unit message LoadStatus { - int64 finishedBytes = 1; - int64 totalBytes = 2; - string progress = 3; - string metaBinlog = 4; - string metaBinlogGTID = 5; + int64 finishedBytes = 1; + int64 totalBytes = 2; + string progress = 3; + string metaBinlog = 4; + string metaBinlogGTID = 5; } -// ShardingGroup represents a DDL sharding group, this is used by SyncStatus, -// and is differ from ShardingGroup in syncer pkg target: target table name DDL: -// in syncing DDL firstPos: first DDL binlog pos for this group synced: synced -// source tables unsynced: unsynced source tables +// ShardingGroup represents a DDL sharding group, this is used by SyncStatus, and is differ from ShardingGroup in syncer pkg +// target: target table name +// DDL: in syncing DDL +// firstPos: first DDL binlog pos for this group +// synced: synced source tables +// unsynced: unsynced source tables message ShardingGroup { - string target = 1; - repeated string DDLs = 2; - string firstLocation = 3; - repeated string synced = 4; - repeated string unsynced = 5; + string target = 1; + repeated string DDLs = 2; + string firstLocation = 3; + repeated string synced = 4; + repeated string unsynced = 5; } // SyncStatus represents status for sync unit message SyncStatus { - int64 totalEvents = 1; - int64 totalTps = 2; - int64 recentTps = 3; - string masterBinlog = 4; - string masterBinlogGtid = 5; - string syncerBinlog = 6; - string syncerBinlogGtid = 7; - repeated string blockingDDLs = 8; // sharding DDL which current is blocking - repeated ShardingGroup unresolvedGroups = - 9; // sharding groups which current are un-resolved - bool synced = 10; // whether sync is catched-up in this moment - string binlogType = 11; - int64 secondsBehindMaster = 12; // sync unit delay seconds behind master. + int64 totalEvents = 1; + int64 totalTps = 2; + int64 recentTps = 3; + string masterBinlog = 4; + string masterBinlogGtid = 5; + string syncerBinlog = 6; + string syncerBinlogGtid = 7; + repeated string blockingDDLs = 8; // sharding DDL which current is blocking + repeated ShardingGroup unresolvedGroups = 9; // sharding groups which current are un-resolved + bool synced = 10; // whether sync is catched-up in this moment + string binlogType = 11; + int64 secondsBehindMaster = 12; // sync unit delay seconds behind master. } // SourceStatus represents status for source runing on dm-worker message SourceStatus { - string source = 1; // sourceID, set by dm-master - string worker = 2; // bounded worker name for this source - ProcessResult result = 3; - RelayStatus relayStatus = 4; + string source = 1; // sourceID, set by dm-master + string worker = 2; // bounded worker name for this source + ProcessResult result = 3; + RelayStatus relayStatus = 4; } // RelayStatus represents status for relay unit. message RelayStatus { - string masterBinlog = 1; - string masterBinlogGtid = 2; - string relaySubDir = 3; - string relayBinlog = 4; - string relayBinlogGtid = 5; - bool relayCatchUpMaster = 6; - Stage stage = 7; - ProcessResult result = 8; + string masterBinlog = 1; + string masterBinlogGtid = 2; + string relaySubDir = 3; + string relayBinlog = 4; + string relayBinlogGtid = 5; + bool relayCatchUpMaster = 6; + Stage stage = 7; + ProcessResult result = 8; } // SubTaskStatus represents status for a sub task @@ -188,54 +182,66 @@ message RelayStatus { // for Load, includes total bytes, progress, etc. // for Sync, includes TPS, binlog meta, etc. message SubTaskStatus { - string name = 1; - Stage stage = 2; - UnitType unit = 3; - ProcessResult result = 4; - string unresolvedDDLLockID = 5; - oneof status { - string msg = 6; // message when error occurred - CheckStatus check = 7; - DumpStatus dump = 8; - LoadStatus load = 9; - SyncStatus sync = 10; - } + string name = 1; + Stage stage = 2; + UnitType unit = 3; + ProcessResult result = 4; + string unresolvedDDLLockID = 5; + oneof status { + string msg = 6; // message when error occurred + CheckStatus check = 7; + DumpStatus dump = 8; + LoadStatus load = 9; + SyncStatus sync = 10; + } } // SubTaskStatusList used for internal jsonpb marshal -message SubTaskStatusList { repeated SubTaskStatus status = 1; } +message SubTaskStatusList { + repeated SubTaskStatus status = 1; +} // CheckError represents error for check unit // adds fields later -message CheckError { string msg = 1; } +message CheckError { + string msg = 1; +} // DumpError represents error for dump unit // add fields later -message DumpError { string msg = 1; } +message DumpError { + string msg = 1; +} // LoadError represents error for load unit -message LoadError { string msg = 1; } +message LoadError { + string msg = 1; +} // SyncSQLError represents a sql error in sync unit message SyncSQLError { - string msg = 1; - string failedBinlogPosition = 2; - string errorSQL = 3; + string msg = 1; + string failedBinlogPosition = 2; + string errorSQL = 3; } // SyncError represents error list for sync unit -message SyncError { repeated SyncSQLError errors = 1; } +message SyncError { + repeated SyncSQLError errors = 1; +} // SourceError represents error for start/stop source on dm-worker message SourceError { - string source = 1; - string worker = 2; - string SourceError = 3; - RelayError RelayError = 4; // RelayError represents error for relay unit. + string source = 1; + string worker = 2; + string SourceError = 3; + RelayError RelayError = 4; // RelayError represents error for relay unit. } // RelayError represents error for relay unit. -message RelayError { string msg = 1; } +message RelayError { + string msg = 1; +} // SubTaskError represents error for a sub task during running // name: sub task'name, when starting a sub task the name should be unique @@ -244,144 +250,149 @@ message RelayError { string msg = 1; } // error: current unit's error information // for Sync, includes failed sql, failed sql pos in binlog, etc. message SubTaskError { - string name = 1; - Stage stage = 2; - UnitType unit = 3; - oneof error { - string msg = 4; // message when error occurred - CheckError check = 5; - DumpError dump = 6; - LoadError load = 7; - SyncError sync = 8; - } + string name = 1; + Stage stage = 2; + UnitType unit = 3; + oneof error { + string msg = 4; // message when error occurred + CheckError check = 5; + DumpError dump = 6; + LoadError load = 7; + SyncError sync = 8; + } } // SubTaskErrorList used for internal jsonpb marshal -message SubTaskErrorList { repeated SubTaskError error = 1; } +message SubTaskErrorList { + repeated SubTaskError error = 1; +} // UnitType represents the dm unit's type enum UnitType { - InvalidUnit = 0; // placeholder - Check = 1; - Dump = 2; - Load = 3; - Sync = 4; - Relay = 100; // relay is a special unit + InvalidUnit = 0; // placeholder + Check = 1; + Dump = 2; + Load = 3; + Sync = 4; + Relay = 100; // relay is a special unit } // ProcessResult represents results produced by a dm unit // isCanceled: indicates whether the process is canceled from external -// when Stop or Pause is requested from external, isCanceled will be -// true +// when Stop or Pause is requested from external, isCanceled will be true // errors: includes all (potential) errors occured when processing message ProcessResult { - bool isCanceled = 1; - repeated ProcessError errors = 2; - bytes detail = 3; + bool isCanceled = 1; + repeated ProcessError errors = 2; + bytes detail = 3; } // ProcessError is same as terror used in dm -// NOTE: currently stack trace is not supported, `Message` is the -// `terror.Error.getMsg` result and `RawCause` is the `Error` result of error -// from `terror.Error.Cause()`. +// NOTE: currently stack trace is not supported, `Message` is the `terror.Error.getMsg` result +// and `RawCause` is the `Error` result of error from `terror.Error.Cause()`. message ProcessError { - int32 ErrCode = 1; - string ErrClass = 2; - string ErrScope = 3; - string ErrLevel = 4; - string Message = 5; - string RawCause = 6; - string Workaround = 7; + int32 ErrCode = 1; + string ErrClass = 2; + string ErrScope = 3; + string ErrLevel = 4; + string Message = 5; + string RawCause = 6; + string Workaround = 7; } // RelayOp differs from TaskOp enum RelayOp { - InvalidRelayOp = 0; // placeholder - StopRelay = 1; - PauseRelay = 2; - ResumeRelay = 3; + InvalidRelayOp = 0; // placeholder + StopRelay = 1; + PauseRelay = 2; + ResumeRelay = 3; } -// PurgeRelayRequest represents a request to purge relay log files for this -// dm-worker inactive: whether purge inactive relay log files time: whether -// purge relay log files before this time, the number of seconds elapsed since -// January 1, 1970 UTC filename: whether purge relay log files before this -// filename subDir: specify relay sub directory for @filename +// PurgeRelayRequest represents a request to purge relay log files for this dm-worker +// inactive: whether purge inactive relay log files +// time: whether purge relay log files before this time, the number of seconds elapsed since January 1, 1970 UTC +// filename: whether purge relay log files before this filename +// subDir: specify relay sub directory for @filename message PurgeRelayRequest { - bool inactive = 1; - int64 time = 2; - string filename = 3; - string subDir = 4; + bool inactive = 1; + int64 time = 2; + string filename = 3; + string subDir = 4; } enum SchemaOp { - InvalidSchemaOp = 0; - GetSchema = 1; - SetSchema = 2; - RemoveSchema = 3; - ListSchema = 4; - ListTable = 5; + InvalidSchemaOp = 0; + GetSchema = 1; + SetSchema = 2; + RemoveSchema = 3; + ListSchema = 4; + ListTable = 5; } message OperateWorkerSchemaRequest { - SchemaOp op = 1; // operation type - string task = 2; // task name - string source = 3; // source ID - string database = 4; // database name - string table = 5; // table name - string schema = 6; // schema content, a `CREATE TABLE` statement - bool flush = 7; // flush table info and checkpoint - bool sync = 8; // sync the table info to master - bool fromSource = 9; // update schema from source schema - bool fromTarget = 10; // update schema from target schema + SchemaOp op = 1; // operation type + string task = 2; // task name + string source = 3; // source ID + string database = 4; // database name + string table = 5; // table name + string schema = 6; // schema content, a `CREATE TABLE` statement + bool flush = 7; // flush table info and checkpoint + bool sync = 8; // sync the table info to master + bool fromSource = 9; // update schema from source schema + bool fromTarget = 10; // update schema from target schema } // copied `TaskMeta` from release-1.0 branch. message V1SubTaskMeta { - TaskOp op = 1; - Stage stage = 2; // the stage of sub-task after we apply some operations - string name = 3; // sub task's name - bytes task = 4; // (sub) task's configuration + TaskOp op = 1; + Stage stage = 2; // the stage of sub-task after we apply some operations + string name = 3; // sub task's name + bytes task = 4; // (sub) task's configuration } enum V1MetaOp { - InvalidV1MetaOp = 0; - GetV1Meta = 1; - RemoveV1Meta = 2; + InvalidV1MetaOp = 0; + GetV1Meta = 1; + RemoveV1Meta = 2; } -message OperateV1MetaRequest { V1MetaOp op = 1; } +message OperateV1MetaRequest { + V1MetaOp op = 1; +} message OperateV1MetaResponse { - bool result = 1; - string msg = 2; // error message if failed. - map meta = 3; // subtasks' meta for `get` operation. + bool result = 1; + string msg = 2; // error message if failed. + map meta = 3; // subtasks' meta for `get` operation. } enum ErrorOp { - InvalidErrorOp = 0; - Skip = 1; // skip the error event - Replace = 2; // replace the error event with a specified SQL - Revert = 3; // remove the error operator - Inject = 4; // inject a specified SQL - List = 5; // show handle error commands + InvalidErrorOp = 0; + Skip = 1; // skip the error event + Replace = 2; // replace the error event with a specified SQL + Revert = 3; // remove the error operator + Inject = 4; // inject a specified SQL + List = 5; // show handle error commands } message HandleWorkerErrorRequest { - ErrorOp op = 1; // operation type - string task = 2; // task name - string binlogPos = 3; // binlog-pos (that's file:pos format) - repeated string sqls = 4; // sqls (use for replace) + ErrorOp op = 1; // operation type + string task = 2; // task name + string binlogPos = 3; // binlog-pos (that's file:pos format) + repeated string sqls = 4; // sqls (use for replace) } -message GetWorkerCfgRequest {} +message GetWorkerCfgRequest { +} -message GetWorkerCfgResponse { string cfg = 1; } +message GetWorkerCfgResponse { + string cfg = 1; +} enum ValidatorOp { - InvalidValidatorOp = 0; - StartValidator = 1; - StopValidator = 2; + InvalidValidatorOp = 0; + StartValidator = 1; + StopValidator = 2; } message CheckSubtasksCanUpdateRequest { string subtaskCfgTomlString = 1; } diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go old mode 100644 new mode 100755 index 4c8c0ab0694..4652d15c9f5 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -620,17 +620,7 @@ func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error { } w.l.Info("OperateSubTask start", zap.Stringer("op", op), zap.String("task", name)) - failpoint.Inject("SkipRefreshFromETCDInUT", func(_ failpoint.Value) { - failpoint.Goto("bypassRefresh") - }) - if op == pb.TaskOp_Resume { - if refreshErr := w.tryRefreshSubTaskConfig(st); refreshErr != nil { - // NOTE: for current unit is not syncer unit or syncer is in shardding merge. - w.l.Warn("can not update subtask config now", zap.Error(refreshErr)) - } - } - failpoint.Label("bypassRefresh") var err error switch op { case pb.TaskOp_Delete: @@ -641,6 +631,14 @@ func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error { w.l.Info("pause subtask", zap.String("task", name)) err = st.Pause() case pb.TaskOp_Resume: + failpoint.Inject("SkipRefreshFromETCDInUT", func(_ failpoint.Value) { + failpoint.Goto("bypassRefresh") + }) + if refreshErr := w.tryRefreshSubTaskConfig(st); refreshErr != nil { + // NOTE: for current unit is not syncer unit or is in shard merge. + w.l.Warn("can not update subtask config now", zap.Error(refreshErr)) + } + failpoint.Label("bypassRefresh") w.l.Info("resume subtask", zap.String("task", name)) err = st.Resume(w.getRelayWithoutLock()) case pb.TaskOp_AutoResume: diff --git a/dm/errors.toml b/dm/errors.toml index 8ff96fda20d..2502e2e0e82 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -3227,7 +3227,7 @@ workaround = "Please try again later" tags = ["internal", "low"] [error.DM-scheduler-46025] -message = "source can only update when no relay workers and no running tasks for now" +message = "source can only update when not enable relay and no running tasks for now" description = "" workaround = "" tags = ["internal", "low"] diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index 8903b07859d..886d99e6ef4 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -1317,7 +1317,7 @@ var ( ErrSchedulerRelayWorkersWrongRelay = New(codeSchedulerRelayWorkersWrongRelay, ClassScheduler, ScopeInternal, LevelHigh, "these workers %s have started relay for another sources %s respectively", "Please correct sources in `stop-relay`.") ErrSchedulerSourceOpRelayExist = New(codeSchedulerSourceOpRelayExist, ClassScheduler, ScopeInternal, LevelHigh, "source with name %s need to operate has existing relay workers %s", "Please `stop-relay` first.") ErrSchedulerLatchInUse = New(codeSchedulerLatchInUse, ClassScheduler, ScopeInternal, LevelLow, "when %s, resource %s is in use by other client", "Please try again later") - ErrSchedulerSourceCfgUpdate = New(codeSchedulerSourceCfgUpdate, ClassScheduler, ScopeInternal, LevelLow, "source can only update when no relay workers and no running tasks for now", "") + ErrSchedulerSourceCfgUpdate = New(codeSchedulerSourceCfgUpdate, ClassScheduler, ScopeInternal, LevelLow, "source can only update when not enable relay and no running tasks for now", "") ErrSchedulerWrongWorkerInput = New(codeSchedulerWrongWorkerInput, ClassScheduler, ScopeInternal, LevelMedium, "require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]", "") ErrSchedulerBoundDiffWithStartedRelay = New(codeSchedulerCantTransferToRelayWorker, ClassScheduler, ScopeInternal, LevelMedium, "require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s]", "If you intend to bind the source with worker, you can stop-relay for current source.") ErrSchedulerStartRelayOnSpecified = New(codeSchedulerStartRelayOnSpecified, ClassScheduler, ScopeInternal, LevelLow, "the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now", "Please stop all relay workers first, or specify worker name for `start-relay`.")