Skip to content

Commit

Permalink
dr-autosync: move state replicate to different goroutine (tikv#6874) (t…
Browse files Browse the repository at this point in the history
…ikv#6705)

fix tikv#6883

Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Aug 2, 2023
1 parent 2892b46 commit 198860d
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 106 deletions.
148 changes: 72 additions & 76 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/syncutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
Expand Down Expand Up @@ -70,11 +69,11 @@ type ModeManager struct {
initTime time.Time

syncutil.RWMutex
config config.ReplicationModeConfig
storage endpoint.ReplicationStatusStorage
cluster schedule.Cluster
fileReplicater FileReplicater
replicatedMembers []uint64
config config.ReplicationModeConfig
storage endpoint.ReplicationStatusStorage
cluster schedule.Cluster
fileReplicater FileReplicater
replicateState sync.Map

drAutoSync drAutoSyncStatus
// intermediate states of the recovery process
Expand Down Expand Up @@ -240,7 +239,6 @@ func (m *ModeManager) drSwitchToAsyncWait(availableStores []uint64) error {
return err
}
dr := drAutoSyncStatus{State: drStateAsyncWait, StateID: id, AvailableStores: availableStores}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -263,7 +261,6 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error {
return err
}
dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -287,7 +284,6 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
}
now := time.Now()
dr := drAutoSyncStatus{State: drStateSyncRecover, StateID: id, RecoverStartTime: &now}
m.drPersistStatusWithLock(dr)
if err = m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -307,7 +303,6 @@ func (m *ModeManager) drSwitchToSync() error {
return err
}
dr := drAutoSyncStatus{State: drStateSync, StateID: id}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -317,59 +312,16 @@ func (m *ModeManager) drSwitchToSync() error {
return nil
}

func (m *ModeManager) drPersistStatusWithLock(status drAutoSyncStatus) {
ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout)
defer cancel()

members, err := m.fileReplicater.GetMembers()
if err != nil {
log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync))
return
}

data, _ := json.Marshal(status)

m.replicatedMembers = m.replicatedMembers[:0]
for _, member := range members {
if err := m.fileReplicater.ReplicateFileToMember(ctx, member, drStatusFile, data); err != nil {
log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", status.State), errs.ZapError(err))
// Throw away the error to make it possible to switch to async when
// primary and dr DC are disconnected. This will result in the
// inability to accurately determine whether data is fully
// synchronized when using dr DC to disaster recovery.
// Since the member will not be in `replicatedMembers` list, PD will
// try to replicate state file later.
} else {
m.replicatedMembers = append(m.replicatedMembers, member.GetMemberId())
}
}
}

func (m *ModeManager) drCheckNeedPersistStatus(members []*pdpb.Member) bool {
m.RLock()
defer m.RUnlock()
return slice.AnyOf(members, func(i int) bool { // if there is any member in the new list
return slice.NoneOf(m.replicatedMembers, func(j int) bool { // not replicated
return m.replicatedMembers[j] == members[i].GetMemberId()
})
})
}

func (m *ModeManager) drPersistStatus() {
m.Lock()
defer m.Unlock()
m.drPersistStatusWithLock(drAutoSyncStatus{State: m.drAutoSync.State, StateID: m.drAutoSync.StateID})
}

func (m *ModeManager) drGetState() string {
m.RLock()
defer m.RUnlock()
return m.drAutoSync.State
}

const (
idleTimeout = time.Minute
tickInterval = 500 * time.Millisecond
idleTimeout = time.Minute
tickInterval = 500 * time.Millisecond
replicateStateInterval = time.Second * 5
)

// Run starts the background job.
Expand All @@ -380,17 +332,38 @@ func (m *ModeManager) Run(ctx context.Context) {
case <-ctx.Done():
return
}
for {
select {
case <-time.After(tickInterval):
case <-ctx.Done():
return

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
for {
select {
case <-time.After(tickInterval):
case <-ctx.Done():
return
}
m.tickUpdateState()
}
m.tickDR()
}
}()

go func() {
defer wg.Done()
for {
select {
case <-time.After(replicateStateInterval):
case <-ctx.Done():
return
}
m.tickReplicateStatus()
}
}()

wg.Wait()
}

func (m *ModeManager) tickDR() {
func (m *ModeManager) tickUpdateState() {
if m.getModeName() != modeDRAutoSync {
return
}
Expand Down Expand Up @@ -483,8 +456,42 @@ func (m *ModeManager) tickDR() {
}
}
}
}

func (m *ModeManager) tickReplicateStatus() {
if m.getModeName() != modeDRAutoSync {
return
}

m.RLock()
state := drAutoSyncStatus{
State: m.drAutoSync.State,
StateID: m.drAutoSync.StateID,
AvailableStores: m.drAutoSync.AvailableStores,
RecoverStartTime: m.drAutoSync.RecoverStartTime,
}
m.RUnlock()

m.checkReplicateFile()
data, _ := json.Marshal(state)

members, err := m.fileReplicater.GetMembers()
if err != nil {
log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync))
return
}
for _, member := range members {
stateID, ok := m.replicateState.Load(member.GetMemberId())
if !ok || stateID.(uint64) != state.StateID {
ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout)
err := m.fileReplicater.ReplicateFileToMember(ctx, member, drStatusFile, data)
if err != nil {
log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", state.State), errs.ZapError(err))
} else {
m.replicateState.Store(member.GetMemberId(), state.StateID)
}
cancel()
}
}
}

const (
Expand Down Expand Up @@ -556,17 +563,6 @@ func (m *ModeManager) drCheckStoreStateUpdated(stores []uint64) bool {
return true
}

func (m *ModeManager) checkReplicateFile() {
members, err := m.fileReplicater.GetMembers()
if err != nil {
log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync))
return
}
if m.drCheckNeedPersistStatus(members) {
m.drPersistStatus()
}
}

var (
regionScanBatchSize = 1024
regionMinSampleSize = 512
Expand Down
Loading

0 comments on commit 198860d

Please sign in to comment.