diff --git a/go/vt/concurrency/error_group.go b/go/vt/concurrency/error_group.go new file mode 100644 index 00000000000..712280d3d91 --- /dev/null +++ b/go/vt/concurrency/error_group.go @@ -0,0 +1,102 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package concurrency + +import "context" + +// ErrorGroup provides a function for waiting for N goroutines to complete with +// at least X successes and no more than Y failures, and cancelling the rest. +// +// It should be used as follows: +// +// errCh := make(chan error) +// errgroupCtx, errgroupCancel := context.WithCancel(ctx) +// +// for _, arg := range args { +// arg := arg +// +// go func() { +// err := doWork(errGroupCtx, arg) +// errCh <- err +// }() +// } +// +// errgroup := concurrency.ErrorGroup{ +// NumGoroutines: len(args), +// NumRequiredSuccess: 5, // need at least 5 to respond with nil error before cancelling the rest +// NumAllowedErrors: 1, // if more than 1 responds with non-nil error, cancel the rest +// } +// errRec := errgroup.Wait(errgroupCancel, errCh) +// +// if errRec.HasErrors() { +// // ... +// } +type ErrorGroup struct { + NumGoroutines int + NumRequiredSuccesses int + NumAllowedErrors int +} + +// Wait waits for a group of goroutines that are sending errors to the given +// error channel, and are cancellable by the given cancel function. +// +// Wait will cancel any outstanding goroutines under the following conditions: +// +// (1) More than NumAllowedErrors non-nil results have been consumed on the +// error channel. +// +// (2) At least NumRequiredSuccesses nil results have been consumed on the error +// channel. +// +// After the cancellation condition is triggered, Wait will continue to consume +// results off the error channel so as to not permanently block any of those +// cancelled goroutines. +// +// When finished consuming results from all goroutines, cancelled or otherwise, +// Wait returns an AllErrorRecorder that contains all errors returned by any of +// those goroutines. It does not close the error channel. +func (eg ErrorGroup) Wait(cancel context.CancelFunc, errors chan error) *AllErrorRecorder { + errCounter := 0 + successCounter := 0 + responseCounter := 0 + rec := &AllErrorRecorder{} + + for err := range errors { + responseCounter++ + + switch err { + case nil: + successCounter++ + default: + errCounter++ + rec.RecordError(err) + } + + // Even though we cancel in the next conditional, we need to keep + // consuming off the channel, or those goroutines will get stuck + // forever. + if responseCounter == eg.NumGoroutines { + break + } + + if errCounter > eg.NumAllowedErrors || successCounter >= eg.NumRequiredSuccesses { + cancel() + } + } + + return rec +} diff --git a/go/vt/topotools/position_searcher.go b/go/vt/topotools/position_searcher.go new file mode 100644 index 00000000000..8affbcf077e --- /dev/null +++ b/go/vt/topotools/position_searcher.go @@ -0,0 +1,124 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topotools + +import ( + "context" + "sync" + "time" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// MaxReplicationPositionSearcher provides a threadsafe way to find a tablet +// with the most advanced replication position. +// +// A typical usage will look like: +// +// var ( +// searcher = NewMaxReplicationPositionSearcher(tmc, logger, waitTimeout) +// wg sync.WaitGroup +// ) +// for _, tablet := range tablets { +// wg.Add(1) +// go func(t *topodatapb.Tablet) { +// defer wg.Done() +// searcher.ProcessTablet(ctx, t) +// }(tablet) +// } +// wg.Wait() +// maxPosTablet := searcher.MaxPositionTablet() +// +type MaxReplicationPositionSearcher struct { + tmc tmclient.TabletManagerClient + logger logutil.Logger + waitTimeout time.Duration + m sync.Mutex + + maxPos mysql.Position + maxPosTablet *topodatapb.Tablet +} + +// NewMaxReplicationPositionSearcher returns a new +// MaxReplicationPositionSearcher instance, ready to begin processing tablets. +// To reuse an existing instance, first call Reset(). +func NewMaxReplicationPositionSearcher(tmc tmclient.TabletManagerClient, logger logutil.Logger, waitTimeout time.Duration) *MaxReplicationPositionSearcher { + return &MaxReplicationPositionSearcher{ + tmc: tmc, + logger: logger, + waitTimeout: waitTimeout, + m: sync.Mutex{}, + maxPos: mysql.Position{}, + maxPosTablet: nil, + } +} + +// ProcessTablet processes the replication position for a single tablet and +// updates the state of the searcher. It is safe to call from multiple +// goroutines. +func (searcher *MaxReplicationPositionSearcher) ProcessTablet(ctx context.Context, tablet *topodatapb.Tablet) { + searcher.logger.Infof("getting replication position from %v", topoproto.TabletAliasString(tablet.Alias)) + + ctx, cancel := context.WithTimeout(ctx, searcher.waitTimeout) + defer cancel() + + status, err := searcher.tmc.ReplicationStatus(ctx, tablet) + if err != nil { + searcher.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", topoproto.TabletAliasString(tablet.Alias), err) + + return + } + + pos, err := mysql.DecodePosition(status.Position) + if err != nil { + searcher.logger.Warningf("cannot decode replica position %v for tablet %v, ignoring tablet: %v", status.Position, topoproto.TabletAliasString(tablet.Alias), err) + + return + } + + searcher.m.Lock() + defer searcher.m.Unlock() + + if searcher.maxPosTablet == nil || !searcher.maxPos.AtLeast(pos) { + searcher.maxPos = pos + searcher.maxPosTablet = tablet + } +} + +// MaxPositionTablet returns the most advanced-positioned tablet the searcher +// has seen so far. +func (searcher *MaxReplicationPositionSearcher) MaxPositionTablet() *topodatapb.Tablet { + searcher.m.Lock() + defer searcher.m.Unlock() + + return searcher.maxPosTablet +} + +// Reset clears any tracked position or tablet from the searcher, making this +// instance ready to begin a new search. +func (searcher *MaxReplicationPositionSearcher) Reset() { + searcher.m.Lock() + defer searcher.m.Unlock() + + searcher.maxPos = mysql.Position{} + searcher.maxPosTablet = nil +} diff --git a/go/vt/topotools/position_searcher_test.go b/go/vt/topotools/position_searcher_test.go new file mode 100644 index 00000000000..b90b52be489 --- /dev/null +++ b/go/vt/topotools/position_searcher_test.go @@ -0,0 +1,223 @@ +package topotools + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +type fakeTMClient struct { + tmclient.TabletManagerClient + tabletReplicationPositions map[string]*replicationdatapb.Status +} + +func (fake *fakeTMClient) ReplicationStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.Status, error) { + if fake.tabletReplicationPositions == nil { + return nil, assert.AnError + } + + if tablet.Alias == nil { + return nil, assert.AnError + } + + if pos, ok := fake.tabletReplicationPositions[topoproto.TabletAliasString(tablet.Alias)]; ok { + return pos, nil + } + + return nil, assert.AnError +} + +func TestMaxReplicationPositionSearcher(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + positions map[string]*replicationdatapb.Status + tablets []*topodatapb.Tablet + expected *topodatapb.Tablet + }{ + { + name: "success", + positions: map[string]*replicationdatapb.Status{ + "zone1-0000000100": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:23", + }, + "zone1-0000000101": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:23-28", + }, + "zone1-0000000102": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:23-30", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + }, + expected: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + }, + { + name: "reverse order", + positions: map[string]*replicationdatapb.Status{ + "zone1-0000000100": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:23", + }, + "zone1-0000000101": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:23-28", + }, + "zone1-0000000102": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:23-30", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + expected: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + }, + { + name: "no position for tablet is ignored", + positions: map[string]*replicationdatapb.Status{ + "zone1-0000000100": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:23", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + }, + expected: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + { + name: "bad position is ignored", + positions: map[string]*replicationdatapb.Status{ + "zone1-0000000100": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:23", + }, + "zone1-0000000101": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:23-28", + }, + "zone1-0000000102": { + Position: "junk position", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + }, + expected: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + tmc := &fakeTMClient{ + tabletReplicationPositions: tt.positions, + } + searcher := NewMaxReplicationPositionSearcher(tmc, logutil.NewMemoryLogger(), time.Millisecond*50) + + for _, tablet := range tt.tablets { + searcher.ProcessTablet(ctx, tablet) + } + + assert.Equal(t, tt.expected, searcher.MaxPositionTablet()) + }) + } +} diff --git a/go/vt/vtctl/reparentutil/emergency_reparenter.go b/go/vt/vtctl/reparentutil/emergency_reparenter.go new file mode 100644 index 00000000000..1ad55715985 --- /dev/null +++ b/go/vt/vtctl/reparentutil/emergency_reparenter.go @@ -0,0 +1,374 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reparentutil + +import ( + "context" + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + + "vitess.io/vitess/go/event" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools/events" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + logutilpb "vitess.io/vitess/go/vt/proto/logutil" + replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// EmergencyReparenter performs EmergencyReparentShard operations. +type EmergencyReparenter struct { + ts *topo.Server + tmc tmclient.TabletManagerClient + logger logutil.Logger +} + +// EmergencyReparentOptions provides optional parameters to +// EmergencyReparentShard operations. Options are passed by value, so it is safe +// for callers to mutate and reuse options structs for multiple calls. +type EmergencyReparentOptions struct { + NewPrimaryAlias *topodatapb.TabletAlias + IgnoreReplicas sets.String + WaitReplicasTimeout time.Duration + + // Private options managed internally. We use value passing to avoid leaking + // these details back out. + + lockAction string +} + +// NewEmergencyReparenter returns a new EmergencyReparenter object, ready to +// perform EmergencyReparentShard operations using the given topo.Server, +// TabletManagerClient, and logger. +// +// Providing a nil logger instance is allowed. +func NewEmergencyReparenter(ts *topo.Server, tmc tmclient.TabletManagerClient, logger logutil.Logger) *EmergencyReparenter { + erp := EmergencyReparenter{ + ts: ts, + tmc: tmc, + logger: logger, + } + + if erp.logger == nil { + // Create a no-op logger so we can call functions on er.logger without + // needed to constantly check for non-nil. + erp.logger = logutil.NewCallbackLogger(func(*logutilpb.Event) {}) + } + + return &erp +} + +// ReparentShard performs the EmergencyReparentShard operation on the given +// keyspace and shard. +func (erp *EmergencyReparenter) ReparentShard(ctx context.Context, keyspace string, shard string, opts EmergencyReparentOptions) (*events.Reparent, error) { + opts.lockAction = erp.getLockAction(opts.NewPrimaryAlias) + + ctx, unlock, err := erp.ts.LockShard(ctx, keyspace, shard, opts.lockAction) + if err != nil { + return nil, err + } + + defer unlock(&err) + + ev := &events.Reparent{} + defer func() { + switch err { + case nil: + event.DispatchUpdate(ev, "finished EmergencyReparentShard") + default: + event.DispatchUpdate(ev, "failed EmergencyReparentShard: "+err.Error()) + } + }() + + err = erp.reparentShardLocked(ctx, ev, keyspace, shard, opts) + + return ev, err +} + +func (erp *EmergencyReparenter) getLockAction(newPrimaryAlias *topodatapb.TabletAlias) string { + action := "EmergencyReparentShard" + + if newPrimaryAlias != nil { + action += fmt.Sprintf("(%v)", topoproto.TabletAliasString(newPrimaryAlias)) + } + + return action +} + +func (erp *EmergencyReparenter) promoteNewPrimary( + ctx context.Context, + ev *events.Reparent, + keyspace string, + shard string, + newPrimaryTabletAlias string, + tabletMap map[string]*topo.TabletInfo, + statusMap map[string]*replicationdatapb.StopReplicationStatus, + opts EmergencyReparentOptions, +) error { + erp.logger.Infof("promoting tablet %v to master", newPrimaryTabletAlias) + event.DispatchUpdate(ev, "promoting replica") + + newPrimaryTabletInfo, ok := tabletMap[newPrimaryTabletAlias] + if !ok { + return vterrors.Errorf(vtrpc.Code_INTERNAL, "attempted to promote master-elect %v that was not in the tablet map; this an impossible situation", newPrimaryTabletAlias) + } + + rp, err := erp.tmc.PromoteReplica(ctx, newPrimaryTabletInfo.Tablet) + if err != nil { + return vterrors.Wrapf(err, "master-elect tablet %v failed to be upgraded to master: %v", newPrimaryTabletAlias, err) + } + + if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) + } + + replCtx, replCancel := context.WithTimeout(ctx, opts.WaitReplicasTimeout) + defer replCancel() + + event.DispatchUpdate(ev, "reparenting all tablets") + + // Create a context and cancel function to watch for the first successful + // SetMaster call on a replica. We use a background context so that this + // context is only ever Done when its cancel is called by the background + // goroutine we're about to spin up. + // + // Similarly, create a context and cancel for the replica waiter goroutine + // to signal when all replica goroutines have finished. In the case where at + // least one replica succeeds, replSuccessCtx will be canceled first, while + // allReplicasDoneCtx is guaranteed to be canceled within + // opts.WaitReplicasTimeout plus some jitter. + replSuccessCtx, replSuccessCancel := context.WithCancel(context.Background()) + allReplicasDoneCtx, allReplicasDoneCancel := context.WithCancel(context.Background()) + + now := time.Now().UnixNano() + replWg := sync.WaitGroup{} + rec := concurrency.AllErrorRecorder{} + + handlePrimary := func(alias string, ti *topo.TabletInfo) error { + erp.logger.Infof("populating reparent journal on new master %v", alias) + return erp.tmc.PopulateReparentJournal(replCtx, ti.Tablet, now, opts.lockAction, newPrimaryTabletInfo.Alias, rp) + } + + handleReplica := func(alias string, ti *topo.TabletInfo) { + defer replWg.Done() + erp.logger.Infof("setting new master on replica %v", alias) + + forceStart := false + if status, ok := statusMap[alias]; ok { + fs, err := ReplicaWasRunning(status) + if err != nil { + err = vterrors.Wrapf(err, "tablet %v could not determine StopReplicationStatus: %v", alias, err) + rec.RecordError(err) + + return + } + + forceStart = fs + } + + err := erp.tmc.SetMaster(replCtx, ti.Tablet, newPrimaryTabletInfo.Alias, now, "", forceStart) + if err != nil { + err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err) + rec.RecordError(err) + + return + } + + // Signal that at least one goroutine succeeded to SetMaster. + replSuccessCancel() + } + + numReplicas := 0 + + for alias, ti := range tabletMap { + switch { + case alias == newPrimaryTabletAlias: + continue + case !opts.IgnoreReplicas.Has(alias): + replWg.Add(1) + numReplicas++ + go handleReplica(alias, ti) + } + } + + // Spin up a background goroutine to wait until all replica goroutines + // finished. Polling this way allows us to have promoteNewPrimary return + // success as soon as (a) the primary successfully populates its reparent + // journal and (b) at least one replica successfully begins replicating. + // + // If we were to follow the more common pattern of blocking on replWg.Wait() + // in the main body of promoteNewPrimary, we would be bound to the + // time of slowest replica, instead of the time of the fastest successful + // replica, and we want ERS to be fast. + go func() { + replWg.Wait() + allReplicasDoneCancel() + }() + + primaryErr := handlePrimary(newPrimaryTabletAlias, newPrimaryTabletInfo) + if primaryErr != nil { + erp.logger.Warningf("master failed to PopulateReparentJournal") + replCancel() + + return vterrors.Wrapf(primaryErr, "failed to PopulateReparentJournal on master: %v", primaryErr) + } + + select { + case <-replSuccessCtx.Done(): + // At least one replica was able to SetMaster successfully + return nil + case <-allReplicasDoneCtx.Done(): + // There are certain timing issues between replSuccessCtx.Done firing + // and allReplicasDoneCtx.Done firing, so we check again if truly all + // replicas failed (where `numReplicas` goroutines recorded an error) or + // one or more actually managed to succeed. + errCount := len(rec.Errors) + + switch { + case errCount > numReplicas: + // Technically, rec.Errors should never be greater than numReplicas, + // but it's better to err on the side of caution here, but also + // we're going to be explicit that this is doubly unexpected. + return vterrors.Wrapf(rec.Error(), "received more errors (= %d) than replicas (= %d), which should be impossible: %v", errCount, numReplicas, rec.Error()) + case errCount == numReplicas: + return vterrors.Wrapf(rec.Error(), "%d replica(s) failed: %v", numReplicas, rec.Error()) + default: + return nil + } + } +} + +func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace string, shard string, opts EmergencyReparentOptions) error { + shardInfo, err := erp.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return err + } + + ev.ShardInfo = *shardInfo + + event.DispatchUpdate(ev, "reading all tablets") + + tabletMap, err := erp.ts.GetTabletMapForShard(ctx, keyspace, shard) + if err != nil { + return vterrors.Wrapf(err, "failed to get tablet map for %v/%v: %v", keyspace, shard, err) + } + + statusMap, primaryStatusMap, err := StopReplicationAndBuildStatusMaps(ctx, erp.tmc, ev, tabletMap, opts.WaitReplicasTimeout, opts.IgnoreReplicas, erp.logger) + if err != nil { + return vterrors.Wrapf(err, "failed to stop replication and build status maps: %v", err) + } + + if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) + } + + validCandidates, err := FindValidEmergencyReparentCandidates(statusMap, primaryStatusMap) + if err != nil { + return err + } else if len(validCandidates) == 0 { + return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "no valid candidates for emergency reparent") + } + + // Wait for all candidates to apply relay logs + if err := erp.waitForAllRelayLogsToApply(ctx, validCandidates, tabletMap, statusMap, opts); err != nil { + return err + } + + // Elect the candidate with the most up-to-date position. + var ( + winningPosition mysql.Position + winningPrimaryTabletAliasStr string + ) + + for alias, position := range validCandidates { + if winningPosition.IsZero() || position.AtLeast(winningPosition) { + winningPosition = position + winningPrimaryTabletAliasStr = alias + } + } + + // If we were requested to elect a particular primary, verify it's a valid + // candidate (non-zero position, no errant GTIDs) and is at least as + // advanced as the winning position. + if opts.NewPrimaryAlias != nil { + winningPrimaryTabletAliasStr = topoproto.TabletAliasString(opts.NewPrimaryAlias) + pos, ok := validCandidates[winningPrimaryTabletAliasStr] + switch { + case !ok: + return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "master elect %v has errant GTIDs", winningPrimaryTabletAliasStr) + case !pos.AtLeast(winningPosition): + return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "master elect %v at position %v is not fully caught up. Winning position: %v", winningPrimaryTabletAliasStr, pos, winningPosition) + } + } + + // Check (again) we still have the topology lock. + if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) + } + + // Do the promotion. + if err := erp.promoteNewPrimary(ctx, ev, keyspace, shard, winningPrimaryTabletAliasStr, tabletMap, statusMap, opts); err != nil { + return err + } + + return nil +} + +func (erp *EmergencyReparenter) waitForAllRelayLogsToApply( + ctx context.Context, + validCandidates map[string]mysql.Position, + tabletMap map[string]*topo.TabletInfo, + statusMap map[string]*replicationdatapb.StopReplicationStatus, + opts EmergencyReparentOptions, +) error { + errCh := make(chan error) + defer close(errCh) + + groupCtx, groupCancel := context.WithTimeout(ctx, opts.WaitReplicasTimeout) + defer groupCancel() + + for candidate := range validCandidates { + go func(alias string) { + var err error + defer func() { errCh <- err }() + err = WaitForRelayLogsToApply(groupCtx, erp.tmc, tabletMap[alias], statusMap[alias]) + }(candidate) + } + + errgroup := concurrency.ErrorGroup{ + NumGoroutines: len(validCandidates), + NumRequiredSuccesses: len(validCandidates), + NumAllowedErrors: 0, + } + rec := errgroup.Wait(groupCancel, errCh) + + if len(rec.Errors) != 0 { + return vterrors.Wrapf(rec.Error(), "could not apply all relay logs within the provided WaitReplicasTimeout (%s): %v", opts.WaitReplicasTimeout, rec.Error()) + } + + return nil +} diff --git a/go/vt/vtctl/reparentutil/replication.go b/go/vt/vtctl/reparentutil/replication.go new file mode 100644 index 00000000000..42440b82918 --- /dev/null +++ b/go/vt/vtctl/reparentutil/replication.go @@ -0,0 +1,242 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reparentutil + +import ( + "context" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + + "vitess.io/vitess/go/event" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topotools/events" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" + "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// FindValidEmergencyReparentCandidates will find candidates for an emergency +// reparent, and, if successful, return a mapping of those tablet aliases (as +// raw strings) to their replication positions for later comparison. +func FindValidEmergencyReparentCandidates( + statusMap map[string]*replicationdatapb.StopReplicationStatus, + primaryStatusMap map[string]*replicationdatapb.MasterStatus, +) (map[string]mysql.Position, error) { + replicationStatusMap := make(map[string]*mysql.ReplicationStatus, len(statusMap)) + positionMap := make(map[string]mysql.Position) + + // Build out replication status list from proto types. + for alias, statuspb := range statusMap { + status := mysql.ProtoToReplicationStatus(statuspb.After) + replicationStatusMap[alias] = &status + } + + // Determine if we're GTID-based. If we are, we'll need to look for errant + // GTIDs below. + var ( + isGTIDBased bool + isNonGTIDBased bool + emptyRelayPosErrorRecorder concurrency.FirstErrorRecorder + ) + + for alias, status := range replicationStatusMap { + if _, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet); ok { + isGTIDBased = true + } else { + isNonGTIDBased = true + } + + if status.RelayLogPosition.IsZero() { + // Potentially bail. If any other tablet is detected to have + // GTID-based relay log positions, we will return the error recorded + // here. + emptyRelayPosErrorRecorder.RecordError(vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has GTID based relay log positions", alias)) + } + } + + if isGTIDBased && emptyRelayPosErrorRecorder.HasErrors() { + return nil, emptyRelayPosErrorRecorder.Error() + } + + if isGTIDBased && isNonGTIDBased { + return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "encountered mix of GTID-based and non GTID-based relay logs") + } + + // Create relevant position list of errant GTID-based positions for later + // comparison. + for alias, status := range replicationStatusMap { + // If we're not GTID-based, no need to search for errant GTIDs, so just + // add the position to the map and continue. + if !isGTIDBased { + positionMap[alias] = status.Position + + continue + } + + // This condition should really never happen, since we did the same cast + // in the earlier loop, but let's be doubly sure. + relayLogGTIDSet, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) + if !ok { + return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "we got a filled-in relay log position, but it's not of type Mysql56GTIDSet, even though we've determined we need to use GTID based assesment") + } + + // We need to remove this alias's status from the list, otherwise the + // GTID diff will always be empty. + statusList := make([]*mysql.ReplicationStatus, 0, len(replicationStatusMap)-1) + + for a, s := range replicationStatusMap { + if a != alias { + statusList = append(statusList, s) + } + } + + errantGTIDs, err := status.FindErrantGTIDs(statusList) + switch { + case err != nil: + // Could not look up GTIDs to determine if we have any. It's not + // safe to continue. + return nil, err + case len(errantGTIDs) != 0: + // This tablet has errant GTIDs. It's not a valid candidate for + // reparent, so don't insert it into the final mapping. + continue + } + + pos := mysql.Position{GTIDSet: relayLogGTIDSet} + positionMap[alias] = pos + } + + for alias, primaryStatus := range primaryStatusMap { + executedPosition, err := mysql.DecodePosition(primaryStatus.Position) + if err != nil { + return nil, vterrors.Wrapf(err, "could not decode a master status executed position for tablet %v: %v", alias, err) + } + + positionMap[alias] = executedPosition + } + + return positionMap, nil +} + +// ReplicaWasRunning returns true if a StopReplicationStatus indicates that the +// replica had running replication threads before being stopped. It returns an +// error if the Before state of replication is nil. +func ReplicaWasRunning(stopStatus *replicationdatapb.StopReplicationStatus) (bool, error) { + if stopStatus == nil || stopStatus.Before == nil { + return false, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "could not determine Before state of StopReplicationStatus %v", stopStatus) + } + + return stopStatus.Before.IoThreadRunning || stopStatus.Before.SqlThreadRunning, nil +} + +// StopReplicationAndBuildStatusMaps stops replication on all replicas, then +// collects and returns a mapping of TabletAlias (as string) to their current +// replication positions. +func StopReplicationAndBuildStatusMaps( + ctx context.Context, + tmc tmclient.TabletManagerClient, + ev *events.Reparent, + tabletMap map[string]*topo.TabletInfo, + waitReplicasTimeout time.Duration, + ignoredTablets sets.String, + logger logutil.Logger, +) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { + event.DispatchUpdate(ev, "stop replication on all replicas") + + var ( + statusMap = map[string]*replicationdatapb.StopReplicationStatus{} + masterStatusMap = map[string]*replicationdatapb.MasterStatus{} + m sync.Mutex + errChan = make(chan error) + ) + + groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) + defer groupCancel() + + fillStatus := func(alias string, tabletInfo *topo.TabletInfo) { + err := vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "fillStatus did not successfully complete") + defer func() { errChan <- err }() + + logger.Infof("getting replication position from %v", alias) + + _, stopReplicationStatus, err := tmc.StopReplicationAndGetStatus(groupCtx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) + switch err { + case mysql.ErrNotReplica: + var masterStatus *replicationdatapb.MasterStatus + + masterStatus, err = tmc.DemoteMaster(groupCtx, tabletInfo.Tablet) + if err != nil { + msg := "replica %v thinks it's master but we failed to demote it" + err = vterrors.Wrapf(err, msg+": %v", alias, err) + + logger.Warningf(msg, alias) + return + } + + m.Lock() + masterStatusMap[alias] = masterStatus + m.Unlock() + case nil: + m.Lock() + statusMap[alias] = stopReplicationStatus + m.Unlock() + default: + logger.Warningf("failed to get replication status from %v: %v", alias, err) + + err = vterrors.Wrapf(err, "error when getting replication status for alias %v: %v", alias, err) + } + } + + for alias, tabletInfo := range tabletMap { + if !ignoredTablets.Has(alias) { + go fillStatus(alias, tabletInfo) + } + } + + errgroup := concurrency.ErrorGroup{ + NumGoroutines: len(tabletMap) - ignoredTablets.Len(), + NumRequiredSuccesses: len(tabletMap) - ignoredTablets.Len() - 1, + NumAllowedErrors: 1, + } + + errRecorder := errgroup.Wait(groupCancel, errChan) + if len(errRecorder.Errors) > 1 { + return nil, nil, vterrors.Wrapf(errRecorder.Error(), "encountered more than one error when trying to stop replication and get positions: %v", errRecorder.Error()) + } + + return statusMap, masterStatusMap, nil +} + +// WaitForRelayLogsToApply blocks execution waiting for the given tablet's relay +// logs to apply, unless the specified context is canceled or exceeded. +// Typically a caller will set a timeout of WaitReplicasTimeout on a context and +// use that context with this function. +func WaitForRelayLogsToApply(ctx context.Context, tmc tmclient.TabletManagerClient, tabletInfo *topo.TabletInfo, status *replicationdatapb.StopReplicationStatus) error { + switch status.After.RelayLogPosition { + case "": + return tmc.WaitForPosition(ctx, tabletInfo.Tablet, status.After.FileRelayLogPosition) + default: + return tmc.WaitForPosition(ctx, tabletInfo.Tablet, status.After.RelayLogPosition) + } +} diff --git a/go/vt/vtctl/reparentutil/replication_test.go b/go/vt/vtctl/reparentutil/replication_test.go new file mode 100644 index 00000000000..12a045c60dd --- /dev/null +++ b/go/vt/vtctl/reparentutil/replication_test.go @@ -0,0 +1,898 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reparentutil + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools/events" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +func TestFindValidEmergencyReparentCandidates(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + statusMap map[string]*replicationdatapb.StopReplicationStatus + primaryStatusMap map[string]*replicationdatapb.MasterStatus + // Note: for these tests, it's simpler to compare keys than actual + // mysql.Postion structs, which are just thin wrappers around the + // mysql.GTIDSet interface. If a tablet alias makes it into the map, we + // know it was chosen by the method, and that either + // mysql.DecodePosition was successful (in the primary case) or + // status.FindErrantGTIDs was successful (in the replica case). If the + // former is not true, then the function should return an error. If the + // latter is not true, then the tablet alias will not be in the map. The + // point is, the combination of (1) whether the test should error and + // (2) the set of keys we expect in the map is enough to fully assert on + // the correctness of the behavior of this functional unit. + expected []string + shouldErr bool + }{ + { + name: "success", + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "r1": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + "r2": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + }, + primaryStatusMap: map[string]*replicationdatapb.MasterStatus{ + "p1": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + expected: []string{"r1", "r2", "p1"}, + shouldErr: false, + }, + { + name: "mixed replication modes", + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "r1": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + "r2": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "FilePos/mysql-bin.0001:10", + }, + }, + }, + expected: nil, + shouldErr: true, + }, + { + name: "tablet without relay log position", + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "r1": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + "r2": { + After: &replicationdatapb.Status{ + RelayLogPosition: "", + }, + }, + }, + expected: nil, + shouldErr: true, + }, + { + name: "non-GTID-based", + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "r1": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "FilePos/mysql-bin.0001:100", + }, + }, + "r2": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "FilePos/mysql-bin.0001:10", + }, + }, + }, + expected: []string{"r1", "r2"}, + shouldErr: false, + }, + { + name: "tablet with errant GTIDs is excluded", + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "r1": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + "errant": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5,AAAAAAAA-71CA-11E1-9E33-C80AA9429562:1", + }, + }, + }, + primaryStatusMap: map[string]*replicationdatapb.MasterStatus{ + "p1": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + expected: []string{"r1", "p1"}, + shouldErr: false, + }, + { + name: "bad master position fails the call", + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "r1": { + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + }, + primaryStatusMap: map[string]*replicationdatapb.MasterStatus{ + "p1": { + Position: "InvalidFlavor/1234", + }, + }, + expected: nil, + shouldErr: true, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + actual, err := FindValidEmergencyReparentCandidates(tt.statusMap, tt.primaryStatusMap) + if tt.shouldErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + + keys := make([]string, 0, len(actual)) + for key := range actual { + keys = append(keys, key) + } + assert.ElementsMatch(t, tt.expected, keys) + }) + } +} + +// stopReplicationAndBuildStatusMapsTestTMClient implements +// tmclient.TabletManagerClient to facilitate testing of +// StopReplicationAndBuildStatusMaps. +type stopReplicationAndBuildStatusMapsTestTMClient struct { + tmclient.TabletManagerClient + + demoteMasterResults map[string]*struct { + MasterStatus *replicationdatapb.MasterStatus + Err error + } + demoteMasterDelays map[string]time.Duration + + stopReplicationAndGetStatusResults map[string]*struct { + StopStatus *replicationdatapb.StopReplicationStatus + Err error + } + stopReplicationAndGetStatusDelays map[string]time.Duration +} + +func (fake *stopReplicationAndBuildStatusMapsTestTMClient) DemoteMaster(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.MasterStatus, error) { + if tablet.Alias == nil { + return nil, assert.AnError + } + + key := topoproto.TabletAliasString(tablet.Alias) + + if delay, ok := fake.demoteMasterDelays[key]; ok { + select { + case <-time.After(delay): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + if result, ok := fake.demoteMasterResults[key]; ok { + return result.MasterStatus, result.Err + } + + return nil, assert.AnError +} + +func (fake *stopReplicationAndBuildStatusMapsTestTMClient) StopReplicationAndGetStatus(ctx context.Context, tablet *topodatapb.Tablet, mode replicationdatapb.StopReplicationMode) (*replicationdatapb.Status, *replicationdatapb.StopReplicationStatus, error) { + if tablet.Alias == nil { + return nil, nil, assert.AnError + } + + key := topoproto.TabletAliasString(tablet.Alias) + + if delay, ok := fake.stopReplicationAndGetStatusDelays[key]; ok { + select { + case <-time.After(delay): + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } + + if result, ok := fake.stopReplicationAndGetStatusResults[key]; ok { + return /* unused by the code under test */ nil, result.StopStatus, result.Err + } + + return nil, nil, assert.AnError +} + +func TestStopReplicationAndBuildStatusMaps(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := logutil.NewMemoryLogger() + tests := []struct { + name string + tmc *stopReplicationAndBuildStatusMapsTestTMClient + tabletMap map[string]*topo.TabletInfo + waitReplicasTimeout time.Duration + ignoredTablets sets.String + expectedStatusMap map[string]*replicationdatapb.StopReplicationStatus + expectedMasterStatusMap map[string]*replicationdatapb.MasterStatus + shouldErr bool + }{ + { + name: "success", + tmc: &stopReplicationAndBuildStatusMapsTestTMClient{ + stopReplicationAndGetStatusResults: map[string]*struct { + StopStatus *replicationdatapb.StopReplicationStatus + Err error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{Position: "100-before"}, + After: &replicationdatapb.Status{Position: "100-after"}, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + }, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + ignoredTablets: sets.NewString(), + expectedStatusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000100": { + Before: &replicationdatapb.Status{Position: "100-before"}, + After: &replicationdatapb.Status{Position: "100-after"}, + }, + "zone1-0000000101": { + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + expectedMasterStatusMap: map[string]*replicationdatapb.MasterStatus{}, + shouldErr: false, + }, + { + name: "ignore tablets", + tmc: &stopReplicationAndBuildStatusMapsTestTMClient{ + stopReplicationAndGetStatusResults: map[string]*struct { + StopStatus *replicationdatapb.StopReplicationStatus + Err error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{Position: "100-before"}, + After: &replicationdatapb.Status{Position: "100-after"}, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + }, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + ignoredTablets: sets.NewString("zone1-0000000100"), + expectedStatusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000101": { + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + expectedMasterStatusMap: map[string]*replicationdatapb.MasterStatus{}, + shouldErr: false, + }, + { + name: "have MASTER tablet and can demote", + tmc: &stopReplicationAndBuildStatusMapsTestTMClient{ + demoteMasterResults: map[string]*struct { + MasterStatus *replicationdatapb.MasterStatus + Err error + }{ + "zone1-0000000100": { + MasterStatus: &replicationdatapb.MasterStatus{ + Position: "master-position-100", + }, + }, + }, + stopReplicationAndGetStatusResults: map[string]*struct { + StopStatus *replicationdatapb.StopReplicationStatus + Err error + }{ + "zone1-0000000100": { + Err: mysql.ErrNotReplica, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + }, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + ignoredTablets: sets.NewString(), + expectedStatusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000101": { + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + expectedMasterStatusMap: map[string]*replicationdatapb.MasterStatus{ + "zone1-0000000100": { + Position: "master-position-100", + }, + }, + shouldErr: false, + }, + { + name: "one tablet is MASTER and cannot demote", + tmc: &stopReplicationAndBuildStatusMapsTestTMClient{ + demoteMasterResults: map[string]*struct { + MasterStatus *replicationdatapb.MasterStatus + Err error + }{ + "zone1-0000000100": { + Err: assert.AnError, + }, + }, + stopReplicationAndGetStatusResults: map[string]*struct { + StopStatus *replicationdatapb.StopReplicationStatus + Err error + }{ + "zone1-0000000100": { + Err: mysql.ErrNotReplica, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + }, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + ignoredTablets: sets.NewString(), + expectedStatusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000101": { + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + expectedMasterStatusMap: map[string]*replicationdatapb.MasterStatus{}, // zone1-0000000100 fails to demote, so does not appear + shouldErr: false, + }, + { + name: "multiple tablets are MASTER and cannot demote", + tmc: &stopReplicationAndBuildStatusMapsTestTMClient{ + demoteMasterResults: map[string]*struct { + MasterStatus *replicationdatapb.MasterStatus + Err error + }{ + "zone1-0000000100": { + Err: assert.AnError, + }, + "zone1-0000000101": { + Err: assert.AnError, + }, + }, + stopReplicationAndGetStatusResults: map[string]*struct { + StopStatus *replicationdatapb.StopReplicationStatus + Err error + }{ + "zone1-0000000100": { + Err: mysql.ErrNotReplica, + }, + "zone1-0000000101": { + Err: mysql.ErrNotReplica, + }, + }, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + ignoredTablets: sets.NewString(), + expectedStatusMap: nil, + expectedMasterStatusMap: nil, + shouldErr: true, // we get multiple errors, so we fail + }, + { + name: "waitReplicasTimeout exceeded", + tmc: &stopReplicationAndBuildStatusMapsTestTMClient{ + stopReplicationAndGetStatusDelays: map[string]time.Duration{ + "zone1-0000000100": time.Minute, // zone1-0000000100 will timeout and not be included + }, + stopReplicationAndGetStatusResults: map[string]*struct { + StopStatus *replicationdatapb.StopReplicationStatus + Err error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{Position: "100-before"}, + After: &replicationdatapb.Status{Position: "100-after"}, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + }, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + waitReplicasTimeout: time.Millisecond * 5, + ignoredTablets: sets.NewString(), + expectedStatusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000101": { + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + expectedMasterStatusMap: map[string]*replicationdatapb.MasterStatus{}, + shouldErr: false, + }, + { + name: "one tablet fails to StopReplication", + tmc: &stopReplicationAndBuildStatusMapsTestTMClient{ + stopReplicationAndGetStatusResults: map[string]*struct { + StopStatus *replicationdatapb.StopReplicationStatus + Err error + }{ + "zone1-0000000100": { + Err: assert.AnError, // not being mysql.ErrNotReplica will not cause us to call DemoteMaster + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + }, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + ignoredTablets: sets.NewString(), + expectedStatusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000101": { + Before: &replicationdatapb.Status{Position: "101-before"}, + After: &replicationdatapb.Status{Position: "101-after"}, + }, + }, + expectedMasterStatusMap: map[string]*replicationdatapb.MasterStatus{}, + shouldErr: false, + }, + { + name: "multiple tablets fail StopReplication", + tmc: &stopReplicationAndBuildStatusMapsTestTMClient{ + stopReplicationAndGetStatusResults: map[string]*struct { + StopStatus *replicationdatapb.StopReplicationStatus + Err error + }{ + "zone1-0000000100": { + Err: assert.AnError, + }, + "zone1-0000000101": { + Err: assert.AnError, + }, + }, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + ignoredTablets: sets.NewString(), + expectedStatusMap: nil, + expectedMasterStatusMap: nil, + shouldErr: true, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + statusMap, masterStatusMap, err := StopReplicationAndBuildStatusMaps( + ctx, + tt.tmc, + &events.Reparent{}, + tt.tabletMap, + tt.waitReplicasTimeout, + tt.ignoredTablets, + logger, + ) + if tt.shouldErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expectedStatusMap, statusMap, "StopReplicationStatus mismatch") + assert.Equal(t, tt.expectedMasterStatusMap, masterStatusMap, "MasterStatusMap mismatch") + }) + } +} + +func TestReplicaWasRunning(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + in *replicationdatapb.StopReplicationStatus + expected bool + shouldErr bool + }{ + { + name: "io thread running", + in: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{ + IoThreadRunning: true, + SqlThreadRunning: false, + }, + }, + expected: true, + shouldErr: false, + }, + { + name: "sql thread running", + in: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{ + IoThreadRunning: false, + SqlThreadRunning: true, + }, + }, + expected: true, + shouldErr: false, + }, + { + name: "io and sql threads running", + in: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{ + IoThreadRunning: true, + SqlThreadRunning: true, + }, + }, + expected: true, + shouldErr: false, + }, + { + name: "no replication threads running", + in: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{ + IoThreadRunning: false, + SqlThreadRunning: false, + }, + }, + expected: false, + shouldErr: false, + }, + { + name: "passing nil pointer results in an error", + in: nil, + expected: false, + shouldErr: true, + }, + { + name: "status.Before is nil results in an error", + in: &replicationdatapb.StopReplicationStatus{ + Before: nil, + }, + expected: false, + shouldErr: true, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + actual, err := ReplicaWasRunning(tt.in) + if tt.shouldErr { + assert.Error(t, err) + + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expected, actual) + }) + } +} + +// waitForRelayLogsToApplyTestTMClient implements just the WaitForPosition +// method of the tmclient.TabletManagerClient interface for +// TestWaitForRelayLogsToApply, with the necessary trackers to facilitate +// testing that unit. +type waitForRelayLogsToApplyTestTMClient struct { + tmclient.TabletManagerClient + calledPositions []string + shouldErr bool +} + +func (fake *waitForRelayLogsToApplyTestTMClient) WaitForPosition(_ context.Context, _ *topodatapb.Tablet, position string) error { + if fake.shouldErr { + return assert.AnError + } + + fake.calledPositions = append(fake.calledPositions, position) + return nil +} + +func TestWaitForRelayLogsToApply(t *testing.T) { + t.Parallel() + + ctx := context.Background() + tests := []struct { + name string + client *waitForRelayLogsToApplyTestTMClient + status *replicationdatapb.StopReplicationStatus + expectedCalledPositions []string + shouldErr bool + }{ + { + name: "using relay log position", + client: &waitForRelayLogsToApplyTestTMClient{}, + status: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + RelayLogPosition: "relay-pos", + }, + }, + expectedCalledPositions: []string{"relay-pos"}, + shouldErr: false, + }, + { + name: "using file relay log position", + client: &waitForRelayLogsToApplyTestTMClient{}, + status: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + FileRelayLogPosition: "file-relay-pos", + }, + }, + expectedCalledPositions: []string{"file-relay-pos"}, + shouldErr: false, + }, + { + name: "when both are set, relay log position takes precedence over file relay log position", + client: &waitForRelayLogsToApplyTestTMClient{}, + status: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + RelayLogPosition: "relay-pos", + FilePosition: "file-relay-pos", + }, + }, + expectedCalledPositions: []string{"relay-pos"}, + shouldErr: false, + }, + { + name: "error waiting for position", + client: &waitForRelayLogsToApplyTestTMClient{ + shouldErr: true, + }, + status: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + RelayLogPosition: "relay-pos", + }, + }, + expectedCalledPositions: nil, + shouldErr: true, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := WaitForRelayLogsToApply(ctx, tt.client, &topo.TabletInfo{}, tt.status) + defer assert.Equal(t, tt.expectedCalledPositions, tt.client.calledPositions) + if tt.shouldErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + }) + } +} diff --git a/go/vt/vtctl/reparentutil/util.go b/go/vt/vtctl/reparentutil/util.go new file mode 100644 index 00000000000..9e31c6985da --- /dev/null +++ b/go/vt/vtctl/reparentutil/util.go @@ -0,0 +1,146 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reparentutil + +import ( + "context" + "sync" + "time" + + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// ChooseNewPrimary finds a tablet that should become a primary after reparent. +// The criteria for the new primary-elect are (preferably) to be in the same +// cell as the current primary, and to be different from avoidPrimaryAlias. The +// tablet with the most advanced replication position is chosen to minimize the +// amount of time spent catching up with the current primary. +// +// Note that the search for the most advanced replication position will race +// with transactions being executed on the current primary, so when all tablets +// are at roughly the same position, then the choice of new primary-elect will +// be somewhat unpredictable. +func ChooseNewPrimary( + ctx context.Context, + tmc tmclient.TabletManagerClient, + shardInfo *topo.ShardInfo, + tabletMap map[string]*topo.TabletInfo, + avoidPrimaryAlias *topodatapb.TabletAlias, + waitReplicasTimeout time.Duration, + // (TODO:@ajm188) it's a little gross we need to pass this, maybe embed in the context? + logger logutil.Logger, +) (*topodatapb.TabletAlias, error) { + if avoidPrimaryAlias == nil { + return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "tablet to avoid for reparent is not provided, cannot choose new primary") + } + + var primaryCell string + if shardInfo.MasterAlias != nil { + primaryCell = shardInfo.MasterAlias.Cell + } + + var ( + searcher = topotools.NewMaxReplicationPositionSearcher(tmc, logger, waitReplicasTimeout) + wg sync.WaitGroup + ) + + for _, tablet := range tabletMap { + switch { + case primaryCell != "" && tablet.Alias.Cell != primaryCell: + continue + case topoproto.TabletAliasEqual(tablet.Alias, avoidPrimaryAlias): + continue + case tablet.Tablet.Type != topodatapb.TabletType_REPLICA: + continue + } + + wg.Add(1) + + go func(tablet *topodatapb.Tablet) { + defer wg.Done() + searcher.ProcessTablet(ctx, tablet) + }(tablet.Tablet) + } + + wg.Wait() + + if maxPosTablet := searcher.MaxPositionTablet(); maxPosTablet != nil { + return maxPosTablet.Alias, nil + } + + return nil, nil +} + +// FindCurrentPrimary returns the current primary tablet of a shard, if any. The +// current primary is whichever tablet of type MASTER (if any) has the most +// recent MasterTermStartTime, which is the same rule that vtgate uses to route +// master traffic. +// +// The return value is nil if the current primary cannot be definitively +// determined. This can happen either if no tablet claims to be type MASTER, or +// if multiple tablets claim to be type MASTER and happen to have the same +// MasterTermStartTime timestamp (a tie). +// +// The tabletMap must be a complete map (not a partial result) for the shard. +func FindCurrentPrimary(tabletMap map[string]*topo.TabletInfo, logger logutil.Logger) *topo.TabletInfo { + var ( + currentPrimary *topo.TabletInfo + currentTermStartTime time.Time + ) + + for _, tablet := range tabletMap { + if tablet.Type != topodatapb.TabletType_MASTER { + continue + } + + if currentPrimary == nil { + currentPrimary = tablet + currentTermStartTime = tablet.GetMasterTermStartTime() + continue + } + + otherPrimaryTermStartTime := tablet.GetMasterTermStartTime() + if otherPrimaryTermStartTime.After(currentTermStartTime) { + currentPrimary = tablet + currentTermStartTime = otherPrimaryTermStartTime + } else if otherPrimaryTermStartTime.Equal(currentTermStartTime) { + // A tie should not happen unless the upgrade order was violated + // (e.g. some vttablets have not been upgraded) or if we get really + // unlucky. + // + // Either way, we need to be safe and not assume we know who the + // true primary is. + logger.Warningf( + "Multiple primaries (%v and %v) are tied for MasterTermStartTime; can't determine the true primary.", + topoproto.TabletAliasString(currentPrimary.Alias), + topoproto.TabletAliasString(tablet.Alias), + ) + + return nil + } + } + + return currentPrimary +} diff --git a/go/vt/vtctl/reparentutil/util_test.go b/go/vt/vtctl/reparentutil/util_test.go new file mode 100644 index 00000000000..eb0ccba3a3d --- /dev/null +++ b/go/vt/vtctl/reparentutil/util_test.go @@ -0,0 +1,513 @@ +/* +Copyright 20201 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reparentutil + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vttime" +) + +type chooseNewPrimaryTestTMClient struct { + tmclient.TabletManagerClient + replicationStatuses map[string]*replicationdatapb.Status +} + +func (fake *chooseNewPrimaryTestTMClient) ReplicationStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.Status, error) { + if fake.replicationStatuses == nil { + return nil, assert.AnError + } + + key := topoproto.TabletAliasString(tablet.Alias) + + if status, ok := fake.replicationStatuses[key]; ok { + return status, nil + } + + return nil, assert.AnError +} + +func TestChooseNewPrimary(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := logutil.NewMemoryLogger() + tests := []struct { + name string + tmc *chooseNewPrimaryTestTMClient + shardInfo *topo.ShardInfo + tabletMap map[string]*topo.TabletInfo + avoidPrimaryAlias *topodatapb.TabletAlias + expected *topodatapb.TabletAlias + shouldErr bool + }{ + { + name: "found a replica", + tmc: &chooseNewPrimaryTestTMClient{ + // zone1-101 is behind zone1-102 + replicationStatuses: map[string]*replicationdatapb.Status{ + "zone1-0000000101": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1", + }, + "zone1-0000000102": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + }, + shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{ + MasterAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, nil), + tabletMap: map[string]*topo.TabletInfo{ + "primary": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Type: topodatapb.TabletType_MASTER, + }, + }, + "replica1": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Type: topodatapb.TabletType_REPLICA, + }, + }, + "replica2": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Type: topodatapb.TabletType_REPLICA, + }, + }, + }, + avoidPrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 0, + }, + expected: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + shouldErr: false, + }, + { + name: "no active primary in shard", + tmc: &chooseNewPrimaryTestTMClient{ + replicationStatuses: map[string]*replicationdatapb.Status{ + "zone1-0000000101": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1", + }, + }, + }, + shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{}, nil), + tabletMap: map[string]*topo.TabletInfo{ + "primary": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Type: topodatapb.TabletType_MASTER, + }, + }, + "replica1": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Type: topodatapb.TabletType_REPLICA, + }, + }, + }, + avoidPrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 0, + }, + expected: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + shouldErr: false, + }, + { + name: "primary alias is nil", + tmc: &chooseNewPrimaryTestTMClient{ + replicationStatuses: map[string]*replicationdatapb.Status{ + "zone1-0000000101": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1", + }, + }, + }, + shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{ + MasterAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, nil), + tabletMap: map[string]*topo.TabletInfo{ + "primary": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Type: topodatapb.TabletType_MASTER, + }, + }, + "replica1": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Type: topodatapb.TabletType_REPLICA, + }, + }, + }, + avoidPrimaryAlias: nil, + expected: nil, + shouldErr: true, + }, + { + name: "no replicas in primary cell", + tmc: &chooseNewPrimaryTestTMClient{ + // zone1-101 is behind zone1-102 + replicationStatuses: map[string]*replicationdatapb.Status{ + "zone1-0000000101": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1", + }, + "zone1-0000000102": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + }, + shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{ + MasterAlias: &topodatapb.TabletAlias{ + Cell: "zone2", + Uid: 200, + }, + }, nil), + tabletMap: map[string]*topo.TabletInfo{ + "primary": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone2", + Uid: 200, + }, + Type: topodatapb.TabletType_MASTER, + }, + }, + "replica1": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Type: topodatapb.TabletType_REPLICA, + }, + }, + "replica2": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Type: topodatapb.TabletType_REPLICA, + }, + }, + }, + avoidPrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 0, + }, + expected: nil, + shouldErr: false, + }, + { + name: "only available tablet is AvoidPrimary", + tmc: &chooseNewPrimaryTestTMClient{ + // zone1-101 is behind zone1-102 + replicationStatuses: map[string]*replicationdatapb.Status{ + "zone1-0000000101": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1", + }, + "zone1-0000000102": { + Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5", + }, + }, + }, + shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{ + MasterAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, nil), + tabletMap: map[string]*topo.TabletInfo{ + "avoid-primary": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Type: topodatapb.TabletType_REPLICA, + }, + }, + }, + avoidPrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + expected: nil, + shouldErr: false, + }, + { + name: "no replicas in shard", + tmc: &chooseNewPrimaryTestTMClient{}, + shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{ + MasterAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, nil), + tabletMap: map[string]*topo.TabletInfo{ + "primary": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Type: topodatapb.TabletType_MASTER, + }, + }, + }, + avoidPrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 0, + }, + expected: nil, + shouldErr: false, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + actual, err := ChooseNewPrimary(ctx, tt.tmc, tt.shardInfo, tt.tabletMap, tt.avoidPrimaryAlias, time.Millisecond*50, logger) + if tt.shouldErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expected, actual) + }) + } +} + +func TestFindCurrentPrimary(t *testing.T) { + t.Parallel() + + // The exact values of the tablet aliases don't matter to this function, but + // we need them to be non-nil, so we'll just make one and reuse it. + alias := &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + } + logger := logutil.NewMemoryLogger() + tests := []struct { + name string + in map[string]*topo.TabletInfo + expected *topo.TabletInfo + }{ + { + name: "single current primary", + in: map[string]*topo.TabletInfo{ + "primary": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_MASTER, + MasterTermStartTime: &vttime.Time{ + Seconds: 100, + }, + Hostname: "primary-tablet", + }, + }, + "replica": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_REPLICA, + Hostname: "replica-tablet", + }, + }, + "rdonly": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_RDONLY, + Hostname: "rdonly-tablet", + }, + }, + }, + expected: &topo.TabletInfo{ + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_MASTER, + MasterTermStartTime: &vttime.Time{ + Seconds: 100, + }, + Hostname: "primary-tablet", + }, + }, + }, + { + name: "no primaries", + in: map[string]*topo.TabletInfo{ + "replica1": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_REPLICA, + Hostname: "replica-tablet-1", + }, + }, + "replica2": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_REPLICA, + Hostname: "replica-tablet-2", + }, + }, + "rdonly": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_RDONLY, + Hostname: "rdonly-tablet", + }, + }, + }, + expected: nil, + }, + { + name: "multiple primaries with one true primary", + in: map[string]*topo.TabletInfo{ + "stale-primary": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_MASTER, + MasterTermStartTime: &vttime.Time{ + Seconds: 100, + }, + Hostname: "stale-primary-tablet", + }, + }, + "true-primary": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_MASTER, + MasterTermStartTime: &vttime.Time{ + Seconds: 1000, + }, + Hostname: "true-primary-tablet", + }, + }, + "rdonly": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_RDONLY, + Hostname: "rdonly-tablet", + }, + }, + }, + expected: &topo.TabletInfo{ + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_MASTER, + MasterTermStartTime: &vttime.Time{ + Seconds: 1000, + }, + Hostname: "true-primary-tablet", + }, + }, + }, + { + name: "multiple primaries with same term start", + in: map[string]*topo.TabletInfo{ + "primary1": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_MASTER, + MasterTermStartTime: &vttime.Time{ + Seconds: 100, + }, + Hostname: "primary-tablet-1", + }, + }, + "primary2": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_MASTER, + MasterTermStartTime: &vttime.Time{ + Seconds: 100, + }, + Hostname: "primary-tablet-2", + }, + }, + "rdonly": { + Tablet: &topodatapb.Tablet{ + Alias: alias, + Type: topodatapb.TabletType_RDONLY, + Hostname: "rdonly-tablet", + }, + }, + }, + expected: nil, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + actual := FindCurrentPrimary(tt.in, logger) + assert.Equal(t, tt.expected, actual) + }) + } +} diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index e9dc22a0eb5..f03890913ba 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -27,7 +27,6 @@ import ( "time" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/utils/pointer" "vitess.io/vitess/go/event" "vitess.io/vitess/go/mysql" @@ -38,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/topotools/events" + "vitess.io/vitess/go/vt/vtctl/reparentutil" "vitess.io/vitess/go/vt/vterrors" replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" @@ -48,7 +48,7 @@ import ( const ( initShardMasterOperation = "InitShardMaster" plannedReparentShardOperation = "PlannedReparentShard" - emergencyReparentShardOperation = "EmergencyReparentShard" + emergencyReparentShardOperation = "EmergencyReparentShard" //nolint tabletExternallyReparentedOperation = "TabletExternallyReparented" //nolint ) @@ -395,7 +395,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R return nil } event.DispatchUpdate(ev, "searching for master candidate") - masterElectTabletAlias, err = wr.chooseNewMaster(ctx, shardInfo, tabletMap, avoidMasterTabletAlias, waitReplicasTimeout) + masterElectTabletAlias, err = reparentutil.ChooseNewPrimary(ctx, wr.tmc, shardInfo, tabletMap, avoidMasterTabletAlias, waitReplicasTimeout, wr.logger) if err != nil { return err } @@ -417,7 +417,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R // Find the current master (if any) based on the tablet states. We no longer // trust the shard record for this, because it is updated asynchronously. - currentMaster := wr.findCurrentMaster(tabletMap) + currentMaster := reparentutil.FindCurrentPrimary(tabletMap, wr.logger) var reparentJournalPos string @@ -711,477 +711,20 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R return nil } -// findCurrentMaster returns the current master of a shard, if any. -// -// The tabletMap must be a complete map (not a partial result) for the shard. -// -// The current master is whichever MASTER tablet (if any) has the highest -// MasterTermStartTime, which is the same rule that vtgate uses to route master -// traffic. -// -// The return value is nil if the current master can't be definitively -// determined. This can happen either if no tablet claims to be MASTER, or if -// multiple MASTER tablets claim to have the same timestamp (a tie). -func (wr *Wrangler) findCurrentMaster(tabletMap map[string]*topo.TabletInfo) *topo.TabletInfo { - var currentMaster *topo.TabletInfo - var currentMasterTime time.Time - - for _, tablet := range tabletMap { - // Only look at masters. - if tablet.Type != topodatapb.TabletType_MASTER { - continue - } - // Fill in first master we find. - if currentMaster == nil { - currentMaster = tablet - currentMasterTime = tablet.GetMasterTermStartTime() - continue - } - // If we find any other masters, compare timestamps. - newMasterTime := tablet.GetMasterTermStartTime() - if newMasterTime.After(currentMasterTime) { - currentMaster = tablet - currentMasterTime = newMasterTime - continue - } - if newMasterTime.Equal(currentMasterTime) { - // A tie shouldn't happen unless the upgrade order was violated - // (some vttablets have not yet been upgraded) or if we get really - // unlucky. However, if it does happen, we need to be safe and not - // assume we know who the true master is. - wr.logger.Warningf("Multiple masters (%v and %v) are tied for MasterTermStartTime; can't determine the true master.", - topoproto.TabletAliasString(currentMaster.Alias), - topoproto.TabletAliasString(tablet.Alias)) - return nil - } - } - - return currentMaster -} - -// maxReplPosSearch is a struct helping to search for a tablet with the largest replication -// position querying status from all tablets in parallel. -type maxReplPosSearch struct { - wrangler *Wrangler - ctx context.Context - waitReplicasTimeout time.Duration - waitGroup sync.WaitGroup - maxPosLock sync.Mutex - maxPos mysql.Position - maxPosTablet *topodatapb.Tablet -} - -func (maxPosSearch *maxReplPosSearch) processTablet(tablet *topodatapb.Tablet) { - defer maxPosSearch.waitGroup.Done() - maxPosSearch.wrangler.logger.Infof("getting replication position from %v", topoproto.TabletAliasString(tablet.Alias)) - - replicaStatusCtx, cancelReplicaStatus := context.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitReplicasTimeout) - defer cancelReplicaStatus() - - status, err := maxPosSearch.wrangler.tmc.ReplicationStatus(replicaStatusCtx, tablet) - if err != nil { - maxPosSearch.wrangler.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", topoproto.TabletAliasString(tablet.Alias), err) - return - } - replPos, err := mysql.DecodePosition(status.Position) - if err != nil { - maxPosSearch.wrangler.logger.Warningf("cannot decode replica %v position %v: %v", topoproto.TabletAliasString(tablet.Alias), status.Position, err) - return - } - - maxPosSearch.maxPosLock.Lock() - if maxPosSearch.maxPosTablet == nil || !maxPosSearch.maxPos.AtLeast(replPos) { - maxPosSearch.maxPos = replPos - maxPosSearch.maxPosTablet = tablet - } - maxPosSearch.maxPosLock.Unlock() -} - -// chooseNewMaster finds a tablet that is going to become master after reparent. The criteria -// for the new master-elect are (preferably) to be in the same cell as the current master, and -// to be different from avoidMasterTabletAlias. The tablet with the largest replication -// position is chosen to minimize the time of catching up with the master. Note that the search -// for largest replication position will race with transactions being executed on the master at -// the same time, so when all tablets are roughly at the same position then the choice of the -// new master-elect will be somewhat unpredictable. -func (wr *Wrangler) chooseNewMaster( - ctx context.Context, - shardInfo *topo.ShardInfo, - tabletMap map[string]*topo.TabletInfo, - avoidMasterTabletAlias *topodatapb.TabletAlias, - waitReplicasTimeout time.Duration) (*topodatapb.TabletAlias, error) { - - if avoidMasterTabletAlias == nil { - return nil, fmt.Errorf("tablet to avoid for reparent is not provided, cannot choose new master") - } - var masterCell string - if shardInfo.MasterAlias != nil { - masterCell = shardInfo.MasterAlias.Cell - } - - maxPosSearch := maxReplPosSearch{ - wrangler: wr, - ctx: ctx, - waitReplicasTimeout: waitReplicasTimeout, - waitGroup: sync.WaitGroup{}, - maxPosLock: sync.Mutex{}, - } - for _, tabletInfo := range tabletMap { - if (masterCell != "" && tabletInfo.Alias.Cell != masterCell) || - topoproto.TabletAliasEqual(tabletInfo.Alias, avoidMasterTabletAlias) || - tabletInfo.Tablet.Type != topodatapb.TabletType_REPLICA { - continue - } - maxPosSearch.waitGroup.Add(1) - go maxPosSearch.processTablet(tabletInfo.Tablet) - } - maxPosSearch.waitGroup.Wait() - - if maxPosSearch.maxPosTablet == nil { - return nil, nil - } - return maxPosSearch.maxPosTablet.Alias, nil -} - // EmergencyReparentShard will make the provided tablet the master for // the shard, when the old master is completely unreachable. func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, ignoredTablets sets.String) (err error) { - // lock the shard - actionMsg := emergencyReparentShardOperation - if masterElectTabletAlias != nil { - actionMsg += fmt.Sprintf("(%v)", topoproto.TabletAliasString(masterElectTabletAlias)) - } - ctx, unlock, lockErr := wr.ts.LockShard(ctx, keyspace, shard, actionMsg) - if lockErr != nil { - return lockErr - } - defer unlock(&err) - - // Create reusable Reparent event with available info - ev := &events.Reparent{} - - // do the work - err = wr.emergencyReparentShardLocked(ctx, ev, keyspace, shard, masterElectTabletAlias, waitReplicasTimeout, ignoredTablets) - if err != nil { - event.DispatchUpdate(ev, "failed EmergencyReparentShard: "+err.Error()) - } else { - event.DispatchUpdate(ev, "finished EmergencyReparentShard") - } - return err -} - -func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, ignoredTablets sets.String) error { - shardInfo, err := wr.ts.GetShard(ctx, keyspace, shard) - if err != nil { - return err - } - ev.ShardInfo = *shardInfo - - event.DispatchUpdate(ev, "reading all tablets") - tabletMap, err := wr.ts.GetTabletMapForShard(ctx, keyspace, shard) - if err != nil { - return vterrors.Wrapf(err, "failed to get tablet map for shard %v in keyspace %v: %v", shard, keyspace, err) - } - - statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout, ignoredTablets) - if err != nil { - return vterrors.Wrapf(err, "failed to stop replication and build status maps: %v", err) - } - - // Check we still have the topology lock. - if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) - } - - validCandidates, err := wr.findValidReparentCandidates(statusMap, masterStatusMap) - if err != nil { - return err - } - if len(validCandidates) == 0 { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no valid candidates for emergency reparent") - } - - errChan := make(chan error) - rec := &concurrency.AllErrorRecorder{} - groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer groupCancel() - for candidate := range validCandidates { - go func(alias string) { - var err error - defer func() { errChan <- err }() - err = wr.WaitForRelayLogsToApply(groupCtx, tabletMap[alias], statusMap[alias]) - }(candidate) - } - - resultCounter := 0 - for waitErr := range errChan { - resultCounter++ - if waitErr != nil { - rec.RecordError(waitErr) - groupCancel() - } - if resultCounter == len(validCandidates) { - break - } - } - if len(rec.Errors) != 0 { - return vterrors.Wrapf(rec.Error(), "could not apply all relay logs within the provided wait_replicas_timeout: %v", rec.Error()) - } - - var winningPosition mysql.Position - var newMasterTabletAliasStr string - for alias, position := range validCandidates { - if winningPosition.IsZero() { - winningPosition = position - newMasterTabletAliasStr = alias - continue - } - if position.AtLeast(winningPosition) { - winningPosition = position - newMasterTabletAliasStr = alias - } - } - - if masterElectTabletAlias != nil { - newMasterTabletAliasStr = topoproto.TabletAliasString(masterElectTabletAlias) - masterPos, ok := validCandidates[newMasterTabletAliasStr] - if !ok { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect %v has errant GTIDs", newMasterTabletAliasStr) - } - if !masterPos.AtLeast(winningPosition) { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect: %v at position %v, is not fully caught up. Winning position: %v", newMasterTabletAliasStr, masterPos, winningPosition) - } - } + _, err = reparentutil.NewEmergencyReparenter(wr.ts, wr.tmc, wr.logger).ReparentShard( + ctx, + keyspace, + shard, + reparentutil.EmergencyReparentOptions{ + NewPrimaryAlias: masterElectTabletAlias, + WaitReplicasTimeout: waitReplicasTimeout, + IgnoreReplicas: ignoredTablets, + }, + ) - // Check we still have the topology lock. - if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) - } - - // Promote the masterElect - wr.logger.Infof("promote tablet %v to master", newMasterTabletAliasStr) - event.DispatchUpdate(ev, "promoting replica") - rp, err := wr.tmc.PromoteReplica(ctx, tabletMap[newMasterTabletAliasStr].Tablet) - if err != nil { - return vterrors.Wrapf(err, "master-elect tablet %v failed to be upgraded to master: %v", newMasterTabletAliasStr, err) - } - - // Check we still have the topology lock. - if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) - } - - // Create a cancelable context for the following RPCs. - // If error conditions happen, we can cancel all outgoing RPCs. - replCtx, replCancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer replCancel() - - // Reset replication on all replicas to point to the new master, and - // insert test row in the new master. - // Go through all the tablets: - // - new master: populate the reparent journal - // - everybody else: reparent to new master, wait for row - event.DispatchUpdate(ev, "reparenting all tablets") - now := time.Now().UnixNano() - errChan = make(chan error) - - handleMaster := func(alias string, tabletInfo *topo.TabletInfo) error { - wr.logger.Infof("populating reparent journal on new master %v", alias) - return wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, tabletMap[newMasterTabletAliasStr].Alias, rp) - } - handleReplica := func(alias string, tabletInfo *topo.TabletInfo) { - var err error - defer func() { errChan <- err }() - - wr.logger.Infof("setting new master on replica %v", alias) - forceStart := false - if status, ok := statusMap[alias]; ok { - forceStart = replicaWasRunning(status) - } - err = wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart) - if err != nil { - err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err) - } - } - - for alias, tabletInfo := range tabletMap { - if alias == newMasterTabletAliasStr { - continue - } else if !ignoredTablets.Has(alias) { - go handleReplica(alias, tabletInfo) - } - } - - masterErr := handleMaster(newMasterTabletAliasStr, tabletMap[newMasterTabletAliasStr]) - if masterErr != nil { - wr.logger.Warningf("master failed to PopulateReparentJournal") - replCancel() - return vterrors.Wrapf(masterErr, "failed to PopulateReparentJournal on master: %v", masterErr) - } - - return nil -} - -// waitOnNMinusOneTablets will wait until N-1 tablets have responded via a supplied error channel. In that case that N-1 tablets have responded, -// the supplied cancel function will be called, and we will wait until N tablets return their errors, and then return an AllErrorRecorder to the caller. -func waitOnNMinusOneTablets(ctxCancel context.CancelFunc, tabletCount int, errorChannel chan error, acceptableErrCnt int) *concurrency.AllErrorRecorder { - errCounter := 0 - successCounter := 0 - responseCounter := 0 - rec := &concurrency.AllErrorRecorder{} - - for err := range errorChannel { - responseCounter++ - if err != nil { - errCounter++ - rec.RecordError(err) - } else { - successCounter++ - } - if responseCounter == tabletCount { - // We must wait for any cancelled goroutines to return their error. - break - } - if errCounter > acceptableErrCnt || successCounter == tabletCount-1 { - ctxCancel() - } - } - - return rec -} - -// findValidReparentCandidates will find valid candidates for emergency reparent, and if successful, returning them as a list of tablet aliases. -func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicationdatapb.StopReplicationStatus, masterStatusMap map[string]*replicationdatapb.MasterStatus) (map[string]mysql.Position, error) { - // Build out replication status list from proto types. - replicationStatusMap := make(map[string]*mysql.ReplicationStatus, len(statusMap)) - for alias, protoStatus := range statusMap { - status := mysql.ProtoToReplicationStatus(protoStatus.After) - replicationStatusMap[alias] = &status - } - - // Determine if we need to find errant GTIDs. - var gtidBased *bool - for alias, status := range replicationStatusMap { - if gtidBased == nil { - _, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) - gtidBased = pointer.BoolPtr(ok) - } else if !*gtidBased { - break - } else if status.RelayLogPosition.IsZero() { - // Bail. We have an odd one in the bunch. - return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has GTID based relay log positions", alias) - } - } - - // Create relevant position list of errant GTID based positions for later comparison. - positionMap := make(map[string]mysql.Position) - for alias, status := range replicationStatusMap { - // Find errantGTIDs and clean them from status map if relevant. - if *gtidBased { - // We need to remove this status from a copy of the list, otherwise the diff will be empty always. - statusList := make([]*mysql.ReplicationStatus, 0, len(replicationStatusMap)-1) - for a, s := range replicationStatusMap { - if a != alias { - statusList = append(statusList, s) - } - } - relayLogGTIDSet, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) - if !ok { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "we got a filled in relay log position, but it's not of type Mysql56GTIDSet, even though we've determined we need to use GTID based assessment") - } - errantGTIDs, err := status.FindErrantGTIDs(statusList) - if err != nil { - // Could not find errant GTIDs when we must. - return nil, err - } - if len(errantGTIDs) != 0 { - // Skip inserting this tablet. It's not a valid candidate. - continue - } - - pos := mysql.Position{GTIDSet: relayLogGTIDSet} - positionMap[alias] = pos - } else { - positionMap[alias] = status.Position - } - } - - for alias, masterStatus := range masterStatusMap { - executedPosition, err := mysql.DecodePosition(masterStatus.Position) - if err != nil { - return nil, vterrors.Wrapf(err, "could not decode a master status executed position for tablet %v: %v", alias, err) - } - positionMap[alias] = executedPosition - } - - return positionMap, nil -} - -func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *events.Reparent, tabletMap map[string]*topo.TabletInfo, waitReplicasTimeout time.Duration, ignoredTablets sets.String) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { - // Stop replication on all replicas, get their current - // replication position - event.DispatchUpdate(ev, "stop replication on all replicas") - statusMap := make(map[string]*replicationdatapb.StopReplicationStatus) - masterStatusMap := make(map[string]*replicationdatapb.MasterStatus) - mu := sync.Mutex{} - - errChan := make(chan error) - groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer groupCancel() - fillStatus := func(alias string, tabletInfo *topo.TabletInfo) { - err := vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "fillStatus did not successfully complete") - defer func() { errChan <- err }() - - wr.logger.Infof("getting replication position from %v", alias) - var stopReplicationStatus *replicationdatapb.StopReplicationStatus - _, stopReplicationStatus, err = wr.tmc.StopReplicationAndGetStatus(groupCtx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) - switch err { - case mysql.ErrNotReplica: - var masterStatus *replicationdatapb.MasterStatus - masterStatus, err = wr.tmc.DemoteMaster(groupCtx, tabletInfo.Tablet) - if err != nil { - wr.logger.Warningf("replica %v thinks it's master but we failed to demote it", alias) - err = vterrors.Wrapf(err, "replica %v thinks it's master but we failed to demote it: %v", alias, err) - return - } - mu.Lock() - masterStatusMap[alias] = masterStatus - mu.Unlock() - - case nil: - mu.Lock() - statusMap[alias] = stopReplicationStatus - mu.Unlock() - - default: - wr.logger.Warningf("failed to get replication status from %v: %v", alias, err) - err = vterrors.Wrapf(err, "error when getting replication status for alias %v: %v", alias, err) - } - } - - for alias, tabletInfo := range tabletMap { - if !ignoredTablets.Has(alias) { - go fillStatus(alias, tabletInfo) - } - } - - errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap)-ignoredTablets.Len(), errChan, 1) - - if len(errRecorder.Errors) > 1 { - return nil, nil, vterrors.Wrapf(errRecorder.Error(), "encountered more than one error when trying to stop replication and get positions: %v", errRecorder.Error()) - } - return statusMap, masterStatusMap, nil -} - -// WaitForRelayLogsToApply will block execution waiting for the given tablets relay logs to apply, unless the supplied -// context is cancelled, or waitReplicasTimeout is exceeded. -func (wr *Wrangler) WaitForRelayLogsToApply(ctx context.Context, tabletInfo *topo.TabletInfo, status *replicationdatapb.StopReplicationStatus) error { - var err error - if status.After.RelayLogPosition != "" { - err = wr.tmc.WaitForPosition(ctx, tabletInfo.Tablet, status.After.RelayLogPosition) - } else { - err = wr.tmc.WaitForPosition(ctx, tabletInfo.Tablet, status.After.FileRelayLogPosition) - } return err } @@ -1232,7 +775,3 @@ func (wr *Wrangler) TabletExternallyReparented(ctx context.Context, newMasterAli } return nil } - -func replicaWasRunning(stopReplicationStatus *replicationdatapb.StopReplicationStatus) bool { - return stopReplicationStatus.Before.IoThreadRunning || stopReplicationStatus.Before.SqlThreadRunning -}