Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Leader Election for Housekeeping in Sharded Cluster using MongoDB #529

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build/charts/yorkie-cluster/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ spec:
"--mongo-connection-uri",
"mongodb://{{ .Values.yorkie.args.dbUrl }}:{{ .Values.yorkie.args.dbPort }}/yorkie-meta",
"--enable-pprof",
"--housekeeping-leader-election",
"true",
]
ports:
- containerPort: {{ .Values.yorkie.ports.rpcPort }}
Expand Down
14 changes: 14 additions & 0 deletions cmd/yorkie/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (

adminTokenDuration time.Duration
housekeepingInterval time.Duration
housekeepingLeaseDuration time.Duration
clientDeactivateThreshold string

mongoConnectionURI string
Expand Down Expand Up @@ -74,6 +75,7 @@ func newServerCmd() *cobra.Command {
conf.Backend.AuthWebhookCacheUnauthTTL = authWebhookCacheUnauthTTL.String()

conf.Housekeeping.Interval = housekeepingInterval.String()
conf.Housekeeping.LeaseDuration = housekeepingLeaseDuration.String()

if mongoConnectionURI != "" {
conf.Mongo = &mongo.Config{
Expand Down Expand Up @@ -227,6 +229,18 @@ func init() {
server.DefaultHousekeepingCandidatesLimitPerProject,
"candidates limit per project for a single housekeeping run",
)
cmd.Flags().BoolVar(
&conf.Housekeeping.LeaderElection,
"housekeeping-leader-election",
server.DefaultHousekeepingLeaderElection,
"Enable leader election to run housekeeping only on the leader.",
)
cmd.Flags().DurationVar(
&housekeepingLeaseDuration,
"housekeeping-lease-duration",
server.DefaultHousekeepingLeaseDuration,
"lease duration for a leader election in housekeeping",
)
cmd.Flags().StringVar(
&mongoConnectionURI,
"mongo-connection-uri",
Expand Down
19 changes: 16 additions & 3 deletions server/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/yorkie-team/yorkie/server/backend/database"
memdb "github.com/yorkie-team/yorkie/server/backend/database/memory"
"github.com/yorkie-team/yorkie/server/backend/database/mongo"
"github.com/yorkie-team/yorkie/server/backend/election"
mongoelection "github.com/yorkie-team/yorkie/server/backend/election/mongo"
"github.com/yorkie-team/yorkie/server/backend/housekeeping"
"github.com/yorkie-team/yorkie/server/backend/sync"
memsync "github.com/yorkie-team/yorkie/server/backend/sync/memory"
Expand All @@ -48,6 +50,7 @@ type Backend struct {

DB database.Database
Coordinator sync.Coordinator
Elector election.Elector
Metrics *prometheus.Metrics
Background *background.Background
Housekeeping *housekeeping.Housekeeping
Expand All @@ -64,11 +67,12 @@ func New(
) (*Backend, error) {
hostname := conf.Hostname
if hostname == "" {
hostname, err := os.Hostname()
osHostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("os.Hostname: %w", err)
}
conf.Hostname = hostname
conf.Hostname = osHostname
hostname = osHostname
}

serverInfo := &sync.ServerInfo{
Expand Down Expand Up @@ -103,10 +107,13 @@ func New(
return nil, err
}

elector := mongoelection.NewElector(hostname, db)

keeping, err := housekeeping.Start(
housekeepingConf,
db,
coordinator,
elector,
)
if err != nil {
return nil, err
Expand All @@ -118,8 +125,9 @@ func New(
}

logging.DefaultLogger().Infof(
"backend created: id: %s, rpc: %s",
"backend created: id: %s, rpc: %s, db: %s",
serverInfo.ID,
serverInfo.Hostname,
dbInfo,
)

Expand All @@ -141,6 +149,7 @@ func New(
Metrics: metrics,
DB: db,
Coordinator: coordinator,
Elector: elector,
Housekeeping: keeping,

AuthWebhookCache: authWebhookCache,
Expand All @@ -155,6 +164,10 @@ func (b *Backend) Shutdown() error {
return err
}

if err := b.Elector.Stop(); err != nil {
return err
}

if err := b.Coordinator.Close(); err != nil {
logging.DefaultLogger().Error(err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Config struct {
// we are using server as single-tenant mode, this should be set to true.
UseDefaultProject bool `yaml:"UseDefaultProject"`

// ClientDeactivateThreshold is deactivate threshold of clients in specific project for housekeeping.
// ClientDeactivateThreshold is deactivation threshold of clients in specific project for housekeeping.
ClientDeactivateThreshold string `yaml:"ClientDeactivateThreshold"`

// SnapshotThreshold is the threshold that determines if changes should be
Expand Down
29 changes: 29 additions & 0 deletions server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package database
import (
"context"
"errors"
gotime "time"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/pkg/document"
Expand Down Expand Up @@ -253,4 +254,32 @@ type Database interface {
docID types.ID,
excludeClientID types.ID,
) (bool, error)

// CreateTTLIndex creates a TTL index.
CreateTTLIndex(
ctx context.Context,
leaseDuration gotime.Duration,
) error

// TryToAcquireLeaderLease tries to acquire the leader lease.
TryToAcquireLeaderLease(
ctx context.Context,
hostname string,
leaseLockName string,
leaseDuration gotime.Duration,
) (bool, error)

// RenewLeaderLease renews the leader lease.
RenewLeaderLease(
ctx context.Context,
hostname string,
leaseLockName string,
leaseDuration gotime.Duration,
) error

// FindLeader returns the leader hostname for the given leaseLockName.
FindLeader(
ctx context.Context,
leaseLockName string,
) (*string, error)
}
34 changes: 34 additions & 0 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,40 @@ func (d *DB) findTicketByServerSeq(
), nil
}

// CreateTTLIndex creates a TTL index.
func (d *DB) CreateTTLIndex(
ctx context.Context,
leaseDuration gotime.Duration,
) error {
return nil
}

// TryToAcquireLeaderLease tries to acquire the leader lease.
func (d *DB) TryToAcquireLeaderLease(
ctx context.Context,
hostname string,
leaseLockName string,
leaseDuration gotime.Duration,
) (bool, error) {
// In memory database, leader is always myself.
return true, nil
}

// RenewLeaderLease renews the leader lease.
func (d *DB) RenewLeaderLease(
ctx context.Context,
hostname string,
leaseLockName string,
leaseDuration gotime.Duration,
) error {
return nil
}

// FindLeader returns the leader hostname for the given leaseLockName.
func (d *DB) FindLeader(ctx context.Context, leaseLockName string) (*string, error) {
return nil, nil
}

func newID() types.ID {
return types.ID(primitive.NewObjectID().Hex())
}
93 changes: 93 additions & 0 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,99 @@ func (c *Client) findTicketByServerSeq(
), nil
}

// CreateTTLIndex creates a TTL index.
func (c *Client) CreateTTLIndex(
ctx context.Context,
leaseDuration gotime.Duration,
) error {
ttlIndexModel := mongo.IndexModel{
Keys: bson.M{"lease_expire_at": 1},
Options: options.Index().SetExpireAfterSeconds(int32(leaseDuration.Seconds())),
}
_, err := c.collection(colElections).Indexes().CreateOne(ctx, ttlIndexModel)
if err != nil {
return err
}

return nil
}

// TryToAcquireLeaderLease tries to acquire the leader lease.
func (c *Client) TryToAcquireLeaderLease(
ctx context.Context,
hostname string,
leaseLockName string,
leaseDuration gotime.Duration,
) (bool, error) {
updated := false
result, err := c.collection(colElections).UpdateOne(ctx, bson.M{
"election_id": leaseLockName,
"lease_expire_at": bson.M{"$lt": gotime.Now()},
}, bson.M{
"$set": bson.M{
"leader_id": hostname,
"lease_expire_at": gotime.Now().Add(leaseDuration),
}},
options.Update().SetUpsert(true),
)
if err != nil {
return false, err
}

if result.ModifiedCount == 1 || result.UpsertedCount == 1 {
updated = true
}
return updated, nil
}

// RenewLeaderLease renews the leader lease.
func (c *Client) RenewLeaderLease(
ctx context.Context,
hostname string,
leaseLockName string,
leaseDuration gotime.Duration,
) error {
_, err := c.collection(colElections).UpdateOne(ctx, bson.M{
"election_id": leaseLockName,
"leader_id": hostname,
}, bson.M{
"$set": bson.M{
"lease_expire_at": gotime.Now().Add(leaseDuration),
}},
)
if err != nil {
return err
}

return nil
}

// FindLeader returns the leader hostname for the given leaseLockName.
func (c *Client) FindLeader(ctx context.Context, leaseLockName string) (*string, error) {
electionInfo := &struct {
ElectionID string `bson:"election_id"`
LeaderID string `bson:"leader_id"`
LeaseExpireAt gotime.Time `bson:"lease_expire_at"`
}{}

result := c.collection(colElections).FindOne(ctx, bson.M{
"election_id": leaseLockName,
})
if result.Err() == mongo.ErrNoDocuments {
return nil, nil
}
if result.Err() != nil {
logging.From(ctx).Error(result.Err())
return nil, fmt.Errorf("find leader: %w", result.Err())
}

if err := result.Decode(&electionInfo); err != nil {
return nil, fmt.Errorf("decode leader: %w", err)
}

return &electionInfo.LeaderID, nil
}

func (c *Client) collection(
name string,
opts ...*options.CollectionOptions,
Expand Down
33 changes: 33 additions & 0 deletions server/backend/database/mongo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package mongo_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -100,4 +102,35 @@ func TestClient(t *testing.T) {
t.Run("IsDocumentAttached test", func(t *testing.T) {
testcases.RunIsDocumentAttachedTest(t, cli, dummyProjectID)
})

t.Run("leader lease acquisition test", func(t *testing.T) {
ctx := context.Background()
leaseDuration := 5 * time.Second
node1 := "node1"
node2 := "node2"

err := cli.CreateTTLIndex(ctx, leaseDuration)
assert.NoError(t, err)

// node 1 try to acquire leader lease
acquired, err := cli.TryToAcquireLeaderLease(ctx, node1, t.Name(), leaseDuration)
assert.NoError(t, err)
assert.True(t, acquired)

// node 2 try to acquire leader lease, but it will fail
acquired, err = cli.TryToAcquireLeaderLease(ctx, node2, t.Name(), leaseDuration)
assert.Error(t, err)
assert.False(t, acquired)

// after lease duration, node 2 can acquire leader lease
time.Sleep(leaseDuration)

acquired, err = cli.TryToAcquireLeaderLease(ctx, node2, t.Name(), leaseDuration)
assert.NoError(t, err)
assert.True(t, acquired)

// node 2 renew leader lease
err = cli.RenewLeaderLease(ctx, node2, t.Name(), leaseDuration)
assert.NoError(t, err)
})
}
15 changes: 15 additions & 0 deletions server/backend/database/mongo/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
colChanges = "changes"
colSnapshots = "snapshots"
colSyncedSeqs = "syncedseqs"
colElections = "elections"
)

type collectionInfo struct {
Expand Down Expand Up @@ -131,6 +132,20 @@ var collectionInfos = []collectionInfo{
{Key: "actor_id", Value: bsonx.Int32(1)},
},
}},
}, {
name: colElections,
indexes: []mongo.IndexModel{{
Keys: bsonx.Doc{
{Key: "election_id", Value: bsonx.Int32(1)},
},
Options: options.Index().SetUnique(true),
}, {
Keys: bsonx.Doc{
{Key: "election_id", Value: bsonx.Int32(1)},
{Key: "leader_id", Value: bsonx.Int32(1)},
{Key: "lease_expire_at", Value: bsonx.Int32(1)},
},
}},
},
}

Expand Down
Loading