From 1ea2d2d459bbbde5493c2ead3d1b8c5550ede58f Mon Sep 17 00:00:00 2001 From: csynineyang <93956978+csynineyang@users.noreply.github.com> Date: Sun, 12 May 2024 20:46:59 +0800 Subject: [PATCH] Support dts to replica databases/groups (#834) * Support dts to replica databases/groups * feat: check dts status whether the replication task is finished --- conf/bootstrap.yaml | 6 + pkg/admin/config.go | 3 + pkg/admin/config_service.go | 250 +++++++++++++++++++++++ pkg/admin/router/clusters.go | 18 ++ pkg/config/misc.go | 3 + pkg/config/model.go | 7 + pkg/runtime/optimize/ddl/create_table.go | 5 +- scripts/sharding.sql | 1 + 8 files changed, 291 insertions(+), 2 deletions(-) diff --git a/conf/bootstrap.yaml b/conf/bootstrap.yaml index 5d3e23d0..1a0d0ec4 100644 --- a/conf/bootstrap.yaml +++ b/conf/bootstrap.yaml @@ -31,6 +31,12 @@ registry: options: endpoints: "http://etcd:2379" +dts: + enable: false + name: dtle + options: + endpoints: "http://dtle:4646" + # name: nacos # options: # endpoints: "127.0.0.1:8848" diff --git a/pkg/admin/config.go b/pkg/admin/config.go index 11a7b744..b2585a95 100644 --- a/pkg/admin/config.go +++ b/pkg/admin/config.go @@ -116,6 +116,9 @@ type configWriter interface { // UpsertCluster upserts a cluster into an existing tenant. UpsertCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error + // ExtendCluster extends a cluster in an existing tenant. + ExtendCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error + // RemoveCluster removes a cluster from an existing tenant. RemoveCluster(ctx context.Context, tenant, cluster string) error diff --git a/pkg/admin/config_service.go b/pkg/admin/config_service.go index a0e19a3e..5f433916 100644 --- a/pkg/admin/config_service.go +++ b/pkg/admin/config_service.go @@ -18,13 +18,18 @@ package admin import ( + "bytes" "context" + "encoding/json" "errors" + "fmt" + "net/http" "reflect" "sort" "strings" "sync" "sync/atomic" + "time" ) import ( @@ -336,6 +341,251 @@ func (cs *MyConfigService) UpsertCluster(ctx context.Context, tenant, cluster st return nil } +func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, src, dst string, srcNode, dstNode *NodeDTO, vtables []*TableDTO, idx int) map[string]interface{} { + jobJson := make(map[string]interface{}) + jobBody := make(map[string]interface{}) + jobJson["Job"] = jobBody + + jobId := tenant + "-" + cluster + "-" + src + "-" + dst + "-" + time.Now().Format("20060102150405") + jobBody["ID"] = jobId + jobBody["Datacenters"] = []string{"dc1"} + jobGroups := make([]interface{}, 0, 2) + + jobSrc := make(map[string]interface{}) + jobSrc["Name"] = "src" + + jobTasks := make([]interface{}, 0, 1) + jobTask := make(map[string]interface{}) + jobTask["Name"] = "src" + jobTask["Driver"] = "dtle" + jobConfig := make(map[string]interface{}) + jobConfig["Gtid"] = "" + jobReplicate := make([]interface{}, 0, 1) + jobDatabase := make(map[string]interface{}) + jobDatabase["TableSchema"] = src + jobDatabase["TableSchemaRename"] = dst + jobTables := []map[string]string{} + for i := range vtables { + vTable := vtables[i] + _, _, dbEnd, _ := config.ParseTopology(vTable.Topology.DbPattern) + tbFormat, _, tbEnd, _ := config.ParseTopology(vTable.Topology.TblPattern) + tableNum := int((tbEnd + 1) / (dbEnd + 1)) + for j := 0; j < tableNum; j++ { + jobTable := map[string]string{} + jobTable["TableName"] = fmt.Sprintf(tbFormat, idx*tableNum+j) + jobTable["TableRename"] = fmt.Sprintf(tbFormat, idx*tableNum+j+tbEnd+1) + jobTables = append(jobTables, jobTable) + } + } + jobDatabase["Tables"] = jobTables + + jobReplicate = append(jobReplicate, jobDatabase) + jobConfig["ReplicateDoDb"] = jobReplicate + jobSrcConfig := make(map[string]interface{}) + jobSrcConfig["Host"] = srcNode.Host + jobSrcConfig["Port"] = srcNode.Port + jobSrcConfig["User"] = srcNode.Username + jobSrcConfig["Password"] = srcNode.Password + jobConfig["SrcConnectionConfig"] = jobSrcConfig + jobDstConfig := make(map[string]interface{}) + jobDstConfig["Host"] = dstNode.Host + jobDstConfig["Port"] = dstNode.Port + jobDstConfig["User"] = dstNode.Username + jobDstConfig["Password"] = dstNode.Password + jobConfig["DestConnectionConfig"] = jobDstConfig + jobTask["Config"] = jobConfig + jobSrc["Tasks"] = append(jobTasks, jobTask) + jobGroups = append(jobGroups, jobSrc) + + jobDst := make(map[string]interface{}) + jobDst["Name"] = "dest" + jobTasks = make([]interface{}, 0, 1) + jobTask = make(map[string]interface{}) + jobTask["Name"] = "dest" + jobTask["Driver"] = "dtle" + jobTask["Config"] = map[string]string{"DestType": "mysql"} + jobDst["Tasks"] = append(jobTasks, jobTask) + jobGroups = append(jobGroups, jobDst) + jobBody["TaskGroups"] = jobGroups + + return jobJson +} + +func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error { + //1、校验node和group,保证node和group翻倍(缩容将node和group减半,流程同理) + groups, err := cs.ListDBGroups(ctx, tenant, cluster) + if err != nil { + return err + } + if len(groups) != len(body.Groups) { + return perrors.Errorf("new groups is not equle to old groups") + } + vtables, err := cs.ListTables(ctx, tenant, cluster) + if err != nil { + return err + } + allNodes, err := cs.ListNodes(ctx, tenant) + if err != nil { + return err + } + + //2、创建复制group(物理数据库)的dts任务 + //groups[0] --> body.Groups[0] + //groups[1] --> body.Groups[1] + //... + httpClient := &http.Client{} + dtsJobList := make([]map[string]interface{}, 0, len(groups)) + dtsEndpoint := config.BootOpts.Dts.Options["endpoints"].(string) + for i := range groups { + srcGroup := groups[i].Name + var srcNode, dstNode *NodeDTO + for n := range allNodes { + if allNodes[n].Database == srcGroup { + srcNode = allNodes[n] + } + } + dstGroup := body.Groups[i] + for n := range allNodes { + if allNodes[n].Database == dstGroup { + dstNode = allNodes[n] + } + } + dtsJob := cs.buildDTSJson(ctx, tenant, cluster, srcGroup, dstGroup, srcNode, dstNode, vtables, i) + if dtsJob == nil { + return perrors.Errorf("failed to build DTS json parameter") + } + dtsJobList = append(dtsJobList, dtsJob) + dtsJobJson, _ := json.Marshal(dtsJob) + httpReq, err := http.NewRequest("POST", dtsEndpoint+"/v1/jobs", bytes.NewBuffer(dtsJobJson)) + if err != nil { + return perrors.Errorf("failed to create POST http requst") + } + + httpResp, err := httpClient.Do(httpReq) + if err != nil { + return perrors.Errorf("failed to start to replica source group") + } + httpResp.Body.Close() + } + + //3、检查是否复制完毕 + for { + time.Sleep(5 * time.Second) + finished := false + + for i := range dtsJobList { + dtsJob := dtsJobList[i]["Job"].(map[string]interface{}) + httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string) + httpReq, err := http.NewRequest("GET", httpURL, nil) + if err != nil { + return perrors.Errorf("failed to create GET http requst") + } + + httpResp, err := httpClient.Do(httpReq) + if err != nil { + return perrors.Errorf("failed to check replica source group") + } + + //TODO: check Status + finished = true + httpResp.Body.Close() + } + + if finished { + break + } + } + + //4、断开并拒绝所有客户端连接 + + //5、再次检查是否复制完毕 + + //6、停止dts任务 + for i := range groups { + dtsJob := dtsJobList[i]["Job"].(map[string]interface{}) + httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string) + httpReq, err := http.NewRequest("DELETE", httpURL, nil) + if err != nil { + return perrors.Errorf("failed to create DELETE http requst") + } + + httpResp, err := httpClient.Do(httpReq) + if err != nil { + return perrors.Errorf("failed to stop to replica source group") + } + httpResp.Body.Close() + } + + //7、更新groups节点 + var groupBody GroupDTO + var groupNode string + for i := range body.Groups { + groupNode = "" + for n := range allNodes { + if allNodes[n].Database == body.Groups[i] { + groupNode = allNodes[n].Name + } + } + if strings.Compare(groupNode, "") == 0 { + continue + } + groupBody.ClusterName = cluster + groupBody.Name = body.Groups[i] + groupBody.Nodes = []string{groupNode} + err = cs.UpsertGroup(ctx, tenant, cluster, groupBody.Name, &groupBody) + if err != nil { + return err + } + } + + //8、更新sharding路由 + var tableBody TableDTO + for i := range vtables { + vTable := vtables[i] + _, _, dbEnd, err := config.ParseTopology(vTable.Topology.DbPattern) + if err != nil { + return err + } + _, _, tbEnd, err := config.ParseTopology(vTable.Topology.TblPattern) + if err != nil { + return err + } + dbTotal := 2 * (dbEnd + 1) + tableTotal := 2 * (tbEnd + 1) + + tableBody.Name = vTable.Name + tableBody.Sequence = vTable.Sequence + tableBody.DbRules = []*config.Rule{ + { + Columns: vTable.DbRules[0].Columns, + Type: vTable.DbRules[0].Type, + Expr: "$0 % " + fmt.Sprintf("%d", tableTotal) + " / " + fmt.Sprintf("%d", dbTotal), + }, + } + tableBody.TblRules = []*config.Rule{ + { + Columns: vTable.TblRules[0].Columns, + Type: vTable.TblRules[0].Type, + Expr: "$0 % " + fmt.Sprintf("%d", tableTotal), + }, + } + tableBody.Topology = &config.Topology{ + DbPattern: cluster + fmt.Sprintf("_${0000..%04d}", dbTotal-1), + TblPattern: vTable.Name + fmt.Sprintf("_${0000..%04d}", tableTotal-1), + } + tableBody.ShadowTopology = vTable.ShadowTopology + tableBody.Attributes = vTable.Attributes + err = cs.UpsertTable(ctx, tenant, cluster, tableBody.Name, &tableBody) + if err != nil { + return err + } + } + + //9、接受客户端连接 + + return nil +} + func (cs *MyConfigService) RemoveCluster(ctx context.Context, tenant, cluster string) error { op, err := cs.getCenter(ctx, tenant) if err != nil { diff --git a/pkg/admin/router/clusters.go b/pkg/admin/router/clusters.go index 437c69ab..d7a319fe 100644 --- a/pkg/admin/router/clusters.go +++ b/pkg/admin/router/clusters.go @@ -36,6 +36,7 @@ func init() { router.POST("/tenants/:tenant/clusters", CreateCluster) router.GET("/tenants/:tenant/clusters/:cluster", GetCluster) router.PUT("/tenants/:tenant/clusters/:cluster", UpdateCluster) + router.POST("/tenants/:tenant/clusters/:cluster", ExtendCluster) router.DELETE("/tenants/:tenant/clusters/:cluster", RemoveCluster) }) } @@ -107,6 +108,23 @@ func UpdateCluster(c *gin.Context) error { return nil } +func ExtendCluster(c *gin.Context) error { + service := admin.GetService(c) + tenant := c.Param("tenant") + cluster := c.Param("cluster") + var clusterBody admin.ClusterDTO + if err := c.ShouldBindJSON(&clusterBody); err != nil { + return exception.Wrap(exception.CodeInvalidParams, err) + } + + err := service.ExtendCluster(c, tenant, cluster, &clusterBody) + if err != nil { + return err + } + c.JSON(http.StatusOK, "success") + return nil +} + func RemoveCluster(c *gin.Context) error { service := admin.GetService(c) tenant := c.Param("tenant") diff --git a/pkg/config/misc.go b/pkg/config/misc.go index d5fd0629..2608fe1c 100644 --- a/pkg/config/misc.go +++ b/pkg/config/misc.go @@ -35,6 +35,8 @@ import ( "github.com/arana-db/arana/pkg/util/log" ) +var BootOpts *BootOptions + // LoadBootOptions loads BootOptions from specified file path. func LoadBootOptions(path string) (*BootOptions, error) { content, err := os.ReadFile(path) @@ -59,6 +61,7 @@ func LoadBootOptions(path string) (*BootOptions, error) { return nil, errors.Wrap(err, "failed to validate boot config") } + BootOpts = &cfg log.Init(cfg.Logging) return &cfg, nil } diff --git a/pkg/config/model.go b/pkg/config/model.go index d52c81f5..3d0cedda 100644 --- a/pkg/config/model.go +++ b/pkg/config/model.go @@ -80,11 +80,18 @@ type ( Options map[string]interface{} `yaml:"options" json:"options"` } + Dts struct { + Enable bool `yaml:"enable" json:"enable"` + Name string `yaml:"name" json:"name"` + Options map[string]interface{} `yaml:"options" json:"options"` + } + BootOptions struct { Spec `yaml:",inline"` Config *Options `yaml:"config" json:"config"` Listeners []*Listener `validate:"required,dive" yaml:"listeners" json:"listeners"` Registry *Registry `yaml:"registry" json:"registry"` + Dts *Dts `yaml:"dts" json:"dts"` Trace *Trace `yaml:"trace" json:"trace"` Supervisor *User `validate:"required,dive" yaml:"supervisor" json:"supervisor"` Logging *log.Config `validate:"required,dive" yaml:"logging" json:"logging"` diff --git a/pkg/runtime/optimize/ddl/create_table.go b/pkg/runtime/optimize/ddl/create_table.go index f0ad0ba2..7697fbc0 100644 --- a/pkg/runtime/optimize/ddl/create_table.go +++ b/pkg/runtime/optimize/ddl/create_table.go @@ -124,14 +124,15 @@ func drdsCreateTable(ctx context.Context, o *optimize.Optimizer) (*rule.VTable, { Columns: []*config.ColumnRule{{Name: dbColume}}, Expr: strings.ReplaceAll(dbColume, dbColume, "$0") + " % " + - fmt.Sprintf("%d", stmt.Partition.Num), + fmt.Sprintf("%d", stmt.Partition.Num*stmt.Partition.Sub.Num) + " / " + + fmt.Sprintf("%d", stmt.Partition.Sub.Num), }, }, TblRules: []*config.Rule{ { Columns: []*config.ColumnRule{{Name: tbColume}}, Expr: strings.ReplaceAll(tbColume, tbColume, "$0") + " % " + - fmt.Sprintf("%d", stmt.Partition.Sub.Num), + fmt.Sprintf("%d", stmt.Partition.Num*stmt.Partition.Sub.Num), }, }, Topology: &config.Topology{ diff --git a/scripts/sharding.sql b/scripts/sharding.sql index e538f6b3..54f4ae23 100644 --- a/scripts/sharding.sql +++ b/scripts/sharding.sql @@ -83,6 +83,7 @@ CREATE TABLE IF NOT EXISTS `employees_0000_r`.`student_0006` LIKE `employees_000 CREATE TABLE IF NOT EXISTS `employees_0000_r`.`student_0007` LIKE `employees_0000`.`student_0000`; INSERT INTO employees_0000.student_0001(id,uid,name,score,nickname,gender,birth_year,created_at,modified_at) VALUES (1, 1, 'arana', 95, 'Awesome Arana', 0, 2021, NOW(), NOW()); +INSERT INTO employees_0000.student_0001(id,uid,name,score,nickname,gender,birth_year,created_at,modified_at) VALUES (33, 33, 'arana33', 95, 'Awesome Arana', 0, 2021, NOW(), NOW()); CREATE TABLE IF NOT EXISTS `employees_0000`.`friendship_0000` (