diff --git a/go/vt/vtgr/controller/diagnose.go b/go/vt/vtgr/controller/diagnose.go index 1da053633e1..3e4a4dd5ad9 100644 --- a/go/vt/vtgr/controller/diagnose.go +++ b/go/vt/vtgr/controller/diagnose.go @@ -31,7 +31,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" @@ -92,7 +91,7 @@ const ( func (shard *GRShard) ScanAndRepairShard(ctx context.Context) { status, err := shard.Diagnose(ctx) if err != nil { - log.Errorf("fail to scanAndRepairShard %v/%v because of Diagnose error: %v", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard, err) + shard.logger.Errorf("fail to scanAndRepairShard %v/%v because of Diagnose error: %v", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard, err) return } // We are able to get Diagnose without error @@ -100,9 +99,9 @@ func (shard *GRShard) ScanAndRepairShard(ctx context.Context) { // Note: all the recovery function should first try to grab a shard level lock // and check the trigger conditions before doing anything. This is to avoid // other VTGR instance try to do the same thing - log.Infof("%v status is %v", formatKeyspaceShard(shard.KeyspaceShard), status) + shard.logger.Infof("%v status is %v", formatKeyspaceShard(shard.KeyspaceShard), status) if _, err := shard.Repair(ctx, status); err != nil { - log.Errorf("failed to repair %v: %v", status, err) + shard.logger.Errorf("failed to repair %v: %v", status, err) } } @@ -123,7 +122,7 @@ func (shard *GRShard) Diagnose(ctx context.Context) (DiagnoseType, error) { shard.shardStatusCollector.recordDiagnoseResult(diagnoseResult) shard.populateVTGRStatusLocked() if diagnoseResult != DiagnoseTypeHealthy { - log.Warningf(`VTGR diagnose shard as unhealthy for %s/%s: result=%v | last_result=%v | instances=%v | primary=%v | primary_tablet=%v | problematics=%v | unreachables=%v | SQL group=%v`, + shard.logger.Warningf(`VTGR diagnose shard as unhealthy for %s/%s: result=%v | last_result=%v | instances=%v | primary=%v | primary_tablet=%v | problematics=%v | unreachables=%v | SQL group=%v`, shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard, shard.shardStatusCollector.status.DiagnoseResult, shard.lastDiagnoseResult, @@ -154,7 +153,7 @@ func (shard *GRShard) diagnoseLocked(ctx context.Context) (DiagnoseType, error) // later VTGR needs to find group name, primary etc from // SQLGroup for repairing instead of getting nil shard.sqlGroup.overrideView([]*db.GroupView{localView}) - log.Infof("Diagnose %v from fast path", fastDiagnose) + shard.logger.Infof("Diagnose %v from fast path", fastDiagnose) return fastDiagnose, nil } } @@ -181,7 +180,7 @@ func (shard *GRShard) diagnoseLocked(ctx context.Context) (DiagnoseType, error) // In this situation, instead of bootstrap a group, we should re-build the // old group for the shard if shard.isAllOfflineOrError() { - log.Info("Found all members are OFFLINE or ERROR") + shard.logger.Info("Found all members are OFFLINE or ERROR") return DiagnoseTypeShardHasInactiveGroup, nil } @@ -196,7 +195,7 @@ func (shard *GRShard) diagnoseLocked(ctx context.Context) (DiagnoseType, error) // errMissingGroup means we cannot find a mysql group for the shard // we are in DiagnoseTypeShardHasNoGroup state if err == errMissingGroup { - log.Warning("Missing mysql group") + shard.logger.Warning("Missing mysql group") return DiagnoseTypeShardHasNoGroup, nil } // errMissingPrimaryTablet means we cannot find a tablet based on mysql primary @@ -268,7 +267,7 @@ func (shard *GRShard) getLocalView() *db.GroupView { // We still have the fallback logic if this failed, therefore we don't raise error // but try to get local view with best effort if err != nil { - log.Errorf("failed to fetch local group view: %v", err) + shard.logger.Errorf("failed to fetch local group view: %v", err) } return view } @@ -319,7 +318,7 @@ func (shard *GRShard) hasWrongPrimaryTablet(ctx context.Context) (bool, error) { // in case the primary is unreachable host, port, _ := shard.sqlGroup.GetPrimary() if !isHostPortValid(host, port) { - log.Warningf("Invalid address for primary %v:%v", host, port) + shard.logger.Warningf("Invalid address for primary %v:%v", host, port) return false, errMissingGroup } // Make sure we have a tablet available @@ -329,7 +328,7 @@ func (shard *GRShard) hasWrongPrimaryTablet(ctx context.Context) (bool, error) { // we retrun errMissingPrimaryTablet so that VTGR will trigger a failover tablet := shard.findTabletByHostAndPort(host, port) if tablet == nil || !shard.instanceReachable(ctx, tablet) { - log.Errorf("Failed to find tablet that is running with mysql on %v:%v", host, port) + shard.logger.Errorf("Failed to find tablet that is running with mysql on %v:%v", host, port) return false, errMissingPrimaryTablet } // Now we know we have a valid mysql primary in the group @@ -338,7 +337,7 @@ func (shard *GRShard) hasWrongPrimaryTablet(ctx context.Context) (bool, error) { // If we failed to find primary for shard, it mostly means we are initializing the shard // return true directly so that VTGR will set primary tablet according to MySQL group if primary == nil { - log.Infof("unable to find primary tablet for %v", formatKeyspaceShard(shard.KeyspaceShard)) + shard.logger.Infof("unable to find primary tablet for %v", formatKeyspaceShard(shard.KeyspaceShard)) return true, nil } return (host != primary.instanceKey.Hostname) || (port != primary.instanceKey.Port), nil @@ -363,11 +362,11 @@ func (shard *GRShard) instanceReachable(ctx context.Context, instance *grInstanc go func() { c <- shard.tmc.Ping(pingCtx, instance.tablet) }() select { case <-pingCtx.Done(): - log.Errorf("Ping abort timeout %v", *pingTabletTimeout) + shard.logger.Errorf("Ping abort timeout %v", *pingTabletTimeout) return false case err := <-c: if err != nil { - log.Errorf("Ping error host=%v: %v", instance.instanceKey.Hostname, err) + shard.logger.Errorf("Ping error host=%v: %v", instance.instanceKey.Hostname, err) } return err == nil } @@ -428,7 +427,7 @@ func (shard *GRShard) disconnectedInstance() (*grInstance, error) { // Skip instance without hostname because they are not up and running // also skip instances that raised unrecoverable errors if shard.shardStatusCollector.isUnreachable(instance) { - log.Infof("Skip %v to check disconnectedInstance because it is unhealthy", instance.alias) + shard.logger.Infof("Skip %v to check disconnectedInstance because it is unhealthy", instance.alias) continue } isUnconnected := shard.sqlGroup.IsUnconnectedReplica(instance.instanceKey) @@ -530,7 +529,7 @@ func (shard *GRShard) forAllInstances(task func(instance *grInstance, wg *sync.W } wg.Wait() if len(errorRecord.Errors) > 0 { - log.Errorf("get errors in forAllInstances call: %v", errorRecord.Error()) + shard.logger.Errorf("get errors in forAllInstances call: %v", errorRecord.Error()) } return &errorRecord } @@ -571,7 +570,7 @@ func (shard *GRShard) refreshSQLGroup() error { if unreachableError(err) { shard.shardStatusCollector.recordUnreachables(instance) } - log.Errorf("%v get error while fetch group info: %v", instance.alias, err) + shard.logger.Errorf("%v get error while fetch group info: %v", instance.alias, err) return } shard.sqlGroup.recordView(view) @@ -579,7 +578,7 @@ func (shard *GRShard) refreshSQLGroup() error { // Only raise error if we failed to get any data from mysql // otherwise, we will use what we get from mysql directly if len(er.Errors) == len(shard.instances) { - log.Errorf("fail to fetch any data for mysql") + shard.logger.Errorf("fail to fetch any data for mysql") return db.ErrGroupBackoffError } return shard.sqlGroup.Resolve() diff --git a/go/vt/vtgr/controller/group.go b/go/vt/vtgr/controller/group.go index b9333c3c5f6..e91208fd43e 100644 --- a/go/vt/vtgr/controller/group.go +++ b/go/vt/vtgr/controller/group.go @@ -23,9 +23,9 @@ import ( "sync" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/orchestrator/inst" "vitess.io/vitess/go/vt/vtgr/db" + "vitess.io/vitess/go/vt/vtgr/log" ) var ( @@ -37,6 +37,7 @@ var ( type SQLGroup struct { views []*db.GroupView resolvedView *ResolvedView + logger *log.Logger size int singlePrimary bool statsTags []string @@ -45,13 +46,14 @@ type SQLGroup struct { // NewSQLGroup creates a new SQLGroup func NewSQLGroup(size int, singlePrimary bool, keyspace, shard string) *SQLGroup { - return &SQLGroup{size: size, singlePrimary: singlePrimary, statsTags: []string{keyspace, shard}} + return &SQLGroup{size: size, singlePrimary: singlePrimary, statsTags: []string{keyspace, shard}, logger: log.NewVTGRLogger(keyspace, shard)} } // ResolvedView is the resolved view type ResolvedView struct { groupName string view map[inst.InstanceKey]db.GroupMember + logger *log.Logger } // recordView adds a view to the group @@ -164,13 +166,13 @@ func (group *SQLGroup) IsSafeToBootstrap() bool { // for bootstrap we require group at least has quorum number of views // this is to make sure we don't bootstrap a group improperly if len(group.views) < group.size { - log.Errorf("[sql_group] cannot bootstrap because we only have %v views | expected %v", len(group.views), group.size) + group.logger.Errorf("[sql_group] cannot bootstrap because we only have %v views | expected %v", len(group.views), group.size) return false } // we think it is safe to bootstrap a group if all the views don't have a primary host host, port, _ := group.getPrimaryLocked() if host != "" || port != 0 { - log.Warningf("not safe to bootstrap sql group because %v/%v might already be primary", host, port) + group.logger.Warningf("not safe to bootstrap sql group because %v/%v might already be primary", host, port) } return host == "" && port == 0 } @@ -201,7 +203,7 @@ func (group *SQLGroup) Resolve() error { return group.resolveLocked() } func (group *SQLGroup) resolveLocked() error { - rv := &ResolvedView{} + rv := &ResolvedView{logger: group.logger} group.resolvedView = rv m := make(map[inst.InstanceKey]db.GroupMember) for _, view := range group.views { @@ -209,7 +211,7 @@ func (group *SQLGroup) resolveLocked() error { rv.groupName = view.GroupName } if view.GroupName != "" && rv.groupName != view.GroupName { - log.Errorf("previous group name %v found %v", rv.groupName, view.GroupName) + group.logger.Errorf("previous group name %v found %v", rv.groupName, view.GroupName) return db.ErrGroupSplitBrain } for _, member := range view.UnresolvedMembers { @@ -246,7 +248,7 @@ func (group *SQLGroup) resolveLocked() error { func (rv *ResolvedView) validate(singlePrimary bool, statsTags []string) error { if !rv.hasGroup() { - log.Info("Resolved view does not have a group") + rv.logger.Info("Resolved view does not have a group") return nil } hasPrimary := false @@ -255,13 +257,13 @@ func (rv *ResolvedView) validate(singlePrimary bool, statsTags []string) error { for _, status := range rv.view { if status.Role == db.PRIMARY { if singlePrimary && hasPrimary { - log.Errorf("Found more than one primary in the group") + rv.logger.Errorf("Found more than one primary in the group") return db.ErrGroupSplitBrain } hasPrimary = true primaryState = status.State if status.State != db.ONLINE { - log.Warningf("Found a PRIMARY not ONLINE (%v)", status.State) + rv.logger.Warningf("Found a PRIMARY not ONLINE (%v)", status.State) } } switch status.State { @@ -279,10 +281,10 @@ func (rv *ResolvedView) validate(singlePrimary bool, statsTags []string) error { } groupOnlineSize.Set(statsTags, int64(onlineCount)) if unreachableCount > 0 || errorCount > 0 || offlineCount > 0 { - log.Warningf("Some of nodes are unconnected in the group. hasPrimary=%v (%v), online_count=%v, recovering_count=%v, unreachable_count=%v, offline_count=%v, error_count=%v", hasPrimary, primaryState, onlineCount, recoveringCount, unreachableCount, offlineCount, errorCount) + rv.logger.Warningf("Some of nodes are unconnected in the group. hasPrimary=%v (%v), online_count=%v, recovering_count=%v, unreachable_count=%v, offline_count=%v, error_count=%v", hasPrimary, primaryState, onlineCount, recoveringCount, unreachableCount, offlineCount, errorCount) } if unreachableCount >= len(rv.view)/2+1 { - log.Errorf("Backoff error by quorum unreachable: found %v number of UNREACHABLE nodes while quorum is %v", unreachableCount, len(rv.view)/2+1) + rv.logger.Errorf("Backoff error by quorum unreachable: found %v number of UNREACHABLE nodes while quorum is %v", unreachableCount, len(rv.view)/2+1) isLostQuorum.Set(statsTags, 1) } else { isLostQuorum.Set(statsTags, 0) @@ -300,19 +302,19 @@ func (rv *ResolvedView) validate(singlePrimary bool, statsTags []string) error { } // Ongoing bootstrap, we should backoff and wait if recoveringCount == 1 && (offlineCount+recoveringCount == len(rv.view)) { - log.Warningf("Group has one recovery node with all others in offline mode") + rv.logger.Warningf("Group has one recovery node with all others in offline mode") return db.ErrGroupOngoingBootstrap } // We don't have quorum number of unreachable, but the primary is not online // This most likely means there is a failover in the group we should back off and wait if hasPrimary && primaryState != db.ONLINE { - log.Warningf("Found a PRIMARY that is not ONLINE (%v)", primaryState) + rv.logger.Warningf("Found a PRIMARY that is not ONLINE (%v)", primaryState) return db.ErrGroupBackoffError } // If all the node in view are OFFLINE or ERROR, it is an inactive group // It is expected to have no primary in this case if !hasPrimary && (offlineCount+errorCount != len(rv.view)) { - log.Warningf("Group is NOT all offline or error without a primary node") + rv.logger.Warningf("Group is NOT all offline or error without a primary node") return db.ErrGroupBackoffError } return nil diff --git a/go/vt/vtgr/controller/group_test.go b/go/vt/vtgr/controller/group_test.go index 4dd2ccb38a0..15090324d28 100644 --- a/go/vt/vtgr/controller/group_test.go +++ b/go/vt/vtgr/controller/group_test.go @@ -19,6 +19,8 @@ package controller import ( "testing" + "vitess.io/vitess/go/vt/vtgr/log" + "vitess.io/vitess/go/vt/orchestrator/inst" "vitess.io/vitess/go/vt/vtgr/db" @@ -249,7 +251,7 @@ func TestResolve(t *testing.T) { {Hostname: "host1", Port: 10}: {HostName: "host1", Port: 10, Role: db.PRIMARY, State: db.ONLINE, ReadOnly: false}, {Hostname: "host2", Port: 10}: {HostName: "host2", Port: 10, Role: db.SECONDARY, State: db.ONLINE, ReadOnly: true}, {Hostname: "host3", Port: 10}: {HostName: "host3", Port: 10, Role: db.SECONDARY, State: db.ONLINE, ReadOnly: true}, - }}, ""}, + }, nil}, ""}, {"test readonly with unreachable primary", []*db.GroupView{ // host1 is unreachable {MySQLHost: "host2", MySQLPort: 10, GroupName: "group", UnresolvedMembers: []*db.GroupMember{ {HostName: "host1", Port: 10, Role: db.PRIMARY, State: db.ONLINE, ReadOnly: false}, @@ -265,7 +267,7 @@ func TestResolve(t *testing.T) { {Hostname: "host1", Port: 10}: {HostName: "host1", Port: 10, Role: db.PRIMARY, State: db.ONLINE, ReadOnly: false}, {Hostname: "host2", Port: 10}: {HostName: "host2", Port: 10, Role: db.SECONDARY, State: db.ONLINE, ReadOnly: true}, {Hostname: "host3", Port: 10}: {HostName: "host3", Port: 10, Role: db.SECONDARY, State: db.ONLINE, ReadOnly: true}, - }}, ""}, + }, nil}, ""}, {"test split brain by group name", []*db.GroupView{ {MySQLHost: "host1", MySQLPort: 10, GroupName: "group", UnresolvedMembers: healthyView}, {MySQLHost: "host2", MySQLPort: 10, GroupName: "group1", UnresolvedMembers: healthyView}, @@ -285,7 +287,7 @@ func TestResolve(t *testing.T) { {Hostname: "host1", Port: 10}: {HostName: "host1", Port: 10, Role: db.UNKNOWNROLE, State: db.OFFLINE, ReadOnly: true}, {Hostname: "host2", Port: 10}: {HostName: "host2", Port: 10, Role: db.UNKNOWNROLE, State: db.OFFLINE, ReadOnly: true}, {Hostname: "host3", Port: 10}: {HostName: "host3", Port: 10, Role: db.UNKNOWNROLE, State: db.OFFLINE, ReadOnly: true}, - }}, ""}, + }, nil}, ""}, {"test network partition by majority unreachable", []*db.GroupView{ {MySQLHost: "host1", MySQLPort: 10, GroupName: "group", UnresolvedMembers: []*db.GroupMember{ {HostName: "host1", Port: 10, Role: db.PRIMARY, State: db.UNREACHABLE, ReadOnly: false}, @@ -308,7 +310,7 @@ func TestResolve(t *testing.T) { {Hostname: "host1", Port: 10}: {HostName: "host1", Port: 10, Role: db.PRIMARY, State: db.ONLINE, ReadOnly: false}, {Hostname: "host2", Port: 10}: {HostName: "host2", Port: 10, Role: db.SECONDARY, State: db.ONLINE, ReadOnly: true}, {Hostname: "host3", Port: 10}: {HostName: "host3", Port: 10, Role: db.SECONDARY, State: db.UNREACHABLE, ReadOnly: false}, - }}, "group backoff error"}, + }, nil}, "group backoff error"}, {"test network partition by unreachable primary", []*db.GroupView{ {MySQLHost: "host2", MySQLPort: 10, GroupName: "group", UnresolvedMembers: []*db.GroupMember{ {HostName: "host1", Port: 10, Role: db.PRIMARY, State: db.UNREACHABLE}, @@ -331,7 +333,7 @@ func TestResolve(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.testName, func(t *testing.T) { - group := SQLGroup{views: testCase.views, statsTags: []string{"ks", "0"}} + group := SQLGroup{views: testCase.views, statsTags: []string{"ks", "0"}, logger: log.NewVTGRLogger("ks", "0")} err := group.Resolve() if testCase.errorMsg != "" { assert.EqualError(t, err, testCase.errorMsg) @@ -341,7 +343,8 @@ func TestResolve(t *testing.T) { if testCase.expected != nil { rv := group.resolvedView expected := testCase.expected - assert.Equal(t, expected, rv) + assert.Equal(t, expected.view, rv.view) + assert.Equal(t, expected.groupName, rv.groupName) } }) } diff --git a/go/vt/vtgr/controller/refresh.go b/go/vt/vtgr/controller/refresh.go index db5dd47b09d..24e101f9db6 100644 --- a/go/vt/vtgr/controller/refresh.go +++ b/go/vt/vtgr/controller/refresh.go @@ -26,13 +26,13 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/sync2" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/orchestrator/inst" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgr/config" "vitess.io/vitess/go/vt/vtgr/db" + "vitess.io/vitess/go/vt/vtgr/log" ) var ( @@ -91,6 +91,8 @@ type GRShard struct { isActive sync2.AtomicBool + logger *log.Logger + // lock prevents multiple go routine fights with each other sync.Mutex } @@ -140,6 +142,7 @@ func NewGRShard( minNumReplicas: config.MinNumReplica, disableReadOnlyProtection: config.DisableReadOnlyProtection, localDbPort: localDbPort, + logger: log.NewVTGRLogger(keyspace, shard), transientErrorWaitTime: time.Duration(config.BackoffErrorWaitTimeSeconds) * time.Second, bootstrapWaitTime: time.Duration(config.BootstrapWaitTimeSeconds) * time.Second, } @@ -172,7 +175,7 @@ func (shard *GRShard) refreshTabletsInShardInternal(ctx context.Context) ([]*grI keyspace, shardName := shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard tablets, err := shard.ts.GetTabletMapForShardByCell(ctx, keyspace, shardName, shard.cells) if err != nil { - log.Errorf("Error fetching tablets for keyspace/shardName %v/%v: %v", keyspace, shardName, err) + shard.logger.Errorf("Error fetching tablets for keyspace/shardName %v/%v: %v", keyspace, shardName, err) return nil, err } return parseTabletInfos(tablets), nil @@ -232,7 +235,7 @@ func (shard *GRShard) UnlockShard() { shard.unlockMu.Lock() defer shard.unlockMu.Unlock() if shard.unlock == nil { - log.Warningf("Shard %s/%s does not hold a lock", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard) + shard.logger.Warningf("Shard %s/%s does not hold a lock", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard) return } var err error @@ -284,7 +287,7 @@ func (shard *GRShard) GetUnlock() func(*error) { // SetIsActive sets isActive for the shard func (shard *GRShard) SetIsActive(isActive bool) { - log.Infof("Setting is active to %v", isActive) + shard.logger.Infof("Setting is active to %v", isActive) shard.isActive.Set(isActive) } diff --git a/go/vt/vtgr/controller/repair.go b/go/vt/vtgr/controller/repair.go index ade0c89dda3..a584ea85816 100644 --- a/go/vt/vtgr/controller/repair.go +++ b/go/vt/vtgr/controller/repair.go @@ -30,7 +30,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" @@ -80,13 +79,13 @@ func (shard *GRShard) Repair(ctx context.Context, status DiagnoseType) (RepairRe case DiagnoseTypeBootstrapBackoff, DiagnoseTypeBackoffError: code, err = shard.repairBackoffError(ctx, status) case DiagnoseTypeError: - log.Errorf("%v is %v", formatKeyspaceShard(shard.KeyspaceShard), status) + shard.logger.Errorf("%v is %v", formatKeyspaceShard(shard.KeyspaceShard), status) case DiagnoseTypeHealthy: start := time.Now() repairTimingsMs.Record([]string{string(status), "true"}, start) } if status != DiagnoseTypeHealthy { - log.Infof("VTGR repaired %v status=%v | code=%v", formatKeyspaceShard(shard.KeyspaceShard), status, code) + shard.logger.Infof("VTGR repaired %v status=%v | code=%v", formatKeyspaceShard(shard.KeyspaceShard), status, code) } return code, vterrors.Wrap(err, "vtgr repair") } @@ -94,7 +93,7 @@ func (shard *GRShard) Repair(ctx context.Context, status DiagnoseType) (RepairRe func (shard *GRShard) repairShardHasNoGroup(ctx context.Context) (RepairResultCode, error) { ctx, err := shard.LockShard(ctx, "repairShardHasNoGroup") if err != nil { - log.Warningf("repairShardHasNoPrimaryTablet fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err) + shard.logger.Warningf("repairShardHasNoPrimaryTablet fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err) return Noop, err } defer shard.UnlockShard() @@ -103,11 +102,11 @@ func (shard *GRShard) repairShardHasNoGroup(ctx context.Context) (RepairResultCo // which will update mysqlGroup stored in the shard status, err := shard.diagnoseLocked(ctx) if err != nil { - log.Errorf("Failed to diagnose: %v", err) + shard.logger.Errorf("Failed to diagnose: %v", err) return Fail, err } if status != DiagnoseTypeShardHasNoGroup { - log.Infof("Shard %v is no longer in DiagnoseTypeShardHasNoGroup: %v", formatKeyspaceShard(shard.KeyspaceShard), status) + shard.logger.Infof("Shard %v is no longer in DiagnoseTypeShardHasNoGroup: %v", formatKeyspaceShard(shard.KeyspaceShard), status) return Noop, nil } start := time.Now() @@ -125,7 +124,7 @@ func (shard *GRShard) repairShardHasNoGroupAction(ctx context.Context) error { mysqlGroup := shard.shardAgreedGroupName() isAllOffline := shard.isAllOfflineOrError() if mysqlGroup != "" { - log.Infof("Shard %v already have a group %v", formatKeyspaceShard(shard.KeyspaceShard), mysqlGroup) + shard.logger.Infof("Shard %v already have a group %v", formatKeyspaceShard(shard.KeyspaceShard), mysqlGroup) return nil } // This should not really happen in reality @@ -138,7 +137,7 @@ func (shard *GRShard) repairShardHasNoGroupAction(ctx context.Context) error { replicas := shard.instances // Sanity check to make sure there is at least one instance if len(replicas) == 0 { - log.Warningf("Cannot find any instance for the shard %v", formatKeyspaceShard(shard.KeyspaceShard)) + shard.logger.Warningf("Cannot find any instance for the shard %v", formatKeyspaceShard(shard.KeyspaceShard)) return nil } if !shard.sqlGroup.IsSafeToBootstrap() { @@ -158,24 +157,24 @@ func (shard *GRShard) repairShardHasNoGroupAction(ctx context.Context) error { return errors.New("fail to find any candidate to bootstrap") } // Bootstrap the group - log.Infof("Bootstrapping the group for %v on host=%v", formatKeyspaceShard(shard.KeyspaceShard), candidate.instanceKey.Hostname) + shard.logger.Infof("Bootstrapping the group for %v on host=%v", formatKeyspaceShard(shard.KeyspaceShard), candidate.instanceKey.Hostname) // Make sure we still hold the topo server lock before moving on if err := shard.checkShardLocked(ctx); err != nil { return err } if err := shard.dbAgent.BootstrapGroupLocked(candidate.instanceKey); err != nil { // if bootstrap failed, the next one that gets the lock will try to do it again - log.Errorf("Failed to bootstrap mysql group on %v: %v", candidate.instanceKey.Hostname, err) + shard.logger.Errorf("Failed to bootstrap mysql group on %v: %v", candidate.instanceKey.Hostname, err) return err } - log.Infof("Bootstrapped the group for %v", formatKeyspaceShard(shard.KeyspaceShard)) + shard.logger.Infof("Bootstrapped the group for %v", formatKeyspaceShard(shard.KeyspaceShard)) return nil } func (shard *GRShard) repairShardHasInactiveGroup(ctx context.Context) (RepairResultCode, error) { ctx, err := shard.LockShard(ctx, "repairShardHasInactiveGroup") if err != nil { - log.Warningf("repairShardHasInactiveGroup fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err) + shard.logger.Warningf("repairShardHasInactiveGroup fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err) return Noop, err } defer shard.UnlockShard() @@ -184,11 +183,11 @@ func (shard *GRShard) repairShardHasInactiveGroup(ctx context.Context) (RepairRe // which will update mysqlGroup stored in the shard status, err := shard.diagnoseLocked(ctx) if err != nil { - log.Errorf("Failed to diagnose: %v", err) + shard.logger.Errorf("Failed to diagnose: %v", err) return Fail, err } if status != DiagnoseTypeShardHasInactiveGroup { - log.Infof("Shard %v is no longer in DiagnoseTypeShardHasInactiveGroup: %v", formatKeyspaceShard(shard.KeyspaceShard), status) + shard.logger.Infof("Shard %v is no longer in DiagnoseTypeShardHasInactiveGroup: %v", formatKeyspaceShard(shard.KeyspaceShard), status) return Noop, nil } // Now we know the shard has an agreed group but no member in it @@ -206,22 +205,22 @@ func (shard *GRShard) repairShardHasInactiveGroup(ctx context.Context) (RepairRe func (shard *GRShard) repairBackoffError(ctx context.Context, diagnose DiagnoseType) (RepairResultCode, error) { ctx, err := shard.LockShard(ctx, "repairBackoffError") if err != nil { - log.Warningf("repairBackoffError fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err) + shard.logger.Warningf("repairBackoffError fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err) return Noop, err } defer shard.UnlockShard() shard.refreshTabletsInShardLocked(ctx) status, err := shard.diagnoseLocked(ctx) if err != nil { - log.Errorf("Failed to diagnose: %v", err) + shard.logger.Errorf("Failed to diagnose: %v", err) return Fail, err } if status != diagnose { - log.Infof("Shard %v is no longer in %v: %v", formatKeyspaceShard(shard.KeyspaceShard), diagnose, status) + shard.logger.Infof("Shard %v is no longer in %v: %v", formatKeyspaceShard(shard.KeyspaceShard), diagnose, status) return Noop, nil } if shard.lastDiagnoseResult != diagnose { - log.Infof("diagnose shard as %v but last diagnose result was %v", diagnose, shard.lastDiagnoseResult) + shard.logger.Infof("diagnose shard as %v but last diagnose result was %v", diagnose, shard.lastDiagnoseResult) return Noop, nil } now := time.Now() @@ -235,10 +234,10 @@ func (shard *GRShard) repairBackoffError(ctx context.Context, diagnose DiagnoseT return Fail, fmt.Errorf("unsupported diagnose for repairBackoffError: %v", diagnose) } if now.Sub(shard.lastDiagnoseSince) < waitTime { - log.Infof("Detected %v at %v. In wait time for network partition", diagnose, shard.lastDiagnoseSince) + shard.logger.Infof("Detected %v at %v. In wait time for network partition", diagnose, shard.lastDiagnoseSince) return Noop, nil } - log.Infof("Detected %v at %v. Start repairing after %v", diagnose, shard.lastDiagnoseSince, shard.transientErrorWaitTime) + shard.logger.Infof("Detected %v at %v. Start repairing after %v", diagnose, shard.lastDiagnoseSince, shard.transientErrorWaitTime) err = shard.stopAndRebootstrap(ctx) repairTimingsMs.Record([]string{DiagnoseTypeBackoffError, strconv.FormatBool(err == nil)}, now) if err != nil { @@ -259,27 +258,27 @@ func (shard *GRShard) stopAndRebootstrap(ctx context.Context) error { defer wg.Done() status := shard.sqlGroup.GetStatus(instance.instanceKey) if status != nil && status.State == db.OFFLINE { - log.Infof("stop group replication on %v skipped because it is already OFFLINE", instance.alias) + shard.logger.Infof("stop group replication on %v skipped because it is already OFFLINE", instance.alias) return } - log.Infof("stop group replication on %v", instance.alias) + shard.logger.Infof("stop group replication on %v", instance.alias) err := shard.dbAgent.StopGroupLocked(instance.instanceKey) if err != nil { if !unreachableError(err) { er.RecordError(err) } - log.Warningf("Error during stop group replication on %v: %v", instance.instanceKey.Hostname, err) + shard.logger.Warningf("Error during stop group replication on %v: %v", instance.instanceKey.Hostname, err) } }) if errorRecorder.HasErrors() { - log.Errorf("Failed to stop group replication %v", errorRecorder.Error()) + shard.logger.Errorf("Failed to stop group replication %v", errorRecorder.Error()) return errorRecorder.Error() } - log.Infof("Stop the group for %v", formatKeyspaceShard(shard.KeyspaceShard)) - log.Info("Start find candidate to rebootstrap") + shard.logger.Infof("Stop the group for %v", formatKeyspaceShard(shard.KeyspaceShard)) + shard.logger.Info("Start find candidate to rebootstrap") candidate, err := shard.findRebootstrapCandidate(ctx) if err != nil { - log.Errorf("Failed to find rebootstrap candidate: %v", err) + shard.logger.Errorf("Failed to find rebootstrap candidate: %v", err) return err } shard.refreshSQLGroup() @@ -287,10 +286,10 @@ func (shard *GRShard) stopAndRebootstrap(ctx context.Context) error { return errors.New("unsafe to bootstrap group") } if *abortRebootstrap { - log.Warningf("Abort stopAndRebootstrap because rebootstrap hook override") + shard.logger.Warningf("Abort stopAndRebootstrap because rebootstrap hook override") return errForceAbortBootstrap } - log.Infof("Rebootstrap %v on %v", formatKeyspaceShard(shard.KeyspaceShard), candidate.instanceKey.Hostname) + shard.logger.Infof("Rebootstrap %v on %v", formatKeyspaceShard(shard.KeyspaceShard), candidate.instanceKey.Hostname) // Make sure we still hold the topo server lock before moving on if err := shard.checkShardLocked(ctx); err != nil { return err @@ -321,7 +320,7 @@ func (shard *GRShard) getGTIDSetFromAll(skipPrimary bool) (*groupGTIDRecorder, * if skipPrimary && primary != nil { status := shard.sqlGroup.GetStatus(primary.instanceKey) mysqlPrimaryHost, mysqlPrimaryPort = status.HostName, status.Port - log.Infof("Found primary instance from MySQL on %v", mysqlPrimaryHost) + shard.logger.Infof("Found primary instance from MySQL on %v", mysqlPrimaryHost) } gtidRecorder := &groupGTIDRecorder{} // Iterate through all the instances in the shard and find the one with largest GTID set with best effort @@ -330,13 +329,13 @@ func (shard *GRShard) getGTIDSetFromAll(skipPrimary bool) (*groupGTIDRecorder, * errorRecorder := shard.forAllInstances(func(instance *grInstance, wg *sync.WaitGroup, er concurrency.ErrorRecorder) { defer wg.Done() if skipPrimary && instance.instanceKey.Hostname == mysqlPrimaryHost && instance.instanceKey.Port == mysqlPrimaryPort { - log.Infof("Skip %v to failover to a non-primary node", mysqlPrimaryHost) + shard.logger.Infof("Skip %v to failover to a non-primary node", mysqlPrimaryHost) return } gtids, err := shard.dbAgent.FetchApplierGTIDSet(instance.instanceKey) if err != nil { er.RecordError(err) - log.Errorf("%v get error while fetch applier GTIDs: %v", instance.alias, err) + shard.logger.Errorf("%v get error while fetch applier GTIDs: %v", instance.alias, err) shard.shardStatusCollector.recordProblematics(instance) if unreachableError(err) { shard.shardStatusCollector.recordUnreachables(instance) @@ -344,7 +343,7 @@ func (shard *GRShard) getGTIDSetFromAll(skipPrimary bool) (*groupGTIDRecorder, * return } if gtids == nil { - log.Warningf("[failover candidate] skip %s with empty gtid", instance.alias) + shard.logger.Warningf("[failover candidate] skip %s with empty gtid", instance.alias) return } gtidRecorder.recordGroupGTIDs(gtids, instance) @@ -355,28 +354,28 @@ func (shard *GRShard) getGTIDSetFromAll(skipPrimary bool) (*groupGTIDRecorder, * func (shard *GRShard) findRebootstrapCandidate(ctx context.Context) (*grInstance, error) { gtidRecorder, errorRecorder, err := shard.getGTIDSetFromAll(false) if err != nil { - log.Errorf("Failed to get gtid from all: %v", err) + shard.logger.Errorf("Failed to get gtid from all: %v", err) return nil, err } err = errorRecorder.Error() // We cannot tolerate any error from mysql during a rebootstrap. if err != nil { - log.Errorf("Failed to fetch all GTID with forAllInstances for rebootstrap: %v", err) + shard.logger.Errorf("Failed to fetch all GTID with forAllInstances for rebootstrap: %v", err) return nil, err } candidate, err := shard.findFailoverCandidateFromRecorder(ctx, gtidRecorder, nil) if err != nil { - log.Errorf("Failed to find rebootstrap candidate by GTID after forAllInstances: %v", err) + shard.logger.Errorf("Failed to find rebootstrap candidate by GTID after forAllInstances: %v", err) return nil, err } if candidate == nil { return nil, fmt.Errorf("failed to find rebootstrap candidate for %v", formatKeyspaceShard(shard.KeyspaceShard)) } if !shard.instanceReachable(ctx, candidate) { - log.Errorf("rebootstrap candidate %v (%v) is not reachable via ping", candidate.alias, candidate.instanceKey.Hostname) + shard.logger.Errorf("rebootstrap candidate %v (%v) is not reachable via ping", candidate.alias, candidate.instanceKey.Hostname) return nil, fmt.Errorf("%v is unreachable", candidate.alias) } - log.Infof("%v is the rebootstrap candidate", candidate.alias) + shard.logger.Infof("%v is the rebootstrap candidate", candidate.alias) return candidate, nil } @@ -385,7 +384,7 @@ func (shard *GRShard) findRebootstrapCandidate(ctx context.Context) (*grInstance func (shard *GRShard) findFailoverCandidate(ctx context.Context) (*grInstance, error) { gtidRecorder, errorRecorder, err := shard.getGTIDSetFromAll(true) if err != nil { - log.Errorf("Failed to get gtid from all: %v", err) + shard.logger.Errorf("Failed to get gtid from all: %v", err) return nil, err } err = errorRecorder.Error() @@ -393,12 +392,12 @@ func (shard *GRShard) findFailoverCandidate(ctx context.Context) (*grInstance, e // Failover within the group is safe, finding the largest GTID is an optimization. // therefore we don't check error from errorRecorder just log it if err != nil { - log.Warningf("Errors when fetch all GTID with forAllInstances for failover: %v", err) + shard.logger.Warningf("Errors when fetch all GTID with forAllInstances for failover: %v", err) } shard.forAllInstances(func(instance *grInstance, wg *sync.WaitGroup, er concurrency.ErrorRecorder) { defer wg.Done() if !shard.instanceReachable(ctx, instance) { - log.Errorf("%v is not reachable via ping", instance.alias) + shard.logger.Errorf("%v is not reachable via ping", instance.alias) shard.shardStatusCollector.recordProblematics(instance) shard.shardStatusCollector.recordUnreachables(instance) } @@ -408,20 +407,20 @@ func (shard *GRShard) findFailoverCandidate(ctx context.Context) (*grInstance, e return !shard.shardStatusCollector.isUnreachable(instance) }) if err != nil { - log.Errorf("Failed to find failover candidate by GTID after forAllInstances: %v", err) + shard.logger.Errorf("Failed to find failover candidate by GTID after forAllInstances: %v", err) return nil, err } if candidate == nil { return nil, fmt.Errorf("failed to find failover candidate for %v", formatKeyspaceShard(shard.KeyspaceShard)) } - log.Infof("%v is the failover candidate", candidate.alias) + shard.logger.Infof("%v is the failover candidate", candidate.alias) return candidate, nil } func (shard *GRShard) repairWrongPrimaryTablet(ctx context.Context) (RepairResultCode, error) { ctx, err := shard.LockShard(ctx, "repairWrongPrimaryTablet") if err != nil { - log.Warningf("repairWrongPrimaryTablet fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err) + shard.logger.Warningf("repairWrongPrimaryTablet fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err) return Noop, err } defer shard.UnlockShard() @@ -430,11 +429,11 @@ func (shard *GRShard) repairWrongPrimaryTablet(ctx context.Context) (RepairResul shard.refreshTabletsInShardLocked(ctx) status, err := shard.diagnoseLocked(ctx) if err != nil { - log.Errorf("Failed to diagnose: %v", err) + shard.logger.Errorf("Failed to diagnose: %v", err) return Fail, err } if status != DiagnoseTypeWrongPrimaryTablet { - log.Infof("Shard %v is no longer in DiagnoseTypeWrongPrimaryTablet: %v", formatKeyspaceShard(shard.KeyspaceShard), status) + shard.logger.Infof("Shard %v is no longer in DiagnoseTypeWrongPrimaryTablet: %v", formatKeyspaceShard(shard.KeyspaceShard), status) return Noop, nil } start := time.Now() @@ -465,7 +464,7 @@ func (shard *GRShard) fixPrimaryTabletLocked(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to change type to primary on %v: %v", candidate.alias, err) } - log.Infof("Successfully make %v the primary tablet", candidate.alias) + shard.logger.Infof("Successfully make %v the primary tablet", candidate.alias) return nil } @@ -474,18 +473,18 @@ func (shard *GRShard) fixPrimaryTabletLocked(ctx context.Context) error { func (shard *GRShard) repairUnconnectedReplica(ctx context.Context) (RepairResultCode, error) { ctx, err := shard.LockShard(ctx, "repairUnconnectedReplica") if err != nil { - log.Warningf("repairUnconnectedReplica fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) + shard.logger.Warningf("repairUnconnectedReplica fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) return Noop, err } defer shard.UnlockShard() shard.refreshTabletsInShardLocked(ctx) status, err := shard.diagnoseLocked(ctx) if err != nil { - log.Errorf("Failed to diagnose: %v", err) + shard.logger.Errorf("Failed to diagnose: %v", err) return Fail, err } if status != DiagnoseTypeUnconnectedReplica { - log.Infof("Shard %v is no longer in DiagnoseTypeUnconnectedReplica: %v", formatKeyspaceShard(shard.KeyspaceShard), status) + shard.logger.Infof("Shard %v is no longer in DiagnoseTypeUnconnectedReplica: %v", formatKeyspaceShard(shard.KeyspaceShard), status) return Noop, nil } start := time.Now() @@ -504,20 +503,20 @@ func (shard *GRShard) repairUnconnectedReplicaAction(ctx context.Context) error return err } if target == nil { - log.Infof("there is no instance without group for %v", formatKeyspaceShard(shard.KeyspaceShard)) + shard.logger.Infof("there is no instance without group for %v", formatKeyspaceShard(shard.KeyspaceShard)) return nil } - log.Infof("Connecting replica %v to %v", target.instanceKey.Hostname, primaryInstance.instanceKey.Hostname) + shard.logger.Infof("Connecting replica %v to %v", target.instanceKey.Hostname, primaryInstance.instanceKey.Hostname) status := shard.sqlGroup.GetStatus(target.instanceKey) // Make sure we still hold the topo server lock before moving on if err := shard.checkShardLocked(ctx); err != nil { return err } if status != nil && status.State != db.OFFLINE { - log.Infof("stop group replication on %v ($v) before join the group", target.alias, status.State) + shard.logger.Infof("stop group replication on %v (%v) before join the group", target.alias, status.State) err := shard.dbAgent.StopGroupLocked(target.instanceKey) if err != nil { - log.Errorf("Failed to stop group replication on %v: %v", target.instanceKey.Hostname, err) + shard.logger.Errorf("Failed to stop group replication on %v: %v", target.instanceKey.Hostname, err) return err } // Make sure we still hold the topo server lock before moving on @@ -531,18 +530,18 @@ func (shard *GRShard) repairUnconnectedReplicaAction(ctx context.Context) error func (shard *GRShard) repairUnreachablePrimary(ctx context.Context) (RepairResultCode, error) { ctx, err := shard.LockShard(ctx, "repairUnreachablePrimary") if err != nil { - log.Warningf("repairUnreachablePrimary fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) + shard.logger.Warningf("repairUnreachablePrimary fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) return Noop, err } defer shard.UnlockShard() shard.refreshTabletsInShardLocked(ctx) status, err := shard.diagnoseLocked(ctx) if err != nil { - log.Errorf("Failed to diagnose: %v", err) + shard.logger.Errorf("Failed to diagnose: %v", err) return Fail, err } if status != DiagnoseTypeUnreachablePrimary { - log.Infof("Shard %v is no longer in DiagnoseTypeUnreachablePrimary: %v", formatKeyspaceShard(shard.KeyspaceShard), status) + shard.logger.Infof("Shard %v is no longer in DiagnoseTypeUnreachablePrimary: %v", formatKeyspaceShard(shard.KeyspaceShard), status) return Noop, nil } // We are here because either: @@ -564,18 +563,18 @@ func (shard *GRShard) repairUnreachablePrimary(ctx context.Context) (RepairResul func (shard *GRShard) repairInsufficientGroupSize(ctx context.Context) (RepairResultCode, error) { ctx, err := shard.LockShard(ctx, "repairInsufficientGroupSize") if err != nil { - log.Warningf("repairInsufficientGroupSize fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) + shard.logger.Warningf("repairInsufficientGroupSize fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) return Noop, err } defer shard.UnlockShard() shard.refreshTabletsInShardLocked(ctx) status, err := shard.diagnoseLocked(ctx) if err != nil { - log.Errorf("Failed to diagnose: %v", err) + shard.logger.Errorf("Failed to diagnose: %v", err) return Fail, err } if status != DiagnoseTypeInsufficientGroupSize { - log.Infof("Shard %v is no longer in DiagnoseTypeInsufficientGroupSize: %v", formatKeyspaceShard(shard.KeyspaceShard), status) + shard.logger.Infof("Shard %v is no longer in DiagnoseTypeInsufficientGroupSize: %v", formatKeyspaceShard(shard.KeyspaceShard), status) return Noop, nil } // We check primary tablet is consistent with sql primary before InsufficientGroupSize @@ -599,18 +598,18 @@ func (shard *GRShard) repairInsufficientGroupSize(ctx context.Context) (RepairRe func (shard *GRShard) repairReadOnlyShard(ctx context.Context) (RepairResultCode, error) { ctx, err := shard.LockShard(ctx, "repairReadOnlyShard") if err != nil { - log.Warningf("repairReadOnlyShard fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) + shard.logger.Warningf("repairReadOnlyShard fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) return Noop, err } defer shard.UnlockShard() shard.refreshTabletsInShardLocked(ctx) status, err := shard.diagnoseLocked(ctx) if err != nil { - log.Errorf("Failed to diagnose: %v", err) + shard.logger.Errorf("Failed to diagnose: %v", err) return Fail, err } if status != DiagnoseTypeReadOnlyShard { - log.Infof("Shard %v is no longer in DiagnoseTypeReadOnlyShard: %v", formatKeyspaceShard(shard.KeyspaceShard), status) + shard.logger.Infof("Shard %v is no longer in DiagnoseTypeReadOnlyShard: %v", formatKeyspaceShard(shard.KeyspaceShard), status) return Noop, nil } primary := shard.findShardPrimaryTablet() @@ -630,7 +629,7 @@ func (shard *GRShard) repairReadOnlyShard(ctx context.Context) (RepairResultCode func (shard *GRShard) Failover(ctx context.Context) error { ctx, err := shard.LockShard(ctx, "Failover") if err != nil { - log.Warningf("Failover fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) + shard.logger.Warningf("Failover fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err) return err } defer shard.UnlockShard() @@ -641,7 +640,7 @@ func (shard *GRShard) Failover(ctx context.Context) error { func (shard *GRShard) failoverLocked(ctx context.Context) error { candidate, err := shard.findFailoverCandidate(ctx) if err != nil { - log.Errorf("Failed to find failover candidate: %v", err) + shard.logger.Errorf("Failed to find failover candidate: %v", err) return err } // Make sure we still hold the topo server lock before moving on @@ -650,20 +649,20 @@ func (shard *GRShard) failoverLocked(ctx context.Context) error { } err = shard.dbAgent.Failover(candidate.instanceKey) if err != nil { - log.Errorf("Failed to failover mysql to %v", candidate.alias) + shard.logger.Errorf("Failed to failover mysql to %v", candidate.alias) return err } - log.Infof("Successfully failover MySQL to %v for %v", candidate.instanceKey.Hostname, formatKeyspaceShard(shard.KeyspaceShard)) + shard.logger.Infof("Successfully failover MySQL to %v for %v", candidate.instanceKey.Hostname, formatKeyspaceShard(shard.KeyspaceShard)) // Make sure we still hold the topo server lock before moving on if err := shard.checkShardLocked(ctx); err != nil { return err } err = shard.tmc.ChangeType(ctx, candidate.tablet, topodatapb.TabletType_PRIMARY) if err != nil { - log.Errorf("Failed to failover Vitess %v", candidate.alias) + shard.logger.Errorf("Failed to failover Vitess %v", candidate.alias) return err } - log.Infof("Successfully failover Vitess to %v for %v", candidate.alias, formatKeyspaceShard(shard.KeyspaceShard)) + shard.logger.Infof("Successfully failover Vitess to %v for %v", candidate.alias, formatKeyspaceShard(shard.KeyspaceShard)) return nil } @@ -675,7 +674,7 @@ func (shard *GRShard) findFailoverCandidateFromRecorder(ctx context.Context, rec // in case they have same gtid set recorder.sort() for _, gtidInst := range recorder.gtidWithInstances { - log.Infof("[failover candidates] %s gtid %s", gtidInst.instance.alias, gtidInst.gtids.String()) + shard.logger.Infof("[failover candidates] %s gtid %s", gtidInst.instance.alias, gtidInst.gtids.String()) } var largestGTIDs mysql.GTIDSet var candidate *grInstance @@ -686,7 +685,7 @@ func (shard *GRShard) findFailoverCandidateFromRecorder(ctx context.Context, rec gtids := elem.gtids inst := elem.instance if check != nil && !check(ctx, inst) { - log.Warningf("Skip %v as candidate with gtid %v because it failed the check", inst.alias, gtids.String()) + shard.logger.Warningf("Skip %v as candidate with gtid %v because it failed the check", inst.alias, gtids.String()) continue } if largestGTIDs == nil { @@ -706,7 +705,7 @@ func (shard *GRShard) findFailoverCandidateFromRecorder(ctx context.Context, rec // we log and append to candidates so that we know there is a problem in the group // after the iteration if !isSuperset { - log.Errorf("FetchGroupView divergent GITD set from host=%v GTIDSet=%v", inst.instanceKey.Hostname, gtids) + shard.logger.Errorf("FetchGroupView divergent GITD set from host=%v GTIDSet=%v", inst.instanceKey.Hostname, gtids) divergentCandidates = append(divergentCandidates, inst.alias) } } @@ -736,7 +735,7 @@ func (shard *GRShard) checkShardLocked(ctx context.Context) error { if err := topo.CheckShardLocked(ctx, shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard); err != nil { labels := []string{shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard} unexpectedLockLost.Add(labels, 1) - log.Errorf("lost topology lock; aborting") + shard.logger.Errorf("lost topology lock; aborting") return vterrors.Wrap(err, "lost topology lock; aborting") } return nil diff --git a/go/vt/vtgr/log/log.go b/go/vt/vtgr/log/log.go new file mode 100644 index 00000000000..7a94179ed98 --- /dev/null +++ b/go/vt/vtgr/log/log.go @@ -0,0 +1,53 @@ +package log + +import ( + "fmt" + + "vitess.io/vitess/go/vt/log" +) + +// Logger is a wrapper that prefix loglines with keyspace/shard +type Logger struct { + prefix string +} + +// NewVTGRLogger creates a new logger +func NewVTGRLogger(keyspace, shard string) *Logger { + return &Logger{ + prefix: fmt.Sprintf("%s/%s", keyspace, shard), + } +} + +// Info formats arguments like fmt.Print +func (logger *Logger) Info(msg string) { + log.InfoDepth(1, logger.annotate(msg)) +} + +// Infof formats arguments like fmt.Printf. +func (logger *Logger) Infof(format string, args ...interface{}) { + log.InfoDepth(1, logger.annotate(fmt.Sprintf(format, args...))) +} + +// Warning formats arguments like fmt.Print +func (logger *Logger) Warning(msg string) { + log.WarningDepth(1, logger.annotate(msg)) +} + +// Warningf formats arguments like fmt.Printf. +func (logger *Logger) Warningf(format string, args ...interface{}) { + log.WarningDepth(1, logger.annotate(fmt.Sprintf(format, args...))) +} + +// Error formats arguments like fmt.Print +func (logger *Logger) Error(msg string) { + log.ErrorDepth(1, logger.annotate(msg)) +} + +// Errorf formats arguments like fmt.Printf. +func (logger *Logger) Errorf(format string, args ...interface{}) { + log.ErrorDepth(1, logger.annotate(fmt.Sprintf(format, args...))) +} + +func (logger *Logger) annotate(input string) string { + return fmt.Sprintf("shard=%s %s", logger.prefix, input) +} diff --git a/go/vt/vtgr/log/log_test.go b/go/vt/vtgr/log/log_test.go new file mode 100644 index 00000000000..fd4ede386e9 --- /dev/null +++ b/go/vt/vtgr/log/log_test.go @@ -0,0 +1,16 @@ +package log + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestVTGRLogger(t *testing.T) { + logger := NewVTGRLogger("ks", "0") + s1 := logger.annotate("abc") + assert.Equal(t, "shard=ks/0 abc", s1) + s2 := fmt.Sprintf(logger.annotate("abc %s"), "def") + assert.Equal(t, "shard=ks/0 abc def", s2) +}