From 47559421088bf3275b5d69d13db6887f6ecd87df Mon Sep 17 00:00:00 2001 From: Max Claus Nunes Date: Tue, 5 Dec 2023 20:47:20 -0300 Subject: [PATCH 1/5] Migrate mongo driver --- backend/backend.go | 13 +- backend/mongo/masterlock.go | 38 ++-- backend/mongo/mongodb.go | 184 ++++++++++++------- backend/mysql/masterlock.go | 15 +- backend/mysql/masterlock_integration_test.go | 19 +- cluster_test.go | 22 ++- example/example.go | 8 +- fakes/fakebackend/fake_masterlock.go | 73 +++++--- go.mod | 3 +- go.sum | 76 +++++++- master.go | 16 +- 11 files changed, 298 insertions(+), 169 deletions(-) diff --git a/backend/backend.go b/backend/backend.go index 7fa6d79..96d43dc 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -1,6 +1,9 @@ package backend -import "time" +import ( + "context" + "time" +) //go:generate counterfeiter -o ../fakes/fakebackend/fake_masterlock.go . MasterLock @@ -8,18 +11,18 @@ type MasterLock interface { // Achieve a lock to become the master. If lock is successful, the provided // MasterInfo will be filled out and recorded. The MasterInfo passed in will be filled // out with the remaining details. - Lock(info *MasterInfo) error + Lock(ctx context.Context, info *MasterInfo) error // Release the lock to relinquish the master role. This will not succeed if the // provided masterID does not match the ID of the current master. - UnLock(masterID string) error + UnLock(ctx context.Context, masterID string) error // Write a heartbeat to ensure that the master role is not lost. // If successful, the last heartbeat time is written to the passed MasterInfo - WriteHeartbeat(info *MasterInfo) error + WriteHeartbeat(ctx context.Context, info *MasterInfo) error // Get the current master status. Provides the MasterInfo of the current master. - Status() (*MasterInfo, error) + Status(ctx context.Context) (*MasterInfo, error) } type MasterInfo struct { diff --git a/backend/mongo/masterlock.go b/backend/mongo/masterlock.go index 37b50cc..bb3daae 100644 --- a/backend/mongo/masterlock.go +++ b/backend/mongo/masterlock.go @@ -1,11 +1,12 @@ package mongo import ( + "context" "fmt" "time" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" + mgo "github.com/qiniu/qmgo" + "go.mongodb.org/mongo-driver/bson" "github.com/InVisionApp/go-master/backend" ) @@ -36,7 +37,7 @@ func (m *MongoMasterInfo) toMasterInfo() *backend.MasterInfo { // Achieve a lock to become the master. If lock is successful, the provided // MasterInfo will be filled out and recorded. The MasterInfo passed in will be filled // out with the remaining details. -func (m *MongoBackend) Lock(info *backend.MasterInfo) error { +func (m *MongoBackend) Lock(ctx context.Context, info *backend.MasterInfo) error { // get the heartbeat first to see if there is one before inserting oldMMI := &MongoMasterInfo{} t := time.Now() @@ -48,12 +49,12 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error { LastHeartbeat: t, } - err := m.lock.Collection().FindId(MasterInfoID).One(oldMMI) + err := m.lock.Collection().Find(ctx, bson.M{"_id": MasterInfoID}).One(oldMMI) // an error has occurred and it is not a NotFound if err != nil { - if err == mgo.ErrNotFound { + if err == mgo.ErrNoSuchDocuments { // perform an insert - if err := m.lock.Collection().Insert(mmi); err != nil { + if _, err := m.lock.Collection().InsertOne(ctx, mmi); err != nil { return fmt.Errorf("unable to insert initial lock: %v", err) } @@ -62,9 +63,6 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error { err = fmt.Errorf("failed to fetch current master info: %v", err) - m.log.Debug("attempting to refresh sessions in case of db issues") - m.refresh() - return err } @@ -80,7 +78,7 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error { ReturnNew: true, } - if _, err := m.lock.Collection().Find(query).Apply(change, mmi); err != nil { + if err := m.lock.Collection().Find(ctx, query).Apply(change, mmi); err != nil { err = fmt.Errorf("unable to complete findModify: %v", err) return err } @@ -89,19 +87,13 @@ func (m *MongoBackend) Lock(info *backend.MasterInfo) error { } -// Force refresh all sessions (bypassing SmartCollection's auto-refresh) -// TODO: add auto-refreshing functionality across the board (GetNextJob, CompleteJob, etc.) -func (m *MongoBackend) refresh() { - m.lock.Collection().Database.Session.Refresh() -} - // Release the lock to relinquish the master role. This will not succeed if the // provided masterID does not match the ID of the current master. -func (m *MongoBackend) UnLock(masterID string) error { +func (m *MongoBackend) UnLock(ctx context.Context, masterID string) error { query := bson.M{"master_id": masterID} - if err := m.lock.Collection().Remove(query); err != nil { - if err == mgo.ErrNotFound { // not found is ok, already gone + if err := m.lock.Collection().Remove(ctx, query); err != nil { + if err == mgo.ErrNoSuchDocuments { // not found is ok, already gone return nil } @@ -113,7 +105,7 @@ func (m *MongoBackend) UnLock(masterID string) error { // Write a heartbeat to ensure that the master role is not lost. // If successful, the last heartbeat time is written to the passed MasterInfo -func (m *MongoBackend) WriteHeartbeat(info *backend.MasterInfo) error { +func (m *MongoBackend) WriteHeartbeat(ctx context.Context, info *backend.MasterInfo) error { query := bson.M{"master_id": info.MasterID} lastHeartbeat := time.Now() @@ -124,7 +116,7 @@ func (m *MongoBackend) WriteHeartbeat(info *backend.MasterInfo) error { }, } - if err := m.lock.coll.Update(query, change); err != nil { + if err := m.lock.coll.UpdateOne(ctx, query, change); err != nil { return fmt.Errorf("Unable to complete heartbeat update: %v", err) } @@ -134,9 +126,9 @@ func (m *MongoBackend) WriteHeartbeat(info *backend.MasterInfo) error { } // Get the current master status. Provides the MasterInfo of the current master. -func (m *MongoBackend) Status() (*backend.MasterInfo, error) { +func (m *MongoBackend) Status(ctx context.Context) (*backend.MasterInfo, error) { mi := &backend.MasterInfo{} - if err := m.lock.Collection().FindId(MasterInfoID).One(mi); err != nil { + if err := m.lock.Collection().Find(ctx, bson.M{"_id": MasterInfoID}).One(mi); err != nil { return nil, fmt.Errorf("failed to fetch master info: %v", err) } diff --git a/backend/mongo/mongodb.go b/backend/mongo/mongodb.go index 13713f1..87da9f7 100644 --- a/backend/mongo/mongodb.go +++ b/backend/mongo/mongodb.go @@ -1,33 +1,35 @@ package mongo import ( - "time" - + "context" "crypto/tls" "fmt" - "net" - "strings" "sync" + "time" log "github.com/InVisionApp/go-logger" "github.com/InVisionApp/go-logger/shims/logrus" - "github.com/globalsign/mgo" newrelic "github.com/newrelic/go-agent" + mgo "github.com/qiniu/qmgo" + mgooptions "github.com/qiniu/qmgo/options" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" + "go.mongodb.org/mongo-driver/mongo/readpref" + "go.mongodb.org/mongo-driver/mongo/writeconcern" ) const ( DefaultCollectionName = "masterlock" DefaultHeartbeatFrequency = time.Second * 5 - MgoSessionRefreshFreq = time.Minute * 5 - DefaultPoolLimit = 4 + DefaultPoolLimit = 4 ) type MongoBackend struct { collName string lock *SmartCollection - indices []*mgo.Index + indices []mgooptions.IndexModel heartBeatFreq time.Duration @@ -50,6 +52,7 @@ type MongoBackendConfig struct { } type MongoConnectConfig struct { + URL string Hosts []string Name string ReplicaSet string @@ -70,10 +73,10 @@ func New(cfg *MongoBackendConfig) *MongoBackend { heartBeatFreq: cfg.HeartBeatFreq, log: cfg.Logger, cfg: cfg.ConnectConfig, - indices: []*mgo.Index{ + indices: []mgooptions.IndexModel{ { - Name: "heartbeat_ttl", - Key: []string{"last_heartbeat"}, + Key: []string{"last_heartbeat"}, + IndexOptions: options.Index().SetName("heartbeat_ttl"), }, }, } @@ -97,47 +100,72 @@ func setDefaults(cfg *MongoBackendConfig) { } } -func (m *MongoBackend) Connect() error { +func (m *MongoBackend) Connect(ctx context.Context) error { m.log.Infof("Connecting to DB: %q hosts: %v with timeout %d sec and pool size %v", m.cfg.Name, m.cfg.Hosts, m.cfg.Timeout, m.cfg.PoolLimit) m.log.Debugf("DB name: '%s'; replica set: '%s'; auth source: '%s'; user: '%s'; pass len: %d; use SSL: %v", m.cfg.Name, m.cfg.ReplicaSet, m.cfg.Source, m.cfg.User, len(m.cfg.Password), m.cfg.UseSSL) - dialInfo := &mgo.DialInfo{ - Addrs: m.cfg.Hosts, - Database: m.cfg.Name, - ReplicaSetName: m.cfg.ReplicaSet, - Source: m.cfg.Source, - Username: m.cfg.User, - Password: m.cfg.Password, - Timeout: m.cfg.Timeout, - PoolLimit: m.cfg.PoolLimit, - MaxIdleTimeMS: m.cfg.MaxIdleTimeMS, + mongoClient, err := m.connectMongoClient(ctx) + if err != nil { + return err } - if m.cfg.UseSSL { - dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) { - conn, err := tls.Dial("tcp", addr.String(), &tls.Config{}) - if conn != nil { - m.log.Infof("Connection local address: %s, remote address: %s", conn.LocalAddr(), conn.RemoteAddr()) - } - return conn, err - } + db := mongoClient.Database(m.cfg.Name) + lc := db.Collection(m.collName) + + m.lock = newSmartCollection(mongoClient, db, lc, m.log) + m.lock.coll.CreateIndexes(ctx, m.indices) + + return nil +} + +func (m *MongoBackend) Disconnect(ctx context.Context) error { + return m.lock.client.Close(ctx) +} + +func (m *MongoBackend) connectMongoClient(ctx context.Context) (*mgo.Client, error) { + clientConfig := &mgo.Config{ + Uri: m.cfg.URL, } - session, err := mgo.DialWithInfo(dialInfo) + if clientConfig.Uri == "" { + // NOTE: A base URI is required by qmgo, and the rest of the options + // will be applied by it later into this URI (but won't override any param + // that might already exist in the URI). + clientConfig.Uri = "mongodb://" + strings.Join(m.cfg.Hosts, ",") + } + + client, err := mgo.NewClient(ctx, clientConfig, m.getMongoClientOptions()) if err != nil { - return fmt.Errorf("could not connect to MongoDB: %v", err) + return nil, fmt.Errorf("Could not connect to MongoDB: %v", err) } - // the lock db is special because data accuracy is more important here - // strong mode will cause all reads and writes to go to the primary mongo node - lc := session.Copy().DB(m.cfg.Name).C(m.collName) - lc.Database.Session.SetMode(mgo.Strong, false) - lc.Database.Session.SetSafe(&mgo.Safe{}) - m.lock = newSmartCollection(lc, MgoSessionRefreshFreq, m.log) - m.lock.EnsureIndexes(m.indices) + return client, nil +} - return nil +func (m *MongoBackend) getMongoClientOptions() mgooptions.ClientOptions { + clientOptions := options.Client() + clientOptions.SetHosts(m.cfg.Hosts) + clientOptions.SetReplicaSet(m.cfg.ReplicaSet) + clientOptions.SetReadPreference(readpref.Primary()) + clientOptions.SetReadConcern(readconcern.Majority()) + clientOptions.SetWriteConcern(writeconcern.New(writeconcern.WMajority())) + clientOptions.SetConnectTimeout(m.cfg.Timeout) + clientOptions.SetMaxPoolSize(uint64(m.cfg.PoolLimit)) + clientOptions.SetMaxConnIdleTime(time.Duration(m.cfg.MaxIdleTimeMS) * time.Millisecond) + clientOptions.SetAuth(options.Credential{ + Username: m.cfg.User, + Password: m.cfg.Password, + AuthSource: m.cfg.Source, + }) + + if m.cfg.UseSSL { + clientOptions.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12}) + } + + return mgooptions.ClientOptions{ + ClientOptions: clientOptions, + } } /***************** @@ -145,44 +173,53 @@ func (m *MongoBackend) Connect() error { *****************/ type SmartCollection struct { - coll *mgo.Collection - mu RWLocker - last time.Time - freq time.Duration - log log.Logger + client *mgo.Client + db *mgo.Database + coll *mgo.Collection + mu RWLocker + last time.Time + freq time.Duration + log log.Logger } -func newSmartCollection(c *mgo.Collection, freq time.Duration, log log.Logger) *SmartCollection { +func newSmartCollection(client *mgo.Client, db *mgo.Database, c *mgo.Collection, log log.Logger) *SmartCollection { return &SmartCollection{ - coll: c, - mu: &sync.RWMutex{}, - last: time.Now(), - freq: freq, - log: log, + client: client, + db: db, + coll: c, + mu: &sync.RWMutex{}, + last: time.Now(), + log: log, } } func (s *SmartCollection) Collection() *mgo.Collection { - s.mu.RLock() - elapsed := time.Since(s.last) - s.mu.RUnlock() - - if elapsed > s.freq { - s.mu.Lock() - s.last = time.Now() - s.mu.Unlock() - - // this is safe to do without a lock because it implements its own lock - s.coll.Database.Session.Refresh() - } + // NOTE: The way connections are managed have changed completely + // and there is no more session neither the need to refresh the session, + // therefore we shouldn't need this session refreshing anymore here. + // Keeping this around anyway as commented code just for future reference in case we + // notice something wrong with this change. + + // s.mu.RLock() + // elapsed := time.Since(s.last) + // s.mu.RUnlock() + // + // if elapsed > s.freq { + // s.mu.Lock() + // s.last = time.Now() + // s.mu.Unlock() + // + // // this is safe to do without a lock because it implements its own lock + // s.coll.Database.Session.Refresh() + // } return s.coll } -func (s *SmartCollection) EnsureIndexes(idxs []*mgo.Index) error { +func (s *SmartCollection) EnsureIndexes(ctx context.Context, idxs []*mgooptions.IndexModel) error { for _, idx := range idxs { s.log.Infof("Ensuring index: %s", idx.Name) - if err := s.UpsertIndex(idx); err != nil { + if err := s.UpsertIndex(ctx, idx); err != nil { return fmt.Errorf("could not ensure indexes on DB: %v", err) } } @@ -191,18 +228,23 @@ func (s *SmartCollection) EnsureIndexes(idxs []*mgo.Index) error { } // Ensure new index. If index already exists with same options, remove it and add new one. -func (s *SmartCollection) UpsertIndex(idx *mgo.Index) error { - if err := s.coll.EnsureIndex(*idx); err != nil { +func (s *SmartCollection) UpsertIndex(ctx context.Context, idx *mgooptions.IndexModel) error { + if idx.Name == nil { + return fmt.Errorf("index is missing a name: %+v", idx) + + } + + if err := s.coll.CreateOneIndex(ctx, *idx); err != nil { if strings.Contains(err.Error(), "already exists with different options") || strings.Contains(err.Error(), "Trying to create an index with same name") { s.log.Warnf("index already exists with name '%s'. replacing...", idx.Name) //drop that one - if err := s.coll.DropIndexName(idx.Name); err != nil { + if err := s.coll.DropIndex(ctx, []string{*idx.Name}); err != nil { return fmt.Errorf("failed to remove old index: %v", err) } - if err := s.coll.EnsureIndex(*idx); err != nil { + if err := s.coll.CreateOneIndex(ctx, *idx); err != nil { return fmt.Errorf("failed to add new index: %v", err) } @@ -219,8 +261,8 @@ func (s *SmartCollection) StartMongoDatastoreSegment(txn newrelic.Transaction, o return &newrelic.DatastoreSegment{ StartTime: newrelic.StartSegmentNow(txn), Product: newrelic.DatastoreMongoDB, - DatabaseName: s.coll.Database.Name, - Collection: s.coll.Name, + DatabaseName: s.db.GetDatabaseName(), + Collection: s.coll.GetCollectionName(), Operation: op, QueryParameters: query, } diff --git a/backend/mysql/masterlock.go b/backend/mysql/masterlock.go index e44b75f..8811c96 100644 --- a/backend/mysql/masterlock.go +++ b/backend/mysql/masterlock.go @@ -1,6 +1,7 @@ package mysql import ( + "context" "database/sql" "errors" "fmt" @@ -35,7 +36,7 @@ func (m *MySQLMasterInfo) toMasterInfo() *backend.MasterInfo { } } -func (m *MySQLBackend) Lock(info *backend.MasterInfo) error { +func (m *MySQLBackend) Lock(ctx context.Context, info *backend.MasterInfo) error { curInfo, ok, err := m.getMasterInfo() if err != nil { return err @@ -127,7 +128,7 @@ func (m *MySQLBackend) getMasterInfo() (*MySQLMasterInfo, bool, error) { func (m *MySQLBackend) insertMasterInfo(info *MySQLMasterInfo) error { query := fmt.Sprintf(` - INSERT INTO %s (id, master_id, version, started_at, last_heartbeat) + INSERT INTO %s (id, master_id, version, started_at, last_heartbeat) VALUES (:id, :master_id, :version, :started_at, :last_heartbeat)`, m.TableName) @@ -138,7 +139,7 @@ func (m *MySQLBackend) insertMasterInfo(info *MySQLMasterInfo) error { return nil } -func (m *MySQLBackend) UnLock(masterID string) error { +func (m *MySQLBackend) UnLock(ctx context.Context, masterID string) error { query := fmt.Sprintf(`DELETE FROM %s WHERE id = ? AND master_id = ?`, m.TableName) if _, err := m.db.Exec(query, MasterLockID, masterID); err != nil { @@ -150,11 +151,11 @@ func (m *MySQLBackend) UnLock(masterID string) error { return nil } -func (m *MySQLBackend) WriteHeartbeat(info *backend.MasterInfo) error { +func (m *MySQLBackend) WriteHeartbeat(ctx context.Context, info *backend.MasterInfo) error { query := fmt.Sprintf(` - UPDATE %s SET + UPDATE %s SET last_heartbeat = :last_heartbeat - WHERE + WHERE master_id = :master_id`, m.TableName) @@ -177,7 +178,7 @@ func (m *MySQLBackend) WriteHeartbeat(info *backend.MasterInfo) error { return nil } -func (m *MySQLBackend) Status() (*backend.MasterInfo, error) { +func (m *MySQLBackend) Status(ctx context.Context) (*backend.MasterInfo, error) { info, ok, err := m.getMasterInfo() if err != nil { return nil, err diff --git a/backend/mysql/masterlock_integration_test.go b/backend/mysql/masterlock_integration_test.go index a7b7726..77f4817 100644 --- a/backend/mysql/masterlock_integration_test.go +++ b/backend/mysql/masterlock_integration_test.go @@ -1,17 +1,20 @@ +//go:build integration // +build integration package mysql import ( + "context" "fmt" "time" - "github.com/InVisionApp/go-master" - "github.com/InVisionApp/go-master/backend" _ "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + "github.com/InVisionApp/go-master" + "github.com/InVisionApp/go-master/backend" ) var _ = Describe("masterlock-integration", func() { @@ -58,7 +61,7 @@ var _ = Describe("masterlock-integration", func() { LastHeartbeat: time.Now(), } - err := be.Lock(masterInfo) + err := be.Lock(context.TODO(), masterInfo) Expect(err).ToNot(HaveOccurred()) // Verify that masterInfo in DB server is the same as ours @@ -93,7 +96,7 @@ var _ = Describe("masterlock-integration", func() { Expect(masterInfo).ToNot(BeNil()) // Performing another lock should cause us to get a lock error - err = be.Lock(masterInfo.toMasterInfo()) + err = be.Lock(context.TODO(), masterInfo.toMasterInfo()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("found active master")) }) @@ -121,7 +124,7 @@ var _ = Describe("masterlock-integration", func() { Expect(hasMaster).To(BeTrue()) Expect(masterInfo).ToNot(BeNil()) - err = be.UnLock(masterInfo.MasterID) + err = be.UnLock(context.TODO(), masterInfo.MasterID) Expect(err).ToNot(HaveOccurred()) }) @@ -131,7 +134,7 @@ var _ = Describe("masterlock-integration", func() { err := be.db.Close() Expect(err).ToNot(HaveOccurred(), "closing db connection should not error") - err = be.UnLock("foo") + err = be.UnLock(context.TODO(), "foo") Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("failed to release master lock")) }) @@ -157,7 +160,7 @@ var _ = Describe("masterlock-integration", func() { // Give some time for master to start time.Sleep(100 * time.Millisecond) - masterInfo, err := be.Status() + masterInfo, err := be.Status(context.TODO()) Expect(err).ToNot(HaveOccurred()) Expect(masterInfo).ToNot(BeNil()) Expect(masterInfo.MasterID).ToNot(BeEmpty()) @@ -166,7 +169,7 @@ var _ = Describe("masterlock-integration", func() { Context("when there is no master", func() { It("return error", func() { - masterInfo, err := be.Status() + masterInfo, err := be.Status(context.TODO()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("no master currently")) Expect(masterInfo).To(BeNil()) diff --git a/cluster_test.go b/cluster_test.go index edc5df9..a17f28c 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1,17 +1,19 @@ package master import ( + "context" "errors" "fmt" "time" - "github.com/InVisionApp/go-logger" - "github.com/InVisionApp/go-master/backend" - "github.com/InVisionApp/go-master/fakes/fakebackend" - "github.com/InVisionApp/go-master/safe" + log "github.com/InVisionApp/go-logger" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/relistan/go-director" + + "github.com/InVisionApp/go-master/backend" + "github.com/InVisionApp/go-master/fakes/fakebackend" + "github.com/InVisionApp/go-master/safe" ) var _ = Describe("scenarios", func() { @@ -66,7 +68,7 @@ var _ = Describe("scenarios", func() { Context("start together", func() { It("first becomes master", func() { - fakeLock.LockStub = func(info *backend.MasterInfo) error { + fakeLock.LockStub = func(ctx context.Context, info *backend.MasterInfo) error { if info.MasterID == testID1 { return nil } @@ -88,7 +90,7 @@ var _ = Describe("scenarios", func() { }) It("first fails second becomes master", func() { - fakeLock.LockStub = func(info *backend.MasterInfo) error { + fakeLock.LockStub = func(ctx context.Context, info *backend.MasterInfo) error { if info.MasterID == testID2 { return nil } @@ -152,7 +154,7 @@ type testlock struct { secondMaster string } -func (t *testlock) Lock(info *backend.MasterInfo) error { +func (t *testlock) Lock(ctx context.Context, info *backend.MasterInfo) error { switch info.MasterID { case t.firstMaster: // if there is no master, first becomes master @@ -182,15 +184,15 @@ func (t *testlock) Lock(info *backend.MasterInfo) error { return errors.New("failed to become master") } -func (t *testlock) Status() (*backend.MasterInfo, error) { +func (t *testlock) Status(ctx context.Context) (*backend.MasterInfo, error) { return &backend.MasterInfo{MasterID: t.isMaster}, nil } -func (t *testlock) UnLock(masterID string) error { +func (t *testlock) UnLock(ctx context.Context, masterID string) error { return nil } -func (t *testlock) WriteHeartbeat(info *backend.MasterInfo) error { +func (t *testlock) WriteHeartbeat(ctx context.Context, info *backend.MasterInfo) error { if info.MasterID != t.isMaster { return errors.New("wrong node") } diff --git a/example/example.go b/example/example.go index 367c431..080eec7 100644 --- a/example/example.go +++ b/example/example.go @@ -1,17 +1,19 @@ package main import ( + "context" "os" "sync" "time" log "github.com/InVisionApp/go-logger" logshim "github.com/InVisionApp/go-logger/shims/logrus" + "github.com/sirupsen/logrus" + "github.com/InVisionApp/go-master" "github.com/InVisionApp/go-master/backend" "github.com/InVisionApp/go-master/backend/mongo" "github.com/InVisionApp/go-master/backend/mysql" - "github.com/sirupsen/logrus" ) var ( @@ -61,7 +63,9 @@ func MongDBBackend() backend.MasterLock { Logger: logger, }) - if err := mongoBackend.Connect(); err != nil { + ctx := context.TODO() + + if err := mongoBackend.Connect(ctx); err != nil { logger.Errorf("Unable to connect to mongo: %v", err) os.Exit(1) } diff --git a/fakes/fakebackend/fake_masterlock.go b/fakes/fakebackend/fake_masterlock.go index 4bd4d93..c522054 100644 --- a/fakes/fakebackend/fake_masterlock.go +++ b/fakes/fakebackend/fake_masterlock.go @@ -2,15 +2,17 @@ package fakebackend import ( + "context" "sync" "github.com/InVisionApp/go-master/backend" ) type FakeMasterLock struct { - LockStub func(info *backend.MasterInfo) error + LockStub func(ctx context.Context, info *backend.MasterInfo) error lockMutex sync.RWMutex lockArgsForCall []struct { + ctx context.Context info *backend.MasterInfo } lockReturns struct { @@ -19,9 +21,10 @@ type FakeMasterLock struct { lockReturnsOnCall map[int]struct { result1 error } - UnLockStub func(masterID string) error + UnLockStub func(ctx context.Context, masterID string) error unLockMutex sync.RWMutex unLockArgsForCall []struct { + ctx context.Context masterID string } unLockReturns struct { @@ -30,9 +33,10 @@ type FakeMasterLock struct { unLockReturnsOnCall map[int]struct { result1 error } - WriteHeartbeatStub func(info *backend.MasterInfo) error + WriteHeartbeatStub func(ctx context.Context, info *backend.MasterInfo) error writeHeartbeatMutex sync.RWMutex writeHeartbeatArgsForCall []struct { + ctx context.Context info *backend.MasterInfo } writeHeartbeatReturns struct { @@ -41,10 +45,12 @@ type FakeMasterLock struct { writeHeartbeatReturnsOnCall map[int]struct { result1 error } - StatusStub func() (*backend.MasterInfo, error) + StatusStub func(ctx context.Context) (*backend.MasterInfo, error) statusMutex sync.RWMutex - statusArgsForCall []struct{} - statusReturns struct { + statusArgsForCall []struct { + ctx context.Context + } + statusReturns struct { result1 *backend.MasterInfo result2 error } @@ -56,16 +62,17 @@ type FakeMasterLock struct { invocationsMutex sync.RWMutex } -func (fake *FakeMasterLock) Lock(info *backend.MasterInfo) error { +func (fake *FakeMasterLock) Lock(ctx context.Context, info *backend.MasterInfo) error { fake.lockMutex.Lock() ret, specificReturn := fake.lockReturnsOnCall[len(fake.lockArgsForCall)] fake.lockArgsForCall = append(fake.lockArgsForCall, struct { + ctx context.Context info *backend.MasterInfo - }{info}) - fake.recordInvocation("Lock", []interface{}{info}) + }{ctx, info}) + fake.recordInvocation("Lock", []interface{}{ctx, info}) fake.lockMutex.Unlock() if fake.LockStub != nil { - return fake.LockStub(info) + return fake.LockStub(ctx, info) } if specificReturn { return ret.result1 @@ -79,10 +86,10 @@ func (fake *FakeMasterLock) LockCallCount() int { return len(fake.lockArgsForCall) } -func (fake *FakeMasterLock) LockArgsForCall(i int) *backend.MasterInfo { +func (fake *FakeMasterLock) LockArgsForCall(i int) (context.Context, *backend.MasterInfo) { fake.lockMutex.RLock() defer fake.lockMutex.RUnlock() - return fake.lockArgsForCall[i].info + return fake.lockArgsForCall[i].ctx, fake.lockArgsForCall[i].info } func (fake *FakeMasterLock) LockReturns(result1 error) { @@ -104,16 +111,17 @@ func (fake *FakeMasterLock) LockReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeMasterLock) UnLock(masterID string) error { +func (fake *FakeMasterLock) UnLock(ctx context.Context, masterID string) error { fake.unLockMutex.Lock() ret, specificReturn := fake.unLockReturnsOnCall[len(fake.unLockArgsForCall)] fake.unLockArgsForCall = append(fake.unLockArgsForCall, struct { + ctx context.Context masterID string - }{masterID}) - fake.recordInvocation("UnLock", []interface{}{masterID}) + }{ctx, masterID}) + fake.recordInvocation("UnLock", []interface{}{ctx, masterID}) fake.unLockMutex.Unlock() if fake.UnLockStub != nil { - return fake.UnLockStub(masterID) + return fake.UnLockStub(ctx, masterID) } if specificReturn { return ret.result1 @@ -127,10 +135,10 @@ func (fake *FakeMasterLock) UnLockCallCount() int { return len(fake.unLockArgsForCall) } -func (fake *FakeMasterLock) UnLockArgsForCall(i int) string { +func (fake *FakeMasterLock) UnLockArgsForCall(i int) (context.Context, string) { fake.unLockMutex.RLock() defer fake.unLockMutex.RUnlock() - return fake.unLockArgsForCall[i].masterID + return fake.unLockArgsForCall[i].ctx, fake.unLockArgsForCall[i].masterID } func (fake *FakeMasterLock) UnLockReturns(result1 error) { @@ -152,16 +160,17 @@ func (fake *FakeMasterLock) UnLockReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeMasterLock) WriteHeartbeat(info *backend.MasterInfo) error { +func (fake *FakeMasterLock) WriteHeartbeat(ctx context.Context, info *backend.MasterInfo) error { fake.writeHeartbeatMutex.Lock() ret, specificReturn := fake.writeHeartbeatReturnsOnCall[len(fake.writeHeartbeatArgsForCall)] fake.writeHeartbeatArgsForCall = append(fake.writeHeartbeatArgsForCall, struct { + ctx context.Context info *backend.MasterInfo - }{info}) - fake.recordInvocation("WriteHeartbeat", []interface{}{info}) + }{ctx, info}) + fake.recordInvocation("WriteHeartbeat", []interface{}{ctx, info}) fake.writeHeartbeatMutex.Unlock() if fake.WriteHeartbeatStub != nil { - return fake.WriteHeartbeatStub(info) + return fake.WriteHeartbeatStub(ctx, info) } if specificReturn { return ret.result1 @@ -175,10 +184,10 @@ func (fake *FakeMasterLock) WriteHeartbeatCallCount() int { return len(fake.writeHeartbeatArgsForCall) } -func (fake *FakeMasterLock) WriteHeartbeatArgsForCall(i int) *backend.MasterInfo { +func (fake *FakeMasterLock) WriteHeartbeatArgsForCall(i int) (context.Context, *backend.MasterInfo) { fake.writeHeartbeatMutex.RLock() defer fake.writeHeartbeatMutex.RUnlock() - return fake.writeHeartbeatArgsForCall[i].info + return fake.writeHeartbeatArgsForCall[i].ctx, fake.writeHeartbeatArgsForCall[i].info } func (fake *FakeMasterLock) WriteHeartbeatReturns(result1 error) { @@ -200,14 +209,16 @@ func (fake *FakeMasterLock) WriteHeartbeatReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeMasterLock) Status() (*backend.MasterInfo, error) { +func (fake *FakeMasterLock) Status(ctx context.Context) (*backend.MasterInfo, error) { fake.statusMutex.Lock() ret, specificReturn := fake.statusReturnsOnCall[len(fake.statusArgsForCall)] - fake.statusArgsForCall = append(fake.statusArgsForCall, struct{}{}) - fake.recordInvocation("Status", []interface{}{}) + fake.statusArgsForCall = append(fake.statusArgsForCall, struct { + ctx context.Context + }{ctx}) + fake.recordInvocation("Status", []interface{}{ctx}) fake.statusMutex.Unlock() if fake.StatusStub != nil { - return fake.StatusStub() + return fake.StatusStub(ctx) } if specificReturn { return ret.result1, ret.result2 @@ -221,6 +232,12 @@ func (fake *FakeMasterLock) StatusCallCount() int { return len(fake.statusArgsForCall) } +func (fake *FakeMasterLock) StatusArgsForCall(i int) context.Context { + fake.statusMutex.RLock() + defer fake.statusMutex.RUnlock() + return fake.statusArgsForCall[i].ctx +} + func (fake *FakeMasterLock) StatusReturns(result1 *backend.MasterInfo, result2 error) { fake.StatusStub = nil fake.statusReturns = struct { diff --git a/go.mod b/go.mod index d46454d..2bd1e95 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,14 @@ go 1.14 require ( github.com/InVisionApp/go-logger v1.0.1 - github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 github.com/go-sql-driver/mysql v1.5.0 github.com/gofrs/uuid v3.3.0+incompatible github.com/jmoiron/sqlx v1.2.0 github.com/newrelic/go-agent v3.4.0+incompatible github.com/onsi/ginkgo v1.12.2 github.com/onsi/gomega v1.10.1 + github.com/qiniu/qmgo v1.1.8 github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d github.com/sirupsen/logrus v1.6.0 + go.mongodb.org/mongo-driver v1.11.6 ) diff --git a/go.sum b/go.sum index 35d20b7..605a242 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,19 @@ github.com/InVisionApp/go-logger v1.0.1 h1:WFL19PViM1mHUmUWfsv5zMo379KSWj2MRmBlzMFDRiE= github.com/InVisionApp/go-logger v1.0.1/go.mod h1:+cGTDSn+P8105aZkeOfIhdd7vFO5X1afUHcjvanY0L8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 h1:DujepqpGd1hyOd7aW59XpK7Qymp8iy83xq74fLr21is= -github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE= +github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= @@ -20,10 +27,13 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -31,12 +41,23 @@ github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/newrelic/go-agent v3.4.0+incompatible h1:GhUhNLDdR3ETfUVJAN/czXlqRTcgbPs6U02jYhf15rg= github.com/newrelic/go-agent v3.4.0+incompatible/go.mod h1:a8Fv1b/fYhFSReoTU6HDkTYIMZeSVNffmoS726Y0LzQ= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= @@ -48,8 +69,12 @@ github.com/onsi/ginkgo v1.12.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/qiniu/qmgo v1.1.8 h1:E64M+P59aqQpXKI24ClVtluYkLaJLkkeD2hTVhrdMks= +github.com/qiniu/qmgo v1.1.8/go.mod h1:QvZkzWNEv0buWPx0kdZsSs6URhESVubacxFPlITmvB8= github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d h1:NWE6gufaNLgqs6VUzsqXkogQkMEcZxQjdRTSbf79NCA= github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d/go.mod h1:zxI04y3OTmbrx/ef0ahmkEy9/eBLLseHAjy6M5iKsws= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= @@ -58,26 +83,56 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +go.mongodb.org/mongo-driver v1.11.6 h1:XM7G6PjiGAO5betLF13BIa5TlLUUE3uJ/2Ox3Lz1K+o= +go.mongodb.org/mongo-driver v1.11.6/go.mod h1:G9TgswdsWjX4tmDA5zfs2+6AEPpYJwqblyjsfuh8oXY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= @@ -89,11 +144,16 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/master.go b/master.go index 1ed98ad..dbdc90c 100644 --- a/master.go +++ b/master.go @@ -1,6 +1,7 @@ package master import ( + "context" "fmt" "time" @@ -9,6 +10,7 @@ import ( "github.com/relistan/go-director" "github.com/InVisionApp/go-logger/shims/logrus" + "github.com/InVisionApp/go-master/backend" "github.com/InVisionApp/go-master/safe" ) @@ -145,8 +147,9 @@ func (m *master) Start() error { func (m *master) runHeartBeat() { m.heartBeat.Loop(func() error { if !m.isMaster.Val() { + ctx := context.TODO() // TODO: identify the proper context. // attempt to become the master - if m.becomeMaster() { + if m.becomeMaster(ctx) { // became the master if m.startHook != nil { // run the start hook in a routine so it doesn't block @@ -159,9 +162,9 @@ func (m *master) runHeartBeat() { } // I am the master! - // run the heartbeat - if err := m.lock.WriteHeartbeat(m.info); err != nil { + ctx := context.TODO() // TODO: identify the proper context. + if err := m.lock.WriteHeartbeat(ctx, m.info); err != nil { m.sendError(fmt.Errorf("failed to write heartbeat: %v", err)) // if heartbeat fails or master lock lost, stop the tasks m.cleanupMaster() @@ -177,13 +180,13 @@ func (m *master) runHeartBeat() { }) } -func (m *master) becomeMaster() bool { +func (m *master) becomeMaster(ctx context.Context) bool { mi := &backend.MasterInfo{ MasterID: m.uuid, Version: m.version, } - if err := m.lock.Lock(mi); err != nil { + if err := m.lock.Lock(ctx, mi); err != nil { // The heartbeat tries to become master every second. Logging an error here // (at error level) is a constant stream. m.log.Debugf("failed to acquire lock while becoming master: %v", err) @@ -225,7 +228,8 @@ func (m *master) Stop() error { // attempt a release on the backend // this is a best effort. The heartbeat loop has been stopped, // so the lock will be lost eventually either way - if err := m.lock.UnLock(m.uuid); err != nil { + ctx := context.TODO() // TODO: identify the proper context. + if err := m.lock.UnLock(ctx, m.uuid); err != nil { m.sendError(fmt.Errorf("failed to release lock on master backend: %v", err)) } From 797adc4bbedff9c93cd94047c7a8e33d0987ebb6 Mon Sep 17 00:00:00 2001 From: Max Claus Nunes Date: Wed, 6 Dec 2023 20:15:21 -0300 Subject: [PATCH 2/5] Add context to hooks --- example/example.go | 4 ++-- master.go | 13 +++++++------ master_test.go | 22 ++++++++++++---------- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/example/example.go b/example/example.go index 080eec7..29ce5bb 100644 --- a/example/example.go +++ b/example/example.go @@ -93,10 +93,10 @@ func MySQLBackend() backend.MasterLock { return mysqlBackend } -func startHook() { +func startHook(ctx context.Context) { logger.Info("Became master") } -func stopHook() { +func stopHook(ctx context.Context) { logger.Info("Lost master status") } diff --git a/master.go b/master.go index dbdc90c..6d7f176 100644 --- a/master.go +++ b/master.go @@ -40,8 +40,8 @@ type master struct { heartBeatFreq time.Duration - startHook func() - stopHook func() + startHook func(context.Context) + stopHook func(context.Context) heartBeat director.Looper @@ -61,12 +61,12 @@ type MasterConfig struct { // Optional: StartHook func is called as soon as a master lock is achieved. // It is the callback to signal becoming a master - StartHook func() + StartHook func(context.Context) // Optional: StopHook func is called when the master lock is lost // It is the callback to signal that it is no longer the master. // It is not called when the master is stopped manually - StopHook func() + StopHook func(context.Context) // Optional: Error channel to receive go-master related error messages Err chan error @@ -153,7 +153,7 @@ func (m *master) runHeartBeat() { // became the master if m.startHook != nil { // run the start hook in a routine so it doesn't block - go m.startHook() + go m.startHook(ctx) } } @@ -207,8 +207,9 @@ func (m *master) cleanupMaster() { m.info = &backend.MasterInfo{} if m.stopHook != nil { + ctx := context.TODO() // TODO: identify the proper context. // run hook in routine to avoid blocking - go m.stopHook() + go m.stopHook(ctx) } } diff --git a/master_test.go b/master_test.go index 8729b7d..7fc00b6 100644 --- a/master_test.go +++ b/master_test.go @@ -1,17 +1,19 @@ package master import ( + "context" "errors" "time" - "github.com/InVisionApp/go-logger" + log "github.com/InVisionApp/go-logger" "github.com/InVisionApp/go-logger/shims/testlog" - "github.com/InVisionApp/go-master/backend" - "github.com/InVisionApp/go-master/fakes/fakebackend" - "github.com/InVisionApp/go-master/safe" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/relistan/go-director" + + "github.com/InVisionApp/go-master/backend" + "github.com/InVisionApp/go-master/fakes/fakebackend" + "github.com/InVisionApp/go-master/safe" ) var _ = Describe("New", func() { @@ -64,12 +66,12 @@ var _ = Describe("New", func() { }) Context("hooks provided", func() { - var startfunc func() - var stopfunc func() + var startfunc func(ctx context.Context) + var stopfunc func(ctx context.Context) BeforeEach(func() { - startfunc = func() {} - stopfunc = func() {} + startfunc = func(ctx context.Context) {} + stopfunc = func(ctx context.Context) {} mCfg.StartHook = startfunc mCfg.StopHook = stopfunc }) @@ -403,10 +405,10 @@ type testHooks struct { stopCalls int } -func (t *testHooks) startHook() { +func (t *testHooks) startHook(ctx context.Context) { t.startCalls++ } -func (t *testHooks) stopHook() { +func (t *testHooks) stopHook(ctx context.Context) { t.stopCalls++ } From a5534f59a24c9073b6bbc0a6a97afa836c1cad43 Mon Sep 17 00:00:00 2001 From: Max Claus Nunes Date: Sat, 16 Dec 2023 09:14:40 -0300 Subject: [PATCH 3/5] Make credentials optional --- backend/mongo/mongodb.go | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/backend/mongo/mongodb.go b/backend/mongo/mongodb.go index 87da9f7..a1f8574 100644 --- a/backend/mongo/mongodb.go +++ b/backend/mongo/mongodb.go @@ -101,7 +101,13 @@ func setDefaults(cfg *MongoBackendConfig) { } func (m *MongoBackend) Connect(ctx context.Context) error { - m.log.Infof("Connecting to DB: %q hosts: %v with timeout %d sec and pool size %v", m.cfg.Name, m.cfg.Hosts, m.cfg.Timeout, m.cfg.PoolLimit) + m.log.Infof( + "Connecting to DB: %q hosts: %v with timeout %d sec and pool size %v", + m.cfg.Name, + m.cfg.Hosts, + m.cfg.Timeout, + m.cfg.PoolLimit, + ) m.log.Debugf("DB name: '%s'; replica set: '%s'; auth source: '%s'; user: '%s'; pass len: %d; use SSL: %v", m.cfg.Name, m.cfg.ReplicaSet, m.cfg.Source, m.cfg.User, len(m.cfg.Password), m.cfg.UseSSL) @@ -153,11 +159,14 @@ func (m *MongoBackend) getMongoClientOptions() mgooptions.ClientOptions { clientOptions.SetConnectTimeout(m.cfg.Timeout) clientOptions.SetMaxPoolSize(uint64(m.cfg.PoolLimit)) clientOptions.SetMaxConnIdleTime(time.Duration(m.cfg.MaxIdleTimeMS) * time.Millisecond) - clientOptions.SetAuth(options.Credential{ - Username: m.cfg.User, - Password: m.cfg.Password, - AuthSource: m.cfg.Source, - }) + + if m.cfg.User != "" || m.cfg.Password != "" || m.cfg.Source != "" { + clientOptions.SetAuth(options.Credential{ + Username: m.cfg.User, + Password: m.cfg.Password, + AuthSource: m.cfg.Source, + }) + } if m.cfg.UseSSL { clientOptions.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12}) @@ -231,7 +240,6 @@ func (s *SmartCollection) EnsureIndexes(ctx context.Context, idxs []*mgooptions. func (s *SmartCollection) UpsertIndex(ctx context.Context, idx *mgooptions.IndexModel) error { if idx.Name == nil { return fmt.Errorf("index is missing a name: %+v", idx) - } if err := s.coll.CreateOneIndex(ctx, *idx); err != nil { @@ -239,7 +247,7 @@ func (s *SmartCollection) UpsertIndex(ctx context.Context, idx *mgooptions.Index strings.Contains(err.Error(), "Trying to create an index with same name") { s.log.Warnf("index already exists with name '%s'. replacing...", idx.Name) - //drop that one + // drop that one if err := s.coll.DropIndex(ctx, []string{*idx.Name}); err != nil { return fmt.Errorf("failed to remove old index: %v", err) } @@ -257,7 +265,11 @@ func (s *SmartCollection) UpsertIndex(ctx context.Context, idx *mgooptions.Index return nil } -func (s *SmartCollection) StartMongoDatastoreSegment(txn newrelic.Transaction, op string, query map[string]interface{}) *newrelic.DatastoreSegment { +func (s *SmartCollection) StartMongoDatastoreSegment( + txn newrelic.Transaction, + op string, + query map[string]interface{}, +) *newrelic.DatastoreSegment { return &newrelic.DatastoreSegment{ StartTime: newrelic.StartSegmentNow(txn), Product: newrelic.DatastoreMongoDB, From 1e45037bd28ba4a350662279a4e48bb19109fd3d Mon Sep 17 00:00:00 2001 From: Max Claus Nunes Date: Wed, 20 Dec 2023 19:23:22 -0300 Subject: [PATCH 4/5] Ignore mongo auth source as requirement --- backend/mongo/mongodb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/mongo/mongodb.go b/backend/mongo/mongodb.go index a1f8574..772f2bc 100644 --- a/backend/mongo/mongodb.go +++ b/backend/mongo/mongodb.go @@ -160,7 +160,7 @@ func (m *MongoBackend) getMongoClientOptions() mgooptions.ClientOptions { clientOptions.SetMaxPoolSize(uint64(m.cfg.PoolLimit)) clientOptions.SetMaxConnIdleTime(time.Duration(m.cfg.MaxIdleTimeMS) * time.Millisecond) - if m.cfg.User != "" || m.cfg.Password != "" || m.cfg.Source != "" { + if m.cfg.User != "" || m.cfg.Password != "" { clientOptions.SetAuth(options.Credential{ Username: m.cfg.User, Password: m.cfg.Password, From 74375618b833b0cb3f1bc3c3ed94ea2702a7b463 Mon Sep 17 00:00:00 2001 From: Max Claus Nunes Date: Mon, 13 May 2024 16:56:52 -0300 Subject: [PATCH 5/5] Fix: update document must contain key beginning with '$' --- backend/mongo/masterlock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/mongo/masterlock.go b/backend/mongo/masterlock.go index bb3daae..4270f65 100644 --- a/backend/mongo/masterlock.go +++ b/backend/mongo/masterlock.go @@ -74,7 +74,7 @@ func (m *MongoBackend) Lock(ctx context.Context, info *backend.MasterInfo) error query := bson.M{"master_id": oldMMI.MasterID, "last_heartbeat": oldMMI.LastHeartbeat} change := mgo.Change{ - Update: mmi, + Update: bson.M{"$set": mmi}, ReturnNew: true, }