From 1ac60769de5f21aee217d4aff047648d96d2d1d6 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 8 Mar 2021 17:16:39 +0800 Subject: [PATCH 1/7] dmctl, scheduler: support manually transfer-source --- dm/master/scheduler/scheduler.go | 64 +++ dm/master/scheduler/scheduler_test.go | 93 +++- dm/master/server.go | 20 + dm/pb/dmmaster.pb.go | 706 +++++++++++++++++++++----- dm/proto/dmmaster.proto | 12 + go.mod | 2 +- go.sum | 5 + pkg/ha/bound.go | 14 + 8 files changed, 799 insertions(+), 117 deletions(-) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 72e047f45c..30c86dbc58 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -15,10 +15,12 @@ package scheduler import ( "context" + "errors" "sort" "sync" "time" + "github.com/pingcap/failpoint" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -351,6 +353,68 @@ func (s *Scheduler) GetSourceCfgByID(source string) *config.SourceConfig { return &clone } +// TransferSource unbinds the source and binds it to a free worker. If fails halfway, the old worker should try recover +func (s *Scheduler) TransferSource(source, worker string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.started { + return terror.ErrSchedulerNotStarted.Generate() + } + + // 1. check existence.or no need + if _, ok := s.sourceCfgs[source]; !ok { + return terror.ErrSchedulerSourceCfgExist.Generate(source) + } + w, ok := s.workers[worker] + if !ok { + return terror.ErrSchedulerWorkerNotExist.Generate(worker) + } + oldWorker, hasOldWorker := s.bounds[source] + if hasOldWorker && oldWorker.BaseInfo().Name == worker { + return nil + } + + // 2. check new worker is free + stage := w.Stage() + if stage != WorkerFree { + return terror.ErrSchedulerWorkerInvalidTrans.Generate(worker, stage, WorkerBound) + } + + // 3. deal with old worker + if !hasOldWorker { + s.logger.Warn("in transfer source, found a free worker and not bound source, which should not happened", + zap.String("source", source), + zap.String("worker", worker)) + // 3.1 bound it directly + err := s.boundSourceToWorker(source, w) + if err == nil { + delete(s.unbounds, source) + } + return err + } + + defer func() { + _, err := s.tryBoundForWorker(oldWorker) + if err != nil { + s.logger.Warn("in transfer source, error when try bound the old worker", zap.Error(err)) + } + }() + + // 4. replace the source bound + failpoint.Inject("failToReplaceSourceBound", func(_ failpoint.Value) { + failpoint.Return(errors.New("failToPutSourceBound")) + }) + _, err := ha.ReplaceSourceBound(s.etcdCli, source, oldWorker.BaseInfo().Name, worker) + if err != nil { + return err + } + oldWorker.ToFree() + // we have checked w.stage is free, so there should not be an error + _ = s.updateStatusForBound(w, ha.NewSourceBound(source, worker)) + return nil +} + // AddSubTasks adds the information of one or more subtasks for one task. func (s *Scheduler) AddSubTasks(cfgs ...config.SubTaskConfig) error { s.mu.Lock() diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 5ced30a348..3914cd1565 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -20,6 +20,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "go.etcd.io/etcd/clientv3" v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/integration" @@ -1008,7 +1009,7 @@ func (t *testScheduler) TestLastBound(c *C) { c.Assert(err, IsNil) c.Assert(bounded, IsFalse) - // after worker3 become offline, worker2 should be bounded to worker2 + // after worker3 become offline, source2 should be bounded to worker2 s.updateStatusForUnbound(sourceID2) _, ok := s.bounds[sourceID2] c.Assert(ok, IsFalse) @@ -1018,3 +1019,93 @@ func (t *testScheduler) TestLastBound(c *C) { c.Assert(bounded, IsTrue) c.Assert(s.bounds[sourceID2], DeepEquals, worker2) } + +func (t *testScheduler) TestTransferSource(c *C) { + defer clearTestInfoOperation(c) + + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + sourceID1 = "mysql-replica-1" + sourceID2 = "mysql-replica-2" + sourceID3 = "mysql-replica-3" + sourceID4 = "mysql-replica-4" + workerName1 = "dm-worker-1" + workerName2 = "dm-worker-2" + workerName3 = "dm-worker-3" + workerName4 = "dm-worker-4" + ) + + worker1 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName1}} + worker2 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName2}} + worker3 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName3}} + worker4 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName4}} + + // step 1: start an empty scheduler + s.started = true + s.etcdCli = etcdTestCli + s.workers[workerName1] = worker1 + s.workers[workerName2] = worker2 + s.workers[workerName3] = worker3 + s.workers[workerName4] = worker4 + s.sourceCfgs[sourceID1] = config.SourceConfig{} + s.sourceCfgs[sourceID2] = config.SourceConfig{} + + worker1.ToFree() + c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil) + worker2.ToFree() + c.Assert(s.boundSourceToWorker(sourceID2, worker2), IsNil) + + c.Assert(s.bounds[sourceID1], DeepEquals, worker1) + c.Assert(s.bounds[sourceID2], DeepEquals, worker2) + + worker3.ToFree() + worker4.ToFree() + + // test invalid transfer: source not exists + c.Assert(s.TransferSource("not-exist", workerName3), NotNil) + + // test valid transfer: source -> worker = bound -> free + c.Assert(s.TransferSource(sourceID1, workerName4), IsNil) + c.Assert(s.bounds[sourceID1], DeepEquals, worker4) + c.Assert(worker1.Stage(), Equals, WorkerFree) + + // test valid transfer: source -> worker = unbound -> free + s.sourceCfgs[sourceID3] = config.SourceConfig{} + s.unbounds[sourceID3] = struct{}{} + c.Assert(s.TransferSource(sourceID3, workerName3), IsNil) + c.Assert(s.bounds[sourceID3], DeepEquals, worker3) + + // test valid transfer: self + c.Assert(s.TransferSource(sourceID3, workerName3), IsNil) + c.Assert(s.bounds[sourceID3], DeepEquals, worker3) + + // test invalid transfer: source -> worker = bound -> bound + c.Assert(s.TransferSource(sourceID1, workerName3), NotNil) + c.Assert(s.bounds[sourceID1], DeepEquals, worker4) + c.Assert(s.bounds[sourceID3], DeepEquals, worker3) + + // test invalid transfer: source -> worker = bound -> offline + worker1.ToOffline() + c.Assert(s.TransferSource(sourceID1, workerName1), NotNil) + c.Assert(s.bounds[sourceID1], DeepEquals, worker4) + + // test invalid transfer: source -> worker = unbound -> bound + s.sourceCfgs[sourceID4] = config.SourceConfig{} + s.unbounds[sourceID4] = struct{}{} + c.Assert(s.TransferSource(sourceID4, workerName3), NotNil) + c.Assert(s.bounds[sourceID3], DeepEquals, worker3) + delete(s.unbounds, sourceID4) + delete(s.sourceCfgs, sourceID4) + + worker1.ToFree() + // now we have (worker1, nil) (worker2, source2) (worker3, source3) (worker4, source1) + + // test fail halfway won't left old worker unbound + c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/master/scheduler/failToReplaceSourceBound", `return()`), IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/dm/master/scheduler/failToReplaceSourceBound") + c.Assert(s.TransferSource(sourceID1, workerName1), NotNil) + c.Assert(s.bounds[sourceID1], DeepEquals, worker4) + c.Assert(worker1.Stage(), Equals, WorkerFree) +} diff --git a/dm/master/server.go b/dm/master/server.go index 4645349842..3fcc2d6490 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1982,6 +1982,26 @@ func (s *Server) HandleError(ctx context.Context, req *pb.HandleErrorRequest) (* }, nil } +// TransferSource implements MasterServer.TransferSource +func (s *Server) TransferSource(ctx context.Context, req *pb.TransferSourceRequest) (*pb.TransferSourceResponse, error) { + var ( + resp2 *pb.TransferSourceResponse + err2 error + ) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) + if shouldRet { + return resp2, err2 + } + + err := s.scheduler.TransferSource(req.Source, req.Worker) + if err != nil { + resp2.Msg = err.Error() + return resp2, nil + } + resp2.Result = true + return resp2, nil +} + // sharedLogic does some shared logic for each RPC implementation // arguments with `Pointer` suffix should be pointer to that variable its name indicated // return `true` means caller should return with variable that `xxPointer` modified diff --git a/dm/pb/dmmaster.pb.go b/dm/pb/dmmaster.pb.go index 91233745d7..60b75b1a55 100644 --- a/dm/pb/dmmaster.pb.go +++ b/dm/pb/dmmaster.pb.go @@ -2829,6 +2829,110 @@ func (m *HandleErrorResponse) GetSources() []*CommonWorkerResponse { return nil } +type TransferSourceRequest struct { + Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` + Worker string `protobuf:"bytes,2,opt,name=worker,proto3" json:"worker,omitempty"` +} + +func (m *TransferSourceRequest) Reset() { *m = TransferSourceRequest{} } +func (m *TransferSourceRequest) String() string { return proto.CompactTextString(m) } +func (*TransferSourceRequest) ProtoMessage() {} +func (*TransferSourceRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f9bef11f2a341f03, []int{45} +} +func (m *TransferSourceRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TransferSourceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TransferSourceRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TransferSourceRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TransferSourceRequest.Merge(m, src) +} +func (m *TransferSourceRequest) XXX_Size() int { + return m.Size() +} +func (m *TransferSourceRequest) XXX_DiscardUnknown() { + xxx_messageInfo_TransferSourceRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_TransferSourceRequest proto.InternalMessageInfo + +func (m *TransferSourceRequest) GetSource() string { + if m != nil { + return m.Source + } + return "" +} + +func (m *TransferSourceRequest) GetWorker() string { + if m != nil { + return m.Worker + } + return "" +} + +type TransferSourceResponse struct { + Result bool `protobuf:"varint,1,opt,name=result,proto3" json:"result,omitempty"` + Msg string `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"` +} + +func (m *TransferSourceResponse) Reset() { *m = TransferSourceResponse{} } +func (m *TransferSourceResponse) String() string { return proto.CompactTextString(m) } +func (*TransferSourceResponse) ProtoMessage() {} +func (*TransferSourceResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f9bef11f2a341f03, []int{46} +} +func (m *TransferSourceResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TransferSourceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TransferSourceResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TransferSourceResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TransferSourceResponse.Merge(m, src) +} +func (m *TransferSourceResponse) XXX_Size() int { + return m.Size() +} +func (m *TransferSourceResponse) XXX_DiscardUnknown() { + xxx_messageInfo_TransferSourceResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TransferSourceResponse proto.InternalMessageInfo + +func (m *TransferSourceResponse) GetResult() bool { + if m != nil { + return m.Result + } + return false +} + +func (m *TransferSourceResponse) GetMsg() string { + if m != nil { + return m.Msg + } + return "" +} + func init() { proto.RegisterEnum("pb.SourceOp", SourceOp_name, SourceOp_value) proto.RegisterEnum("pb.LeaderOp", LeaderOp_name, LeaderOp_value) @@ -2878,130 +2982,134 @@ func init() { proto.RegisterType((*GetMasterCfgResponse)(nil), "pb.GetMasterCfgResponse") proto.RegisterType((*HandleErrorRequest)(nil), "pb.HandleErrorRequest") proto.RegisterType((*HandleErrorResponse)(nil), "pb.HandleErrorResponse") + proto.RegisterType((*TransferSourceRequest)(nil), "pb.TransferSourceRequest") + proto.RegisterType((*TransferSourceResponse)(nil), "pb.TransferSourceResponse") } func init() { proto.RegisterFile("dmmaster.proto", fileDescriptor_f9bef11f2a341f03) } var fileDescriptor_f9bef11f2a341f03 = []byte{ - // 1873 bytes of a gzipped FileDescriptorProto + // 1920 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x59, 0x4f, 0x6f, 0xdb, 0xc8, - 0x15, 0x17, 0x25, 0xc5, 0x96, 0x9f, 0x6c, 0xaf, 0x3c, 0xb6, 0x65, 0x86, 0xf1, 0x2a, 0xde, 0xe9, - 0x6e, 0x60, 0x18, 0x45, 0x8c, 0xb8, 0x3d, 0x2d, 0xb0, 0x05, 0x36, 0x52, 0x36, 0x6b, 0x54, 0xa9, + 0x15, 0x17, 0x25, 0xc5, 0x96, 0x9f, 0x6c, 0xad, 0x3c, 0xb6, 0x65, 0x86, 0xf1, 0x2a, 0xde, 0xe9, + 0x6e, 0x60, 0x18, 0x45, 0x8c, 0xb8, 0x3d, 0x2d, 0xb0, 0x05, 0x36, 0x56, 0x36, 0x6b, 0x54, 0xa9, 0xb7, 0x74, 0x82, 0x76, 0x51, 0xa0, 0x58, 0x8a, 0x1a, 0xc9, 0x84, 0x29, 0x92, 0x21, 0x29, 0xbb, - 0x46, 0xb0, 0x97, 0x7e, 0x80, 0xfe, 0x41, 0x0f, 0x7b, 0xec, 0xa1, 0xd7, 0x9e, 0xfa, 0x29, 0x7a, - 0x5c, 0xa0, 0x97, 0x1e, 0x8b, 0xa4, 0x1f, 0xa4, 0x98, 0x37, 0x43, 0x72, 0xf8, 0x47, 0x6e, 0x15, - 0xa0, 0xbe, 0xcd, 0x9b, 0x37, 0x7a, 0xef, 0xf7, 0xfe, 0xcc, 0x9b, 0xf7, 0x28, 0xd8, 0x1c, 0xcf, - 0x66, 0x56, 0x14, 0xb3, 0xf0, 0x71, 0x10, 0xfa, 0xb1, 0x4f, 0xea, 0xc1, 0xc8, 0xd8, 0x1c, 0xcf, - 0xae, 0xfd, 0xf0, 0x32, 0xd9, 0x33, 0xf6, 0xa7, 0xbe, 0x3f, 0x75, 0xd9, 0xb1, 0x15, 0x38, 0xc7, - 0x96, 0xe7, 0xf9, 0xb1, 0x15, 0x3b, 0xbe, 0x17, 0x09, 0x2e, 0xfd, 0x06, 0x3a, 0xe7, 0xb1, 0x15, - 0xc6, 0x2f, 0xad, 0xe8, 0xd2, 0x64, 0xaf, 0xe7, 0x2c, 0x8a, 0x09, 0x81, 0x66, 0x6c, 0x45, 0x97, - 0xba, 0x76, 0xa0, 0x1d, 0xae, 0x99, 0xb8, 0x26, 0x3a, 0xac, 0x46, 0xfe, 0x3c, 0xb4, 0x59, 0xa4, - 0xd7, 0x0f, 0x1a, 0x87, 0x6b, 0x66, 0x42, 0x92, 0x1e, 0x40, 0xc8, 0x66, 0xfe, 0x15, 0x7b, 0xc1, - 0x62, 0x4b, 0x6f, 0x1c, 0x68, 0x87, 0x2d, 0x53, 0xd9, 0xa1, 0xaf, 0x61, 0x4b, 0xd1, 0x10, 0x05, - 0xbe, 0x17, 0x31, 0xd2, 0x85, 0x95, 0x90, 0x45, 0x73, 0x37, 0x46, 0x25, 0x2d, 0x53, 0x52, 0xa4, - 0x03, 0x8d, 0x59, 0x34, 0xd5, 0xeb, 0xa8, 0x99, 0x2f, 0xc9, 0x49, 0xa6, 0xb8, 0x71, 0xd0, 0x38, - 0x6c, 0x9f, 0xe8, 0x8f, 0x83, 0xd1, 0xe3, 0xbe, 0x3f, 0x9b, 0xf9, 0xde, 0x2f, 0xd0, 0xce, 0x44, + 0x46, 0xb0, 0x97, 0x7e, 0x80, 0xfe, 0x41, 0x0f, 0x7b, 0xec, 0xa1, 0xd7, 0x7e, 0x90, 0x1e, 0x17, + 0xe8, 0xa5, 0xc7, 0x22, 0xe9, 0xd7, 0x28, 0x50, 0xcc, 0x9b, 0x21, 0x39, 0xfc, 0x23, 0xb7, 0x0a, + 0x50, 0xdf, 0xf8, 0xe6, 0x8d, 0xde, 0xfb, 0xbd, 0x3f, 0xf3, 0xe6, 0xbd, 0x11, 0x74, 0xc6, 0xb3, + 0x99, 0x15, 0xc5, 0x2c, 0x7c, 0x1c, 0x84, 0x7e, 0xec, 0x93, 0x7a, 0x30, 0x32, 0x3a, 0xe3, 0xd9, + 0xb5, 0x1f, 0x5e, 0x26, 0x6b, 0xc6, 0xde, 0xd4, 0xf7, 0xa7, 0x2e, 0x3b, 0xb2, 0x02, 0xe7, 0xc8, + 0xf2, 0x3c, 0x3f, 0xb6, 0x62, 0xc7, 0xf7, 0x22, 0xc1, 0xa5, 0xdf, 0x40, 0xf7, 0x3c, 0xb6, 0xc2, + 0xf8, 0xa5, 0x15, 0x5d, 0x9a, 0xec, 0xf5, 0x9c, 0x45, 0x31, 0x21, 0xd0, 0x8c, 0xad, 0xe8, 0x52, + 0xd7, 0xf6, 0xb5, 0x83, 0x35, 0x13, 0xbf, 0x89, 0x0e, 0xab, 0x91, 0x3f, 0x0f, 0x6d, 0x16, 0xe9, + 0xf5, 0xfd, 0xc6, 0xc1, 0x9a, 0x99, 0x90, 0xa4, 0x0f, 0x10, 0xb2, 0x99, 0x7f, 0xc5, 0x5e, 0xb0, + 0xd8, 0xd2, 0x1b, 0xfb, 0xda, 0x41, 0xcb, 0x54, 0x56, 0xe8, 0x6b, 0xd8, 0x54, 0x34, 0x44, 0x81, + 0xef, 0x45, 0x8c, 0xf4, 0x60, 0x25, 0x64, 0xd1, 0xdc, 0x8d, 0x51, 0x49, 0xcb, 0x94, 0x14, 0xe9, + 0x42, 0x63, 0x16, 0x4d, 0xf5, 0x3a, 0x6a, 0xe6, 0x9f, 0xe4, 0x38, 0x53, 0xdc, 0xd8, 0x6f, 0x1c, + 0xb4, 0x8f, 0xf5, 0xc7, 0xc1, 0xe8, 0xf1, 0x89, 0x3f, 0x9b, 0xf9, 0xde, 0x2f, 0xd0, 0xce, 0x44, 0x68, 0x0a, 0x89, 0xfe, 0x1a, 0xc8, 0x59, 0xc0, 0x42, 0x2b, 0x66, 0xaa, 0x59, 0x06, 0xd4, 0xfd, - 0x00, 0xf5, 0x6d, 0x9e, 0x00, 0x17, 0xc2, 0x99, 0x67, 0x81, 0x59, 0xf7, 0x03, 0x6e, 0xb2, 0x67, - 0xcd, 0x98, 0x54, 0x8c, 0x6b, 0xd5, 0xe4, 0x46, 0xce, 0x64, 0xfa, 0x7b, 0x0d, 0xb6, 0x73, 0x0a, + 0x00, 0xf5, 0x75, 0x8e, 0x81, 0x0b, 0xe1, 0xcc, 0xb3, 0xc0, 0xac, 0xfb, 0x01, 0x37, 0xd9, 0xb3, + 0x66, 0x4c, 0x2a, 0xc6, 0x6f, 0xd5, 0xe4, 0x46, 0xce, 0x64, 0xfa, 0x7b, 0x0d, 0xb6, 0x72, 0x0a, 0xa4, 0x55, 0xb7, 0x69, 0xc8, 0x2c, 0xae, 0x57, 0x59, 0xdc, 0xa8, 0xb4, 0xb8, 0xf9, 0xbf, 0x5a, - 0xfc, 0x39, 0x6c, 0xbd, 0x0a, 0xc6, 0x05, 0x83, 0x97, 0x8a, 0x23, 0x0d, 0x81, 0xa8, 0x22, 0xee, - 0x24, 0x50, 0x5f, 0x40, 0xf7, 0xe7, 0x73, 0x16, 0xde, 0x9c, 0xc7, 0x56, 0x3c, 0x8f, 0x86, 0x4e, - 0x14, 0x2b, 0xd8, 0x31, 0x20, 0x5a, 0x75, 0x40, 0x0a, 0xd8, 0xaf, 0x60, 0xaf, 0x24, 0x67, 0x69, - 0x03, 0x9e, 0x14, 0x0d, 0xd8, 0xe3, 0x06, 0x28, 0x72, 0xcb, 0xf8, 0xfb, 0xb0, 0x7d, 0x7e, 0xe1, - 0x5f, 0x0f, 0x06, 0xc3, 0xa1, 0x6f, 0x5f, 0x46, 0xef, 0xe7, 0xf8, 0x3f, 0x6b, 0xb0, 0x2a, 0x25, - 0x90, 0x4d, 0xa8, 0x9f, 0x0e, 0xe4, 0xef, 0xea, 0xa7, 0x83, 0x54, 0x52, 0x5d, 0x91, 0x44, 0xa0, - 0x39, 0xf3, 0xc7, 0x4c, 0xa6, 0x0c, 0xae, 0xc9, 0x0e, 0xdc, 0xf3, 0xaf, 0x3d, 0x16, 0xea, 0x4d, - 0xdc, 0x14, 0x04, 0x3f, 0x39, 0x18, 0x0c, 0x23, 0xfd, 0x1e, 0x2a, 0xc4, 0x35, 0xf7, 0x47, 0x74, - 0xe3, 0xd9, 0x6c, 0xac, 0xaf, 0xe0, 0xae, 0xa4, 0x88, 0x01, 0xad, 0xb9, 0x27, 0x39, 0xab, 0xc8, - 0x49, 0x69, 0x6a, 0xc3, 0x4e, 0xde, 0xcc, 0xa5, 0x7d, 0xfb, 0x11, 0xdc, 0x73, 0xf9, 0x4f, 0xa5, - 0x67, 0xdb, 0xdc, 0xb3, 0x52, 0x9c, 0x29, 0x38, 0xd4, 0x85, 0x9d, 0x57, 0x1e, 0x5f, 0x26, 0xfb, - 0xd2, 0x99, 0x45, 0x97, 0x50, 0x58, 0x0f, 0x59, 0xe0, 0x5a, 0x36, 0x3b, 0x43, 0x8b, 0x85, 0x96, - 0xdc, 0x1e, 0x39, 0x80, 0xf6, 0xc4, 0x0f, 0x6d, 0x66, 0x62, 0x19, 0x92, 0x45, 0x49, 0xdd, 0xa2, - 0x9f, 0xc3, 0x6e, 0x41, 0xdb, 0xb2, 0x36, 0x51, 0x13, 0xee, 0xcb, 0x22, 0x90, 0xa4, 0xb7, 0x6b, - 0xdd, 0x24, 0xa8, 0x1f, 0x28, 0xa5, 0x00, 0xad, 0x45, 0xae, 0xac, 0x05, 0x8b, 0x73, 0xe1, 0x3b, - 0x0d, 0x8c, 0x2a, 0xa1, 0x12, 0xdc, 0xad, 0x52, 0xff, 0xbf, 0x15, 0xe6, 0x3b, 0x0d, 0xf6, 0xbe, - 0x9a, 0x87, 0xd3, 0x2a, 0x63, 0x15, 0x7b, 0xb4, 0xfc, 0xe3, 0x60, 0x40, 0xcb, 0xf1, 0x2c, 0x3b, - 0x76, 0xae, 0x98, 0x44, 0x95, 0xd2, 0x98, 0xdb, 0xce, 0x4c, 0x44, 0xa7, 0x61, 0xe2, 0x9a, 0x9f, - 0x9f, 0x38, 0x2e, 0xc3, 0xab, 0x2f, 0x52, 0x39, 0xa5, 0x31, 0x73, 0xe7, 0xa3, 0x81, 0x13, 0xea, - 0xf7, 0x90, 0x23, 0x29, 0xfa, 0x1b, 0xd0, 0xcb, 0xc0, 0xee, 0xa4, 0x7c, 0x3d, 0x82, 0x4e, 0xff, - 0x82, 0xd9, 0x97, 0xff, 0xa5, 0xe8, 0xd2, 0xcf, 0x60, 0x4b, 0x39, 0xb7, 0x74, 0xa2, 0x5d, 0xc0, - 0x8e, 0xcc, 0x89, 0x73, 0x54, 0x9c, 0xa8, 0xda, 0x57, 0xb2, 0x61, 0x9d, 0xa3, 0x15, 0xec, 0x2c, - 0x1d, 0x6c, 0xdf, 0x9b, 0x38, 0x53, 0x99, 0x63, 0x92, 0xe2, 0x2e, 0x16, 0xf8, 0x4f, 0x07, 0xf2, - 0x5d, 0x4b, 0x69, 0x3a, 0x87, 0xdd, 0x82, 0xa6, 0x3b, 0xf1, 0xe3, 0x33, 0xd8, 0x35, 0xd9, 0xd4, - 0xe1, 0x8d, 0x4c, 0x72, 0xe4, 0xd6, 0x57, 0xc0, 0x1a, 0x8f, 0x43, 0x16, 0x45, 0x52, 0x6d, 0x42, - 0xd2, 0xa7, 0xd0, 0x2d, 0x8a, 0x59, 0xda, 0xd7, 0x3f, 0x81, 0x9d, 0xb3, 0xc9, 0xc4, 0x75, 0x3c, - 0xf6, 0x82, 0xcd, 0x46, 0x39, 0x24, 0xf1, 0x4d, 0x90, 0x22, 0xe1, 0xeb, 0xaa, 0xa6, 0x81, 0xd7, - 0x95, 0xc2, 0xef, 0x97, 0x86, 0xf0, 0xe3, 0x34, 0xdc, 0x43, 0x66, 0x8d, 0x33, 0x08, 0xa5, 0x70, - 0x0b, 0xb6, 0x08, 0x37, 0x2a, 0xce, 0xff, 0x6a, 0x69, 0xc5, 0xbf, 0xd3, 0x00, 0x5e, 0x60, 0x3b, - 0x79, 0xea, 0x4d, 0xfc, 0x4a, 0xe7, 0x1b, 0xd0, 0x9a, 0xa1, 0x5d, 0xa7, 0x03, 0xfc, 0x65, 0xd3, - 0x4c, 0x69, 0xfe, 0x06, 0x59, 0xae, 0x93, 0x96, 0x5b, 0x41, 0xf0, 0x5f, 0x04, 0x8c, 0x85, 0xaf, - 0xcc, 0xa1, 0x28, 0x36, 0x6b, 0x66, 0x4a, 0xf3, 0xd6, 0xd1, 0x76, 0x1d, 0xe6, 0xc5, 0xc8, 0x15, - 0xaf, 0x94, 0xb2, 0x43, 0x47, 0x00, 0x22, 0x90, 0x0b, 0xf1, 0x10, 0x68, 0xf2, 0xe8, 0x27, 0x21, - 0xe0, 0x6b, 0x8e, 0x23, 0x8a, 0xad, 0x69, 0xf2, 0x40, 0x0a, 0x02, 0xab, 0x07, 0xa6, 0x9b, 0xac, - 0x2b, 0x92, 0xa2, 0x43, 0xe8, 0xf0, 0x7e, 0x41, 0x38, 0x4d, 0xc4, 0x2c, 0x71, 0x8d, 0x96, 0x65, - 0x75, 0x55, 0x7f, 0x98, 0xe8, 0x6e, 0x64, 0xba, 0xe9, 0xcf, 0x84, 0x34, 0xe1, 0xc5, 0x85, 0xd2, - 0x0e, 0x61, 0x55, 0xb4, 0xed, 0xa2, 0xfe, 0xb7, 0x4f, 0x36, 0x79, 0x38, 0x33, 0xd7, 0x9b, 0x09, - 0x3b, 0x91, 0x27, 0xbc, 0x70, 0x9b, 0x3c, 0xd1, 0xf2, 0xe7, 0xe4, 0x65, 0xae, 0x33, 0x13, 0x36, - 0xfd, 0x8b, 0x06, 0xab, 0x42, 0x4c, 0x44, 0x1e, 0xc3, 0x8a, 0x8b, 0x56, 0xa3, 0xa8, 0xf6, 0xc9, - 0x0e, 0xe6, 0x54, 0xc1, 0x17, 0x5f, 0xd6, 0x4c, 0x79, 0x8a, 0x9f, 0x17, 0xb0, 0xd0, 0x0b, 0xca, - 0x79, 0xd5, 0x5a, 0x7e, 0x5e, 0x9c, 0xe2, 0xe7, 0x85, 0x5a, 0xf4, 0x90, 0x72, 0x5e, 0xb5, 0x86, - 0x9f, 0x17, 0xa7, 0x9e, 0xb6, 0x60, 0x45, 0xe4, 0x12, 0x1f, 0x19, 0x50, 0x6e, 0xee, 0x06, 0x76, - 0x73, 0x70, 0x5b, 0x29, 0xac, 0x6e, 0x0e, 0x56, 0x2b, 0x55, 0xdf, 0xcd, 0xa9, 0x6f, 0x25, 0x6a, - 0x78, 0x7a, 0xf0, 0xf0, 0x25, 0xd9, 0x28, 0x08, 0xca, 0x80, 0xa8, 0x2a, 0x97, 0x2e, 0x7b, 0x9f, - 0xc0, 0xaa, 0x00, 0x9f, 0x6b, 0x71, 0xa4, 0xab, 0xcd, 0x84, 0x47, 0xff, 0xaa, 0x65, 0xb5, 0xdc, - 0xbe, 0x60, 0x33, 0x6b, 0x71, 0x2d, 0x47, 0x76, 0x36, 0x9e, 0x94, 0xda, 0xc0, 0x85, 0xe3, 0x09, - 0xbf, 0x72, 0x63, 0x2b, 0xb6, 0x46, 0x56, 0x94, 0x3e, 0xa2, 0x09, 0xcd, 0xad, 0x8f, 0xad, 0x91, - 0xcb, 0xe4, 0x1b, 0x2a, 0x08, 0xbc, 0x1c, 0xa8, 0x4f, 0x5f, 0x91, 0x97, 0x03, 0x29, 0xf5, 0x3d, - 0x90, 0x68, 0xef, 0xe4, 0x3d, 0x38, 0x82, 0x9d, 0xe7, 0x2c, 0x3e, 0x9f, 0x8f, 0xf8, 0x83, 0xd9, - 0x9f, 0x4c, 0x6f, 0x79, 0x0e, 0xe8, 0x2b, 0xd8, 0x2d, 0x9c, 0x5d, 0x1a, 0x22, 0x81, 0xa6, 0x3d, - 0x99, 0x26, 0x6e, 0xc4, 0x35, 0x1d, 0xc0, 0xc6, 0x73, 0x16, 0x2b, 0xba, 0x1f, 0x2a, 0x0f, 0x80, - 0x6c, 0xbe, 0xfa, 0x93, 0xe9, 0xcb, 0x9b, 0x80, 0xdd, 0xf2, 0x1a, 0x0c, 0x61, 0x33, 0x91, 0xb2, - 0x34, 0xaa, 0x0e, 0x34, 0xec, 0x49, 0xda, 0xb6, 0xd9, 0x93, 0x29, 0xdd, 0x85, 0xed, 0xe7, 0x4c, - 0xde, 0xb6, 0x0c, 0x19, 0x3d, 0x44, 0x6f, 0x29, 0xdb, 0x52, 0x95, 0x14, 0xa0, 0x65, 0x02, 0xfe, - 0xa8, 0x01, 0xf9, 0xd2, 0xf2, 0xc6, 0x2e, 0x7b, 0x16, 0x86, 0x7e, 0xb8, 0xb0, 0x57, 0x45, 0xee, - 0x7b, 0xa5, 0xde, 0x3e, 0xac, 0x8d, 0x1c, 0xcf, 0xf5, 0xa7, 0x5f, 0xf9, 0x91, 0xcc, 0xbd, 0x6c, - 0x83, 0xcb, 0x8a, 0x5e, 0xbb, 0xe9, 0x3c, 0xc2, 0xd7, 0x34, 0x82, 0xed, 0x1c, 0xa4, 0xbb, 0x48, - 0xb0, 0xa3, 0x11, 0xb4, 0x92, 0x5e, 0x89, 0x6c, 0xc3, 0x07, 0xa7, 0xde, 0x95, 0xe5, 0x3a, 0xe3, - 0x64, 0xab, 0x53, 0x23, 0x1f, 0x40, 0x1b, 0x3f, 0x5a, 0x88, 0xad, 0x8e, 0x46, 0x3a, 0xb0, 0x2e, - 0xa6, 0x63, 0xb9, 0x53, 0x27, 0x9b, 0x00, 0xe7, 0xb1, 0x1f, 0x48, 0xba, 0x81, 0xf4, 0x85, 0x7f, - 0x2d, 0xe9, 0xe6, 0xd1, 0x4f, 0xa1, 0x95, 0x3c, 0xd0, 0x8a, 0x8e, 0x64, 0xab, 0x53, 0x23, 0x5b, - 0xb0, 0xf1, 0xec, 0xca, 0xb1, 0xe3, 0x74, 0x4b, 0x23, 0x7b, 0xb0, 0xdd, 0xb7, 0x3c, 0x9b, 0xb9, - 0x79, 0x46, 0xfd, 0xe8, 0x97, 0xb0, 0x2a, 0xb3, 0x8d, 0x43, 0x93, 0xb2, 0x38, 0xd9, 0xa9, 0x91, - 0x75, 0x68, 0xf1, 0xdc, 0x47, 0x4a, 0xe3, 0x30, 0x44, 0x2a, 0x20, 0x8d, 0x30, 0x85, 0x17, 0x90, - 0x16, 0x30, 0x11, 0x22, 0xd2, 0xcd, 0x93, 0xbf, 0x6d, 0xc0, 0x8a, 0xf8, 0x01, 0xf9, 0x1a, 0xd6, - 0xd2, 0x2f, 0x35, 0x04, 0xab, 0x75, 0xf1, 0xd3, 0x90, 0xb1, 0x5b, 0xd8, 0x15, 0x8e, 0xa5, 0x0f, - 0x7f, 0xfb, 0x8f, 0x7f, 0xff, 0xa9, 0x7e, 0x9f, 0xee, 0x1c, 0x5b, 0x81, 0x13, 0x1d, 0x5f, 0x3d, - 0xb1, 0xdc, 0xe0, 0xc2, 0x7a, 0x72, 0xcc, 0xb3, 0x25, 0xfa, 0x54, 0x3b, 0x22, 0x13, 0x68, 0x2b, - 0x1f, 0x4c, 0x48, 0x97, 0x8b, 0x29, 0x7f, 0xa2, 0x31, 0xf6, 0x4a, 0xfb, 0x52, 0xc1, 0x23, 0x54, - 0x70, 0x60, 0x3c, 0xa8, 0x52, 0x70, 0xfc, 0x86, 0x5f, 0xb6, 0x6f, 0xb9, 0x9e, 0xcf, 0x00, 0xb2, - 0x8f, 0x18, 0x04, 0xd1, 0x96, 0xbe, 0x8b, 0x18, 0xdd, 0xe2, 0xb6, 0x54, 0x52, 0x23, 0x2e, 0xb4, - 0x95, 0x79, 0x9f, 0x18, 0x85, 0x0f, 0x00, 0xca, 0x07, 0x0a, 0xe3, 0x41, 0x25, 0x4f, 0x4a, 0xfa, - 0x18, 0xe1, 0xf6, 0xc8, 0x7e, 0x01, 0x6e, 0x84, 0x47, 0x25, 0x5e, 0xd2, 0x87, 0x75, 0x75, 0xac, - 0x26, 0x68, 0x7d, 0xc5, 0xf7, 0x04, 0x43, 0x2f, 0x33, 0x52, 0xc8, 0x5f, 0xc0, 0x46, 0x6e, 0x90, - 0x25, 0x78, 0xb8, 0x6a, 0x92, 0x36, 0xee, 0x57, 0x70, 0x52, 0x39, 0x5f, 0x43, 0xb7, 0x3c, 0x78, - 0xa2, 0x17, 0x3f, 0x54, 0x82, 0x52, 0x1e, 0xfe, 0x8c, 0xde, 0x22, 0x76, 0x2a, 0xfa, 0x0c, 0x3a, - 0xc5, 0x01, 0x8d, 0xa0, 0xfb, 0x16, 0xcc, 0x93, 0xc6, 0x7e, 0x35, 0x33, 0x15, 0xf8, 0x29, 0xac, - 0xa5, 0xf3, 0x94, 0x48, 0xd4, 0xe2, 0x18, 0x26, 0x12, 0xb5, 0x34, 0x74, 0xd1, 0x1a, 0x99, 0xc2, - 0x46, 0x6e, 0xc4, 0x11, 0xfe, 0xaa, 0x9a, 0xaf, 0x84, 0xbf, 0x2a, 0xe7, 0x21, 0xfa, 0x11, 0x06, - 0xf8, 0x81, 0xd1, 0x2d, 0x06, 0x58, 0x14, 0x18, 0x9e, 0x8a, 0xa7, 0xb0, 0x99, 0x9f, 0x46, 0xc8, - 0x7d, 0x31, 0xb1, 0x57, 0x0c, 0x3a, 0x86, 0x51, 0xc5, 0x4a, 0x31, 0x87, 0xb0, 0x91, 0x1b, 0x2a, - 0x24, 0xe6, 0x8a, 0x39, 0x45, 0x62, 0xae, 0x9a, 0x40, 0xe8, 0x0f, 0x11, 0xf3, 0xa3, 0xa3, 0x8f, - 0x0b, 0x98, 0x65, 0x6f, 0x72, 0xfc, 0x86, 0x3f, 0x63, 0xdf, 0x26, 0xc9, 0x79, 0x99, 0xfa, 0x49, - 0x94, 0xa1, 0x9c, 0x9f, 0x72, 0x83, 0x49, 0xce, 0x4f, 0xf9, 0xe1, 0x83, 0x7e, 0x82, 0x3a, 0x1f, - 0x1a, 0x46, 0x41, 0xa7, 0xe8, 0xdd, 0x8e, 0xdf, 0xf8, 0x01, 0x5e, 0xdb, 0x5f, 0x01, 0x64, 0xdd, - 0x97, 0xb8, 0xb6, 0xa5, 0x06, 0x50, 0x5c, 0xdb, 0x72, 0x93, 0x46, 0x7b, 0xa8, 0x43, 0x27, 0xdd, - 0x6a, 0xbb, 0xc8, 0x24, 0x8b, 0x38, 0x36, 0x31, 0xf9, 0x88, 0xab, 0x5d, 0x58, 0x3e, 0xe2, 0xb9, - 0x8e, 0x87, 0x1e, 0xa0, 0x16, 0xc3, 0xd8, 0x2d, 0x46, 0x1c, 0x8f, 0x71, 0x23, 0x5c, 0x6c, 0x19, - 0xb2, 0x4e, 0x44, 0xe8, 0xa9, 0x6a, 0x64, 0x84, 0x9e, 0xca, 0xb6, 0x25, 0xa9, 0x74, 0xa4, 0x57, - 0xd4, 0x33, 0x1f, 0xa9, 0xc5, 0x8e, 0xbc, 0x84, 0x15, 0xd1, 0x5a, 0x90, 0x2d, 0x29, 0x4c, 0x91, - 0x4f, 0xd4, 0x2d, 0x29, 0xf8, 0x07, 0x28, 0xf8, 0x43, 0x72, 0x5b, 0x09, 0x25, 0xdf, 0x40, 0x5b, - 0x79, 0x8d, 0x45, 0x9d, 0x2e, 0x77, 0x0c, 0xa2, 0x4e, 0x57, 0x3c, 0xdb, 0x0b, 0xbd, 0xc4, 0xf8, - 0x29, 0xbc, 0x16, 0x7d, 0x58, 0x57, 0xbb, 0x15, 0x51, 0xf4, 0x2a, 0xda, 0x1a, 0x43, 0x2f, 0x33, - 0x92, 0x0b, 0xf1, 0x54, 0xff, 0xfb, 0xdb, 0x9e, 0xf6, 0xfd, 0xdb, 0x9e, 0xf6, 0xaf, 0xb7, 0x3d, - 0xed, 0x0f, 0xef, 0x7a, 0xb5, 0xef, 0xdf, 0xf5, 0x6a, 0xff, 0x7c, 0xd7, 0xab, 0x8d, 0x56, 0xf0, - 0x6f, 0x8d, 0x1f, 0xfd, 0x27, 0x00, 0x00, 0xff, 0xff, 0xf5, 0xb6, 0xf4, 0x66, 0x1a, 0x19, 0x00, - 0x00, + 0xfc, 0x39, 0x6c, 0xbe, 0x0a, 0xc6, 0x05, 0x83, 0x97, 0x8a, 0x23, 0x0d, 0x81, 0xa8, 0x22, 0xee, + 0x24, 0x50, 0x5f, 0x40, 0xef, 0xe7, 0x73, 0x16, 0xde, 0x9c, 0xc7, 0x56, 0x3c, 0x8f, 0x86, 0x4e, + 0x14, 0x2b, 0xd8, 0x31, 0x20, 0x5a, 0x75, 0x40, 0x0a, 0xd8, 0xaf, 0x60, 0xb7, 0x24, 0x67, 0x69, + 0x03, 0x9e, 0x14, 0x0d, 0xd8, 0xe5, 0x06, 0x28, 0x72, 0xcb, 0xf8, 0x4f, 0x60, 0xeb, 0xfc, 0xc2, + 0xbf, 0x1e, 0x0c, 0x86, 0x43, 0xdf, 0xbe, 0x8c, 0xde, 0xcf, 0xf1, 0x7f, 0xd6, 0x60, 0x55, 0x4a, + 0x20, 0x1d, 0xa8, 0x9f, 0x0e, 0xe4, 0xef, 0xea, 0xa7, 0x83, 0x54, 0x52, 0x5d, 0x91, 0x44, 0xa0, + 0x39, 0xf3, 0xc7, 0x4c, 0xa6, 0x0c, 0x7e, 0x93, 0x6d, 0xb8, 0xe7, 0x5f, 0x7b, 0x2c, 0xd4, 0x9b, + 0xb8, 0x28, 0x08, 0xbe, 0x73, 0x30, 0x18, 0x46, 0xfa, 0x3d, 0x54, 0x88, 0xdf, 0xdc, 0x1f, 0xd1, + 0x8d, 0x67, 0xb3, 0xb1, 0xbe, 0x82, 0xab, 0x92, 0x22, 0x06, 0xb4, 0xe6, 0x9e, 0xe4, 0xac, 0x22, + 0x27, 0xa5, 0xa9, 0x0d, 0xdb, 0x79, 0x33, 0x97, 0xf6, 0xed, 0x47, 0x70, 0xcf, 0xe5, 0x3f, 0x95, + 0x9e, 0x6d, 0x73, 0xcf, 0x4a, 0x71, 0xa6, 0xe0, 0x50, 0x17, 0xb6, 0x5f, 0x79, 0xfc, 0x33, 0x59, + 0x97, 0xce, 0x2c, 0xba, 0x84, 0xc2, 0x7a, 0xc8, 0x02, 0xd7, 0xb2, 0xd9, 0x19, 0x5a, 0x2c, 0xb4, + 0xe4, 0xd6, 0xc8, 0x3e, 0xb4, 0x27, 0x7e, 0x68, 0x33, 0x13, 0xcb, 0x90, 0x2c, 0x4a, 0xea, 0x12, + 0xfd, 0x1c, 0x76, 0x0a, 0xda, 0x96, 0xb5, 0x89, 0x9a, 0x70, 0x5f, 0x16, 0x81, 0x24, 0xbd, 0x5d, + 0xeb, 0x26, 0x41, 0xfd, 0x40, 0x29, 0x05, 0x68, 0x2d, 0x72, 0x65, 0x2d, 0x58, 0x9c, 0x0b, 0xdf, + 0x69, 0x60, 0x54, 0x09, 0x95, 0xe0, 0x6e, 0x95, 0xfa, 0xff, 0xad, 0x30, 0xdf, 0x69, 0xb0, 0xfb, + 0xd5, 0x3c, 0x9c, 0x56, 0x19, 0xab, 0xd8, 0xa3, 0xe5, 0x2f, 0x07, 0x03, 0x5a, 0x8e, 0x67, 0xd9, + 0xb1, 0x73, 0xc5, 0x24, 0xaa, 0x94, 0xc6, 0xdc, 0x76, 0x66, 0x22, 0x3a, 0x0d, 0x13, 0xbf, 0xf9, + 0xfe, 0x89, 0xe3, 0x32, 0x3c, 0xfa, 0x22, 0x95, 0x53, 0x1a, 0x33, 0x77, 0x3e, 0x1a, 0x38, 0xa1, + 0x7e, 0x0f, 0x39, 0x92, 0xa2, 0xbf, 0x01, 0xbd, 0x0c, 0xec, 0x4e, 0xca, 0xd7, 0x23, 0xe8, 0x9e, + 0x5c, 0x30, 0xfb, 0xf2, 0xbf, 0x14, 0x5d, 0xfa, 0x19, 0x6c, 0x2a, 0xfb, 0x96, 0x4e, 0xb4, 0x0b, + 0xd8, 0x96, 0x39, 0x71, 0x8e, 0x8a, 0x13, 0x55, 0x7b, 0x4a, 0x36, 0xac, 0x73, 0xb4, 0x82, 0x9d, + 0xa5, 0x83, 0xed, 0x7b, 0x13, 0x67, 0x2a, 0x73, 0x4c, 0x52, 0xdc, 0xc5, 0x02, 0xff, 0xe9, 0x40, + 0xde, 0x6b, 0x29, 0x4d, 0xe7, 0xb0, 0x53, 0xd0, 0x74, 0x27, 0x7e, 0x7c, 0x06, 0x3b, 0x26, 0x9b, + 0x3a, 0xbc, 0x91, 0x49, 0xb6, 0xdc, 0x7a, 0x0b, 0x58, 0xe3, 0x71, 0xc8, 0xa2, 0x48, 0xaa, 0x4d, + 0x48, 0xfa, 0x14, 0x7a, 0x45, 0x31, 0x4b, 0xfb, 0xfa, 0x27, 0xb0, 0x7d, 0x36, 0x99, 0xb8, 0x8e, + 0xc7, 0x5e, 0xb0, 0xd9, 0x28, 0x87, 0x24, 0xbe, 0x09, 0x52, 0x24, 0xfc, 0xbb, 0xaa, 0x69, 0xe0, + 0x75, 0xa5, 0xf0, 0xfb, 0xa5, 0x21, 0xfc, 0x38, 0x0d, 0xf7, 0x90, 0x59, 0xe3, 0x0c, 0x42, 0x29, + 0xdc, 0x82, 0x2d, 0xc2, 0x8d, 0x8a, 0xf3, 0xbf, 0x5a, 0x5a, 0xf1, 0xef, 0x34, 0x80, 0x17, 0xd8, + 0x4e, 0x9e, 0x7a, 0x13, 0xbf, 0xd2, 0xf9, 0x06, 0xb4, 0x66, 0x68, 0xd7, 0xe9, 0x00, 0x7f, 0xd9, + 0x34, 0x53, 0x9a, 0xdf, 0x41, 0x96, 0xeb, 0xa4, 0xe5, 0x56, 0x10, 0xfc, 0x17, 0x01, 0x63, 0xe1, + 0x2b, 0x73, 0x28, 0x8a, 0xcd, 0x9a, 0x99, 0xd2, 0xbc, 0x75, 0xb4, 0x5d, 0x87, 0x79, 0x31, 0x72, + 0xc5, 0x2d, 0xa5, 0xac, 0xd0, 0x11, 0x80, 0x08, 0xe4, 0x42, 0x3c, 0x04, 0x9a, 0x3c, 0xfa, 0x49, + 0x08, 0xf8, 0x37, 0xc7, 0x11, 0xc5, 0xd6, 0x34, 0xb9, 0x20, 0x05, 0x81, 0xd5, 0x03, 0xd3, 0x4d, + 0xd6, 0x15, 0x49, 0xd1, 0x21, 0x74, 0x79, 0xbf, 0x20, 0x9c, 0x26, 0x62, 0x96, 0xb8, 0x46, 0xcb, + 0xb2, 0xba, 0xaa, 0x3f, 0x4c, 0x74, 0x37, 0x32, 0xdd, 0xf4, 0x67, 0x42, 0x9a, 0xf0, 0xe2, 0x42, + 0x69, 0x07, 0xb0, 0x2a, 0xda, 0x76, 0x51, 0xff, 0xdb, 0xc7, 0x1d, 0x1e, 0xce, 0xcc, 0xf5, 0x66, + 0xc2, 0x4e, 0xe4, 0x09, 0x2f, 0xdc, 0x26, 0x4f, 0xb4, 0xfc, 0x39, 0x79, 0x99, 0xeb, 0xcc, 0x84, + 0x4d, 0xff, 0xa2, 0xc1, 0xaa, 0x10, 0x13, 0x91, 0xc7, 0xb0, 0xe2, 0xa2, 0xd5, 0x28, 0xaa, 0x7d, + 0xbc, 0x8d, 0x39, 0x55, 0xf0, 0xc5, 0x97, 0x35, 0x53, 0xee, 0xe2, 0xfb, 0x05, 0x2c, 0xf4, 0x82, + 0xb2, 0x5f, 0xb5, 0x96, 0xef, 0x17, 0xbb, 0xf8, 0x7e, 0xa1, 0x16, 0x3d, 0xa4, 0xec, 0x57, 0xad, + 0xe1, 0xfb, 0xc5, 0xae, 0xa7, 0x2d, 0x58, 0x11, 0xb9, 0xc4, 0x47, 0x06, 0x94, 0x9b, 0x3b, 0x81, + 0xbd, 0x1c, 0xdc, 0x56, 0x0a, 0xab, 0x97, 0x83, 0xd5, 0x4a, 0xd5, 0xf7, 0x72, 0xea, 0x5b, 0x89, + 0x1a, 0x9e, 0x1e, 0x3c, 0x7c, 0x49, 0x36, 0x0a, 0x82, 0x32, 0x20, 0xaa, 0xca, 0xa5, 0xcb, 0xde, + 0x27, 0xb0, 0x2a, 0xc0, 0xe7, 0x5a, 0x1c, 0xe9, 0x6a, 0x33, 0xe1, 0xd1, 0xbf, 0x6a, 0x59, 0x2d, + 0xb7, 0x2f, 0xd8, 0xcc, 0x5a, 0x5c, 0xcb, 0x91, 0x9d, 0x8d, 0x27, 0xa5, 0x36, 0x70, 0xe1, 0x78, + 0xc2, 0x8f, 0xdc, 0xd8, 0x8a, 0xad, 0x91, 0x15, 0xa5, 0x97, 0x68, 0x42, 0x73, 0xeb, 0x63, 0x6b, + 0xe4, 0x32, 0x79, 0x87, 0x0a, 0x02, 0x0f, 0x07, 0xea, 0xd3, 0x57, 0xe4, 0xe1, 0x40, 0x4a, 0xbd, + 0x0f, 0x24, 0xda, 0x3b, 0xb9, 0x0f, 0x0e, 0x61, 0xfb, 0x39, 0x8b, 0xcf, 0xe7, 0x23, 0x7e, 0x61, + 0x9e, 0x4c, 0xa6, 0xb7, 0x5c, 0x07, 0xf4, 0x15, 0xec, 0x14, 0xf6, 0x2e, 0x0d, 0x91, 0x40, 0xd3, + 0x9e, 0x4c, 0x13, 0x37, 0xe2, 0x37, 0x1d, 0xc0, 0xc6, 0x73, 0x16, 0x2b, 0xba, 0x1f, 0x2a, 0x17, + 0x80, 0x6c, 0xbe, 0x4e, 0x26, 0xd3, 0x97, 0x37, 0x01, 0xbb, 0xe5, 0x36, 0x18, 0x42, 0x27, 0x91, + 0xb2, 0x34, 0xaa, 0x2e, 0x34, 0xec, 0x49, 0xda, 0xb6, 0xd9, 0x93, 0x29, 0xdd, 0x81, 0xad, 0xe7, + 0x4c, 0x9e, 0xb6, 0x0c, 0x19, 0x3d, 0x40, 0x6f, 0x29, 0xcb, 0x52, 0x95, 0x14, 0xa0, 0x65, 0x02, + 0xfe, 0xa8, 0x01, 0xf9, 0xd2, 0xf2, 0xc6, 0x2e, 0x7b, 0x16, 0x86, 0x7e, 0xb8, 0xb0, 0x57, 0x45, + 0xee, 0x7b, 0xa5, 0xde, 0x1e, 0xac, 0x8d, 0x1c, 0xcf, 0xf5, 0xa7, 0x5f, 0xf9, 0x91, 0xcc, 0xbd, + 0x6c, 0x81, 0xcb, 0x8a, 0x5e, 0xbb, 0xe9, 0x3c, 0xc2, 0xbf, 0x69, 0x04, 0x5b, 0x39, 0x48, 0x77, + 0x92, 0x60, 0xcf, 0x61, 0xe7, 0x65, 0x68, 0x79, 0xd1, 0x84, 0x85, 0xf9, 0x96, 0x2a, 0xbb, 0x25, + 0x34, 0xf5, 0x96, 0x50, 0x8a, 0x89, 0xd0, 0x2c, 0x29, 0xde, 0x72, 0x14, 0x05, 0x2d, 0x6b, 0xc0, + 0xe1, 0x08, 0x5a, 0x49, 0xe3, 0x46, 0xb6, 0xe0, 0x83, 0x53, 0xef, 0xca, 0x72, 0x9d, 0x71, 0xb2, + 0xd4, 0xad, 0x91, 0x0f, 0xa0, 0x8d, 0x2f, 0x28, 0x62, 0xa9, 0xab, 0x91, 0x2e, 0xac, 0x8b, 0x51, + 0x5d, 0xae, 0xd4, 0x49, 0x07, 0xe0, 0x3c, 0xf6, 0x03, 0x49, 0x37, 0x90, 0xbe, 0xf0, 0xaf, 0x25, + 0xdd, 0x3c, 0xfc, 0x29, 0xb4, 0x92, 0x6e, 0x41, 0xd1, 0x91, 0x2c, 0x75, 0x6b, 0x64, 0x13, 0x36, + 0x9e, 0x5d, 0x39, 0x76, 0x9c, 0x2e, 0x69, 0x64, 0x17, 0xb6, 0x4e, 0x2c, 0xcf, 0x66, 0x6e, 0x9e, + 0x51, 0x3f, 0xfc, 0x25, 0xac, 0xca, 0xd4, 0xe7, 0xd0, 0xa4, 0x2c, 0x4e, 0x76, 0x6b, 0x64, 0x1d, + 0x5a, 0xfc, 0x20, 0x22, 0xa5, 0x71, 0x18, 0x22, 0x2f, 0x91, 0x46, 0x98, 0x22, 0x24, 0x48, 0x0b, + 0x98, 0x08, 0x11, 0xe9, 0xe6, 0xf1, 0xbf, 0x37, 0x60, 0x45, 0xfc, 0x80, 0x7c, 0x0d, 0x6b, 0xe9, + 0xb3, 0x11, 0xc1, 0xab, 0xa3, 0xf8, 0x4e, 0x65, 0xec, 0x14, 0x56, 0x85, 0xe7, 0xe9, 0xc3, 0xdf, + 0xfe, 0xfd, 0x5f, 0x7f, 0xaa, 0xdf, 0xa7, 0xdb, 0x47, 0x56, 0xe0, 0x44, 0x47, 0x57, 0x4f, 0x2c, + 0x37, 0xb8, 0xb0, 0x9e, 0x1c, 0xf1, 0xd4, 0x8d, 0x3e, 0xd5, 0x0e, 0xc9, 0x04, 0xda, 0xca, 0xeb, + 0x0d, 0xe9, 0x71, 0x31, 0xe5, 0xf7, 0x22, 0x63, 0xb7, 0xb4, 0x2e, 0x15, 0x3c, 0x42, 0x05, 0xfb, + 0xc6, 0x83, 0x2a, 0x05, 0x47, 0x6f, 0xf8, 0xc9, 0xff, 0x96, 0xeb, 0xf9, 0x0c, 0x20, 0x7b, 0x51, + 0x21, 0x88, 0xb6, 0xf4, 0x48, 0x63, 0xf4, 0x8a, 0xcb, 0x52, 0x49, 0x8d, 0xb8, 0xd0, 0x56, 0x1e, + 0x1f, 0x88, 0x51, 0x78, 0x8d, 0x50, 0x5e, 0x4b, 0x8c, 0x07, 0x95, 0x3c, 0x29, 0xe9, 0x63, 0x84, + 0xdb, 0x27, 0x7b, 0x05, 0xb8, 0x11, 0x6e, 0x95, 0x78, 0xc9, 0x09, 0xac, 0xab, 0x33, 0x3e, 0x41, + 0xeb, 0x2b, 0x1e, 0x37, 0x0c, 0xbd, 0xcc, 0x48, 0x21, 0x7f, 0x01, 0x1b, 0xb9, 0xa9, 0x9a, 0xe0, + 0xe6, 0xaa, 0xb1, 0xde, 0xb8, 0x5f, 0xc1, 0x49, 0xe5, 0x7c, 0x0d, 0xbd, 0xf2, 0x14, 0x8c, 0x5e, + 0xfc, 0x50, 0x09, 0x4a, 0x79, 0x12, 0x35, 0xfa, 0x8b, 0xd8, 0xa9, 0xe8, 0x33, 0xe8, 0x16, 0xa7, + 0x45, 0x82, 0xee, 0x5b, 0x30, 0xdc, 0x1a, 0x7b, 0xd5, 0xcc, 0x54, 0xe0, 0xa7, 0xb0, 0x96, 0x0e, + 0x77, 0x22, 0x51, 0x8b, 0x33, 0xa1, 0x48, 0xd4, 0xd2, 0x04, 0x48, 0x6b, 0x64, 0x0a, 0x1b, 0xb9, + 0x79, 0x4b, 0xf8, 0xab, 0x6a, 0xd8, 0x13, 0xfe, 0xaa, 0x1c, 0xce, 0xe8, 0x47, 0x18, 0xe0, 0x07, + 0x46, 0xaf, 0x18, 0x60, 0x51, 0xed, 0x78, 0x2a, 0x9e, 0x42, 0x27, 0x3f, 0x1a, 0x91, 0xfb, 0xe2, + 0xf9, 0xa0, 0x62, 0xea, 0x32, 0x8c, 0x2a, 0x56, 0x8a, 0x39, 0x84, 0x8d, 0xdc, 0x84, 0x23, 0x31, + 0x57, 0x0c, 0x4d, 0x12, 0x73, 0xd5, 0x38, 0x44, 0x7f, 0x88, 0x98, 0x1f, 0x1d, 0x7e, 0x5c, 0xc0, + 0x2c, 0x1b, 0xa5, 0xa3, 0x37, 0xfc, 0x4e, 0xfd, 0x36, 0x49, 0xce, 0xcb, 0xd4, 0x4f, 0xa2, 0x0c, + 0xe5, 0xfc, 0x94, 0x9b, 0x92, 0x72, 0x7e, 0xca, 0x4f, 0x42, 0xf4, 0x13, 0xd4, 0xf9, 0xd0, 0x30, + 0x0a, 0x3a, 0x45, 0x23, 0x79, 0xf4, 0xc6, 0x0f, 0xf0, 0xd8, 0xfe, 0x0a, 0x20, 0x6b, 0x05, 0xc5, + 0xb1, 0x2d, 0x75, 0xa3, 0xe2, 0xd8, 0x96, 0x3b, 0x46, 0xda, 0x47, 0x1d, 0x3a, 0xe9, 0x55, 0xdb, + 0x45, 0x26, 0x59, 0xc4, 0xb1, 0xa3, 0xca, 0x47, 0x5c, 0x6d, 0x09, 0xf3, 0x11, 0xcf, 0xb5, 0x5f, + 0x74, 0x1f, 0xb5, 0x18, 0xc6, 0x4e, 0x31, 0xe2, 0xb8, 0x8d, 0x1b, 0xe1, 0x62, 0xff, 0x92, 0xb5, + 0x45, 0x42, 0x4f, 0x55, 0x57, 0x25, 0xf4, 0x54, 0xf6, 0x50, 0x49, 0xa5, 0x23, 0xfd, 0xa2, 0x9e, + 0xf9, 0x48, 0x2d, 0x76, 0xe4, 0x25, 0xac, 0x88, 0x3e, 0x87, 0x6c, 0x4a, 0x61, 0x8a, 0x7c, 0xa2, + 0x2e, 0x49, 0xc1, 0x3f, 0x40, 0xc1, 0x1f, 0x92, 0xdb, 0x4a, 0x28, 0xf9, 0x06, 0xda, 0x4a, 0x6b, + 0x20, 0xea, 0x74, 0xb9, 0x7d, 0x11, 0x75, 0xba, 0xa2, 0x87, 0x58, 0xe8, 0x25, 0xc6, 0x77, 0xe1, + 0xb1, 0x38, 0x81, 0x75, 0xb5, 0x75, 0x12, 0x45, 0xaf, 0xa2, 0xc7, 0x32, 0xf4, 0x32, 0x23, 0x3d, + 0x10, 0xa7, 0xd0, 0xc9, 0xf7, 0x00, 0xe2, 0x6c, 0x55, 0x36, 0x18, 0xe2, 0x6c, 0x55, 0xb7, 0x0c, + 0xb4, 0xf6, 0x54, 0xff, 0xdb, 0xdb, 0xbe, 0xf6, 0xfd, 0xdb, 0xbe, 0xf6, 0xcf, 0xb7, 0x7d, 0xed, + 0x0f, 0xef, 0xfa, 0xb5, 0xef, 0xdf, 0xf5, 0x6b, 0xff, 0x78, 0xd7, 0xaf, 0x8d, 0x56, 0xf0, 0xef, + 0x9a, 0x1f, 0xfd, 0x27, 0x00, 0x00, 0xff, 0xff, 0xc0, 0x6b, 0xd1, 0x5e, 0xf2, 0x19, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -3048,6 +3156,7 @@ type MasterClient interface { GetCfg(ctx context.Context, in *GetCfgRequest, opts ...grpc.CallOption) (*GetCfgResponse, error) HandleError(ctx context.Context, in *HandleErrorRequest, opts ...grpc.CallOption) (*HandleErrorResponse, error) GetMasterCfg(ctx context.Context, in *GetMasterCfgRequest, opts ...grpc.CallOption) (*GetMasterCfgResponse, error) + TransferSource(ctx context.Context, in *TransferSourceRequest, opts ...grpc.CallOption) (*TransferSourceResponse, error) } type masterClient struct { @@ -3229,6 +3338,15 @@ func (c *masterClient) GetMasterCfg(ctx context.Context, in *GetMasterCfgRequest return out, nil } +func (c *masterClient) TransferSource(ctx context.Context, in *TransferSourceRequest, opts ...grpc.CallOption) (*TransferSourceResponse, error) { + out := new(TransferSourceResponse) + err := c.cc.Invoke(ctx, "/pb.Master/TransferSource", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // MasterServer is the server API for Master service. type MasterServer interface { StartTask(context.Context, *StartTaskRequest) (*StartTaskResponse, error) @@ -3263,6 +3381,7 @@ type MasterServer interface { GetCfg(context.Context, *GetCfgRequest) (*GetCfgResponse, error) HandleError(context.Context, *HandleErrorRequest) (*HandleErrorResponse, error) GetMasterCfg(context.Context, *GetMasterCfgRequest) (*GetMasterCfgResponse, error) + TransferSource(context.Context, *TransferSourceRequest) (*TransferSourceResponse, error) } // UnimplementedMasterServer can be embedded to have forward compatible implementations. @@ -3326,6 +3445,9 @@ func (*UnimplementedMasterServer) HandleError(ctx context.Context, req *HandleEr func (*UnimplementedMasterServer) GetMasterCfg(ctx context.Context, req *GetMasterCfgRequest) (*GetMasterCfgResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetMasterCfg not implemented") } +func (*UnimplementedMasterServer) TransferSource(ctx context.Context, req *TransferSourceRequest) (*TransferSourceResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransferSource not implemented") +} func RegisterMasterServer(s *grpc.Server, srv MasterServer) { s.RegisterService(&_Master_serviceDesc, srv) @@ -3673,6 +3795,24 @@ func _Master_GetMasterCfg_Handler(srv interface{}, ctx context.Context, dec func return interceptor(ctx, in, info, handler) } +func _Master_TransferSource_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TransferSourceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MasterServer).TransferSource(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/pb.Master/TransferSource", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MasterServer).TransferSource(ctx, req.(*TransferSourceRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Master_serviceDesc = grpc.ServiceDesc{ ServiceName: "pb.Master", HandlerType: (*MasterServer)(nil), @@ -3753,6 +3893,10 @@ var _Master_serviceDesc = grpc.ServiceDesc{ MethodName: "GetMasterCfg", Handler: _Master_GetMasterCfg_Handler, }, + { + MethodName: "TransferSource", + Handler: _Master_TransferSource_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "dmmaster.proto", @@ -5915,6 +6059,83 @@ func (m *HandleErrorResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *TransferSourceRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TransferSourceRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TransferSourceRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Worker) > 0 { + i -= len(m.Worker) + copy(dAtA[i:], m.Worker) + i = encodeVarintDmmaster(dAtA, i, uint64(len(m.Worker))) + i-- + dAtA[i] = 0x12 + } + if len(m.Source) > 0 { + i -= len(m.Source) + copy(dAtA[i:], m.Source) + i = encodeVarintDmmaster(dAtA, i, uint64(len(m.Source))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *TransferSourceResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TransferSourceResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TransferSourceResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Msg) > 0 { + i -= len(m.Msg) + copy(dAtA[i:], m.Msg) + i = encodeVarintDmmaster(dAtA, i, uint64(len(m.Msg))) + i-- + dAtA[i] = 0x12 + } + if m.Result { + i-- + if m.Result { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarintDmmaster(dAtA []byte, offset int, v uint64) int { offset -= sovDmmaster(v) base := offset @@ -6894,6 +7115,39 @@ func (m *HandleErrorResponse) Size() (n int) { return n } +func (m *TransferSourceRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Source) + if l > 0 { + n += 1 + l + sovDmmaster(uint64(l)) + } + l = len(m.Worker) + if l > 0 { + n += 1 + l + sovDmmaster(uint64(l)) + } + return n +} + +func (m *TransferSourceResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Result { + n += 2 + } + l = len(m.Msg) + if l > 0 { + n += 1 + l + sovDmmaster(uint64(l)) + } + return n +} + func sovDmmaster(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -12941,6 +13195,228 @@ func (m *HandleErrorResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *TransferSourceRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDmmaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TransferSourceRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TransferSourceRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Source", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDmmaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDmmaster + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthDmmaster + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Source = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Worker", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDmmaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDmmaster + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthDmmaster + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Worker = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDmmaster(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDmmaster + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDmmaster + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TransferSourceResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDmmaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TransferSourceResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TransferSourceResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Result", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDmmaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Result = bool(v != 0) + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Msg", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDmmaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDmmaster + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthDmmaster + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Msg = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDmmaster(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDmmaster + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDmmaster + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipDmmaster(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/dm/proto/dmmaster.proto b/dm/proto/dmmaster.proto index 6b2a7746ea..5121122c78 100644 --- a/dm/proto/dmmaster.proto +++ b/dm/proto/dmmaster.proto @@ -104,6 +104,8 @@ service Master { } rpc GetMasterCfg(GetMasterCfgRequest) returns(GetMasterCfgResponse) {} + + rpc TransferSource(TransferSourceRequest) returns(TransferSourceResponse) {} } message StartTaskRequest { @@ -423,4 +425,14 @@ message HandleErrorResponse { bool result = 1; string msg = 2; repeated CommonWorkerResponse sources = 3; +} + +message TransferSourceRequest { + string source = 1; + string worker = 2; +} + +message TransferSourceResponse { + bool result = 1; + string msg = 2; } \ No newline at end of file diff --git a/go.mod b/go.mod index cb0c9eefc0..5027b54c05 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/dustin/go-humanize v1.0.0 github.com/go-sql-driver/mysql v1.5.0 github.com/gogo/gateway v1.1.0 - github.com/gogo/protobuf v1.3.1 + github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.4.4 github.com/golang/protobuf v1.3.4 github.com/google/uuid v1.1.1 diff --git a/go.sum b/go.sum index 0c0799b699..9e84a4c24f 100644 --- a/go.sum +++ b/go.sum @@ -370,6 +370,8 @@ github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gohxs/readline v0.0.0-20171011095936-a780388e6e7c/go.mod h1:9S/fKAutQ6wVHqm1jnp9D9sc5hu689s9AaTWFS92LaU= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= @@ -568,6 +570,7 @@ github.com/kevinburke/go-bindata v3.18.0+incompatible/go.mod h1:/pEEZ72flUW2p0yi github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0 h1:reN85Pxc5larApoH1keMBiu2GWtPqXQ1nc9gx+jOU+E= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.5 h1:U+CaK85mrNNb4k8BNOfgJtJ/gr6kswUCFj6miSzVC6M= @@ -1564,10 +1567,12 @@ golang.org/x/tools v0.0.0-20200521211927-2b542361a4fc/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200527183253-8e7acdbce89d h1:SR+e35rACZFBohNb4Om1ibX6N3iO0FtdbwqGSuD9dBU= golang.org/x/tools v0.0.0-20200527183253-8e7acdbce89d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200609164405-eb789aa7ce50/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200820010801-b793a1359eac/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200823205832-c024452afbcd h1:KNSumuk5eGuQV7zbOrDDZ3MIkwsQr0n5oKiH4oE0/hU= golang.org/x/tools v0.0.0-20200823205832-c024452afbcd/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/ha/bound.go b/pkg/ha/bound.go index 39ce801b62..2990323693 100644 --- a/pkg/ha/bound.go +++ b/pkg/ha/bound.go @@ -107,6 +107,20 @@ func DeleteSourceBound(cli *clientv3.Client, workers ...string) (int64, error) { return rev, err } +// ReplaceSourceBound do delete an old bound and put a new bound in one transaction, so a bound source will not become +// unbound because of failing halfway +func ReplaceSourceBound(cli *clientv3.Client, source, oldWorker, newWorker string) (int64, error) { + ops := make([]clientv3.Op, 0, 3) + ops = append(ops, deleteSourceBoundOp(oldWorker)) + op, err := putSourceBoundOp(NewSourceBound(source, newWorker)) + if err != nil { + return 0, err + } + ops = append(ops, op...) + _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...) + return rev, err +} + // GetSourceBound gets the source bound relationship for the specified DM-worker. // if the bound relationship for the worker name not exist, return with `err == nil`. // if the worker name is "", it will return all bound relationships as a map{worker-name: bound}. From 7bb67a2f40e8e5f583a949a01cf3ad0a91cf3bb3 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 9 Mar 2021 15:58:12 +0800 Subject: [PATCH 2/7] fix some comment --- dm/master/scheduler/scheduler.go | 3 +-- pkg/ha/bound.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 30c86dbc58..e43b9232ba 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -362,7 +362,7 @@ func (s *Scheduler) TransferSource(source, worker string) error { return terror.ErrSchedulerNotStarted.Generate() } - // 1. check existence.or no need + // 1. check existence or no need if _, ok := s.sourceCfgs[source]; !ok { return terror.ErrSchedulerSourceCfgExist.Generate(source) } @@ -393,7 +393,6 @@ func (s *Scheduler) TransferSource(source, worker string) error { } return err } - defer func() { _, err := s.tryBoundForWorker(oldWorker) if err != nil { diff --git a/pkg/ha/bound.go b/pkg/ha/bound.go index 2990323693..2ca61fb1d0 100644 --- a/pkg/ha/bound.go +++ b/pkg/ha/bound.go @@ -107,7 +107,7 @@ func DeleteSourceBound(cli *clientv3.Client, workers ...string) (int64, error) { return rev, err } -// ReplaceSourceBound do delete an old bound and put a new bound in one transaction, so a bound source will not become +// ReplaceSourceBound deletes an old bound and puts a new bound in one transaction, so a bound source will not become // unbound because of failing halfway func ReplaceSourceBound(cli *clientv3.Client, source, oldWorker, newWorker string) (int64, error) { ops := make([]clientv3.Op, 0, 3) From fecce7f3fddf4249111a69db548ff63fa612bf2d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 10 Mar 2021 11:11:38 +0800 Subject: [PATCH 3/7] add dmctl interface --- dm/ctl/ctl.go | 1 + dm/ctl/master/transfer_source.go | 68 +++++++++++++++++++ dm/master/scheduler/scheduler.go | 2 +- dm/master/server.go | 2 +- .../dmctl_basic/check_list/operate_source.sh | 1 - .../dmctl_basic/check_list/transfer_source.sh | 35 ++++++++++ tests/dmctl_basic/run.sh | 13 +++- tests/ha/run.sh | 11 +++ 8 files changed, 128 insertions(+), 5 deletions(-) create mode 100644 dm/ctl/master/transfer_source.go create mode 100644 tests/dmctl_basic/check_list/transfer_source.sh diff --git a/dm/ctl/ctl.go b/dm/ctl/ctl.go index 10a856e617..fa5a6c3a2f 100644 --- a/dm/ctl/ctl.go +++ b/dm/ctl/ctl.go @@ -74,6 +74,7 @@ func NewRootCmd() *cobra.Command { master.NewOperateSchemaCmd(), master.NewGetCfgCmd(), master.NewHandleErrorCmd(), + master.NewTransferSourceCmd(), ) // copied from (*cobra.Command).InitDefaultHelpCmd helpCmd := &cobra.Command{ diff --git a/dm/ctl/master/transfer_source.go b/dm/ctl/master/transfer_source.go new file mode 100644 index 0000000000..02b8e90f30 --- /dev/null +++ b/dm/ctl/master/transfer_source.go @@ -0,0 +1,68 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package master + +import ( + "context" + "errors" + "os" + + "github.com/pingcap/dm/dm/ctl/common" + "github.com/pingcap/dm/dm/pb" + + "github.com/spf13/cobra" +) + +// NewOperateSourceCmd creates a OperateSource command +func NewTransferSourceCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "transfer-source ", + Short: "Transfers a upstream MySQL/MariaDB source to a free worker.", + RunE: transferSourceFunc, + } + return cmd +} + +func transferSourceFunc(cmd *cobra.Command, _ []string) (err error) { + if len(cmd.Flags().Args()) != 2 { + cmd.SetOut(os.Stdout) + common.PrintCmdUsage(cmd) + err = errors.New("please check output to see error") + return + } + + sourceID := cmd.Flags().Arg(0) + workerID := cmd.Flags().Arg(1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resp := &pb.TransferSourceResponse{} + err = common.SendRequest( + ctx, + "TransferSource", + &pb.TransferSourceRequest{ + Source: sourceID, + Worker: workerID, + }, + &resp, + ) + + if err != nil { + return + } + + common.PrettyPrintResponse(resp) + return +} diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index e43b9232ba..0fecb203f0 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -364,7 +364,7 @@ func (s *Scheduler) TransferSource(source, worker string) error { // 1. check existence or no need if _, ok := s.sourceCfgs[source]; !ok { - return terror.ErrSchedulerSourceCfgExist.Generate(source) + return terror.ErrSchedulerSourceCfgNotExist.Generate(source) } w, ok := s.workers[worker] if !ok { diff --git a/dm/master/server.go b/dm/master/server.go index 3fcc2d6490..6fb4bcde87 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1985,7 +1985,7 @@ func (s *Server) HandleError(ctx context.Context, req *pb.HandleErrorRequest) (* // TransferSource implements MasterServer.TransferSource func (s *Server) TransferSource(ctx context.Context, req *pb.TransferSourceRequest) (*pb.TransferSourceResponse, error) { var ( - resp2 *pb.TransferSourceResponse + resp2 = &pb.TransferSourceResponse{} err2 error ) shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) diff --git a/tests/dmctl_basic/check_list/operate_source.sh b/tests/dmctl_basic/check_list/operate_source.sh index 8c0e58c126..113923b840 100644 --- a/tests/dmctl_basic/check_list/operate_source.sh +++ b/tests/dmctl_basic/check_list/operate_source.sh @@ -25,4 +25,3 @@ function operate_source_invalid_op() { "operate-source invalid $source_conf" \ "invalid operate" 1 } - diff --git a/tests/dmctl_basic/check_list/transfer_source.sh b/tests/dmctl_basic/check_list/transfer_source.sh new file mode 100644 index 0000000000..dee47d82ab --- /dev/null +++ b/tests/dmctl_basic/check_list/transfer_source.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +function transfer_source_empty_arg() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "transfer-source" \ + "transfer-source " 1 +} + +function transfer_source_less_arg() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "transfer-source source-id" \ + "transfer-source " 1 +} + +function transfer_source_more_arg() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "transfer-source source-id worker-id another-worker" \ + "transfer-source " 1 +} + +function transfer_source_valid() { + source_id=$1 + worker_id=$2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "transfer-source $1 $2" \ + "\"result\": true" 1 +} + +function transfer_source_invalid() { + source_id=$1 + worker_id=$2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "transfer-source $1 $2" \ + "invalid stage transformation for dm-worker $2" 1 +} diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index 7e25083429..51ed147d7a 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -67,6 +67,11 @@ function usage_and_arg_test() { operate_source_wrong_config_file operate_source_invalid_op $MYSQL1_CONF operate_source_stop_not_created_config $MYSQL1_CONF + + echo "transfer_source_empty_arg" + transfer_source_empty_arg + transfer_source_less_arg + transfer_source_more_arg } function recover_max_binlog_size() { @@ -130,12 +135,13 @@ function run() { "operate-source stop $cur/conf/source1.yaml $SOURCE_ID2" \ "\"result\": true" 3 + # ensure source1 is bound to worker1 dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $dm_worker1_conf check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "operate-source show" \ "\"result\": true" 3 \ @@ -178,6 +184,9 @@ function run() { "\"stage\": \"Running\"" 4 # update_task_not_paused $TASK_CONF + transfer_source_valid $SOURCE_ID1 worker1 # transfer to self + transfer_source_invalid $SOURCE_ID1 worker2 + echo "get_config" get_config_wrong_arg get_config_to_file diff --git a/tests/ha/run.sh b/tests/ha/run.sh index 647783c6da..dce949264b 100755 --- a/tests/ha/run.sh +++ b/tests/ha/run.sh @@ -88,6 +88,17 @@ function run() { echo "wait and check task running" check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Running"' 10 + # manually transfer a exist source to a newly started worker + run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "transfer-source $SOURCE_ID1 worker3" \ + "\"result\": true" 1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member --name worker3" \ + "$SOURCE_ID1" 1 + echo "query-status from all dm-master" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ "query-status test" \ From 818f92849cbd1d0a542342b3c0051a9eb1a36eef Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 10 Mar 2021 13:03:17 +0800 Subject: [PATCH 4/7] fix CI --- dm/ctl/master/transfer_source.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/ctl/master/transfer_source.go b/dm/ctl/master/transfer_source.go index 02b8e90f30..3b6574008d 100644 --- a/dm/ctl/master/transfer_source.go +++ b/dm/ctl/master/transfer_source.go @@ -24,7 +24,7 @@ import ( "github.com/spf13/cobra" ) -// NewOperateSourceCmd creates a OperateSource command +// NewTransferSourceCmd creates a TransferSource command func NewTransferSourceCmd() *cobra.Command { cmd := &cobra.Command{ Use: "transfer-source ", From 2561700e0b0565b47cbd7884b28c1d3643422160 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 10 Mar 2021 13:26:10 +0800 Subject: [PATCH 5/7] refine logic --- dm/master/scheduler/scheduler.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 0fecb203f0..baa964ae67 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -381,24 +381,17 @@ func (s *Scheduler) TransferSource(source, worker string) error { return terror.ErrSchedulerWorkerInvalidTrans.Generate(worker, stage, WorkerBound) } - // 3. deal with old worker + // 3. if no old worker, bound it directly if !hasOldWorker { s.logger.Warn("in transfer source, found a free worker and not bound source, which should not happened", zap.String("source", source), zap.String("worker", worker)) - // 3.1 bound it directly err := s.boundSourceToWorker(source, w) if err == nil { delete(s.unbounds, source) } return err } - defer func() { - _, err := s.tryBoundForWorker(oldWorker) - if err != nil { - s.logger.Warn("in transfer source, error when try bound the old worker", zap.Error(err)) - } - }() // 4. replace the source bound failpoint.Inject("failToReplaceSourceBound", func(_ failpoint.Value) { @@ -411,6 +404,12 @@ func (s *Scheduler) TransferSource(source, worker string) error { oldWorker.ToFree() // we have checked w.stage is free, so there should not be an error _ = s.updateStatusForBound(w, ha.NewSourceBound(source, worker)) + + // 5. try bound the old worker + _, err = s.tryBoundForWorker(oldWorker) + if err != nil { + s.logger.Warn("in transfer source, error when try bound the old worker", zap.Error(err)) + } return nil } From e12156dc73f70c0ae08e45c5220a2dece62333bb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 10 Mar 2021 14:05:39 +0800 Subject: [PATCH 6/7] fix CI --- tests/dmctl_command/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dmctl_command/run.sh b/tests/dmctl_command/run.sh index e8edc96bfe..15a3603606 100644 --- a/tests/dmctl_command/run.sh +++ b/tests/dmctl_command/run.sh @@ -6,7 +6,7 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME -help_cnt=34 +help_cnt=35 function run() { # check dmctl output with help flag From 30f87fbb3f182f6c1eb354a8f211d8a586ae2c17 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 12 Mar 2021 13:46:03 +0800 Subject: [PATCH 7/7] error when there's running task on that source --- _utils/terror_gen/errors_release.txt | 1 + dm/master/scheduler/scheduler.go | 19 +++++++++++++++++-- dm/master/scheduler/scheduler_test.go | 9 +++++++-- errors.toml | 6 ++++++ pkg/terror/error_list.go | 2 ++ tests/dmctl_basic/run.sh | 6 +++--- tests/ha/run.sh | 11 +++++++++++ 7 files changed, 47 insertions(+), 7 deletions(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 57698c9304..878ff21732 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -490,6 +490,7 @@ ErrSchedulerSubTaskStageInvalidUpdate,[code=46015:class=dm-master:scope=internal ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:level=medium], "Message: subtasks with name %s need to be operate not exist, Workaround: Please use `query-status` command to see tasks." ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium], "Message: sources %v need to be operate not exist" ErrSchedulerTaskNotExist,[code=46018:class=scheduler:scope=internal:level=medium], "Message: task with name %s not exist, Workaround: Please use `query-status` command to see tasks." +ErrSchedulerRequireNotRunning,[code=46019:class=scheduler:scope=internal:level=high], "Message: tasks %v on source %s should not be running, Workaround: Please use `pause-task [-s source ...] task` to pause them first" ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection." ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line." ErrNotSet,[code=50000:class=not-set:scope=not-set:level=high] diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index baa964ae67..96f981ceab 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -393,7 +393,22 @@ func (s *Scheduler) TransferSource(source, worker string) error { return err } - // 4. replace the source bound + // 4. if there's old worker, make sure it's not running + var runningTasks []string + for task, subtaskM := range s.expectSubTaskStages { + subtaskStage, ok2 := subtaskM[source] + if !ok2 { + continue + } + if subtaskStage.Expect == pb.Stage_Running { + runningTasks = append(runningTasks, task) + } + } + if len(runningTasks) > 0 { + return terror.ErrSchedulerRequireNotRunning.Generate(runningTasks, source) + } + + // 5. replace the source bound failpoint.Inject("failToReplaceSourceBound", func(_ failpoint.Value) { failpoint.Return(errors.New("failToPutSourceBound")) }) @@ -405,7 +420,7 @@ func (s *Scheduler) TransferSource(source, worker string) error { // we have checked w.stage is free, so there should not be an error _ = s.updateStatusForBound(w, ha.NewSourceBound(source, worker)) - // 5. try bound the old worker + // 6. try bound the old worker _, err = s.tryBoundForWorker(oldWorker) if err != nil { s.logger.Warn("in transfer source, error when try bound the old worker", zap.Error(err)) diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 3914cd1565..992cdcda09 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -1103,8 +1103,13 @@ func (t *testScheduler) TestTransferSource(c *C) { // test fail halfway won't left old worker unbound c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/master/scheduler/failToReplaceSourceBound", `return()`), IsNil) - //nolint:errcheck - defer failpoint.Disable("github.com/pingcap/dm/dm/master/scheduler/failToReplaceSourceBound") + c.Assert(s.TransferSource(sourceID1, workerName1), NotNil) + c.Assert(s.bounds[sourceID1], DeepEquals, worker4) + c.Assert(worker1.Stage(), Equals, WorkerFree) + c.Assert(failpoint.Disable("github.com/pingcap/dm/dm/master/scheduler/failToReplaceSourceBound"), IsNil) + + // test can't transfer when there's any running task on the source + s.expectSubTaskStages["test"] = map[string]ha.Stage{sourceID1: {Expect: pb.Stage_Running}} c.Assert(s.TransferSource(sourceID1, workerName1), NotNil) c.Assert(s.bounds[sourceID1], DeepEquals, worker4) c.Assert(worker1.Stage(), Equals, WorkerFree) diff --git a/errors.toml b/errors.toml index 710941f476..32104b2713 100644 --- a/errors.toml +++ b/errors.toml @@ -2950,6 +2950,12 @@ description = "" workaround = "Please use `query-status` command to see tasks." tags = ["internal", "medium"] +[error.DM-scheduler-46019] +message = "tasks %v on source %s should not be running" +description = "" +workaround = "Please use `pause-task [-s source ...] task` to pause them first" +tags = ["internal", "high"] + [error.DM-dmctl-48001] message = "can not create grpc connection" description = "" diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 1f07430a23..c40f237c95 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -603,6 +603,7 @@ const ( codeSchedulerSubTaskOpTaskNotExist codeSchedulerSubTaskOpSourceNotExist codeSchedulerTaskNotExist + codeSchedulerRequireNotRunning ) // dmctl error code @@ -1192,6 +1193,7 @@ var ( ErrSchedulerSubTaskOpTaskNotExist = New(codeSchedulerSubTaskOpTaskNotExist, ClassDMMaster, ScopeInternal, LevelMedium, "subtasks with name %s need to be operate not exist", "Please use `query-status` command to see tasks.") ErrSchedulerSubTaskOpSourceNotExist = New(codeSchedulerSubTaskOpSourceNotExist, ClassDMMaster, ScopeInternal, LevelMedium, "sources %v need to be operate not exist", "") ErrSchedulerTaskNotExist = New(codeSchedulerTaskNotExist, ClassScheduler, ScopeInternal, LevelMedium, "task with name %s not exist", "Please use `query-status` command to see tasks.") + ErrSchedulerRequireNotRunning = New(codeSchedulerRequireNotRunning, ClassScheduler, ScopeInternal, LevelHigh, "tasks %v on source %s should not be running", "Please use `pause-task [-s source ...] task` to pause them first") // dmctl ErrCtlGRPCCreateConn = New(codeCtlGRPCCreateConn, ClassDMCtl, ScopeInternal, LevelHigh, "can not create grpc connection", "Please check your network connection.") diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index 51ed147d7a..a3070b83f1 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -157,6 +157,9 @@ function run() { '"worker": "worker1"' 1 \ '"worker": "worker2"' 1 + transfer_source_valid $SOURCE_ID1 worker1 # transfer to self + transfer_source_invalid $SOURCE_ID1 worker2 + echo "pause_relay_success" pause_relay_success query_status_stopped_relay @@ -184,9 +187,6 @@ function run() { "\"stage\": \"Running\"" 4 # update_task_not_paused $TASK_CONF - transfer_source_valid $SOURCE_ID1 worker1 # transfer to self - transfer_source_invalid $SOURCE_ID1 worker2 - echo "get_config" get_config_wrong_arg get_config_to_file diff --git a/tests/ha/run.sh b/tests/ha/run.sh index dce949264b..70b093fb63 100755 --- a/tests/ha/run.sh +++ b/tests/ha/run.sh @@ -90,10 +90,21 @@ function run() { # manually transfer a exist source to a newly started worker run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml + + # pause task first check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "transfer-source $SOURCE_ID1 worker3" \ + "tasks \[test\] on source $SOURCE_ID1 should not be running" 1 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "pause-task -s $SOURCE_ID1 test" \ + "\"result\": true" 2 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "transfer-source $SOURCE_ID1 worker3" \ "\"result\": true" 1 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task -s $SOURCE_ID1 test" \ + "\"result\": true" 2 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "list-member --name worker3" \