From b37873faa94b5bb749996c32248d581c5b3fc247 Mon Sep 17 00:00:00 2001 From: Kevin Park Date: Tue, 2 May 2023 21:20:34 +0900 Subject: [PATCH 1/5] Implement mongodb based leader election package with housekeeping integration --- .../yorkie-cluster/templates/deployment.yaml | 4 + cmd/yorkie/server.go | 14 ++ server/backend/backend.go | 26 +++- server/backend/config.go | 6 +- server/backend/database/database.go | 23 ++++ server/backend/database/memory/database.go | 31 +++++ server/backend/database/mongo/client.go | 67 ++++++++++ server/backend/database/mongo/indexes.go | 15 +++ server/backend/election/election.go | 123 ++++++++++++++++++ server/backend/housekeeping/housekeeping.go | 40 +++++- server/config.go | 5 +- server/config.sample.yml | 6 +- server/rpc/server_test.go | 1 + test/helper/helper.go | 2 + 14 files changed, 355 insertions(+), 8 deletions(-) create mode 100644 server/backend/election/election.go diff --git a/build/charts/yorkie-cluster/templates/deployment.yaml b/build/charts/yorkie-cluster/templates/deployment.yaml index f3d31f518..5d21cc9f4 100644 --- a/build/charts/yorkie-cluster/templates/deployment.yaml +++ b/build/charts/yorkie-cluster/templates/deployment.yaml @@ -41,6 +41,10 @@ spec: "--mongo-connection-uri", "mongodb://{{ .Values.yorkie.args.dbUrl }}:{{ .Values.yorkie.args.dbPort }}/yorkie-meta", "--enable-pprof", + "--leader-election", + "true", + "--client-deactivate-threshold", + "30s", ] ports: - containerPort: {{ .Values.yorkie.ports.rpcPort }} diff --git a/cmd/yorkie/server.go b/cmd/yorkie/server.go index 583188698..35ef1c52a 100644 --- a/cmd/yorkie/server.go +++ b/cmd/yorkie/server.go @@ -40,6 +40,7 @@ var ( adminTokenDuration time.Duration housekeepingInterval time.Duration + housekeepingLeaseDuration time.Duration clientDeactivateThreshold string mongoConnectionURI string @@ -74,6 +75,7 @@ func newServerCmd() *cobra.Command { conf.Backend.AuthWebhookCacheUnauthTTL = authWebhookCacheUnauthTTL.String() conf.Housekeeping.Interval = housekeepingInterval.String() + conf.Housekeeping.LeaseDuration = housekeepingLeaseDuration.String() if mongoConnectionURI != "" { conf.Mongo = &mongo.Config{ @@ -227,6 +229,12 @@ func init() { server.DefaultHousekeepingCandidatesLimitPerProject, "candidates limit per project for a single housekeeping run", ) + cmd.Flags().DurationVar( + &housekeepingLeaseDuration, + "housekeeping-lease-duration", + server.DefaultHousekeepingLeaseDuration, + "lease duration for a leader election in housekeeping", + ) cmd.Flags().StringVar( &mongoConnectionURI, "mongo-connection-uri", @@ -343,6 +351,12 @@ func init() { server.DefaultHostname, "Yorkie Server Hostname", ) + cmd.Flags().BoolVar( + &conf.Backend.LeaderElection, + "leader-election", + server.DefaultLeaderElection, + "Enable leader election to run tasks only on the leader.", + ) rootCmd.AddCommand(cmd) } diff --git a/server/backend/backend.go b/server/backend/backend.go index 19c5b19b3..64ef06971 100644 --- a/server/backend/backend.go +++ b/server/backend/backend.go @@ -33,6 +33,7 @@ import ( "github.com/yorkie-team/yorkie/server/backend/database" memdb "github.com/yorkie-team/yorkie/server/backend/database/memory" "github.com/yorkie-team/yorkie/server/backend/database/mongo" + "github.com/yorkie-team/yorkie/server/backend/election" "github.com/yorkie-team/yorkie/server/backend/housekeeping" "github.com/yorkie-team/yorkie/server/backend/sync" memsync "github.com/yorkie-team/yorkie/server/backend/sync/memory" @@ -48,6 +49,7 @@ type Backend struct { DB database.Database Coordinator sync.Coordinator + Elector *election.Elector Metrics *prometheus.Metrics Background *background.Background Housekeeping *housekeeping.Housekeeping @@ -64,11 +66,12 @@ func New( ) (*Backend, error) { hostname := conf.Hostname if hostname == "" { - hostname, err := os.Hostname() + osHostname, err := os.Hostname() if err != nil { return nil, fmt.Errorf("os.Hostname: %w", err) } - conf.Hostname = hostname + conf.Hostname = osHostname + hostname = osHostname } serverInfo := &sync.ServerInfo{ @@ -98,6 +101,14 @@ func New( // will need to distribute workloads of a document. coordinator := memsync.NewCoordinator(serverInfo) + var elector *election.Elector + if conf.LeaderElection { + elector, err = election.New(hostname, db) + if err != nil { + return nil, err + } + } + authWebhookCache, err := cache.NewLRUExpireCache[string, *types.AuthWebhookResponse](conf.AuthWebhookCacheSize) if err != nil { return nil, err @@ -107,6 +118,7 @@ func New( housekeepingConf, db, coordinator, + elector, ) if err != nil { return nil, err @@ -118,8 +130,9 @@ func New( } logging.DefaultLogger().Infof( - "backend created: id: %s, rpc: %s", + "backend created: id: %s, rpc: %s, db: %s", serverInfo.ID, + serverInfo.Hostname, dbInfo, ) @@ -141,6 +154,7 @@ func New( Metrics: metrics, DB: db, Coordinator: coordinator, + Elector: elector, Housekeeping: keeping, AuthWebhookCache: authWebhookCache, @@ -155,6 +169,12 @@ func (b *Backend) Shutdown() error { return err } + if b.Config.LeaderElection { + if err := b.Elector.Stop(); err != nil { + return err + } + } + if err := b.Coordinator.Close(); err != nil { logging.DefaultLogger().Error(err) } diff --git a/server/backend/config.go b/server/backend/config.go index 038b861c9..957c78b26 100644 --- a/server/backend/config.go +++ b/server/backend/config.go @@ -42,7 +42,7 @@ type Config struct { // we are using server as single-tenant mode, this should be set to true. UseDefaultProject bool `yaml:"UseDefaultProject"` - // ClientDeactivateThreshold is deactivate threshold of clients in specific project for housekeeping. + // ClientDeactivateThreshold is deactivation threshold of clients in specific project for housekeeping. ClientDeactivateThreshold string `yaml:"ClientDeactivateThreshold"` // SnapshotThreshold is the threshold that determines if changes should be @@ -72,6 +72,10 @@ type Config struct { // Hostname is yorkie server hostname. hostname is used by metrics. Hostname string `yaml:"Hostname"` + + // LeaderElection is the flag to enable leader election for performing housekeeping only on leader node. + // Server should be deployed in Kubernetes platform to enable this flag. + LeaderElection bool `yaml:"LeaderElection"` } // Validate validates this config. diff --git a/server/backend/database/database.go b/server/backend/database/database.go index 2d4922fee..543df74ed 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -20,6 +20,7 @@ package database import ( "context" "errors" + gotime "time" "github.com/yorkie-team/yorkie/api/types" "github.com/yorkie-team/yorkie/pkg/document" @@ -253,4 +254,26 @@ type Database interface { docID types.ID, excludeClientID types.ID, ) (bool, error) + + // CreateTTLIndex creates a TTL index. + CreateTTLIndex( + ctx context.Context, + leaseDuration gotime.Duration, + ) error + + // TryToAcquireLeaderLease tries to acquire the leader lease. + TryToAcquireLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, + ) (bool, error) + + // RenewLeaderLease renews the leader lease. + RenewLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, + ) error } diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index f77bdf7d6..cd32fa647 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -1308,6 +1308,37 @@ func (d *DB) findTicketByServerSeq( ), nil } +// CreateTTLIndex creates a TTL index. +func (d *DB) CreateTTLIndex( + ctx context.Context, + leaseDuration gotime.Duration, +) error { + //TODO implement me + panic("implement me") +} + +// TryToAcquireLeaderLease tries to acquire the leader lease. +func (d *DB) TryToAcquireLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, +) (bool, error) { + //TODO implement me + panic("implement me") +} + +// RenewLeaderLease renews the leader lease. +func (d *DB) RenewLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, +) error { + //TODO implement me + panic("implement me") +} + func newID() types.ID { return types.ID(primitive.NewObjectID().Hex()) } diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index 25feb5e3e..af8a1b956 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -1397,6 +1397,73 @@ func (c *Client) findTicketByServerSeq( ), nil } +// CreateTTLIndex creates a TTL index. +func (c *Client) CreateTTLIndex( + ctx context.Context, + leaseDuration gotime.Duration, +) error { + ttlIndexModel := mongo.IndexModel{ + Keys: bson.M{"lease_expire_at": 1}, + Options: options.Index().SetExpireAfterSeconds(int32(leaseDuration.Seconds())), + } + _, err := c.collection(colElections).Indexes().CreateOne(ctx, ttlIndexModel) + if err != nil { + return err + } + + return nil +} + +// TryToAcquireLeaderLease tries to acquire the leader lease. +func (c *Client) TryToAcquireLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, +) (bool, error) { + updated := false + result, err := c.collection(colElections).UpdateOne(ctx, bson.M{ + "election_id": leaseLockName, + "lease_expire_at": bson.M{"$lt": gotime.Now()}, + }, bson.M{ + "$set": bson.M{ + "leader_id": hostname, + "lease_expire_at": gotime.Now().Add(leaseDuration), + }}, + options.Update().SetUpsert(true), + ) + if err != nil { + return false, err + } + + if result.ModifiedCount == 1 || result.UpsertedCount == 1 { + updated = true + } + return updated, nil +} + +// RenewLeaderLease renews the leader lease. +func (c *Client) RenewLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, +) error { + _, err := c.collection(colElections).UpdateOne(ctx, bson.M{ + "election_id": leaseLockName, + "leader_id": hostname, + }, bson.M{ + "$set": bson.M{ + "lease_expire_at": gotime.Now().Add(leaseDuration), + }}, + ) + if err != nil { + return err + } + + return nil +} + func (c *Client) collection( name string, opts ...*options.CollectionOptions, diff --git a/server/backend/database/mongo/indexes.go b/server/backend/database/mongo/indexes.go index 953659b3f..598653e38 100644 --- a/server/backend/database/mongo/indexes.go +++ b/server/backend/database/mongo/indexes.go @@ -33,6 +33,7 @@ const ( colChanges = "changes" colSnapshots = "snapshots" colSyncedSeqs = "syncedseqs" + colElections = "elections" ) type collectionInfo struct { @@ -131,6 +132,20 @@ var collectionInfos = []collectionInfo{ {Key: "actor_id", Value: bsonx.Int32(1)}, }, }}, + }, { + name: colElections, + indexes: []mongo.IndexModel{{ + Keys: bsonx.Doc{ + {Key: "election_id", Value: bsonx.Int32(1)}, + }, + Options: options.Index().SetUnique(true), + }, { + Keys: bsonx.Doc{ + {Key: "election_id", Value: bsonx.Int32(1)}, + {Key: "leader_id", Value: bsonx.Int32(1)}, + {Key: "lease_expire_at", Value: bsonx.Int32(1)}, + }, + }}, }, } diff --git a/server/backend/election/election.go b/server/backend/election/election.go new file mode 100644 index 000000000..173386785 --- /dev/null +++ b/server/backend/election/election.go @@ -0,0 +1,123 @@ +/* + * Copyright 2023 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package election provides leader election between server cluster. It is used to +// elect leader among server cluster and run tasks only on the leader. +package election + +import ( + "context" + "time" + + "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/logging" +) + +// Elector is responsible for leader election between server cluster. +type Elector struct { + database database.Database + hostname string + + ctx context.Context + cancelFunc context.CancelFunc +} + +// New creates a new elector instance. +func New( + hostname string, + database database.Database, +) (*Elector, error) { + ctx, cancelFunc := context.WithCancel(context.Background()) + + return &Elector{ + database: database, + hostname: hostname, + + ctx: ctx, + cancelFunc: cancelFunc, + }, nil +} + +// Start starts leader election. +func (e *Elector) Start( + leaseLockName string, + leaseDuration time.Duration, + onStartLeading func(ctx context.Context), + onStoppedLeading func(), +) error { + if err := e.database.CreateTTLIndex(context.Background(), leaseDuration); err != nil { + return err + } + + go e.Run(leaseLockName, leaseDuration, onStartLeading, onStoppedLeading) + return nil +} + +// Stop stops leader election. +func (e *Elector) Stop() error { + e.cancelFunc() + + return nil +} + +// Run starts leader election loop. +// Run will not return before leader election loop is stopped by ctx, +// or it has stopped holding the leader lease +func (e *Elector) Run( + leaseLockName string, + leaseDuration time.Duration, + onStartLeading func(ctx context.Context), + onStoppedLeading func(), +) { + for { + ctx := context.Background() + acquired, err := e.database.TryToAcquireLeaderLease(ctx, e.hostname, leaseLockName, leaseDuration) + if err != nil { + continue + } + + if acquired { + go onStartLeading(ctx) + logging.From(ctx).Infof( + "leader elected: %s", e.hostname, + ) + + for { + err = e.database.RenewLeaderLease(ctx, e.hostname, leaseLockName, leaseDuration) + if err != nil { + break + } + + select { + case <-time.After(leaseDuration / 2): + case <-e.ctx.Done(): + return + } + } + } else { + onStoppedLeading() + logging.From(ctx).Infof( + "leader lost: %s", e.hostname, + ) + } + + select { + case <-time.After(leaseDuration): + case <-e.ctx.Done(): + return + } + } +} diff --git a/server/backend/housekeeping/housekeeping.go b/server/backend/housekeeping/housekeeping.go index d08abcbdc..625e0f193 100644 --- a/server/backend/housekeeping/housekeeping.go +++ b/server/backend/housekeeping/housekeeping.go @@ -25,6 +25,7 @@ import ( "time" "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/backend/election" "github.com/yorkie-team/yorkie/server/backend/sync" "github.com/yorkie-team/yorkie/server/clients" "github.com/yorkie-team/yorkie/server/logging" @@ -32,6 +33,7 @@ import ( const ( deactivateCandidatesKey = "housekeeping/deactivateCandidates" + lockLeaseName = "housekeeping" ) // Config is the configuration for the housekeeping service. @@ -41,6 +43,9 @@ type Config struct { // CandidatesLimitPerProject is the maximum number of candidates to be returned per project. CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"` + + // LeaseDuration is the duration that non-leader candidates will wait to force acquire leadership. + LeaseDuration string `yaml:"LeaseDuration"` } // Validate validates the configuration. @@ -53,6 +58,14 @@ func (c *Config) Validate() error { ) } + if _, err := time.ParseDuration(c.LeaseDuration); err != nil { + return fmt.Errorf( + `invalid argument %s for "--housekeeping-lease-duration" flag: %w`, + c.LeaseDuration, + err, + ) + } + return nil } @@ -62,9 +75,11 @@ func (c *Config) Validate() error { type Housekeeping struct { database database.Database coordinator sync.Coordinator + elector *election.Elector interval time.Duration candidatesLimitPerProject int + leaseDuration time.Duration ctx context.Context cancelFunc context.CancelFunc @@ -75,8 +90,9 @@ func Start( conf *Config, database database.Database, coordinator sync.Coordinator, + elector *election.Elector, ) (*Housekeeping, error) { - h, err := New(conf, database, coordinator) + h, err := New(conf, database, coordinator, elector) if err != nil { return nil, err } @@ -92,20 +108,27 @@ func New( conf *Config, database database.Database, coordinator sync.Coordinator, + elector *election.Elector, ) (*Housekeeping, error) { interval, err := time.ParseDuration(conf.Interval) if err != nil { return nil, fmt.Errorf("parse interval %s: %w", conf.Interval, err) } + leaseDuration, err := time.ParseDuration(conf.LeaseDuration) + if err != nil { + return nil, fmt.Errorf("parse lease duration %s: %w", conf.LeaseDuration, err) + } ctx, cancelFunc := context.WithCancel(context.Background()) return &Housekeeping{ database: database, coordinator: coordinator, + elector: elector, interval: interval, candidatesLimitPerProject: conf.CandidatesLimitPerProject, + leaseDuration: leaseDuration, ctx: ctx, cancelFunc: cancelFunc, @@ -114,7 +137,20 @@ func New( // Start starts the housekeeping service. func (h *Housekeeping) Start() error { - go h.run() + if h.elector != nil { + err := h.elector.Start( + lockLeaseName, + h.leaseDuration, + func(ctx context.Context) { h.run() }, + func() { h.cancelFunc() }, + ) + if err != nil { + return err + } + } else { + go h.run() + } + return nil } diff --git a/server/config.go b/server/config.go index c875102af..309bf5791 100644 --- a/server/config.go +++ b/server/config.go @@ -42,6 +42,7 @@ const ( DefaultHousekeepingInterval = 30 * time.Second DefaultHousekeepingCandidatesLimitPerProject = 500 + DefaultHousekeepingLeaseDuration = 60 * time.Second DefaultMongoConnectionURI = "mongodb://localhost:27017" DefaultMongoConnectionTimeout = 5 * time.Second @@ -64,7 +65,8 @@ const ( DefaultAuthWebhookCacheAuthTTL = 10 * time.Second DefaultAuthWebhookCacheUnauthTTL = 10 * time.Second - DefaultHostname = "" + DefaultHostname = "" + DefaultLeaderElection = false ) // Config is the configuration for creating a Yorkie instance. @@ -231,6 +233,7 @@ func newConfig(port int, profilingPort int) *Config { Housekeeping: &housekeeping.Config{ Interval: DefaultHousekeepingInterval.String(), CandidatesLimitPerProject: DefaultHousekeepingCandidatesLimitPerProject, + LeaseDuration: DefaultHousekeepingLeaseDuration.String(), }, Backend: &backend.Config{ ClientDeactivateThreshold: DefaultClientDeactivateThreshold, diff --git a/server/config.sample.yml b/server/config.sample.yml index 4c48b7128..24e1155c2 100644 --- a/server/config.sample.yml +++ b/server/config.sample.yml @@ -43,7 +43,7 @@ Backend: # used. If we are using server as single-tenant mode, this should be set to true. UseDefaultProject: true - # ClientDeactivateThreshold is deactivate threshold of clients in specific project for housekeeping. + # ClientDeactivateThreshold is deactivation threshold of clients in specific project for housekeeping. ClientDeactivateThreshold: "24h" # SnapshotThreshold is the threshold that determines if changes should be @@ -78,6 +78,10 @@ Backend: # determined automatically by the OS (Optional, default: os.Hostname()). Hostname: "" + # LeaderElection is the flag to enable leader election for performing housekeeping only on leader node. + # Server should be deployed in Kubernetes platform to enable this flag. + LeaderElection: false + # Mongo is the MongoDB configuration (Optional). Mongo: # ConnectionTimeout is the timeout for connecting to MongoDB. diff --git a/server/rpc/server_test.go b/server/rpc/server_test.go index 36de756d4..b369e7d2c 100644 --- a/server/rpc/server_test.go +++ b/server/rpc/server_test.go @@ -85,6 +85,7 @@ func TestMain(m *testing.M) { }, &housekeeping.Config{ Interval: helper.HousekeepingInterval.String(), CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject, + LeaseDuration: helper.HousekeepingLeaseDuration.String(), }, met) if err != nil { log.Fatal(err) diff --git a/test/helper/helper.go b/test/helper/helper.go index b324d5f27..2d5729fe0 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -59,6 +59,7 @@ var ( AdminPassword = server.DefaultAdminPassword HousekeepingInterval = 10 * gotime.Second HousekeepingCandidatesLimitPerProject = 10 + HousekeepingLeaseDuration = 10 * gotime.Second AdminTokenDuration = "10s" ClientDeactivateThreshold = "10s" @@ -222,6 +223,7 @@ func TestConfig() *server.Config { Housekeeping: &housekeeping.Config{ Interval: HousekeepingInterval.String(), CandidatesLimitPerProject: HousekeepingCandidatesLimitPerProject, + LeaseDuration: HousekeepingLeaseDuration.String(), }, Backend: &backend.Config{ AdminUser: server.DefaultAdminUser, From 7e46a73b9480240a704f131f924bbef583c957fe Mon Sep 17 00:00:00 2001 From: Kevin Park Date: Wed, 3 May 2023 18:00:01 +0900 Subject: [PATCH 2/5] Refactor election package --- .../yorkie-cluster/templates/deployment.yaml | 4 +- cmd/yorkie/server.go | 12 +- server/backend/backend.go | 19 +-- server/backend/config.go | 4 - server/backend/database/memory/database.go | 10 +- server/backend/election/database/election.go | 124 ++++++++++++++++++ server/backend/election/election.go | 110 ++-------------- server/backend/housekeeping/housekeeping.go | 15 ++- server/config.go | 5 +- server/config.sample.yml | 10 +- test/helper/helper.go | 2 + 11 files changed, 175 insertions(+), 140 deletions(-) create mode 100644 server/backend/election/database/election.go diff --git a/build/charts/yorkie-cluster/templates/deployment.yaml b/build/charts/yorkie-cluster/templates/deployment.yaml index 5d21cc9f4..f40008693 100644 --- a/build/charts/yorkie-cluster/templates/deployment.yaml +++ b/build/charts/yorkie-cluster/templates/deployment.yaml @@ -41,10 +41,8 @@ spec: "--mongo-connection-uri", "mongodb://{{ .Values.yorkie.args.dbUrl }}:{{ .Values.yorkie.args.dbPort }}/yorkie-meta", "--enable-pprof", - "--leader-election", + "--housekeeping-leader-election", "true", - "--client-deactivate-threshold", - "30s", ] ports: - containerPort: {{ .Values.yorkie.ports.rpcPort }} diff --git a/cmd/yorkie/server.go b/cmd/yorkie/server.go index 35ef1c52a..5bbb974c2 100644 --- a/cmd/yorkie/server.go +++ b/cmd/yorkie/server.go @@ -229,6 +229,12 @@ func init() { server.DefaultHousekeepingCandidatesLimitPerProject, "candidates limit per project for a single housekeeping run", ) + cmd.Flags().BoolVar( + &conf.Housekeeping.LeaderElection, + "housekeeping-leader-election", + server.DefaultHousekeepingLeaderElection, + "Enable leader election to run housekeeping only on the leader.", + ) cmd.Flags().DurationVar( &housekeepingLeaseDuration, "housekeeping-lease-duration", @@ -351,12 +357,6 @@ func init() { server.DefaultHostname, "Yorkie Server Hostname", ) - cmd.Flags().BoolVar( - &conf.Backend.LeaderElection, - "leader-election", - server.DefaultLeaderElection, - "Enable leader election to run tasks only on the leader.", - ) rootCmd.AddCommand(cmd) } diff --git a/server/backend/backend.go b/server/backend/backend.go index 64ef06971..558cd7193 100644 --- a/server/backend/backend.go +++ b/server/backend/backend.go @@ -34,6 +34,7 @@ import ( memdb "github.com/yorkie-team/yorkie/server/backend/database/memory" "github.com/yorkie-team/yorkie/server/backend/database/mongo" "github.com/yorkie-team/yorkie/server/backend/election" + dbelection "github.com/yorkie-team/yorkie/server/backend/election/database" "github.com/yorkie-team/yorkie/server/backend/housekeeping" "github.com/yorkie-team/yorkie/server/backend/sync" memsync "github.com/yorkie-team/yorkie/server/backend/sync/memory" @@ -49,7 +50,7 @@ type Backend struct { DB database.Database Coordinator sync.Coordinator - Elector *election.Elector + Elector election.Elector Metrics *prometheus.Metrics Background *background.Background Housekeeping *housekeeping.Housekeeping @@ -101,19 +102,13 @@ func New( // will need to distribute workloads of a document. coordinator := memsync.NewCoordinator(serverInfo) - var elector *election.Elector - if conf.LeaderElection { - elector, err = election.New(hostname, db) - if err != nil { - return nil, err - } - } - authWebhookCache, err := cache.NewLRUExpireCache[string, *types.AuthWebhookResponse](conf.AuthWebhookCacheSize) if err != nil { return nil, err } + elector := dbelection.NewElector(hostname, db) + keeping, err := housekeeping.Start( housekeepingConf, db, @@ -169,10 +164,8 @@ func (b *Backend) Shutdown() error { return err } - if b.Config.LeaderElection { - if err := b.Elector.Stop(); err != nil { - return err - } + if err := b.Elector.Stop(); err != nil { + return err } if err := b.Coordinator.Close(); err != nil { diff --git a/server/backend/config.go b/server/backend/config.go index 957c78b26..d39ef28d3 100644 --- a/server/backend/config.go +++ b/server/backend/config.go @@ -72,10 +72,6 @@ type Config struct { // Hostname is yorkie server hostname. hostname is used by metrics. Hostname string `yaml:"Hostname"` - - // LeaderElection is the flag to enable leader election for performing housekeeping only on leader node. - // Server should be deployed in Kubernetes platform to enable this flag. - LeaderElection bool `yaml:"LeaderElection"` } // Validate validates this config. diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index cd32fa647..39f7c58a0 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -1313,8 +1313,7 @@ func (d *DB) CreateTTLIndex( ctx context.Context, leaseDuration gotime.Duration, ) error { - //TODO implement me - panic("implement me") + return nil } // TryToAcquireLeaderLease tries to acquire the leader lease. @@ -1324,8 +1323,8 @@ func (d *DB) TryToAcquireLeaderLease( leaseLockName string, leaseDuration gotime.Duration, ) (bool, error) { - //TODO implement me - panic("implement me") + // In memory database, leader is always myself. + return true, nil } // RenewLeaderLease renews the leader lease. @@ -1335,8 +1334,7 @@ func (d *DB) RenewLeaderLease( leaseLockName string, leaseDuration gotime.Duration, ) error { - //TODO implement me - panic("implement me") + return nil } func newID() types.ID { diff --git a/server/backend/election/database/election.go b/server/backend/election/database/election.go new file mode 100644 index 000000000..ef3d1c032 --- /dev/null +++ b/server/backend/election/database/election.go @@ -0,0 +1,124 @@ +/* + * Copyright 2023 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package database is a database implementation of election package. +package database + +import ( + "context" + "time" + + "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/logging" +) + +// Elector is a database-based implementation of election.Elector. +type Elector struct { + database database.Database + + hostname string + + ctx context.Context + cancelFunc context.CancelFunc +} + +// NewElector creates a new elector instance. +func NewElector( + hostname string, + database database.Database, +) *Elector { + ctx, cancelFunc := context.WithCancel(context.Background()) + + return &Elector{ + database: database, + + hostname: hostname, + + ctx: ctx, + cancelFunc: cancelFunc, + } +} + +// StartElection starts leader election. +func (e *Elector) StartElection( + leaseLockName string, + leaseDuration time.Duration, + onStartLeading func(ctx context.Context), + onStoppedLeading func(), +) error { + if err := e.database.CreateTTLIndex(context.Background(), leaseDuration); err != nil { + return err + } + + go e.run(leaseLockName, leaseDuration, onStartLeading, onStoppedLeading) + return nil +} + +// Stop stops all leader elections. +func (e *Elector) Stop() error { + e.cancelFunc() + + return nil +} + +// run starts leader election loop. +// run will not return before leader election loop is stopped by ctx, +// or it has stopped holding the leader lease +func (e *Elector) run( + leaseLockName string, + leaseDuration time.Duration, + onStartLeading func(ctx context.Context), + onStoppedLeading func(), +) { + for { + ctx := context.Background() + acquired, err := e.database.TryToAcquireLeaderLease(ctx, e.hostname, leaseLockName, leaseDuration) + if err != nil { + continue + } + + if acquired { + go onStartLeading(ctx) + logging.From(ctx).Infof( + "leader elected: %s", e.hostname, + ) + + for { + err = e.database.RenewLeaderLease(ctx, e.hostname, leaseLockName, leaseDuration) + if err != nil { + break + } + + select { + case <-time.After(leaseDuration / 2): + case <-e.ctx.Done(): + return + } + } + } else { + onStoppedLeading() + logging.From(ctx).Infof( + "leader lost: %s", e.hostname, + ) + } + + select { + case <-time.After(leaseDuration): + case <-e.ctx.Done(): + return + } + } +} diff --git a/server/backend/election/election.go b/server/backend/election/election.go index 173386785..08a370966 100644 --- a/server/backend/election/election.go +++ b/server/backend/election/election.go @@ -21,103 +21,19 @@ package election import ( "context" "time" - - "github.com/yorkie-team/yorkie/server/backend/database" - "github.com/yorkie-team/yorkie/server/logging" ) -// Elector is responsible for leader election between server cluster. -type Elector struct { - database database.Database - hostname string - - ctx context.Context - cancelFunc context.CancelFunc -} - -// New creates a new elector instance. -func New( - hostname string, - database database.Database, -) (*Elector, error) { - ctx, cancelFunc := context.WithCancel(context.Background()) - - return &Elector{ - database: database, - hostname: hostname, - - ctx: ctx, - cancelFunc: cancelFunc, - }, nil -} - -// Start starts leader election. -func (e *Elector) Start( - leaseLockName string, - leaseDuration time.Duration, - onStartLeading func(ctx context.Context), - onStoppedLeading func(), -) error { - if err := e.database.CreateTTLIndex(context.Background(), leaseDuration); err != nil { - return err - } - - go e.Run(leaseLockName, leaseDuration, onStartLeading, onStoppedLeading) - return nil -} - -// Stop stops leader election. -func (e *Elector) Stop() error { - e.cancelFunc() - - return nil -} - -// Run starts leader election loop. -// Run will not return before leader election loop is stopped by ctx, -// or it has stopped holding the leader lease -func (e *Elector) Run( - leaseLockName string, - leaseDuration time.Duration, - onStartLeading func(ctx context.Context), - onStoppedLeading func(), -) { - for { - ctx := context.Background() - acquired, err := e.database.TryToAcquireLeaderLease(ctx, e.hostname, leaseLockName, leaseDuration) - if err != nil { - continue - } - - if acquired { - go onStartLeading(ctx) - logging.From(ctx).Infof( - "leader elected: %s", e.hostname, - ) - - for { - err = e.database.RenewLeaderLease(ctx, e.hostname, leaseLockName, leaseDuration) - if err != nil { - break - } - - select { - case <-time.After(leaseDuration / 2): - case <-e.ctx.Done(): - return - } - } - } else { - onStoppedLeading() - logging.From(ctx).Infof( - "leader lost: %s", e.hostname, - ) - } - - select { - case <-time.After(leaseDuration): - case <-e.ctx.Done(): - return - } - } +// Elector provides leader election between server cluster. It is used to +// elect leader among server cluster and run tasks only on the leader. +type Elector interface { + // StartElection starts leader election. + StartElection( + leaseLockName string, + leaseDuration time.Duration, + onStartLeading func(ctx context.Context), + onStoppedLeading func(), + ) error + + // Stop stops all leader elections. + Stop() error } diff --git a/server/backend/housekeeping/housekeeping.go b/server/backend/housekeeping/housekeeping.go index 625e0f193..23bbeb196 100644 --- a/server/backend/housekeeping/housekeeping.go +++ b/server/backend/housekeeping/housekeeping.go @@ -44,6 +44,9 @@ type Config struct { // CandidatesLimitPerProject is the maximum number of candidates to be returned per project. CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"` + // LeaderElection is the flag to enable leader election for performing housekeeping only on leader node. + LeaderElection bool `yaml:"LeaderElection"` + // LeaseDuration is the duration that non-leader candidates will wait to force acquire leadership. LeaseDuration string `yaml:"LeaseDuration"` } @@ -75,10 +78,11 @@ func (c *Config) Validate() error { type Housekeeping struct { database database.Database coordinator sync.Coordinator - elector *election.Elector + elector election.Elector interval time.Duration candidatesLimitPerProject int + leaderElection bool leaseDuration time.Duration ctx context.Context @@ -90,7 +94,7 @@ func Start( conf *Config, database database.Database, coordinator sync.Coordinator, - elector *election.Elector, + elector election.Elector, ) (*Housekeeping, error) { h, err := New(conf, database, coordinator, elector) if err != nil { @@ -108,7 +112,7 @@ func New( conf *Config, database database.Database, coordinator sync.Coordinator, - elector *election.Elector, + elector election.Elector, ) (*Housekeeping, error) { interval, err := time.ParseDuration(conf.Interval) if err != nil { @@ -128,6 +132,7 @@ func New( interval: interval, candidatesLimitPerProject: conf.CandidatesLimitPerProject, + leaderElection: conf.LeaderElection, leaseDuration: leaseDuration, ctx: ctx, @@ -137,8 +142,8 @@ func New( // Start starts the housekeeping service. func (h *Housekeeping) Start() error { - if h.elector != nil { - err := h.elector.Start( + if h.leaderElection { + err := h.elector.StartElection( lockLeaseName, h.leaseDuration, func(ctx context.Context) { h.run() }, diff --git a/server/config.go b/server/config.go index 309bf5791..98aa2f47a 100644 --- a/server/config.go +++ b/server/config.go @@ -42,6 +42,7 @@ const ( DefaultHousekeepingInterval = 30 * time.Second DefaultHousekeepingCandidatesLimitPerProject = 500 + DefaultHousekeepingLeaderElection = false DefaultHousekeepingLeaseDuration = 60 * time.Second DefaultMongoConnectionURI = "mongodb://localhost:27017" @@ -65,8 +66,7 @@ const ( DefaultAuthWebhookCacheAuthTTL = 10 * time.Second DefaultAuthWebhookCacheUnauthTTL = 10 * time.Second - DefaultHostname = "" - DefaultLeaderElection = false + DefaultHostname = "" ) // Config is the configuration for creating a Yorkie instance. @@ -233,6 +233,7 @@ func newConfig(port int, profilingPort int) *Config { Housekeeping: &housekeeping.Config{ Interval: DefaultHousekeepingInterval.String(), CandidatesLimitPerProject: DefaultHousekeepingCandidatesLimitPerProject, + LeaderElection: DefaultHousekeepingLeaderElection, LeaseDuration: DefaultHousekeepingLeaseDuration.String(), }, Backend: &backend.Config{ diff --git a/server/config.sample.yml b/server/config.sample.yml index 24e1155c2..ff4dbf772 100644 --- a/server/config.sample.yml +++ b/server/config.sample.yml @@ -36,6 +36,12 @@ Housekeeping: # CandidatesLimitPerProject is the maximum number of candidates to be returned per project (default: 100). CandidatesLimitPerProject: 100 + # LeaderElection is the flag to enable leader election for performing housekeeping only on leader node. + LeaderElection: false + + # LeaseDuration is the duration that non-leader candidates will wait to force acquire leadership. + LeaseDuration: "15s" + # Backend is the configuration for the backend of Yorkie. Backend: # UseDefaultProject is whether to use the default project (default: true). @@ -78,10 +84,6 @@ Backend: # determined automatically by the OS (Optional, default: os.Hostname()). Hostname: "" - # LeaderElection is the flag to enable leader election for performing housekeeping only on leader node. - # Server should be deployed in Kubernetes platform to enable this flag. - LeaderElection: false - # Mongo is the MongoDB configuration (Optional). Mongo: # ConnectionTimeout is the timeout for connecting to MongoDB. diff --git a/test/helper/helper.go b/test/helper/helper.go index 2d5729fe0..cccaeeb0e 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -59,6 +59,7 @@ var ( AdminPassword = server.DefaultAdminPassword HousekeepingInterval = 10 * gotime.Second HousekeepingCandidatesLimitPerProject = 10 + HousekeepingLeaderElection = false HousekeepingLeaseDuration = 10 * gotime.Second AdminTokenDuration = "10s" @@ -223,6 +224,7 @@ func TestConfig() *server.Config { Housekeeping: &housekeeping.Config{ Interval: HousekeepingInterval.String(), CandidatesLimitPerProject: HousekeepingCandidatesLimitPerProject, + LeaderElection: HousekeepingLeaderElection, LeaseDuration: HousekeepingLeaseDuration.String(), }, Backend: &backend.Config{ From e25d0afade54669e375a09ab7dec692efd363fb8 Mon Sep 17 00:00:00 2001 From: Kevin Park Date: Wed, 3 May 2023 19:08:43 +0900 Subject: [PATCH 3/5] Add leader election test --- server/backend/database/mongo/client_test.go | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/server/backend/database/mongo/client_test.go b/server/backend/database/mongo/client_test.go index ab1719d8d..ed26def73 100644 --- a/server/backend/database/mongo/client_test.go +++ b/server/backend/database/mongo/client_test.go @@ -100,4 +100,35 @@ func TestClient(t *testing.T) { t.Run("IsDocumentAttached test", func(t *testing.T) { testcases.RunIsDocumentAttachedTest(t, cli, dummyProjectID) }) + + t.Run("leader lease acquisition test", func(t *testing.T) { + ctx := context.Background() + leaseDuration := 5 * time.Second + node1 := "node1" + node2 := "node2" + + err := cli.CreateTTLIndex(ctx, leaseDuration) + assert.NoError(t, err) + + // node 1 try to acquire leader lease + acquired, err := cli.TryToAcquireLeaderLease(ctx, node1, t.Name(), leaseDuration) + assert.NoError(t, err) + assert.True(t, acquired) + + // node 2 try to acquire leader lease, but it will fail + acquired, err = cli.TryToAcquireLeaderLease(ctx, node2, t.Name(), leaseDuration) + assert.Error(t, err) + assert.False(t, acquired) + + // after lease duration, node 2 can acquire leader lease + time.Sleep(leaseDuration) + + acquired, err = cli.TryToAcquireLeaderLease(ctx, node2, t.Name(), leaseDuration) + assert.NoError(t, err) + assert.True(t, acquired) + + // node 2 renew leader lease + err = cli.RenewLeaderLease(ctx, node2, t.Name(), leaseDuration) + assert.NoError(t, err) + }) } From 4811338c2896dbfb32a1bc4abf58df567fc356ed Mon Sep 17 00:00:00 2001 From: Kevin Park Date: Fri, 7 Jul 2023 21:31:05 +0900 Subject: [PATCH 4/5] Add election tests --- server/backend/backend.go | 4 +- server/backend/database/database.go | 6 ++ server/backend/database/memory/database.go | 5 + server/backend/database/mongo/client.go | 26 +++++ .../election/{database => mongo}/election.go | 8 +- .../backend/election/mongo/election_test.go | 98 +++++++++++++++++++ test/helper/helper.go | 2 + 7 files changed, 144 insertions(+), 5 deletions(-) rename server/backend/election/{database => mongo}/election.go (94%) create mode 100644 server/backend/election/mongo/election_test.go diff --git a/server/backend/backend.go b/server/backend/backend.go index 558cd7193..c4ce0626d 100644 --- a/server/backend/backend.go +++ b/server/backend/backend.go @@ -34,7 +34,7 @@ import ( memdb "github.com/yorkie-team/yorkie/server/backend/database/memory" "github.com/yorkie-team/yorkie/server/backend/database/mongo" "github.com/yorkie-team/yorkie/server/backend/election" - dbelection "github.com/yorkie-team/yorkie/server/backend/election/database" + mongoelection "github.com/yorkie-team/yorkie/server/backend/election/mongo" "github.com/yorkie-team/yorkie/server/backend/housekeeping" "github.com/yorkie-team/yorkie/server/backend/sync" memsync "github.com/yorkie-team/yorkie/server/backend/sync/memory" @@ -107,7 +107,7 @@ func New( return nil, err } - elector := dbelection.NewElector(hostname, db) + elector := mongoelection.NewElector(hostname, db) keeping, err := housekeeping.Start( housekeepingConf, diff --git a/server/backend/database/database.go b/server/backend/database/database.go index 543df74ed..6fec4b234 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -276,4 +276,10 @@ type Database interface { leaseLockName string, leaseDuration gotime.Duration, ) error + + // FindLeader returns the leader hostname for the given leaseLockName. + FindLeader( + ctx context.Context, + leaseLockName string, + ) (*string, error) } diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index 39f7c58a0..6b7a50bfb 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -1337,6 +1337,11 @@ func (d *DB) RenewLeaderLease( return nil } +// FindLeader returns the leader hostname for the given leaseLockName. +func (d *DB) FindLeader(ctx context.Context, leaseLockName string) (*string, error) { + return nil, nil +} + func newID() types.ID { return types.ID(primitive.NewObjectID().Hex()) } diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index af8a1b956..1d22af781 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -1464,6 +1464,32 @@ func (c *Client) RenewLeaderLease( return nil } +// FindLeader returns the leader hostname for the given leaseLockName. +func (c *Client) FindLeader(ctx context.Context, leaseLockName string) (*string, error) { + electionInfo := &struct { + ElectionId string `bson:"election_id"` + LeaderID string `bson:"leader_id"` + LeaseExpireAt gotime.Time `bson:"lease_expire_at"` + }{} + + result := c.collection(colElections).FindOne(ctx, bson.M{ + "election_id": leaseLockName, + }) + if result.Err() == mongo.ErrNoDocuments { + return nil, nil + } + if result.Err() != nil { + logging.From(ctx).Error(result.Err()) + return nil, fmt.Errorf("find leader: %w", result.Err()) + } + + if err := result.Decode(&electionInfo); err != nil { + return nil, fmt.Errorf("decode leader: %w", err) + } + + return &electionInfo.LeaderID, nil +} + func (c *Client) collection( name string, opts ...*options.CollectionOptions, diff --git a/server/backend/election/database/election.go b/server/backend/election/mongo/election.go similarity index 94% rename from server/backend/election/database/election.go rename to server/backend/election/mongo/election.go index ef3d1c032..a2858cdd6 100644 --- a/server/backend/election/database/election.go +++ b/server/backend/election/mongo/election.go @@ -14,8 +14,8 @@ * limitations under the License. */ -// Package database is a database implementation of election package. -package database +// Package mongo is a mongo based implementation of election package. +package mongo import ( "context" @@ -84,7 +84,7 @@ func (e *Elector) run( onStoppedLeading func(), ) { for { - ctx := context.Background() + ctx, cancelFunc := context.WithCancel(e.ctx) acquired, err := e.database.TryToAcquireLeaderLease(ctx, e.hostname, leaseLockName, leaseDuration) if err != nil { continue @@ -105,6 +105,7 @@ func (e *Elector) run( select { case <-time.After(leaseDuration / 2): case <-e.ctx.Done(): + cancelFunc() return } } @@ -118,6 +119,7 @@ func (e *Elector) run( select { case <-time.After(leaseDuration): case <-e.ctx.Done(): + cancelFunc() return } } diff --git a/server/backend/election/mongo/election_test.go b/server/backend/election/mongo/election_test.go new file mode 100644 index 000000000..240b535ab --- /dev/null +++ b/server/backend/election/mongo/election_test.go @@ -0,0 +1,98 @@ +package mongo_test + +import ( + "context" + "github.com/stretchr/testify/assert" + "github.com/yorkie-team/yorkie/server/backend/database/mongo" + mongoelection "github.com/yorkie-team/yorkie/server/backend/election/mongo" + "github.com/yorkie-team/yorkie/test/helper" + "testing" + "time" +) + +var ( + normalTask = func(ctx context.Context) {} + stopTask = func() {} +) + +func setupTestWithDummyData(t *testing.T) *mongo.Client { + config := &mongo.Config{ + ConnectionTimeout: "5s", + ConnectionURI: "mongodb://localhost:27017", + YorkieDatabase: helper.TestDBName(), + PingTimeout: "5s", + } + assert.NoError(t, config.Validate()) + + db, err := mongo.Dial(config) + assert.NoError(t, err) + + return db +} + +func TestElection(t *testing.T) { + db := setupTestWithDummyData(t) + + t.Run("leader election with multiple electors test", func(t *testing.T) { + leaseLockName := t.Name() + + electorA := mongoelection.NewElector("A", db) + electorB := mongoelection.NewElector("B", db) + electorC := mongoelection.NewElector("C", db) + + assert.NoError(t, electorA.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + assert.NoError(t, electorB.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + assert.NoError(t, electorC.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + + // elector A will be the leader because it is the first to start the election. + leader, err := db.FindLeader(context.Background(), leaseLockName) + assert.NoError(t, err) + + assert.Equal(t, "A", *leader) + + // wait for lease expiration and check the leader again + // elector A is still the leader because it has renewed the lease. + time.Sleep(helper.LeaseDuration) + assert.Equal(t, "A", *leader) + + // stop electorA and electorB, then wait for the next leader election + // elector C will be the leader because other electors are stopped. + assert.NoError(t, electorA.Stop()) + assert.NoError(t, electorB.Stop()) + + time.Sleep(helper.LeaseDuration) + + leader, err = db.FindLeader(context.Background(), leaseLockName) + assert.NoError(t, err) + + assert.Equal(t, "C", *leader) + }) + + t.Run("lease renewal while handling a a long task test", func(t *testing.T) { + leaseLockName := t.Name() + longTask := func(ctx context.Context) { + time.Sleep(helper.LeaseDuration * 2) + } + + electorA := mongoelection.NewElector("A", db) + electorB := mongoelection.NewElector("B", db) + electorC := mongoelection.NewElector("C", db) + + assert.NoError(t, electorA.StartElection(leaseLockName, helper.LeaseDuration, longTask, stopTask)) + assert.NoError(t, electorB.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + assert.NoError(t, electorC.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + + // check if elector A is still the leader + time.Sleep(helper.LeaseDuration) + + leader, err := db.FindLeader(context.Background(), leaseLockName) + assert.NoError(t, err) + + assert.Equal(t, "A", *leader) + }) + + t.Run("handle background routines when shutting down the server test", func(t *testing.T) { + // TODO(krapie): find the way to gradually close election routines + t.Skip() + }) +} diff --git a/test/helper/helper.go b/test/helper/helper.go index cccaeeb0e..9fa799b49 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -74,6 +74,8 @@ var ( MongoConnectionURI = "mongodb://localhost:27017" MongoConnectionTimeout = "5s" MongoPingTimeout = "5s" + + LeaseDuration = 2 * gotime.Second ) func init() { From 23e9f1d996fcf8821bc1afe26d3670ed911af2d5 Mon Sep 17 00:00:00 2001 From: Kevin Park Date: Sat, 8 Jul 2023 21:10:49 +0900 Subject: [PATCH 5/5] Add election test for background routine handling --- server/backend/database/mongo/client.go | 2 +- server/backend/database/mongo/client_test.go | 2 + server/backend/election/mongo/election.go | 10 ++++- .../backend/election/mongo/election_test.go | 42 +++++++++++++++---- 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index 1d22af781..609769e0e 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -1467,7 +1467,7 @@ func (c *Client) RenewLeaderLease( // FindLeader returns the leader hostname for the given leaseLockName. func (c *Client) FindLeader(ctx context.Context, leaseLockName string) (*string, error) { electionInfo := &struct { - ElectionId string `bson:"election_id"` + ElectionID string `bson:"election_id"` LeaderID string `bson:"leader_id"` LeaseExpireAt gotime.Time `bson:"lease_expire_at"` }{} diff --git a/server/backend/database/mongo/client_test.go b/server/backend/database/mongo/client_test.go index ed26def73..bc6c0f4c7 100644 --- a/server/backend/database/mongo/client_test.go +++ b/server/backend/database/mongo/client_test.go @@ -17,7 +17,9 @@ package mongo_test import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" diff --git a/server/backend/election/mongo/election.go b/server/backend/election/mongo/election.go index a2858cdd6..237e6ebb6 100644 --- a/server/backend/election/mongo/election.go +++ b/server/backend/election/mongo/election.go @@ -19,6 +19,7 @@ package mongo import ( "context" + "sync" "time" "github.com/yorkie-team/yorkie/server/backend/database" @@ -33,6 +34,8 @@ type Elector struct { ctx context.Context cancelFunc context.CancelFunc + + wg sync.WaitGroup } // NewElector creates a new elector instance. @@ -70,6 +73,7 @@ func (e *Elector) StartElection( // Stop stops all leader elections. func (e *Elector) Stop() error { e.cancelFunc() + e.wg.Wait() return nil } @@ -91,7 +95,11 @@ func (e *Elector) run( } if acquired { - go onStartLeading(ctx) + go func() { + e.wg.Add(1) + onStartLeading(ctx) + e.wg.Done() + }() logging.From(ctx).Infof( "leader elected: %s", e.hostname, ) diff --git a/server/backend/election/mongo/election_test.go b/server/backend/election/mongo/election_test.go index 240b535ab..02c3693d0 100644 --- a/server/backend/election/mongo/election_test.go +++ b/server/backend/election/mongo/election_test.go @@ -2,12 +2,14 @@ package mongo_test import ( "context" + "testing" + "time" + "github.com/stretchr/testify/assert" + "github.com/yorkie-team/yorkie/server/backend/database/mongo" mongoelection "github.com/yorkie-team/yorkie/server/backend/election/mongo" "github.com/yorkie-team/yorkie/test/helper" - "testing" - "time" ) var ( @@ -41,8 +43,11 @@ func TestElection(t *testing.T) { electorC := mongoelection.NewElector("C", db) assert.NoError(t, electorA.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + time.Sleep(helper.LeaseDuration) + assert.NoError(t, electorB.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) assert.NoError(t, electorC.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + time.Sleep(helper.LeaseDuration) // elector A will be the leader because it is the first to start the election. leader, err := db.FindLeader(context.Background(), leaseLockName) @@ -71,7 +76,7 @@ func TestElection(t *testing.T) { t.Run("lease renewal while handling a a long task test", func(t *testing.T) { leaseLockName := t.Name() longTask := func(ctx context.Context) { - time.Sleep(helper.LeaseDuration * 2) + time.Sleep(helper.LeaseDuration * 4) } electorA := mongoelection.NewElector("A", db) @@ -79,10 +84,12 @@ func TestElection(t *testing.T) { electorC := mongoelection.NewElector("C", db) assert.NoError(t, electorA.StartElection(leaseLockName, helper.LeaseDuration, longTask, stopTask)) - assert.NoError(t, electorB.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) - assert.NoError(t, electorC.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + time.Sleep(helper.LeaseDuration) + + assert.NoError(t, electorB.StartElection(leaseLockName, helper.LeaseDuration, longTask, stopTask)) + assert.NoError(t, electorC.StartElection(leaseLockName, helper.LeaseDuration, longTask, stopTask)) - // check if elector A is still the leader + // wait for lease expiration and check if elector A is still the leader while handling a long task time.Sleep(helper.LeaseDuration) leader, err := db.FindLeader(context.Background(), leaseLockName) @@ -92,7 +99,26 @@ func TestElection(t *testing.T) { }) t.Run("handle background routines when shutting down the server test", func(t *testing.T) { - // TODO(krapie): find the way to gradually close election routines - t.Skip() + shutdownCh := make(chan struct{}) + + isTaskDone := false + longTask := func(ctx context.Context) { + close(shutdownCh) + time.Sleep(helper.LeaseDuration) + isTaskDone = true + } + + elector := mongoelection.NewElector("A", db) + assert.NoError(t, elector.StartElection(t.Name(), helper.LeaseDuration, longTask, stopTask)) + + // if receive shutdown signal, stop elector + select { + case <-shutdownCh: + assert.NoError(t, elector.Stop()) + } + + // check if the task is done + // this means that the background routine is handled properly after server(elector) is stopped + assert.Equal(t, true, isTaskDone) }) }