diff --git a/go/vt/topo/conn.go b/go/vt/topo/conn.go index 0f11ffa1896..daf28d7f8a3 100644 --- a/go/vt/topo/conn.go +++ b/go/vt/topo/conn.go @@ -155,21 +155,21 @@ type Conn interface { Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc) // - // Master election methods. This is meant to have a small + // Leader election methods. This is meant to have a small // number of processes elect a primary within a group. The // backend storage for this can either be the global topo // server, or a resilient quorum of individual cells, to // reduce the load / dependency on the global topo server. // - // NewMasterParticipation creates a MasterParticipation - // object, used to become the Master in an election for the + // NewLeaderParticipation creates a LeaderParticipation + // object, used to become the Leader in an election for the // provided group name. Id is the name of the local process, // passing in the hostname:port of the current process as id // is the common usage. Id must be unique for each process // calling this, for a given name. Calling this function does // not make the current process a candidate for the election. - NewMasterParticipation(name, id string) (MasterParticipation, error) + NewLeaderParticipation(name, id string) (LeaderParticipation, error) // Close closes the connection to the server. Close() @@ -270,14 +270,14 @@ type WatchData struct { Err error } -// MasterParticipation is the object returned by NewMasterParticipation. +// LeaderParticipation is the object returned by NewLeaderParticipation. // Sample usage: // -// mp := server.NewMasterParticipation("vtctld", "hostname:8080") +// mp := server.NewLeaderParticipation("vtctld", "hostname:8080") // job := NewJob() // go func() { // for { -// ctx, err := mp.WaitForMastership() +// ctx, err := mp.WaitForLeadership() // switch err { // case nil: // job.RunUntilContextDone(ctx) @@ -294,34 +294,34 @@ type WatchData struct { // if job.Running() { // job.WriteStatus(w, r) // } else { -// http.Redirect(w, r, mp.GetCurrentMasterID(context.Background()), http.StatusFound) +// http.Redirect(w, r, mp.GetCurrentLeaderID(context.Background()), http.StatusFound) // } // }) // // servenv.OnTermSync(func() { // mp.Stop() // }) -type MasterParticipation interface { - // WaitForMastership makes the current process a candidate +type LeaderParticipation interface { + // WaitForLeadership makes the current process a candidate // for election, and waits until this process is the primary. // After we become the primary, we may lose primaryship. In that case, // the returned context will be canceled. If Stop was called, - // WaitForMastership will return nil, ErrInterrupted. - WaitForMastership() (context.Context, error) + // WaitForLeadership will return nil, ErrInterrupted. + WaitForLeadership() (context.Context, error) // Stop is called when we don't want to participate in the // primary election any more. Typically, that is when the // hosting process is terminating. We will relinquish // primaryship at that point, if we had it. Stop should // not return until everything has been done. - // The MasterParticipation object should be discarded - // after Stop has been called. Any call to WaitForMastership + // The LeaderParticipation object should be discarded + // after Stop has been called. Any call to WaitForLeadership // after Stop() will return nil, ErrInterrupted. - // If WaitForMastership() was running, it will return + // If WaitForLeadership() was running, it will return // nil, ErrInterrupted as soon as possible. Stop() - // GetCurrentMasterID returns the current primary id. + // GetCurrentLeaderID returns the current primary id. // This may not work after Stop has been called. - GetCurrentMasterID(ctx context.Context) (string, error) + GetCurrentLeaderID(ctx context.Context) (string, error) } diff --git a/go/vt/topo/consultopo/election.go b/go/vt/topo/consultopo/election.go index 19ca2f6d96e..0c595dd27a8 100644 --- a/go/vt/topo/consultopo/election.go +++ b/go/vt/topo/consultopo/election.go @@ -27,9 +27,9 @@ import ( "vitess.io/vitess/go/vt/topo" ) -// NewMasterParticipation is part of the topo.Server interface -func (s *Server) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) { - return &consulMasterParticipation{ +// NewLeaderParticipation is part of the topo.Server interface +func (s *Server) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { + return &consulLeaderParticipation{ s: s, name: name, id: id, @@ -38,15 +38,15 @@ func (s *Server) NewMasterParticipation(name, id string) (topo.MasterParticipati }, nil } -// consulMasterParticipation implements topo.MasterParticipation. +// consulLeaderParticipation implements topo.LeaderParticipation. // // We use a key with name /elections/ for the lock, // that contains the id. -type consulMasterParticipation struct { +type consulLeaderParticipation struct { // s is our parent consul topo Server s *Server - // name is the name of this MasterParticipation + // name is the name of this LeaderParticipation name string // id is the process's current id. @@ -59,8 +59,8 @@ type consulMasterParticipation struct { done chan struct{} } -// WaitForMastership is part of the topo.MasterParticipation interface. -func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error) { +// WaitForLeadership is part of the topo.LeaderParticipation interface. +func (mp *consulLeaderParticipation) WaitForLeadership() (context.Context, error) { electionPath := path.Join(mp.s.root, electionsPath, mp.name) l, err := mp.s.client.LockOpts(&api.LockOptions{ @@ -74,7 +74,7 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error // If Stop was already called, mp.done is closed, so we are interrupted. select { case <-mp.done: - return nil, topo.NewError(topo.Interrupted, "mastership") + return nil, topo.NewError(topo.Interrupted, "Leadership") default: } @@ -98,7 +98,7 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error lockCancel() // We could have lost the lock. Per consul API, explicitly call Unlock to make sure that session will not be renewed. if err := l.Unlock(); err != nil { - log.Errorf("master election(%v) Unlock failed: %v", mp.name, err) + log.Errorf("Leader election(%v) Unlock failed: %v", mp.name, err) } case <-mp.stop: // Stop was called. We stop the context first, @@ -106,7 +106,7 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error // is the primary any more, then we unlock. lockCancel() if err := l.Unlock(); err != nil { - log.Errorf("master election(%v) Unlock failed: %v", mp.name, err) + log.Errorf("Leader election(%v) Unlock failed: %v", mp.name, err) } close(mp.done) } @@ -115,14 +115,14 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error return lockCtx, nil } -// Stop is part of the topo.MasterParticipation interface -func (mp *consulMasterParticipation) Stop() { +// Stop is part of the topo.LeaderParticipation interface +func (mp *consulLeaderParticipation) Stop() { close(mp.stop) <-mp.done } -// GetCurrentMasterID is part of the topo.MasterParticipation interface -func (mp *consulMasterParticipation) GetCurrentMasterID(ctx context.Context) (string, error) { +// GetCurrentLeaderID is part of the topo.LeaderParticipation interface +func (mp *consulLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) { electionPath := path.Join(mp.s.root, electionsPath, mp.name) pair, _, err := mp.s.kv.Get(electionPath, nil) if err != nil { diff --git a/go/vt/topo/consultopo/server_flaky_test.go b/go/vt/topo/consultopo/server_flaky_test.go index 19404d6cd65..f6d34c5e7d6 100644 --- a/go/vt/topo/consultopo/server_flaky_test.go +++ b/go/vt/topo/consultopo/server_flaky_test.go @@ -68,6 +68,9 @@ func startConsul(t *testing.T, authToken string) (*exec.Cmd, string, string) { }, } + // TODO(deepthi): this is the legacy ACL format. We run v1.4.0 by default in which this has been deprecated. + // We should start using the new format + // https://learn.hashicorp.com/tutorials/consul/access-control-replication-multiple-datacenters?in=consul/security-operations if authToken != "" { config["datacenter"] = "vitess" config["acl_datacenter"] = "vitess" diff --git a/go/vt/topo/etcd2topo/election.go b/go/vt/topo/etcd2topo/election.go index 46be45afb4b..667b98562f4 100644 --- a/go/vt/topo/etcd2topo/election.go +++ b/go/vt/topo/etcd2topo/election.go @@ -27,9 +27,9 @@ import ( "vitess.io/vitess/go/vt/topo" ) -// NewMasterParticipation is part of the topo.Server interface -func (s *Server) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) { - return &etcdMasterParticipation{ +// NewLeaderParticipation is part of the topo.Server interface +func (s *Server) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { + return &etcdLeaderParticipation{ s: s, name: name, id: id, @@ -38,16 +38,16 @@ func (s *Server) NewMasterParticipation(name, id string) (topo.MasterParticipati }, nil } -// etcdMasterParticipation implements topo.MasterParticipation. +// etcdLeaderParticipation implements topo.LeaderParticipation. // // We use a directory (in global election path, with the name) with // ephemeral files in it, that contains the id. The oldest revision // wins the election. -type etcdMasterParticipation struct { +type etcdLeaderParticipation struct { // s is our parent etcd topo Server s *Server - // name is the name of this MasterParticipation + // name is the name of this LeaderParticipation name string // id is the process's current id. @@ -60,12 +60,12 @@ type etcdMasterParticipation struct { done chan struct{} } -// WaitForMastership is part of the topo.MasterParticipation interface. -func (mp *etcdMasterParticipation) WaitForMastership() (context.Context, error) { +// WaitForLeadership is part of the topo.LeaderParticipation interface. +func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error) { // If Stop was already called, mp.done is closed, so we are interrupted. select { case <-mp.done: - return nil, topo.NewError(topo.Interrupted, "mastership") + return nil, topo.NewError(topo.Interrupted, "Leadership") default: } @@ -99,14 +99,14 @@ func (mp *etcdMasterParticipation) WaitForMastership() (context.Context, error) return lockCtx, nil } -// Stop is part of the topo.MasterParticipation interface -func (mp *etcdMasterParticipation) Stop() { +// Stop is part of the topo.LeaderParticipation interface +func (mp *etcdLeaderParticipation) Stop() { close(mp.stop) <-mp.done } -// GetCurrentMasterID is part of the topo.MasterParticipation interface -func (mp *etcdMasterParticipation) GetCurrentMasterID(ctx context.Context) (string, error) { +// GetCurrentLeaderID is part of the topo.LeaderParticipation interface +func (mp *etcdLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) { electionPath := path.Join(mp.s.root, electionsPath, mp.name) // Get the keys in the directory, older first. diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go index 1bc1d437e6d..752b82205d4 100644 --- a/go/vt/topo/etcd2topo/lock.go +++ b/go/vt/topo/etcd2topo/lock.go @@ -34,7 +34,7 @@ import ( ) var ( - leaseTTL = flag.Int("topo_etcd_lease_ttl", 30, "Lease TTL for locks and master election. The client will use KeepAlive to keep the lease going.") + leaseTTL = flag.Int("topo_etcd_lease_ttl", 30, "Lease TTL for locks and leader election. The client will use KeepAlive to keep the lease going.") ) // newUniqueEphemeralKV creates a new file in the provided directory. diff --git a/go/vt/topo/helpers/copy_test.go b/go/vt/topo/helpers/copy_test.go index f484d255322..8a030f9ec0a 100644 --- a/go/vt/topo/helpers/copy_test.go +++ b/go/vt/topo/helpers/copy_test.go @@ -49,8 +49,8 @@ func createSetup(ctx context.Context, t *testing.T) (*topo.Server, *topo.Server) Cell: "test_cell", Uid: 123, }, - Hostname: "masterhost", - MysqlHostname: "masterhost", + Hostname: "primaryhost", + MysqlHostname: "primaryhost", PortMap: map[string]int32{ "vt": 8101, "gprc": 8102, @@ -63,7 +63,7 @@ func createSetup(ctx context.Context, t *testing.T) (*topo.Server, *topo.Server) } tablet1.MysqlPort = 3306 if err := fromTS.CreateTablet(ctx, tablet1); err != nil { - t.Fatalf("cannot create master tablet: %v", err) + t.Fatalf("cannot create primary tablet: %v", err) } tablet2 := &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ diff --git a/go/vt/topo/helpers/tee.go b/go/vt/topo/helpers/tee.go index 5c983327031..c09cb9c6ab9 100644 --- a/go/vt/topo/helpers/tee.go +++ b/go/vt/topo/helpers/tee.go @@ -30,7 +30,7 @@ import ( // topo.Server to another. // // - primary: we read everything from it, and write to it. We also create -// MasterParticipation from it. +// LeaderParticipation from it. // - secondary: we write to it as well, but we usually don't fail. // - we lock primary/secondary if reverseLockOrder is False, // or secondary/primary if reverseLockOrder is True. @@ -224,7 +224,7 @@ func (ld *teeTopoLockDescriptor) Unlock(ctx context.Context) error { return ferr } -// NewMasterParticipation is part of the topo.Conn interface. -func (c *TeeConn) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) { - return c.primary.NewMasterParticipation(name, id) +// NewLeaderParticipation is part of the topo.Conn interface. +func (c *TeeConn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { + return c.primary.NewLeaderParticipation(name, id) } diff --git a/go/vt/topo/k8stopo/election.go b/go/vt/topo/k8stopo/election.go index 85ebbdf83c7..1106156f88f 100644 --- a/go/vt/topo/k8stopo/election.go +++ b/go/vt/topo/k8stopo/election.go @@ -27,9 +27,9 @@ import ( const electionsPath = "elections" -// NewMasterParticipation is part of the topo.Server interface -func (s *Server) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) { - return &kubernetesMasterParticipation{ +// NewLeaderParticipation is part of the topo.Server interface +func (s *Server) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { + return &kubernetesLeaderParticipation{ s: s, name: name, id: id, @@ -38,16 +38,16 @@ func (s *Server) NewMasterParticipation(name, id string) (topo.MasterParticipati }, nil } -// kubernetesMasterParticipation implements topo.MasterParticipation. +// kubernetesLeaderParticipation implements topo.LeaderParticipation. // // We use a directory (in global election path, with the name) with // ephemeral files in it, that contains the id. The oldest revision // wins the election. -type kubernetesMasterParticipation struct { +type kubernetesLeaderParticipation struct { // s is our parent kubernetes topo Server s *Server - // name is the name of this MasterParticipation + // name is the name of this LeaderParticipation name string // id is the process's current id. @@ -60,16 +60,16 @@ type kubernetesMasterParticipation struct { done chan struct{} } -func (mp *kubernetesMasterParticipation) getElectionPath() string { +func (mp *kubernetesLeaderParticipation) getElectionPath() string { return path.Join(mp.s.root, electionsPath, mp.name) } -// WaitForMastership is part of the topo.MasterParticipation interface. -func (mp *kubernetesMasterParticipation) WaitForMastership() (context.Context, error) { +// WaitForLeadership is part of the topo.LeaderParticipation interface. +func (mp *kubernetesLeaderParticipation) WaitForLeadership() (context.Context, error) { // If Stop was already called, mp.done is closed, so we are interrupted. select { case <-mp.done: - return nil, topo.NewError(topo.Interrupted, "mastership") + return nil, topo.NewError(topo.Interrupted, "Leadership") default: } @@ -103,14 +103,14 @@ func (mp *kubernetesMasterParticipation) WaitForMastership() (context.Context, e return lockCtx, nil } -// Stop is part of the topo.MasterParticipation interface -func (mp *kubernetesMasterParticipation) Stop() { +// Stop is part of the topo.LeaderParticipation interface +func (mp *kubernetesLeaderParticipation) Stop() { close(mp.stop) <-mp.done } -// GetCurrentMasterID is part of the topo.MasterParticipation interface -func (mp *kubernetesMasterParticipation) GetCurrentMasterID(ctx context.Context) (string, error) { +// GetCurrentLeaderID is part of the topo.LeaderParticipation interface +func (mp *kubernetesLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) { id, _, err := mp.s.Get(ctx, mp.getElectionPath()) if err != nil { // NoNode means nobody is the primary diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 74df8f81eeb..d14980d92ef 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -63,6 +63,7 @@ func (ki *KeyspaceInfo) GetServedFrom(tabletType topodatapb.TabletType) *topodat func (ki *KeyspaceInfo) CheckServedFromMigration(tabletType topodatapb.TabletType, cells []string, keyspace string, remove bool) error { // primary is a special case with a few extra checks if tabletType == topodatapb.TabletType_PRIMARY { + // TODO(deepthi): these master references will go away when we delete legacy resharding if !remove { return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot add master back to %v", ki.keyspace) } diff --git a/go/vt/topo/keyspace_test.go b/go/vt/topo/keyspace_test.go index 84cb4fef6dc..1abf873b8e0 100644 --- a/go/vt/topo/keyspace_test.go +++ b/go/vt/topo/keyspace_test.go @@ -23,9 +23,10 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -// This file tests the keyspace related object functionnalities. +// This file tests the keyspace related object functionalities. func TestUpdateServedFromMap(t *testing.T) { + // TODO(deepthi): delete this test once legacy resharding code is deleted ki := &KeyspaceInfo{ keyspace: "ks", version: nil, diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index ed4394210ef..e10d09527a7 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -25,8 +25,8 @@ import ( "vitess.io/vitess/go/vt/topo" ) -// NewMasterParticipation is part of the topo.Server interface -func (c *Conn) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) { +// NewLeaderParticipation is part of the topo.Server interface +func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { c.factory.mu.Lock() defer c.factory.mu.Unlock() @@ -36,7 +36,7 @@ func (c *Conn) NewMasterParticipation(name, id string) (topo.MasterParticipation return nil, topo.NewError(topo.NoNode, electionPath) } - return &cMasterParticipation{ + return &cLeaderParticipation{ c: c, name: name, id: id, @@ -45,16 +45,16 @@ func (c *Conn) NewMasterParticipation(name, id string) (topo.MasterParticipation }, nil } -// cMasterParticipation implements topo.MasterParticipation. +// cLeaderParticipation implements topo.LeaderParticipation. // // We use a directory (in global election path, with the name) with // ephemeral files in it, that contains the id. The oldest revision // wins the election. -type cMasterParticipation struct { +type cLeaderParticipation struct { // c is our memorytopo connection c *Conn - // name is the name of this MasterParticipation + // name is the name of this LeaderParticipation name string // id is the process's current id. @@ -67,12 +67,12 @@ type cMasterParticipation struct { done chan struct{} } -// WaitForMastership is part of the topo.MasterParticipation interface. -func (mp *cMasterParticipation) WaitForMastership() (context.Context, error) { +// WaitForLeadership is part of the topo.LeaderParticipation interface. +func (mp *cLeaderParticipation) WaitForLeadership() (context.Context, error) { // If Stop was already called, mp.done is closed, so we are interrupted. select { case <-mp.done: - return nil, topo.NewError(topo.Interrupted, "mastership") + return nil, topo.NewError(topo.Interrupted, "Leadership") default: } @@ -106,14 +106,14 @@ func (mp *cMasterParticipation) WaitForMastership() (context.Context, error) { return lockCtx, nil } -// Stop is part of the topo.MasterParticipation interface -func (mp *cMasterParticipation) Stop() { +// Stop is part of the topo.LeaderParticipation interface +func (mp *cLeaderParticipation) Stop() { close(mp.stop) <-mp.done } -// GetCurrentMasterID is part of the topo.MasterParticipation interface -func (mp *cMasterParticipation) GetCurrentMasterID(ctx context.Context) (string, error) { +// GetCurrentLeaderID is part of the topo.LeaderParticipation interface +func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) { electionPath := path.Join(electionsPath, mp.name) mp.c.factory.mu.Lock() diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index f331c1504bb..d1da7c94256 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -423,7 +423,7 @@ func (si *ShardInfo) UpdateSourceDeniedTables(ctx context.Context, tabletType to } if tabletType == topodatapb.TabletType_PRIMARY { - if err := si.updateMasterTabletControl(tc, remove, tables); err != nil { + if err := si.updatePrimaryTabletControl(tc, remove, tables); err != nil { return err } return nil @@ -442,7 +442,7 @@ func (si *ShardInfo) UpdateSourceDeniedTables(ctx context.Context, tabletType to return nil } -func (si *ShardInfo) updateMasterTabletControl(tc *topodatapb.Shard_TabletControl, remove bool, tables []string) error { +func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletControl, remove bool, tables []string) error { var newTables []string for _, table := range tables { exists := false diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 1b539d4dac0..4d007435652 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -156,13 +156,18 @@ func (st *StatsConn) Watch(ctx context.Context, filePath string) (current *Watch return st.conn.Watch(ctx, filePath) } -// NewMasterParticipation is part of the Conn interface -func (st *StatsConn) NewMasterParticipation(name, id string) (MasterParticipation, error) { +// NewLeaderParticipation is part of the Conn interface +func (st *StatsConn) NewLeaderParticipation(name, id string) (LeaderParticipation, error) { startTime := time.Now() - statsKey := []string{"NewMasterParticipation", st.cell} + // TODO(deepthi): delete after v13.0 + deprecatedKey := []string{"NewMasterParticipation", st.cell} + defer topoStatsConnTimings.Record(deprecatedKey, startTime) + + statsKey := []string{"NewLeaderParticipation", st.cell} defer topoStatsConnTimings.Record(statsKey, startTime) - res, err := st.conn.NewMasterParticipation(name, id) + res, err := st.conn.NewLeaderParticipation(name, id) if err != nil { + topoStatsConnErrors.Add(deprecatedKey, int64(1)) topoStatsConnErrors.Add(statsKey, int64(1)) return res, err } diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index 356475196ad..f9390779115 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -102,8 +102,8 @@ func (st *fakeConn) Watch(ctx context.Context, filePath string) (current *WatchD return current, changes, cancel } -// NewMasterParticipation is part of the Conn interface -func (st *fakeConn) NewMasterParticipation(name, id string) (mp MasterParticipation, err error) { +// NewLeaderParticipation is part of the Conn interface +func (st *fakeConn) NewLeaderParticipation(name, id string) (mp LeaderParticipation, err error) { if name == "error" { return mp, fmt.Errorf("dummy error") @@ -301,30 +301,46 @@ func TestStatsConnTopoWatch(t *testing.T) { } -//TestStatsConnTopoNewMasterParticipation emits stats on NewMasterParticipation -func TestStatsConnTopoNewMasterParticipation(t *testing.T) { +//TestStatsConnTopoNewLeaderParticipation emits stats on NewLeaderParticipation +func TestStatsConnTopoNewLeaderParticipation(t *testing.T) { conn := &fakeConn{} statsConn := NewStatsConn("global", conn) - statsConn.NewMasterParticipation("", "") + _, _ = statsConn.NewLeaderParticipation("", "") + // TODO(deepthi): delete "Master" stats after v13.0 timingCounts := topoStatsConnTimings.Counts()["NewMasterParticipation.global"] if got, want := timingCounts, int64(1); got != want { t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) } + timingCounts = topoStatsConnTimings.Counts()["NewLeaderParticipation.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } // error is zero before getting an error errorCount := topoStatsConnErrors.Counts()["NewMasterParticipation.global"] if got, want := errorCount, int64(0); got != want { t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) } + // error is zero before getting an error + errorCount = topoStatsConnErrors.Counts()["NewLeaderParticipation.global"] + if got, want := errorCount, int64(0); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } - statsConn.NewMasterParticipation("error", "") + _, _ = statsConn.NewLeaderParticipation("error", "") // error stats gets emitted errorCount = topoStatsConnErrors.Counts()["NewMasterParticipation.global"] if got, want := errorCount, int64(1); got != want { t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) } + + // error stats gets emitted + errorCount = topoStatsConnErrors.Counts()["NewLeaderParticipation.global"] + if got, want := errorCount, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } } //TestStatsConnTopoClose emits stats on Close diff --git a/go/vt/topo/test/election.go b/go/vt/topo/test/election.go index 5365e15e05e..bd92a452c4e 100644 --- a/go/vt/topo/test/election.go +++ b/go/vt/topo/test/election.go @@ -25,27 +25,27 @@ import ( "vitess.io/vitess/go/vt/topo" ) -func waitForMasterID(t *testing.T, mp topo.MasterParticipation, expected string) { +func waitForLeaderID(t *testing.T, mp topo.LeaderParticipation, expected string) { deadline := time.Now().Add(5 * time.Second) for { - master, err := mp.GetCurrentMasterID(context.Background()) + Leader, err := mp.GetCurrentLeaderID(context.Background()) if err != nil { - t.Fatalf("GetCurrentMasterID failed: %v", err) + t.Fatalf("GetCurrentLeaderID failed: %v", err) } - if master == expected { + if Leader == expected { return } if time.Now().After(deadline) { - t.Fatalf("GetCurrentMasterID timed out with %v, expected %v", master, expected) + t.Fatalf("GetCurrentLeaderID timed out with %v, expected %v", Leader, expected) } time.Sleep(10 * time.Millisecond) } } -// checkElection runs the tests on the MasterParticipation part of the +// checkElection runs the tests on the LeaderParticipation part of the // topo.Conn API. func checkElection(t *testing.T, ts *topo.Server) { conn, err := ts.ConnForCell(context.Background(), topo.GlobalCell) @@ -54,20 +54,20 @@ func checkElection(t *testing.T, ts *topo.Server) { } name := "testmp" - // create a new MasterParticipation + // create a new LeaderParticipation id1 := "id1" - mp1, err := conn.NewMasterParticipation(name, id1) + mp1, err := conn.NewLeaderParticipation(name, id1) if err != nil { t.Fatalf("cannot create mp1: %v", err) } // no primary yet, check name - waitForMasterID(t, mp1, "") + waitForLeaderID(t, mp1, "") // wait for id1 to be the primary - ctx1, err := mp1.WaitForMastership() + ctx1, err := mp1.WaitForLeadership() if err != nil { - t.Fatalf("mp1 cannot become master: %v", err) + t.Fatalf("mp1 cannot become Leader: %v", err) } // A lot of implementations use a toplevel directory for their elections. @@ -85,26 +85,26 @@ func checkElection(t *testing.T, ts *topo.Server) { } // get the current primary name, better be id1 - waitForMasterID(t, mp1, id1) + waitForLeaderID(t, mp1, id1) - // create a second MasterParticipation on same name + // create a second LeaderParticipation on same name id2 := "id2" - mp2, err := conn.NewMasterParticipation(name, id2) + mp2, err := conn.NewLeaderParticipation(name, id2) if err != nil { t.Fatalf("cannot create mp2: %v", err) } // wait until mp2 gets to be the primary in the background - mp2IsMaster := make(chan error) + mp2IsLeader := make(chan error) var mp2Context context.Context go func() { var err error - mp2Context, err = mp2.WaitForMastership() - mp2IsMaster <- err + mp2Context, err = mp2.WaitForLeadership() + mp2IsLeader <- err }() // ask mp2 for primary name, should get id1 - waitForMasterID(t, mp2, id1) + waitForLeaderID(t, mp2, id1) // stop mp1 mp1.Stop() @@ -119,13 +119,13 @@ func checkElection(t *testing.T, ts *topo.Server) { } // now mp2 should be primary - err = <-mp2IsMaster + err = <-mp2IsLeader if err != nil { t.Fatalf("mp2 awoke with error: %v", err) } // ask mp2 for primary name, should get id2 - waitForMasterID(t, mp2, id2) + waitForLeaderID(t, mp2, id2) // stop mp2, we're done mp2.Stop() @@ -137,13 +137,13 @@ func checkElection(t *testing.T, ts *topo.Server) { t.Fatalf("shutting down mp2 didn't close mp2Context in time") } - // At this point, we should be able to call WaitForMastership + // At this point, we should be able to call WaitForLeadership // again, and it should return topo.ErrInterrupted. Testing // this here as this is what the vtctld workflow manager loop // does, for instance. There is a go routine that runs - // WaitForMastership and needs to exit cleanly at the end. - _, err = mp2.WaitForMastership() + // WaitForLeadership and needs to exit cleanly at the end. + _, err = mp2.WaitForLeadership() if !topo.IsErrType(err, topo.Interrupted) { - t.Errorf("wrong error returned by WaitForMastership, got %v expected %v", err, topo.NewError(topo.Interrupted, "")) + t.Errorf("wrong error returned by WaitForLeadership, got %v expected %v", err, topo.NewError(topo.Interrupted, "")) } } diff --git a/go/vt/topo/zk2topo/election.go b/go/vt/topo/zk2topo/election.go index d33f934ab19..1ff2f0a9577 100644 --- a/go/vt/topo/zk2topo/election.go +++ b/go/vt/topo/zk2topo/election.go @@ -32,9 +32,9 @@ import ( // This file contains the primary election code for zk2topo.Server. -// NewMasterParticipation is part of the topo.Server interface. +// NewLeaderParticipation is part of the topo.Server interface. // We use the full path: /election/ -func (zs *Server) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) { +func (zs *Server) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { ctx := context.TODO() zkPath := path.Join(zs.root, electionsPath, name) @@ -45,7 +45,7 @@ func (zs *Server) NewMasterParticipation(name, id string) (topo.MasterParticipat return nil, convertError(err, zkPath) } - result := &zkMasterParticipation{ + result := &zkLeaderParticipation{ zs: zs, name: name, id: []byte(id), @@ -55,18 +55,18 @@ func (zs *Server) NewMasterParticipation(name, id string) (topo.MasterParticipat return result, nil } -// zkMasterParticipation implements topo.MasterParticipation. +// zkLeaderParticipation implements topo.LeaderParticipation. // // We use a directory with files created as sequence and ephemeral, // see https://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection // From the toplevel election directory, we'll have one sub-directory // per name, with the sequence files in there. Each sequence file also contains // the id. -type zkMasterParticipation struct { +type zkLeaderParticipation struct { // zs is our parent zk topo Server zs *Server - // name is the name of this MasterParticipation + // name is the name of this LeaderParticipation name string // id is the process's current id. @@ -82,12 +82,12 @@ type zkMasterParticipation struct { done chan struct{} } -// WaitForMastership is part of the topo.MasterParticipation interface. -func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) { +// WaitForLeadership is part of the topo.LeaderParticipation interface. +func (mp *zkLeaderParticipation) WaitForLeadership() (context.Context, error) { // If Stop was already called, mp.done is closed, so we are interrupted. select { case <-mp.done: - return nil, topo.NewError(topo.Interrupted, "mastership") + return nil, topo.NewError(topo.Interrupted, "Leadership") default: } @@ -98,7 +98,7 @@ func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) { select { case <-mp.stopCtx.Done(): close(mp.done) - return nil, topo.NewError(topo.Interrupted, "mastership") + return nil, topo.NewError(topo.Interrupted, "Leadership") default: } @@ -117,7 +117,7 @@ func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) { break case context.Canceled: close(mp.done) - return nil, topo.NewError(topo.Interrupted, "mastership") + return nil, topo.NewError(topo.Interrupted, "Leadership") default: // something else went wrong return nil, err @@ -125,24 +125,24 @@ func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) { // we got the lock, create our background context ctx, cancel := context.WithCancel(context.Background()) - go mp.watchMastership(ctx, mp.zs.conn, proposal, cancel) + go mp.watchLeadership(ctx, mp.zs.conn, proposal, cancel) return ctx, nil } -// watchMastership is the background go routine we run while we are the primary. +// watchLeadership is the background go routine we run while we are the primary. // We will do two things: // - watch for changes to the proposal file. If anything happens there, // it most likely means we lost the ZK session, so we want to stop // being the primary. // - wait for mp.stop. -func (mp *zkMasterParticipation) watchMastership(ctx context.Context, conn *ZkConn, proposal string, cancel context.CancelFunc) { +func (mp *zkLeaderParticipation) watchLeadership(ctx context.Context, conn *ZkConn, proposal string, cancel context.CancelFunc) { // any interruption of this routine means we're not primary any more. defer cancel() // get to work watching our own proposal _, stats, events, err := conn.GetW(ctx, proposal) if err != nil { - log.Warningf("Cannot watch proposal while being master, stopping: %v", err) + log.Warningf("Cannot watch proposal while being Leader, stopping: %v", err) return } @@ -162,15 +162,15 @@ func (mp *zkMasterParticipation) watchMastership(ctx context.Context, conn *ZkCo } } -// Stop is part of the topo.MasterParticipation interface -func (mp *zkMasterParticipation) Stop() { +// Stop is part of the topo.LeaderParticipation interface +func (mp *zkLeaderParticipation) Stop() { mp.stopCtxCancel() <-mp.done } -// GetCurrentMasterID is part of the topo.MasterParticipation interface. +// GetCurrentLeaderID is part of the topo.LeaderParticipation interface. // We just read the smallest (first) node content, that is the id. -func (mp *zkMasterParticipation) GetCurrentMasterID(ctx context.Context) (string, error) { +func (mp *zkLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) { zkPath := path.Join(mp.zs.root, electionsPath, mp.name) for { diff --git a/go/vt/vtctld/workflow.go b/go/vt/vtctld/workflow.go index 81f680ea99e..8d0841cebe0 100644 --- a/go/vt/vtctld/workflow.go +++ b/go/vt/vtctld/workflow.go @@ -92,7 +92,7 @@ func runWorkflowManagerAlone() { } func runWorkflowManagerElection(ts *topo.Server) { - var mp topo.MasterParticipation + var mp topo.LeaderParticipation // We use servenv.ListeningURL which is only populated during Run, // so we have to start this with OnRun. @@ -106,9 +106,9 @@ func runWorkflowManagerElection(ts *topo.Server) { return } - mp, err = conn.NewMasterParticipation("vtctld", servenv.ListeningURL.Host) + mp, err = conn.NewLeaderParticipation("vtctld", servenv.ListeningURL.Host) if err != nil { - log.Errorf("Cannot start MasterParticipation, disabling workflow manager: %v", err) + log.Errorf("Cannot start LeaderParticipation, disabling workflow manager: %v", err) return } @@ -116,12 +116,12 @@ func runWorkflowManagerElection(ts *topo.Server) { // primary, we can redirect traffic properly. vtctl.WorkflowManager.SetRedirectFunc(func() (string, error) { ctx := context.Background() - return mp.GetCurrentMasterID(ctx) + return mp.GetCurrentLeaderID(ctx) }) go func() { for { - ctx, err := mp.WaitForMastership() + ctx, err := mp.WaitForLeadership() switch { case err == nil: vtctl.WorkflowManager.Run(ctx)