Skip to content

Commit

Permalink
feat: check dts status whether the replication task is finished
Browse files Browse the repository at this point in the history
  • Loading branch information
csynineyang committed Apr 20, 2024
1 parent faad14e commit 446ead0
Showing 1 changed file with 55 additions and 19 deletions.
74 changes: 55 additions & 19 deletions pkg/admin/config_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (cs *MyConfigService) UpsertCluster(ctx context.Context, tenant, cluster st
return nil
}

func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, src, dst string, vtables []*TableDTO, idx int) map[string]interface{} {
func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, src, dst string, srcNode, dstNode *NodeDTO, vtables []*TableDTO, idx int) map[string]interface{} {
jobJson := make(map[string]interface{})
jobBody := make(map[string]interface{})
jobJson["Job"] = jobBody
Expand Down Expand Up @@ -382,16 +382,16 @@ func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, sr
jobReplicate = append(jobReplicate, jobDatabase)
jobConfig["ReplicateDoDb"] = jobReplicate
jobSrcConfig := make(map[string]interface{})
jobSrcConfig["Host"] = "arana80"
jobSrcConfig["Port"] = 3306
jobSrcConfig["User"] = "root"
jobSrcConfig["Password"] = "123456"
jobSrcConfig["Host"] = srcNode.Host
jobSrcConfig["Port"] = srcNode.Port
jobSrcConfig["User"] = srcNode.Username
jobSrcConfig["Password"] = srcNode.Password
jobConfig["SrcConnectionConfig"] = jobSrcConfig
jobDstConfig := make(map[string]interface{})
jobDstConfig["Host"] = "arana80"
jobDstConfig["Port"] = 3306
jobDstConfig["User"] = "root"
jobDstConfig["Password"] = "123456"
jobDstConfig["Host"] = dstNode.Host
jobDstConfig["Port"] = dstNode.Port
jobDstConfig["User"] = dstNode.Username
jobDstConfig["Password"] = dstNode.Password
jobConfig["DestConnectionConfig"] = jobDstConfig
jobTask["Config"] = jobConfig
jobSrc["Tasks"] = append(jobTasks, jobTask)
Expand Down Expand Up @@ -420,22 +420,37 @@ func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster st
if len(groups) != len(body.Groups) {
return perrors.Errorf("new groups is not equle to old groups")
}
vtables, err := cs.ListTables(ctx, tenant, cluster)
if err != nil {
return err
}
allNodes, err := cs.ListNodes(ctx, tenant)
if err != nil {
return err
}

//2、创建复制group(物理数据库)的dts任务
//groups[0] --> body.Groups[0]
//groups[1] --> body.Groups[1]
//...
vtables, err := cs.ListTables(ctx, tenant, cluster)
if err != nil {
return err
}
httpClient := &http.Client{}
dtsJobList := make([]map[string]interface{}, 0, len(groups))
dtsEndpoint := config.BootOpts.Dts.Options["endpoints"].(string)
for i := range groups {
srcGroup := groups[i].Name
var srcNode, dstNode *NodeDTO
for n := range allNodes {
if allNodes[n].Database == srcGroup {
srcNode = allNodes[n]
}
}
dstGroup := body.Groups[i]
dtsJob := cs.buildDTSJson(ctx, tenant, cluster, srcGroup, dstGroup, vtables, i)
for n := range allNodes {
if allNodes[n].Database == dstGroup {
dstNode = allNodes[n]
}
}
dtsJob := cs.buildDTSJson(ctx, tenant, cluster, srcGroup, dstGroup, srcNode, dstNode, vtables, i)
if dtsJob == nil {
return perrors.Errorf("failed to build DTS json parameter")
}
Expand All @@ -454,9 +469,34 @@ func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster st
}

//3、检查是否复制完毕
for {
time.Sleep(5 * time.Second)
finished := false

for i := range dtsJobList {
dtsJob := dtsJobList[i]["Job"].(map[string]interface{})
httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string)
httpReq, err := http.NewRequest("GET", httpURL, nil)
if err != nil {
return perrors.Errorf("failed to create GET http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to check replica source group")
}

//TODO: check Status
finished = true
httpResp.Body.Close()
}

if finished {
break
}
}

//4、断开并拒绝所有客户端连接
time.Sleep(5 * time.Second)

//5、再次检查是否复制完毕

Expand All @@ -477,10 +517,6 @@ func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster st
}

//7、更新groups节点
allNodes, err := cs.ListNodes(ctx, tenant)
if err != nil {
return err
}
var groupBody GroupDTO
var groupNode string
for i := range body.Groups {
Expand Down

0 comments on commit 446ead0

Please sign in to comment.