From 7afb29523785108d5c84d163f693c6a5c2e574c4 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Wed, 8 Nov 2023 17:50:18 +0800 Subject: [PATCH] refactor!: refactor shard version logic (#264) ## Rationale Refer to this issue: https://github.com/CeresDB/ceresmeta/issues/263 ## Detailed Changes * Reconstruct the process of create/drop table so that the update of shard version depends on CeresDB ## Test Plan Pass existing unit tests and integration tests. --- .github/workflows/check.yml | 2 - go.mod | 2 +- go.sum | 6 +- server/cluster/manager.go | 36 ++++- server/cluster/manager_test.go | 1 + server/cluster/metadata/cluster_metadata.go | 57 ++++--- .../cluster/metadata/cluster_metadata_test.go | 9 +- server/cluster/metadata/error.go | 1 + server/cluster/metadata/topology_manager.go | 147 ++++-------------- .../cluster/metadata/topology_manager_test.go | 30 ++-- server/cluster/metadata/types.go | 18 ++- server/coordinator/eventdispatch/dispatch.go | 5 +- .../eventdispatch/dispatch_impl.go | 21 ++- .../coordinator/procedure/ddl/common_util.go | 39 +++-- .../create_partition_table.go | 13 +- .../procedure/ddl/createtable/create_table.go | 16 +- .../drop_partition_table.go | 15 +- .../procedure/ddl/droptable/drop_table.go | 24 +-- .../procedure/operation/split/split_test.go | 2 + .../transferleader/transfer_leader.go | 3 +- server/coordinator/procedure/test/common.go | 12 +- server/coordinator/shard_picker_test.go | 7 +- server/service/grpc/service.go | 4 +- server/storage/storage_impl.go | 13 +- server/storage/storage_test.go | 6 +- server/storage/types.go | 6 +- 26 files changed, 233 insertions(+), 262 deletions(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 4470a6df..8549a368 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -46,8 +46,6 @@ jobs: - run: | make install-tools make test - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 integration-test: runs-on: ubuntu-latest diff --git a/go.mod b/go.mod index fc7c6d39..636fdf62 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/CeresDB/ceresmeta go 1.21 require ( - github.com/CeresDB/ceresdbproto/golang v0.0.0-20231012091414-cdaeab9f7f4d + github.com/CeresDB/ceresdbproto/golang v0.0.0-20231108080833-ca110f5a966a github.com/caarlos0/env/v6 v6.10.1 github.com/julienschmidt/httprouter v1.3.0 github.com/looplab/fsm v0.3.0 diff --git a/go.sum b/go.sum index d2cd1f3f..6348b4ca 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20231012091414-cdaeab9f7f4d h1:xjRsXHTX++lOhAwHStjsTgUMsqaMazYMZm9T3EKt+2E= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20231012091414-cdaeab9f7f4d/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20231108080833-ca110f5a966a h1:YxgWd9tpn7IOkGrFQjGbJqa9wnVR1FkdRFvv3ZLqllw= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20231108080833-ca110f5a966a/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -324,6 +324,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/zuliangwang/ceresdbproto/golang v0.0.0-20231106082618-b7e1fc49a3de h1:AMfL1AEmlDt+gvePve1U8ly52BELslBVmea0gk20B/Y= +github.com/zuliangwang/ceresdbproto/golang v0.0.0-20231106082618-b7e1fc49a3de/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= diff --git a/server/cluster/manager.go b/server/cluster/manager.go index 62e07e8b..61e9506e 100644 --- a/server/cluster/manager.go +++ b/server/cluster/manager.go @@ -290,13 +290,47 @@ func (m *managerImpl) GetTablesByShardIDs(clusterName, _ string, shardIDs []stor return shardTables, nil } +// DropTable is only used for the HTTP interface. +// It only deletes the table data in ETCD and does not initiate a table deletion request to CeresDB. func (m *managerImpl) DropTable(ctx context.Context, clusterName, schemaName, tableName string) error { cluster, err := m.getCluster(clusterName) if err != nil { return errors.WithMessage(err, "get cluster") } - _, err = cluster.metadata.DropTable(ctx, schemaName, tableName) + table, ok, err := cluster.metadata.GetTable(schemaName, tableName) + if !ok { + return metadata.ErrTableNotFound + } + if err != nil { + return errors.WithMessage(err, "get table") + } + + getShardNodeResult, err := cluster.metadata.GetShardNodeByTableIDs([]storage.TableID{table.ID}) + if err != nil { + return errors.WithMessage(err, "get shard node by tableID") + } + + if _, ok := getShardNodeResult.ShardNodes[table.ID]; !ok { + return metadata.ErrShardNotFound + } + + if len(getShardNodeResult.ShardNodes[table.ID]) != 1 || len(getShardNodeResult.Version) != 1 { + return metadata.ErrShardNotFound + } + + shardID := getShardNodeResult.ShardNodes[table.ID][0].ID + version, ok := getShardNodeResult.Version[shardID] + if !ok { + return metadata.ErrVersionNotFound + } + + err = cluster.metadata.DropTable(ctx, metadata.DropTableRequest{ + SchemaName: schemaName, + TableName: tableName, + ShardID: shardID, + LatestVersion: version, + }) if err != nil { return errors.WithMessage(err, "cluster drop table") } diff --git a/server/cluster/manager_test.go b/server/cluster/manager_test.go index 614abefb..a94dcc4b 100644 --- a/server/cluster/manager_test.go +++ b/server/cluster/manager_test.go @@ -177,6 +177,7 @@ func testCreateTable(ctx context.Context, re *require.Assertions, manager cluste re.NoError(err) _, err = c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{ ShardID: shardID, + LatestVersion: 0, SchemaName: schema, TableName: tableName, PartitionInfo: storage.PartitionInfo{Info: nil}, diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index f2026a81..ecaac593 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go @@ -191,41 +191,37 @@ func (c *ClusterMetadata) GetShardTables(shardIDs []storage.ShardID) map[storage // DropTable will drop table metadata and all mapping of this table. // If the table to be dropped has been opened multiple times, all its mapping will be dropped. -func (c *ClusterMetadata) DropTable(ctx context.Context, schemaName, tableName string) (DropTableResult, error) { - c.logger.Info("drop table start", zap.String("cluster", c.Name()), zap.String("schemaName", schemaName), zap.String("tableName", tableName)) +func (c *ClusterMetadata) DropTable(ctx context.Context, request DropTableRequest) error { + c.logger.Info("drop table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName)) - var dropRes DropTableResult if !c.ensureClusterStable() { - return dropRes, errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable") + return errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable") } - table, ok, err := c.tableManager.GetTable(schemaName, tableName) + table, ok, err := c.tableManager.GetTable(request.SchemaName, request.TableName) if err != nil { - return dropRes, errors.WithMessage(err, "get table") + return errors.WithMessage(err, "get table") } if !ok { - return dropRes, ErrTableNotFound + return ErrTableNotFound } // Drop table. - err = c.tableManager.DropTable(ctx, schemaName, tableName) + err = c.tableManager.DropTable(ctx, request.SchemaName, request.TableName) if err != nil { - return dropRes, errors.WithMessage(err, "table manager drop table") + return errors.WithMessage(err, "table manager drop table") } // Remove dropped table in shard view. - updateVersions, err := c.topologyManager.EvictTable(ctx, table.ID) + err = c.topologyManager.RemoveTable(ctx, request.ShardID, request.LatestVersion, []storage.TableID{table.ID}) if err != nil { - return dropRes, errors.WithMessage(err, "topology manager remove table") + return errors.WithMessage(err, "topology manager remove table") } - dropRes = DropTableResult{ - ShardVersionUpdate: updateVersions, - } - c.logger.Info("drop table success", zap.String("cluster", c.Name()), zap.String("schemaName", schemaName), zap.String("tableName", tableName), zap.String("dropResult", fmt.Sprintf("%+v", dropRes))) + c.logger.Info("drop table success", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName)) - return dropRes, nil + return nil } // MigrateTable used to migrate tables from old shard to new shard. @@ -256,12 +252,12 @@ func (c *ClusterMetadata) MigrateTable(ctx context.Context, request MigrateTable tableIDs = append(tableIDs, table.ID) } - if _, err := c.topologyManager.RemoveTable(ctx, request.OldShardID, tableIDs); err != nil { + if err := c.topologyManager.RemoveTable(ctx, request.OldShardID, request.latestOldShardVersion, tableIDs); err != nil { c.logger.Error("remove table from topology") return err } - if _, err := c.topologyManager.AddTable(ctx, request.NewShardID, tables); err != nil { + if err := c.topologyManager.AddTable(ctx, request.NewShardID, request.latestNewShardVersion, tables); err != nil { c.logger.Error("add table from topology") return err } @@ -310,25 +306,21 @@ func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request Creat return res, nil } -func (c *ClusterMetadata) AddTableTopology(ctx context.Context, shardID storage.ShardID, table storage.Table) (CreateTableResult, error) { +func (c *ClusterMetadata) AddTableTopology(ctx context.Context, shardVersionUpdate ShardVersionUpdate, table storage.Table) error { c.logger.Info("add table topology start", zap.String("cluster", c.Name()), zap.String("tableName", table.Name)) if !c.ensureClusterStable() { - return CreateTableResult{}, errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable") + return errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable") } // Add table to topology manager. - result, err := c.topologyManager.AddTable(ctx, shardID, []storage.Table{table}) + err := c.topologyManager.AddTable(ctx, shardVersionUpdate.ShardID, shardVersionUpdate.LatestVersion, []storage.Table{table}) if err != nil { - return CreateTableResult{}, errors.WithMessage(err, "topology manager add table") + return errors.WithMessage(err, "topology manager add table") } - ret := CreateTableResult{ - Table: table, - ShardVersionUpdate: result, - } - c.logger.Info("add table topology succeed", zap.String("cluster", c.Name()), zap.String("result", fmt.Sprintf("%+v", ret))) - return ret, nil + c.logger.Info("add table topology succeed", zap.String("cluster", c.Name()), zap.String("table", fmt.Sprintf("%+v", table)), zap.String("shardVersionUpdate", fmt.Sprintf("%+v", shardVersionUpdate))) + return nil } func (c *ClusterMetadata) DropTableMetadata(ctx context.Context, schemaName, tableName string) (DropTableMetadataResult, error) { @@ -381,14 +373,17 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe } // Add table to topology manager. - result, err := c.topologyManager.AddTable(ctx, request.ShardID, []storage.Table{table}) + err = c.topologyManager.AddTable(ctx, request.ShardID, request.LatestVersion, []storage.Table{table}) if err != nil { return CreateTableResult{}, errors.WithMessage(err, "topology manager add table") } ret := CreateTableResult{ - Table: table, - ShardVersionUpdate: result, + Table: table, + ShardVersionUpdate: ShardVersionUpdate{ + ShardID: request.ShardID, + LatestVersion: request.LatestVersion, + }, } c.logger.Info("create table succeed", zap.String("cluster", c.Name()), zap.String("result", fmt.Sprintf("%+v", ret))) return ret, nil diff --git a/server/cluster/metadata/cluster_metadata_test.go b/server/cluster/metadata/cluster_metadata_test.go index d6c1ea7b..4f6eb4ee 100644 --- a/server/cluster/metadata/cluster_metadata_test.go +++ b/server/cluster/metadata/cluster_metadata_test.go @@ -149,6 +149,7 @@ func testTableOperation(ctx context.Context, re *require.Assertions, m *metadata // Test create table. createResult, err := m.CreateTable(ctx, metadata.CreateTableRequest{ ShardID: 0, + LatestVersion: 0, SchemaName: testSchema, TableName: testTableName, PartitionInfo: storage.PartitionInfo{Info: nil}, @@ -177,9 +178,13 @@ func testTableOperation(ctx context.Context, re *require.Assertions, m *metadata re.Equal(storage.ShardID(1), routeResult.RouteEntries[testTableName].NodeShards[0].ShardInfo.ID) // Drop table already created. - dropResult, err := m.DropTable(ctx, testSchema, testTableName) + err = m.DropTable(ctx, metadata.DropTableRequest{ + SchemaName: testSchema, + TableName: testTableName, + ShardID: storage.ShardID(1), + LatestVersion: 0, + }) re.NoError(err) - re.Equal(storage.ShardID(1), dropResult.ShardVersionUpdate[0].ShardID) } func testShardOperation(ctx context.Context, re *require.Assertions, m *metadata.ClusterMetadata) { diff --git a/server/cluster/metadata/error.go b/server/cluster/metadata/error.go index cfa5cfe7..51bea95e 100644 --- a/server/cluster/metadata/error.go +++ b/server/cluster/metadata/error.go @@ -28,6 +28,7 @@ var ( ErrSchemaNotFound = coderr.NewCodeError(coderr.NotFound, "schema not found") ErrTableNotFound = coderr.NewCodeError(coderr.NotFound, "table not found") ErrShardNotFound = coderr.NewCodeError(coderr.NotFound, "shard not found") + ErrVersionNotFound = coderr.NewCodeError(coderr.NotFound, "version not found") ErrNodeNotFound = coderr.NewCodeError(coderr.NotFound, "NodeName not found") ErrTableAlreadyExists = coderr.NewCodeError(coderr.Internal, "table already exists") ErrOpenTable = coderr.NewCodeError(coderr.Internal, "open table") diff --git a/server/cluster/metadata/topology_manager.go b/server/cluster/metadata/topology_manager.go index 40d8f4fc..a6f6b2c6 100644 --- a/server/cluster/metadata/topology_manager.go +++ b/server/cluster/metadata/topology_manager.go @@ -38,11 +38,9 @@ type TopologyManager interface { // GetTableIDs get shardNode and tablesIDs with shardID and nodeName. GetTableIDs(shardIDs []storage.ShardID) map[storage.ShardID]ShardTableIDs // AddTable add table to cluster topology. - AddTable(ctx context.Context, shardID storage.ShardID, tables []storage.Table) (ShardVersionUpdate, error) + AddTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tables []storage.Table) error // RemoveTable remove table on target shards from cluster topology. - RemoveTable(ctx context.Context, shardID storage.ShardID, tableIDs []storage.TableID) (ShardVersionUpdate, error) - // EvictTable evict table from cluster topology. - EvictTable(ctx context.Context, tableID storage.TableID) ([]ShardVersionUpdate, error) + RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tableIDs []storage.TableID) error // GetShards get all shards in cluster topology. GetShards() []storage.ShardID // GetShardNodesByID get shardNodes with shardID. @@ -130,7 +128,6 @@ type TopologyManagerImpl struct { shardNodesMapping map[storage.ShardID][]storage.ShardNode // ShardID -> nodes of the shard nodeShardsMapping map[string][]storage.ShardNode // nodeName -> shards of the NodeName // ShardView in memory. - shardViews []storage.ShardView shardTablesMapping map[storage.ShardID]*storage.ShardView // ShardID -> shardTopology tableShardMapping map[storage.TableID][]storage.ShardID // tableID -> ShardID @@ -148,7 +145,6 @@ func NewTopologyManagerImpl(logger *zap.Logger, storage storage.Storage, cluster clusterView: nil, shardNodesMapping: nil, nodeShardsMapping: nil, - shardViews: nil, shardTablesMapping: nil, tableShardMapping: nil, nodes: nil, @@ -203,18 +199,15 @@ func (m *TopologyManagerImpl) GetTableIDs(shardIDs []storage.ShardID) map[storag return shardTableIDs } -func (m *TopologyManagerImpl) AddTable(ctx context.Context, shardID storage.ShardID, tables []storage.Table) (ShardVersionUpdate, error) { +func (m *TopologyManagerImpl) AddTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tables []storage.Table) error { m.lock.Lock() defer m.lock.Unlock() shardView, ok := m.shardTablesMapping[shardID] - var emptyUpdate ShardVersionUpdate if !ok { - return emptyUpdate, ErrShardNotFound.WithCausef("shard id:%d", shardID) + return ErrShardNotFound.WithCausef("shard id:%d", shardID) } - prevVersion := shardView.Version - tableIDsToAdd := make([]storage.TableID, 0, len(tables)) for _, table := range tables { tableIDsToAdd = append(tableIDsToAdd, table.ID) @@ -224,16 +217,16 @@ func (m *TopologyManagerImpl) AddTable(ctx context.Context, shardID storage.Shar tableIDs = append(tableIDs, shardView.TableIDs...) tableIDs = append(tableIDs, tableIDsToAdd...) - newShardView := storage.NewShardView(shardID, prevVersion+1, tableIDs) + newShardView := storage.NewShardView(shardID, latestVersion, tableIDs) // Update shard view in storage. err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{ - ClusterID: m.clusterID, - ShardView: newShardView, - LatestVersion: prevVersion, + ClusterID: m.clusterID, + ShardView: newShardView, + PrevVersion: shardView.Version, }) if err != nil { - return emptyUpdate, errors.WithMessage(err, "storage update shard view") + return errors.WithMessage(err, "storage update shard view") } // Update shard view in memory. @@ -247,34 +240,17 @@ func (m *TopologyManagerImpl) AddTable(ctx context.Context, shardID storage.Shar } } - m.updateShardView(shardID, newShardView) - - return ShardVersionUpdate{ - ShardID: shardID, - CurrVersion: prevVersion + 1, - PrevVersion: prevVersion, - }, nil -} - -func (m *TopologyManagerImpl) updateShardView(shardID storage.ShardID, newShardView storage.ShardView) { - for i := range m.shardViews { - if m.shardViews[i].ShardID == shardID { - m.shardViews[i] = newShardView - break - } - } + return nil } -func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.ShardID, tableIDs []storage.TableID) (ShardVersionUpdate, error) { +func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion uint64, tableIDs []storage.TableID) error { m.lock.Lock() defer m.lock.Unlock() shardView, ok := m.shardTablesMapping[shardID] - var emptyUpdate ShardVersionUpdate if !ok { - return emptyUpdate, ErrShardNotFound.WithCausef("shard id:%d", shardID) + return ErrShardNotFound.WithCausef("shard id:%d", shardID) } - prevVersion := shardView.Version newTableIDs := make([]storage.TableID, 0, len(shardView.TableIDs)) for _, tableID := range shardView.TableIDs { @@ -286,18 +262,18 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.S } // Update shardView in storage. - newShardView := storage.NewShardView(shardView.ShardID, prevVersion+1, newTableIDs) + newShardView := storage.NewShardView(shardView.ShardID, latestVersion, newTableIDs) if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{ - ClusterID: m.clusterID, - ShardView: newShardView, - LatestVersion: prevVersion, + ClusterID: m.clusterID, + ShardView: newShardView, + PrevVersion: shardView.Version, }); err != nil { - return emptyUpdate, errors.WithMessage(err, "storage update shard view") + return errors.WithMessage(err, "storage update shard view") } // Update shardView in memory. - shardView.Version = prevVersion + 1 - shardView.TableIDs = tableIDs + shardView.Version = latestVersion + shardView.TableIDs = newTableIDs for _, tableID := range tableIDs { delete(m.tableShardMapping, tableID) } @@ -312,74 +288,15 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.S } } - m.updateShardView(shardID, newShardView) - - return ShardVersionUpdate{ - ShardID: shardID, - CurrVersion: prevVersion + 1, - PrevVersion: prevVersion, - }, nil -} - -func (m *TopologyManagerImpl) EvictTable(ctx context.Context, tableID storage.TableID) ([]ShardVersionUpdate, error) { - m.lock.Lock() - defer m.lock.Unlock() - - shardIDs, ok := m.tableShardMapping[tableID] - if !ok { - return []ShardVersionUpdate{}, ErrTableNotFound.WithCausef("table id:%d", tableID) - } - - result := []ShardVersionUpdate{} - - for _, shardID := range shardIDs { - shardView, ok := m.shardTablesMapping[shardID] - if !ok { - return nil, ErrShardNotFound.WithCausef("shard id:%d", shardID) - } - prevVersion := shardView.Version - - tableIDs := make([]storage.TableID, 0, len(shardView.TableIDs)) - for _, id := range shardView.TableIDs { - if id != tableID { - tableIDs = append(tableIDs, id) - } - } - - // Update shardView in storage. - // TODO: Move the code that modifies the version to the outside, and the actual version should be obtained from the call result of CeresDB. - newShardView := storage.NewShardView(shardView.ShardID, prevVersion+1, tableIDs) - if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{ - ClusterID: m.clusterID, - ShardView: newShardView, - LatestVersion: prevVersion, - }); err != nil { - return nil, errors.WithMessage(err, "storage update shard view") - } - - // Update shardView in memory. - shardView.Version = prevVersion + 1 - shardView.TableIDs = tableIDs - delete(m.tableShardMapping, tableID) - - m.updateShardView(shardView.ShardID, newShardView) - - result = append(result, ShardVersionUpdate{ - ShardID: shardID, - CurrVersion: prevVersion + 1, - PrevVersion: prevVersion, - }) - } - - return result, nil + return nil } func (m *TopologyManagerImpl) GetShards() []storage.ShardID { m.lock.RLock() defer m.lock.RUnlock() - shards := make([]storage.ShardID, 0, len(m.shardViews)) - for _, shardView := range m.shardViews { + shards := make([]storage.ShardID, 0, len(m.shardTablesMapping)) + for _, shardView := range m.shardTablesMapping { shards = append(shards, shardView.ShardID) } @@ -584,14 +501,14 @@ func (m *TopologyManagerImpl) UpdateShardVersionWithExpect(ctx context.Context, newShardView := storage.NewShardView(shardID, version, shardView.TableIDs) if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{ - ClusterID: m.clusterID, - ShardView: newShardView, - LatestVersion: expect, + ClusterID: m.clusterID, + ShardView: newShardView, + PrevVersion: expect, }); err != nil { return errors.WithMessage(err, "storage update shard view") } - m.updateShardView(shardID, newShardView) + m.shardTablesMapping[shardID] = shardView return nil } @@ -600,9 +517,14 @@ func (m *TopologyManagerImpl) GetTopology() Topology { m.lock.RLock() defer m.lock.RUnlock() - shardViewsMapping := make(map[storage.ShardID]storage.ShardView, len(m.shardViews)) - for i := 0; i < len(m.shardViews); i++ { - shardViewsMapping[m.shardViews[i].ShardID] = m.shardViews[i] + shardViewsMapping := make(map[storage.ShardID]storage.ShardView, len(m.shardTablesMapping)) + for shardID, view := range m.shardTablesMapping { + shardViewsMapping[shardID] = storage.ShardView{ + ShardID: view.ShardID, + Version: view.Version, + TableIDs: view.TableIDs, + CreatedAt: view.CreatedAt, + } } return Topology{ @@ -642,7 +564,6 @@ func (m *TopologyManagerImpl) loadShardViews(ctx context.Context) error { m.logger.Debug("load shard views", zap.Int32("clusterID", int32(m.clusterID)), zap.String("shardViews", fmt.Sprintf("%+v", shardViewsResult))) // Reset data in memory. - m.shardViews = shardViewsResult.ShardViews m.shardTablesMapping = make(map[storage.ShardID]*storage.ShardView, len(shardViewsResult.ShardViews)) m.tableShardMapping = make(map[storage.TableID][]storage.ShardID, 0) for _, shardView := range shardViewsResult.ShardViews { diff --git a/server/cluster/metadata/topology_manager_test.go b/server/cluster/metadata/topology_manager_test.go index 6bb20194..33579918 100644 --- a/server/cluster/metadata/topology_manager_test.go +++ b/server/cluster/metadata/topology_manager_test.go @@ -66,7 +66,7 @@ func TestTopologyManager(t *testing.T) { } func testTableTopology(ctx context.Context, re *require.Assertions, manager metadata.TopologyManager) { - updateVersionResult, err := manager.AddTable(ctx, TestShardID, []storage.Table{{ + err := manager.AddTable(ctx, TestShardID, 0, []storage.Table{{ ID: TestTableID, Name: TestTableName, SchemaID: TestSchemaID, @@ -74,23 +74,19 @@ func testTableTopology(ctx context.Context, re *require.Assertions, manager meta PartitionInfo: storage.PartitionInfo{Info: nil}, }}) re.NoError(err) - re.Equal(updateVersionResult.PrevVersion, updateVersionResult.CurrVersion-1) - shardTables := manager.GetTableIDs([]storage.ShardID{updateVersionResult.ShardID}) - found := foundTable(TestTableID, shardTables, updateVersionResult) + shardTables := manager.GetTableIDs([]storage.ShardID{TestShardID}) + found := foundTable(TestTableID, shardTables, TestTableID) re.Equal(true, found) - evictVersionResult, err := manager.EvictTable(ctx, TestTableID) + err = manager.RemoveTable(ctx, TestShardID, 0, []storage.TableID{TestTableID}) re.NoError(err) - re.Equal(1, len(evictVersionResult)) - re.Equal(updateVersionResult.ShardID, evictVersionResult[0].ShardID) - re.Equal(updateVersionResult.CurrVersion, evictVersionResult[0].PrevVersion) - shardTables = manager.GetTableIDs([]storage.ShardID{updateVersionResult.ShardID}) - found = foundTable(TestTableID, shardTables, updateVersionResult) + shardTables = manager.GetTableIDs([]storage.ShardID{TestTableID}) + found = foundTable(TestTableID, shardTables, TestTableID) re.Equal(false, found) - updateVersionResult, err = manager.AddTable(ctx, TestShardID, []storage.Table{{ + err = manager.AddTable(ctx, TestShardID, 0, []storage.Table{{ ID: TestTableID, Name: TestTableName, SchemaID: TestSchemaID, @@ -98,19 +94,17 @@ func testTableTopology(ctx context.Context, re *require.Assertions, manager meta PartitionInfo: storage.PartitionInfo{Info: nil}, }}) re.NoError(err) - re.Equal(updateVersionResult.PrevVersion, updateVersionResult.CurrVersion-1) - shardTables = manager.GetTableIDs([]storage.ShardID{updateVersionResult.ShardID}) - found = foundTable(TestTableID, shardTables, updateVersionResult) + shardTables = manager.GetTableIDs([]storage.ShardID{TestTableID}) + found = foundTable(TestTableID, shardTables, TestTableID) re.Equal(true, found) - updateVersionResult, err = manager.RemoveTable(ctx, updateVersionResult.ShardID, []storage.TableID{TestTableID}) + err = manager.RemoveTable(ctx, TestTableID, 0, []storage.TableID{TestTableID}) re.NoError(err) - re.Equal(updateVersionResult.PrevVersion, updateVersionResult.CurrVersion-1) } -func foundTable(targetTableID storage.TableID, shardTables map[storage.ShardID]metadata.ShardTableIDs, updateVersionResult metadata.ShardVersionUpdate) bool { - tableIDs := shardTables[updateVersionResult.ShardID].TableIDs +func foundTable(targetTableID storage.TableID, shardTables map[storage.ShardID]metadata.ShardTableIDs, shardID storage.ShardID) bool { + tableIDs := shardTables[shardID].TableIDs for _, tableID := range tableIDs { if tableID == targetTableID { return true diff --git a/server/cluster/metadata/types.go b/server/cluster/metadata/types.go index 905ce472..aa76427c 100644 --- a/server/cluster/metadata/types.go +++ b/server/cluster/metadata/types.go @@ -88,6 +88,7 @@ type CreateTableMetadataResult struct { type CreateTableRequest struct { ShardID storage.ShardID + LatestVersion uint64 SchemaName string TableName string PartitionInfo storage.PartitionInfo @@ -98,8 +99,11 @@ type CreateTableResult struct { ShardVersionUpdate ShardVersionUpdate } -type DropTableResult struct { - ShardVersionUpdate []ShardVersionUpdate +type DropTableRequest struct { + SchemaName string + TableName string + ShardID storage.ShardID + LatestVersion uint64 } type DropTableMetadataResult struct { @@ -124,13 +128,15 @@ type MigrateTableRequest struct { SchemaName string TableNames []string OldShardID storage.ShardID - NewShardID storage.ShardID + // TODO: refactor migrate table request, simplify params. + latestOldShardVersion uint64 + NewShardID storage.ShardID + latestNewShardVersion uint64 } type ShardVersionUpdate struct { - ShardID storage.ShardID - CurrVersion uint64 - PrevVersion uint64 + ShardID storage.ShardID + LatestVersion uint64 } type RouteEntry struct { diff --git a/server/coordinator/eventdispatch/dispatch.go b/server/coordinator/eventdispatch/dispatch.go index ac272212..9a55ef2f 100644 --- a/server/coordinator/eventdispatch/dispatch.go +++ b/server/coordinator/eventdispatch/dispatch.go @@ -25,8 +25,8 @@ import ( type Dispatch interface { OpenShard(context context.Context, address string, request OpenShardRequest) error CloseShard(context context.Context, address string, request CloseShardRequest) error - CreateTableOnShard(context context.Context, address string, request CreateTableOnShardRequest) error - DropTableOnShard(context context.Context, address string, request DropTableOnShardRequest) error + CreateTableOnShard(context context.Context, address string, request CreateTableOnShardRequest) (uint64, error) + DropTableOnShard(context context.Context, address string, request DropTableOnShardRequest) (uint64, error) OpenTableOnShard(ctx context.Context, address string, request OpenTableOnShardRequest) error CloseTableOnShard(context context.Context, address string, request CloseTableOnShardRequest) error } @@ -41,7 +41,6 @@ type CloseShardRequest struct { type UpdateShardInfo struct { CurrShardInfo metadata.ShardInfo - PrevVersion uint64 } type CreateTableOnShardRequest struct { diff --git a/server/coordinator/eventdispatch/dispatch_impl.go b/server/coordinator/eventdispatch/dispatch_impl.go index 9645ddd2..5eb91647 100644 --- a/server/coordinator/eventdispatch/dispatch_impl.go +++ b/server/coordinator/eventdispatch/dispatch_impl.go @@ -74,34 +74,34 @@ func (d *DispatchImpl) CloseShard(ctx context.Context, addr string, request Clos return nil } -func (d *DispatchImpl) CreateTableOnShard(ctx context.Context, addr string, request CreateTableOnShardRequest) error { +func (d *DispatchImpl) CreateTableOnShard(ctx context.Context, addr string, request CreateTableOnShardRequest) (uint64, error) { client, err := d.getMetaEventClient(ctx, addr) if err != nil { - return err + return 0, err } resp, err := client.CreateTableOnShard(ctx, convertCreateTableOnShardRequestToPB(request)) if err != nil { - return errors.WithMessagef(err, "create table on shard, addr:%s, request:%v", addr, request) + return 0, errors.WithMessagef(err, "create table on shard, addr:%s, request:%v", addr, request) } if resp.GetHeader().Code != 0 { - return ErrDispatch.WithCausef("create table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError()) + return 0, ErrDispatch.WithCausef("create table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError()) } - return nil + return resp.GetLatestShardVersion(), nil } -func (d *DispatchImpl) DropTableOnShard(ctx context.Context, addr string, request DropTableOnShardRequest) error { +func (d *DispatchImpl) DropTableOnShard(ctx context.Context, addr string, request DropTableOnShardRequest) (uint64, error) { client, err := d.getMetaEventClient(ctx, addr) if err != nil { - return err + return 0, err } resp, err := client.DropTableOnShard(ctx, convertDropTableOnShardRequestToPB(request)) if err != nil { - return errors.WithMessagef(err, "drop table on shard, addr:%s, request:%v", addr, request) + return 0, errors.WithMessagef(err, "drop table on shard, addr:%s, request:%v", addr, request) } if resp.GetHeader().Code != 0 { - return ErrDispatch.WithCausef("drop table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError()) + return 0, ErrDispatch.WithCausef("drop table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError()) } - return nil + return resp.GetLatestShardVersion(), nil } func (d *DispatchImpl) OpenTableOnShard(ctx context.Context, addr string, request OpenTableOnShardRequest) error { @@ -192,6 +192,5 @@ func convertOpenTableOnShardRequestToPB(request OpenTableOnShardRequest) *metaev func convertUpdateShardInfoToPB(updateShardInfo UpdateShardInfo) *metaeventpb.UpdateShardInfo { return &metaeventpb.UpdateShardInfo{ CurrShardInfo: metadata.ConvertShardsInfoToPB(updateShardInfo.CurrShardInfo), - PrevVersion: updateShardInfo.PrevVersion, } } diff --git a/server/coordinator/procedure/ddl/common_util.go b/server/coordinator/procedure/ddl/common_util.go index 6183ebbf..e4c85f97 100644 --- a/server/coordinator/procedure/ddl/common_util.go +++ b/server/coordinator/procedure/ddl/common_util.go @@ -29,11 +29,11 @@ import ( "go.uber.org/zap" ) -func CreateTableOnShard(ctx context.Context, c *metadata.ClusterMetadata, dispatch eventdispatch.Dispatch, shardID storage.ShardID, request eventdispatch.CreateTableOnShardRequest) error { - log.Debug("CreateTableOnShard", zap.Uint64("curVersion", request.UpdateShardInfo.CurrShardInfo.Version), zap.Uint64("preVersion", request.UpdateShardInfo.PrevVersion)) +func CreateTableOnShard(ctx context.Context, c *metadata.ClusterMetadata, dispatch eventdispatch.Dispatch, shardID storage.ShardID, request eventdispatch.CreateTableOnShardRequest) (uint64, error) { + log.Debug("CreateTableOnShard", zap.Uint64("version", request.UpdateShardInfo.CurrShardInfo.Version)) shardNodes, err := c.GetShardNodesByShardID(shardID) if err != nil { - return errors.WithMessage(err, "cluster get shardNode by id") + return 0, errors.WithMessage(err, "cluster get shardNode by id") } // TODO: consider followers var leader storage.ShardNode @@ -46,14 +46,14 @@ func CreateTableOnShard(ctx context.Context, c *metadata.ClusterMetadata, dispat } } if !found { - return errors.WithMessagef(procedure.ErrShardLeaderNotFound, "shard node can't find leader, shardID:%d", shardID) + return 0, errors.WithMessagef(procedure.ErrShardLeaderNotFound, "shard node can't find leader, shardID:%d", shardID) } - err = dispatch.CreateTableOnShard(ctx, leader.NodeName, request) + latestVersion, err := dispatch.CreateTableOnShard(ctx, leader.NodeName, request) if err != nil { - return errors.WithMessage(err, "create table on shard") + return 0, errors.WithMessage(err, "create table on shard") } - return nil + return latestVersion, nil } func BuildCreateTableRequest(table storage.Table, shardVersionUpdate metadata.ShardVersionUpdate, req *metaservicepb.CreateTableRequest) eventdispatch.CreateTableOnShardRequest { @@ -63,11 +63,10 @@ func BuildCreateTableRequest(table storage.Table, shardVersionUpdate metadata.Sh ID: shardVersionUpdate.ShardID, // TODO: dispatch CreateTableOnShard to followers? Role: storage.ShardRoleLeader, - Version: shardVersionUpdate.CurrVersion, + Version: shardVersionUpdate.LatestVersion, // FIXME: There is no need to update status here, but it must be set. Shall we provide another struct without status field? Status: storage.ShardStatusUnknown, }, - PrevVersion: shardVersionUpdate.PrevVersion, }, TableInfo: metadata.TableInfo{ ID: table.ID, @@ -121,24 +120,22 @@ func BuildShardVersionUpdate(table storage.Table, clusterMetadata *metadata.Clus return versionUpdate, false, nil } - prevVersion, exists := shardVersions[leader.ID] + latestVersion, exists := shardVersions[leader.ID] if !exists { return versionUpdate, false, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID) } - currVersion := prevVersion + 1 versionUpdate = metadata.ShardVersionUpdate{ - ShardID: leader.ID, - CurrVersion: currVersion, - PrevVersion: prevVersion, + ShardID: leader.ID, + LatestVersion: latestVersion, } return versionUpdate, true, nil } -func DispatchDropTable(ctx context.Context, clusterMetadata *metadata.ClusterMetadata, dispatch eventdispatch.Dispatch, schemaName string, table storage.Table, version metadata.ShardVersionUpdate) error { +func DropTableOnShard(ctx context.Context, clusterMetadata *metadata.ClusterMetadata, dispatch eventdispatch.Dispatch, schemaName string, table storage.Table, version metadata.ShardVersionUpdate) (uint64, error) { shardNodes, err := clusterMetadata.GetShardNodesByShardID(version.ShardID) if err != nil { - return errors.WithMessage(err, "cluster get shard by shard id") + return 0, errors.WithMessage(err, "cluster get shard by shard id") } tableInfo := metadata.TableInfo{ @@ -150,24 +147,24 @@ func DispatchDropTable(ctx context.Context, clusterMetadata *metadata.ClusterMet CreatedAt: 0, } + var latestVersion uint64 for _, shardNode := range shardNodes { - err = dispatch.DropTableOnShard(ctx, shardNode.NodeName, eventdispatch.DropTableOnShardRequest{ + latestVersion, err = dispatch.DropTableOnShard(ctx, shardNode.NodeName, eventdispatch.DropTableOnShardRequest{ UpdateShardInfo: eventdispatch.UpdateShardInfo{ CurrShardInfo: metadata.ShardInfo{ ID: version.ShardID, Role: storage.ShardRoleLeader, - Version: version.CurrVersion, + Version: version.LatestVersion, // FIXME: We have no need to update the status, but it must be set. Maybe we should provide another struct without status field. Status: storage.ShardStatusUnknown, }, - PrevVersion: version.PrevVersion, }, TableInfo: tableInfo, }) if err != nil { - return errors.WithMessage(err, "dispatch drop table on shard") + return 0, errors.WithMessage(err, "dispatch drop table on shard") } } - return nil + return latestVersion, nil } diff --git a/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go index 20f5c1c1..a27e0990 100644 --- a/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go +++ b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go @@ -282,17 +282,20 @@ func createDataTables(req *callbackRequest, shardID storage.ShardID, tableMetaDa } shardVersionUpdate := metadata.ShardVersionUpdate{ - ShardID: shardID, - CurrVersion: shardVersion + 1, - PrevVersion: shardVersion, + ShardID: shardID, + LatestVersion: shardVersion, } - if err := ddl.CreateTableOnShard(req.ctx, params.ClusterMetadata, params.Dispatch, shardID, ddl.BuildCreateTableRequest(result.Table, shardVersionUpdate, params.SourceReq)); err != nil { + latestShardVersion, err := ddl.CreateTableOnShard(req.ctx, params.ClusterMetadata, params.Dispatch, shardID, ddl.BuildCreateTableRequest(result.Table, shardVersionUpdate, params.SourceReq)) + if err != nil { errCh <- errors.WithMessage(err, "dispatch create table on shard") return } - _, err = params.ClusterMetadata.AddTableTopology(req.ctx, shardID, result.Table) + err = params.ClusterMetadata.AddTableTopology(req.ctx, metadata.ShardVersionUpdate{ + ShardID: shardID, + LatestVersion: latestShardVersion, + }, result.Table) if err != nil { errCh <- errors.WithMessage(err, "create table metadata") return diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go index cc7639be..2aa615d1 100644 --- a/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go @@ -79,20 +79,21 @@ func prepareCallback(event *fsm.Event) { log.Debug("create table metadata finish", zap.String("tableName", createTableMetadataRequest.TableName)) shardVersionUpdate := metadata.ShardVersionUpdate{ - ShardID: params.ShardID, - CurrVersion: req.p.relatedVersionInfo.ShardWithVersion[params.ShardID] + 1, - PrevVersion: req.p.relatedVersionInfo.ShardWithVersion[params.ShardID], + ShardID: params.ShardID, + LatestVersion: req.p.relatedVersionInfo.ShardWithVersion[params.ShardID], } createTableRequest := ddl.BuildCreateTableRequest(result.Table, shardVersionUpdate, params.SourceReq) - if err = ddl.CreateTableOnShard(req.ctx, params.ClusterMetadata, params.Dispatch, params.ShardID, createTableRequest); err != nil { + latestShardVersion, err := ddl.CreateTableOnShard(req.ctx, params.ClusterMetadata, params.Dispatch, params.ShardID, createTableRequest) + if err != nil { procedure.CancelEventWithLog(event, err, "dispatch create table on shard") return } log.Debug("dispatch createTableOnShard finish", zap.String("tableName", createTableMetadataRequest.TableName)) - createTableResult, err := params.ClusterMetadata.AddTableTopology(req.ctx, params.ShardID, result.Table) + shardVersionUpdate.LatestVersion = latestShardVersion + err = params.ClusterMetadata.AddTableTopology(req.ctx, shardVersionUpdate, result.Table) if err != nil { procedure.CancelEventWithLog(event, err, "add table topology") return @@ -100,7 +101,10 @@ func prepareCallback(event *fsm.Event) { log.Debug("add table topology finish", zap.String("tableName", createTableMetadataRequest.TableName)) - req.createTableResult = &createTableResult + req.createTableResult = &metadata.CreateTableResult{ + Table: result.Table, + ShardVersionUpdate: shardVersionUpdate, + } } func successCallback(event *fsm.Event) { diff --git a/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go index afcb3510..aee412ff 100644 --- a/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go +++ b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go @@ -396,16 +396,21 @@ func dispatchDropDataTable(req *callbackRequest, dispatch eventdispatch.Dispatch } shardVersionUpdate := metadata.ShardVersionUpdate{ - ShardID: shardID, - CurrVersion: shardVersion + 1, - PrevVersion: shardVersion, + ShardID: shardID, + LatestVersion: shardVersion, } - if err := ddl.DispatchDropTable(req.ctx, clusterMetadata, dispatch, schema, table, shardVersionUpdate); err != nil { + latestShardVersion, err := ddl.DropTableOnShard(req.ctx, clusterMetadata, dispatch, schema, table, shardVersionUpdate) + if err != nil { return errors.WithMessagef(err, "drop table, table:%s", tableName) } - _, err = clusterMetadata.DropTable(req.ctx, req.schemaName(), tableName) + err = clusterMetadata.DropTable(req.ctx, metadata.DropTableRequest{ + SchemaName: req.schemaName(), + TableName: tableName, + ShardID: shardID, + LatestVersion: latestShardVersion, + }) if err != nil { return errors.WithMessagef(err, "drop table, table:%s", tableName) } diff --git a/server/coordinator/procedure/ddl/droptable/drop_table.go b/server/coordinator/procedure/ddl/droptable/drop_table.go index d9fc477a..272ff927 100644 --- a/server/coordinator/procedure/ddl/droptable/drop_table.go +++ b/server/coordinator/procedure/ddl/droptable/drop_table.go @@ -18,7 +18,6 @@ package droptable import ( "context" - "fmt" "sync" "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" @@ -89,7 +88,13 @@ func prepareCallback(event *fsm.Event) { // If the shard corresponding to this table does not exist, it means that the actual table creation failed. // In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table. if !shardExists { - _, err = params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) + // Try to drop table with the latest shard version. + err = params.ClusterMetadata.DropTable(req.ctx, metadata.DropTableRequest{ + SchemaName: params.SourceReq.GetSchemaName(), + TableName: params.SourceReq.GetName(), + ShardID: shardVersionUpdate.ShardID, + LatestVersion: shardVersionUpdate.LatestVersion, + }) if err != nil { procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", params.SourceReq.GetName())) return @@ -97,7 +102,7 @@ func prepareCallback(event *fsm.Event) { return } - err = ddl.DispatchDropTable(req.ctx, params.ClusterMetadata, params.Dispatch, params.SourceReq.GetSchemaName(), table, shardVersionUpdate) + latestShardVersion, err := ddl.DropTableOnShard(req.ctx, params.ClusterMetadata, params.Dispatch, params.SourceReq.GetSchemaName(), table, shardVersionUpdate) if err != nil { procedure.CancelEventWithLog(event, err, "dispatch drop table on shard") return @@ -105,18 +110,17 @@ func prepareCallback(event *fsm.Event) { log.Debug("dispatch dropTableOnShard finish", zap.String("tableName", params.SourceReq.GetName()), zap.Uint64("procedureID", params.ID)) - result, err := params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) - if err != nil { + if err = params.ClusterMetadata.DropTable(req.ctx, metadata.DropTableRequest{ + SchemaName: params.SourceReq.GetSchemaName(), + TableName: params.SourceReq.GetName(), + ShardID: shardVersionUpdate.ShardID, + LatestVersion: latestShardVersion, + }); err != nil { procedure.CancelEventWithLog(event, err, "cluster drop table") return } log.Debug("drop table finish", zap.String("tableName", params.SourceReq.GetName()), zap.Uint64("procedureID", params.ID)) - - if len(result.ShardVersionUpdate) != 1 { - procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("length of shardVersionResult is %d", len(result.ShardVersionUpdate))) - return - } } func successCallback(event *fsm.Event) { diff --git a/server/coordinator/procedure/operation/split/split_test.go b/server/coordinator/procedure/operation/split/split_test.go index a33ba5a5..0987d979 100644 --- a/server/coordinator/procedure/operation/split/split_test.go +++ b/server/coordinator/procedure/operation/split/split_test.go @@ -42,6 +42,7 @@ func TestSplit(t *testing.T) { // Create some tables in this shard. _, err := c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{ ShardID: createTableNodeShard.ID, + LatestVersion: 0, SchemaName: test.TestSchemaName, TableName: test.TestTableName0, PartitionInfo: storage.PartitionInfo{Info: nil}, @@ -49,6 +50,7 @@ func TestSplit(t *testing.T) { re.NoError(err) _, err = c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{ ShardID: createTableNodeShard.ID, + LatestVersion: 0, SchemaName: test.TestSchemaName, TableName: test.TestTableName1, PartitionInfo: storage.PartitionInfo{Info: nil}, diff --git a/server/coordinator/procedure/operation/transferleader/transfer_leader.go b/server/coordinator/procedure/operation/transferleader/transfer_leader.go index 0ba06c53..da7abdf8 100644 --- a/server/coordinator/procedure/operation/transferleader/transfer_leader.go +++ b/server/coordinator/procedure/operation/transferleader/transfer_leader.go @@ -289,13 +289,12 @@ func openNewShardCallback(event *fsm.Event) { procedure.CancelEventWithLog(event, metadata.ErrShardNotFound, "shard not found in topology", zap.Uint64("shardID", uint64(req.p.params.ShardID))) return } - preVersion := shardView.Version openShardRequest := eventdispatch.OpenShardRequest{ Shard: metadata.ShardInfo{ ID: req.p.params.ShardID, Role: storage.ShardRoleLeader, - Version: preVersion + 1, + Version: shardView.Version, Status: storage.ShardStatusUnknown, }, } diff --git a/server/coordinator/procedure/test/common.go b/server/coordinator/procedure/test/common.go index 84ac7c16..dcf37b91 100644 --- a/server/coordinator/procedure/test/common.go +++ b/server/coordinator/procedure/test/common.go @@ -61,19 +61,19 @@ func (m MockDispatch) CloseShard(_ context.Context, _ string, _ eventdispatch.Cl return nil } -func (m MockDispatch) CreateTableOnShard(_ context.Context, _ string, _ eventdispatch.CreateTableOnShardRequest) error { - return nil +func (m MockDispatch) CreateTableOnShard(_ context.Context, _ string, _ eventdispatch.CreateTableOnShardRequest) (uint64, error) { + return 0, nil } -func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ eventdispatch.DropTableOnShardRequest) error { - return nil +func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ eventdispatch.DropTableOnShardRequest) (uint64, error) { + return 0, nil } -func (m MockDispatch) CloseTableOnShard(_ context.Context, _ string, _ eventdispatch.CloseTableOnShardRequest) error { +func (m MockDispatch) OpenTableOnShard(_ context.Context, _ string, _ eventdispatch.OpenTableOnShardRequest) error { return nil } -func (m MockDispatch) OpenTableOnShard(_ context.Context, _ string, _ eventdispatch.OpenTableOnShardRequest) error { +func (m MockDispatch) CloseTableOnShard(_ context.Context, _ string, _ eventdispatch.CloseTableOnShardRequest) error { return nil } diff --git a/server/coordinator/shard_picker_test.go b/server/coordinator/shard_picker_test.go index 248acb7d..ac8ec378 100644 --- a/server/coordinator/shard_picker_test.go +++ b/server/coordinator/shard_picker_test.go @@ -59,9 +59,10 @@ func TestLeastTableShardPicker(t *testing.T) { // Create table on shard 0. _, err = c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{ - ShardID: 0, - SchemaName: test.TestSchemaName, - TableName: "test", + ShardID: 0, + LatestVersion: 0, + SchemaName: test.TestSchemaName, + TableName: "test", PartitionInfo: storage.PartitionInfo{ Info: nil, }, diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go index 55d7a936..2d652b9f 100644 --- a/server/service/grpc/service.go +++ b/server/service/grpc/service.go @@ -231,11 +231,11 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl ShardInfo: &metaservicepb.ShardInfo{ Id: uint32(ret.ShardVersionUpdate.ShardID), Role: clusterpb.ShardRole_LEADER, - Version: ret.ShardVersionUpdate.CurrVersion, + Version: ret.ShardVersionUpdate.LatestVersion, }, }, nil case err = <-errorCh: - log.Warn("create table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds())) + log.Warn("create table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()), zap.Error(err)) return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil } } diff --git a/server/storage/storage_impl.go b/server/storage/storage_impl.go index 9b0070b2..7d05db6d 100644 --- a/server/storage/storage_impl.go +++ b/server/storage/storage_impl.go @@ -33,6 +33,7 @@ import ( ) type Options struct { + // MaxScanLimit is the max limit of the number of keys in a scan. MaxScanLimit int // MinScanLimit is the min limit of the number of keys in a scan. @@ -517,16 +518,14 @@ func (s *metaStorageImpl) UpdateShardView(ctx context.Context, req UpdateShardVi } key := makeShardViewKey(s.rootPath, uint32(req.ClusterID), shardViewPB.ShardId, fmtID(shardViewPB.GetVersion())) - oldTopologyKey := makeShardViewKey(s.rootPath, uint32(req.ClusterID), shardViewPB.ShardId, fmtID(req.LatestVersion)) + oldTopologyKey := makeShardViewKey(s.rootPath, uint32(req.ClusterID), shardViewPB.ShardId, fmtID(req.PrevVersion)) latestVersionKey := makeShardViewLatestVersionKey(s.rootPath, uint32(req.ClusterID), shardViewPB.ShardId) // Check whether the latest version is equal to that in etcd. If it is equal,update shard clusterView and latest version; Otherwise, return an error. - latestVersionEquals := clientv3.Compare(clientv3.Value(latestVersionKey), "=", fmtID(req.LatestVersion)) opPutLatestVersion := clientv3.OpPut(latestVersionKey, fmtID(shardViewPB.Version)) opPutShardTopology := clientv3.OpPut(key, string(value)) resp, err := s.client.Txn(ctx). - If(latestVersionEquals). Then(opPutLatestVersion, opPutShardTopology). Commit() if err != nil { @@ -537,9 +536,11 @@ func (s *metaStorageImpl) UpdateShardView(ctx context.Context, req UpdateShardVi } // Try to remove expired shard view. - opDelShardTopology := clientv3.OpDelete(oldTopologyKey) - if _, err := s.client.Do(ctx, opDelShardTopology); err != nil { - log.Warn("remove expired shard view failed", zap.Error(err), zap.String("oldTopologyKey", oldTopologyKey)) + if req.PrevVersion != shardViewPB.Version { + opDelShardTopology := clientv3.OpDelete(oldTopologyKey) + if _, err := s.client.Do(ctx, opDelShardTopology); err != nil { + log.Warn("remove expired shard view failed", zap.Error(err), zap.String("oldTopologyKey", oldTopologyKey)) + } } return nil diff --git a/server/storage/storage_test.go b/server/storage/storage_test.go index 6be05c27..97f86360 100644 --- a/server/storage/storage_test.go +++ b/server/storage/storage_test.go @@ -282,9 +282,9 @@ func TestStorage_CreateAndListShardView(t *testing.T) { for i := 0; i < defaultCount; i++ { expectShardViews[i].Version = newVersion err = s.UpdateShardView(ctx, UpdateShardViewRequest{ - ClusterID: defaultClusterID, - ShardView: expectShardViews[i], - LatestVersion: defaultVersion, + ClusterID: defaultClusterID, + ShardView: expectShardViews[i], + PrevVersion: defaultVersion, }) re.NoError(err) } diff --git a/server/storage/types.go b/server/storage/types.go index e263c6cd..badcb564 100644 --- a/server/storage/types.go +++ b/server/storage/types.go @@ -153,9 +153,9 @@ type ListShardViewsResult struct { } type UpdateShardViewRequest struct { - ClusterID ClusterID - ShardView ShardView - LatestVersion uint64 + ClusterID ClusterID + ShardView ShardView + PrevVersion uint64 } type ListNodesRequest struct {