Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: correct shard version (#268)
Browse files Browse the repository at this point in the history
## Rationale
For detail, see: #263
In this pr, add the checksum repair logic of shard version.

## Detailed Changes
* Add `MayCorrectShardVersion` in `RegisterNode`, it will correct shard
version when it is inconsistent in ceresmeta and ceresdb.

## Test Plan
I created some local shard version inconsistent scenarios to verify its
repair ability.
  • Loading branch information
ZuLiangWang authored Nov 2, 2023
1 parent b502492 commit b4402f7
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 1 deletion.
25 changes: 25 additions & 0 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ func (c *ClusterMetadata) RegisterNode(ctx context.Context, registeredNode Regis
return nil
}
if exists && !needUpdate(oldCache, registeredNode) {
// Check whether the shard versions need to be corrected.
c.maybeCorrectShardVersion(ctx, registeredNode)
return nil
}

Expand Down Expand Up @@ -766,3 +768,26 @@ L1:

return true
}

func (c *ClusterMetadata) maybeCorrectShardVersion(ctx context.Context, node RegisteredNode) {
topology := c.topologyManager.GetTopology()
for _, shardInfo := range node.ShardInfos {
oldShardView, ok := topology.ShardViewsMapping[shardInfo.ID]
if !ok {
c.logger.Error("shard out found in topology", zap.Uint32("shardID", uint32(shardInfo.ID)))
return
}
if oldShardView.Version != shardInfo.Version {
c.logger.Warn("shard version mismatch", zap.Uint32("shardID", uint32(shardInfo.ID)), zap.Uint64("ceresmetaVersion", oldShardView.Version), zap.Uint64("nodeVersion", shardInfo.Version))
}
if oldShardView.Version < shardInfo.Version {
// Shard version in ceresMeta not equal to ceresDB, it is needed to be corrected.
// Update with expect value.
c.logger.Info("try to update shard version", zap.Uint32("shardID", uint32(shardInfo.ID)), zap.Uint64("expectVersion", oldShardView.Version), zap.Uint64("newVersion", shardInfo.Version))
if err := c.topologyManager.UpdateShardVersionWithExpect(ctx, shardInfo.ID, shardInfo.Version, oldShardView.Version); err != nil {
c.logger.Warn("update shard version with expect failed", zap.Uint32("shardID", uint32(shardInfo.ID)), zap.Uint64("expectVersion", oldShardView.Version), zap.Uint64("newVersion", shardInfo.Version))
}
// TODO: Maybe we need do some thing to ensure ceresDB status after update shard version.
}
}
}
25 changes: 25 additions & 0 deletions server/cluster/metadata/topology_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type TopologyManager interface {
GetClusterView() storage.ClusterView
// CreateShardViews create shardViews.
CreateShardViews(ctx context.Context, shardViews []CreateShardView) error
// UpdateShardVersionWithExpect update shard version when pre version is same as expect version.
UpdateShardVersionWithExpect(ctx context.Context, shardID storage.ShardID, version uint64, expect uint64) error
// GetTopology get current topology snapshot.
GetTopology() Topology
}
Expand Down Expand Up @@ -571,6 +573,29 @@ func (m *TopologyManagerImpl) CreateShardViews(ctx context.Context, createShardV
return nil
}

func (m *TopologyManagerImpl) UpdateShardVersionWithExpect(ctx context.Context, shardID storage.ShardID, version uint64, expect uint64) error {
m.lock.Lock()
defer m.lock.Unlock()

shardView, ok := m.shardTablesMapping[shardID]
if !ok {
return ErrShardNotFound.WithCausef("shard id:%d", shardID)
}

newShardView := storage.NewShardView(shardID, version, shardView.TableIDs)
if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{
ClusterID: m.clusterID,
ShardView: newShardView,
LatestVersion: expect,
}); err != nil {
return errors.WithMessage(err, "storage update shard view")
}

m.updateShardView(shardID, newShardView)

return nil
}

func (m *TopologyManagerImpl) GetTopology() Topology {
m.lock.RLock()
defer m.lock.RUnlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (p *BatchTransferLeaderProcedure) Start(ctx context.Context) error {
g.Go(func() error {
err := p.Start(ctx)
if err != nil {
log.Error("procedure start failed", zap.Error(err), zap.String("procedure", fmt.Sprintf("%v", p)), zap.Error(err))
log.Error("procedure start failed", zap.Error(err), zap.Uint64("procedureID", p.ID()), zap.Error(err))
}
return err
})
Expand Down

0 comments on commit b4402f7

Please sign in to comment.