From 219fc8da8c2e46113d1c3b69e778787a90a43db2 Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Fri, 26 Jan 2024 16:10:17 -0800 Subject: [PATCH 1/5] Add SQL election module --- go.mod | 1 + go.sum | 2 + util/election2/sql/election.go | 269 ++++++++++++++++++++++++++++ util/election2/sql/election_test.go | 134 ++++++++++++++ 4 files changed, 406 insertions(+) create mode 100644 util/election2/sql/election.go create mode 100644 util/election2/sql/election_test.go diff --git a/go.mod b/go.mod index 4a039ce447..1c8612e38d 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/letsencrypt/pkcs11key/v4 v4.0.0 github.com/lib/pq v1.10.9 + github.com/mattn/go-sqlite3 v1.14.20 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 github.com/pseudomuto/protoc-gen-doc v1.5.1 diff --git a/go.sum b/go.sum index cf1d89c469..5aa0051432 100644 --- a/go.sum +++ b/go.sum @@ -480,6 +480,8 @@ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.14.20 h1:BAZ50Ns0OFBNxdAqFhbZqdPcht1Xlb16pDCqkq1spr0= +github.com/mattn/go-sqlite3 v1.14.20/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/miekg/pkcs11 v1.0.2/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= diff --git a/util/election2/sql/election.go b/util/election2/sql/election.go new file mode 100644 index 0000000000..890d070cfb --- /dev/null +++ b/util/election2/sql/election.go @@ -0,0 +1,269 @@ +// Copyright 2023 Google LLC. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package etcd provides an implementation of leader election based on a SQL database. +package sql + +import ( + "context" + "database/sql" + "fmt" + "sync" + "time" + + "github.com/google/trillian/util/election2" + "k8s.io/klog/v2" +) + +type leaderData struct { + currentLeader string + timestamp time.Time +} + +// Election is an implementation of election2.Election based on a SQL database. +type Election struct { + db *sql.DB + instanceID string + resourceID string + + currentLeader leaderData + leaderLock sync.Cond + + // If a channel is supplied with the cancel, it will be signalled when the election routine has exited. + cancel chan *chan error + electionInterval time.Duration +} + +var _ election2.Election = (*Election)(nil) + +// Await implements election2.Election +func (e *Election) Await(ctx context.Context) error { + e.leaderLock.L.Lock() + defer e.leaderLock.L.Unlock() + if e.cancel == nil { + e.cancel = make(chan *chan error) + go e.becomeLeaderLoop(context.Background(), e.cancel) + } + if e.currentLeader.currentLeader == e.instanceID { + return nil + } + for e.currentLeader.currentLeader != e.instanceID { + e.leaderLock.Wait() + + select { + case <-ctx.Done(): + return ctx.Err() + default: + klog.Infof("Waiting for leadership, %s is the leader at %s", e.currentLeader.currentLeader, e.currentLeader.timestamp) + } + } + klog.Infof("%s became leader for %s at %s", e.instanceID, e.resourceID, e.currentLeader.timestamp) + return nil +} + +// Close implements election2.Election +func (e *Election) Close(ctx context.Context) error { + if err := e.Resign(ctx); err != nil { + klog.Errorf("Failed to resign leadership: %v", err) + return err + } + return nil +} + +// Resign implements election2.Election +func (e *Election) Resign(ctx context.Context) error { + e.leaderLock.L.Lock() + closer := e.cancel + e.cancel = nil + e.leaderLock.L.Unlock() + if closer == nil { + return nil + } + // Stop trying to elect ourselves + done := make(chan error) + closer <- &done + return <-done +} + +// WithMastership implements election2.Election +func (e *Election) WithMastership(ctx context.Context) (context.Context, error) { + cctx, cancel := context.WithCancel(ctx) + e.leaderLock.L.Lock() + defer e.leaderLock.L.Unlock() + if e.currentLeader.currentLeader != e.instanceID { + // Not the leader, cancel + cancel() + return cctx, nil + } + + // Start a goroutine to cancel the context when we are no longer leader + go func() { + e.leaderLock.L.Lock() + defer e.leaderLock.L.Unlock() + for e.currentLeader.currentLeader == e.instanceID { + e.leaderLock.Wait() + } + select { + case <-ctx.Done(): + // Don't complain if our context already completed. + return + default: + cancel() + klog.Warningf("%s cancelled: lost leadership, %s is the leader at %s", e.resourceID, e.currentLeader.currentLeader, e.currentLeader.timestamp) + } + }() + + return cctx, nil +} + +// becomeLeaderLoop runs continuously to participate in elections until a message is sent on `cancel` +func (e *Election) becomeLeaderLoop(ctx context.Context, closer chan *chan error) { + for { + select { + case ch := <-closer: + err := e.tearDown() + klog.Infof("Election teardown for %s: %v", e.resourceID, err) + if ch != nil { + *ch <- err + } + return + default: + leader, err := e.tryBecomeLeader(ctx) + if err != nil { + klog.Errorf("Failed attempt to become leader for %s, retrying: %v", e.resourceID, err) + } else { + e.leaderLock.L.Lock() + if leader != e.currentLeader { + // Note: this code does not actually care _which_ instance was + // elected, it sends notifications on each leadership cahnge. + e.currentLeader = leader + e.leaderLock.Broadcast() + } + e.leaderLock.L.Unlock() + } + time.Sleep(e.electionInterval) + } + } +} + +func (e *Election) tryBecomeLeader(ctx context.Context) (leaderData, error) { + leader := leaderData{} + tx, err := e.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + if err != nil { + return leader, fmt.Errorf("BeginTX: %w", err) + } + defer tx.Rollback() + row := tx.QueryRow( + "SELECT leader, last_update FROM leader_election WHERE resource_id = $1", + e.resourceID) + if err := row.Scan(&leader.currentLeader, &leader.timestamp); err != nil { + return leader, fmt.Errorf("Select: %w", err) + } + + if leader.currentLeader != e.instanceID && leader.timestamp.Add(e.electionInterval*10).After(time.Now()) { + return leader, nil // Someone else won the election + } + + timestamp := time.Now() + _, err = tx.Exec( + "UPDATE leader_election SET leader = $1, last_update = $2 WHERE resource_id = $3 AND leader = $4 AND last_update = $5", + e.instanceID, timestamp, e.resourceID, leader.currentLeader, leader.timestamp) + if err != nil { + return leader, fmt.Errorf("Update: %w", err) + } + + if err := tx.Commit(); err != nil { + return leader, fmt.Errorf("Commit failed: %w", err) + } + leader = leaderData{currentLeader: e.instanceID, timestamp: timestamp} + return leader, nil +} + +func (e *Election) tearDown() error { + e.leaderLock.L.Lock() + defer e.leaderLock.L.Unlock() + if e.currentLeader.currentLeader != e.instanceID { + return nil + } + e.currentLeader.currentLeader = "empty leader" + e.leaderLock.Broadcast() + + // Reset election time to epoch to allow a faster fail-over + res, err := e.db.Exec( + "UPDATE leader_election SET last_update = $1 WHERE resource_id = $2 AND leader = $3 AND last_update = $4", + time.Time{}, e.resourceID, e.instanceID, e.currentLeader.timestamp) + if err != nil { + return fmt.Errorf("Update: %w", err) + } + if n, err := res.RowsAffected(); n != 1 || err != nil { + return fmt.Errorf("failed to resign leadership: %d, %w", n, err) + } + return nil +} + +func (e *Election) initializeLock(ctx context.Context) error { + insert, err := e.db.Prepare("INSERT INTO leader_election (resource_id, leader, last_update) VALUES ($1, $2, $3)") + if err != nil { + return err + } + defer insert.Close() + + _, err = insert.Exec(e.resourceID, "empty leader", time.Time{}) + return err +} + +type SqlFactory struct { + db *sql.DB + instanceID string + opts []Option +} + +var _ election2.Factory = (*SqlFactory)(nil) + +type Option func(*Election) *Election + +func NewFactory(instanceID string, database *sql.DB, opts... Option) (*SqlFactory, error) { + return &SqlFactory{db: database, instanceID: instanceID, opts: opts}, nil +} + +func WithElectionInterval(interval time.Duration) Option { + return func(f *Election) *Election { + f.electionInterval = interval + return f + } +} + +// NewElection implements election2.Factory. +func (f *SqlFactory) NewElection(ctx context.Context, resourceID string) (election2.Election, error) { + // Ensure we have a database connection + if f.db == nil { + return nil, fmt.Errorf("no database connection") + } + if err := f.db.Ping(); err != nil { + return nil, err + } + e := &Election{ + db: f.db, + instanceID: f.instanceID, + resourceID: resourceID, + leaderLock: sync.Cond{L: &sync.Mutex{}}, + electionInterval: 1 * time.Second, + } + for _, opt := range f.opts { + e = opt(e) + } + e.initializeLock(ctx) + + return e, nil +} diff --git a/util/election2/sql/election_test.go b/util/election2/sql/election_test.go new file mode 100644 index 0000000000..95779aa3be --- /dev/null +++ b/util/election2/sql/election_test.go @@ -0,0 +1,134 @@ +// Copyright 2023 Google LLC. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package etcd provides an implementation of leader election based on a SQL database. +package sql + +import ( + "context" + "database/sql" + "fmt" + "testing" + "time" + + "github.com/google/trillian/util/election2/testonly" + _ "github.com/mattn/go-sqlite3" +) + +func TestOneElection(t *testing.T) { + db, err := initializeDB("sqllite3", "file::one-election?mode=memory") + if err != nil { + t.Fatalf("Unable to initialize database: %v", err) + } + + factory, err := NewFactory("serv", db, WithElectionInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("Unablet to open database: %v", err) + } + + ctx := context.Background() + el1, err := factory.NewElection(ctx, "5") + if err != nil { + t.Fatalf("NewElection(5): %v", err) + } + + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await(5): %v", err) + } + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await when holding lock(5): %v", err) + } + + if err := el1.Resign(ctx); err != nil { + t.Fatalf("Resign(5): %v", err) + } + + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await(5): %v", err) + } + + if err := el1.Close(ctx); err != nil { + t.Fatalf("Close(5): %v", err) + } +} + +func TestTwoElections(t *testing.T) { + db, err := initializeDB("sqllite3", "file::two-election?mode=memory") + if err != nil { + t.Fatalf("Unable to initialize database: %v", err) + } + + factory, err := NewFactory("serv", db, WithElectionInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("Unablet to open database: %v", err) + } + + ctx := context.Background() + el1, err := factory.NewElection(ctx, "10") + if err != nil { + t.Fatalf("NewElection(10): %v", err) + } + el2, err := factory.NewElection(ctx, "20") + if err != nil { + t.Fatalf("NewElection(20): %v", err) + } + + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await(10): %v", err) + } + if err := el2.Await(ctx); err != nil { + t.Fatalf("Await(20): %v", err) + } + + if err := el1.Close(ctx); err != nil { + t.Fatalf("Close(10): %v", err) + } + + if err := el2.Close(ctx); err != nil { + t.Fatalf("Close(20): %v", err) + } +} + +func initializeDB(driver string, uri string) (*sql.DB, error) { + db, err := sql.Open("sqlite3", uri) + if err != nil { + return nil, err + } + // Additional connections open a _new_, _empty_ database! + db.SetMaxOpenConns(1) + _, err = db.Exec("CREATE TABLE leader_election (resource_id TEXT PRIMARY KEY, leader TEXT, last_update TIMESTAMP);") + if err != nil { + return nil, err + } + + return db, nil +} + +func TestElection(t *testing.T) { + for _, nt := range testonly.Tests { + // Create a new DB and Factory for each test for better isolation. + db, err := initializeDB("sqllite3", fmt.Sprintf("file::%s?mode=memory", nt.Name)) + if err != nil { + t.Fatalf("Initialize DB: %v", err) + } + + fact, err := NewFactory("testID", db, WithElectionInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("NewFactory: %v", err) + } + t.Run(nt.Name, func(t *testing.T) { + nt.Run(t, fact) + }) + } +} From 5f4b850ae7ffa5da7273ba518120684a22b34c1b Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Sun, 28 Jan 2024 21:52:20 -0800 Subject: [PATCH 2/5] Fix lint errors, address JAORMX comments --- util/election2/sql/election.go | 41 +++++++++++------- util/election2/sql/election_test.go | 66 ++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 26 deletions(-) diff --git a/util/election2/sql/election.go b/util/election2/sql/election.go index 890d070cfb..74b4f6368b 100644 --- a/util/election2/sql/election.go +++ b/util/election2/sql/election.go @@ -18,6 +18,7 @@ package sql import ( "context" "database/sql" + "errors" "fmt" "sync" "time" @@ -146,7 +147,7 @@ func (e *Election) becomeLeaderLoop(ctx context.Context, closer chan *chan error e.leaderLock.L.Lock() if leader != e.currentLeader { // Note: this code does not actually care _which_ instance was - // elected, it sends notifications on each leadership cahnge. + // elected, it sends notifications on each leadership change. e.currentLeader = leader e.leaderLock.Broadcast() } @@ -163,21 +164,25 @@ func (e *Election) tryBecomeLeader(ctx context.Context) (leaderData, error) { if err != nil { return leader, fmt.Errorf("BeginTX: %w", err) } - defer tx.Rollback() + defer func() { + if err := tx.Rollback(); err != nil { + klog.Errorf("Rollback failed: %v", err) + } + }() row := tx.QueryRow( - "SELECT leader, last_update FROM leader_election WHERE resource_id = $1", + "SELECT leader, last_update FROM leader_election WHERE resource_id = ?", e.resourceID) if err := row.Scan(&leader.currentLeader, &leader.timestamp); err != nil { return leader, fmt.Errorf("Select: %w", err) } - + if leader.currentLeader != e.instanceID && leader.timestamp.Add(e.electionInterval*10).After(time.Now()) { return leader, nil // Someone else won the election } timestamp := time.Now() _, err = tx.Exec( - "UPDATE leader_election SET leader = $1, last_update = $2 WHERE resource_id = $3 AND leader = $4 AND last_update = $5", + "UPDATE leader_election SET leader = ?, last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", e.instanceID, timestamp, e.resourceID, leader.currentLeader, leader.timestamp) if err != nil { return leader, fmt.Errorf("Update: %w", err) @@ -201,7 +206,7 @@ func (e *Election) tearDown() error { // Reset election time to epoch to allow a faster fail-over res, err := e.db.Exec( - "UPDATE leader_election SET last_update = $1 WHERE resource_id = $2 AND leader = $3 AND last_update = $4", + "UPDATE leader_election SET last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", time.Time{}, e.resourceID, e.instanceID, e.currentLeader.timestamp) if err != nil { return fmt.Errorf("Update: %w", err) @@ -213,27 +218,31 @@ func (e *Election) tearDown() error { } func (e *Election) initializeLock(ctx context.Context) error { - insert, err := e.db.Prepare("INSERT INTO leader_election (resource_id, leader, last_update) VALUES ($1, $2, $3)") - if err != nil { - return err + var leader string + err := e.db.QueryRow( + "SELECT leader FROM leader_election WHERE resource_id = ?", + e.resourceID, + ).Scan(&leader) + if errors.Is(err, sql.ErrNoRows) { + _, err = e.db.Exec( + "INSERT INTO leader_election (resource_id, leader, last_update) VALUES (?, ?, ?)", + e.resourceID, "empty leader", time.Time{}, + ) } - defer insert.Close() - - _, err = insert.Exec(e.resourceID, "empty leader", time.Time{}) return err } type SqlFactory struct { db *sql.DB instanceID string - opts []Option + opts []Option } var _ election2.Factory = (*SqlFactory)(nil) type Option func(*Election) *Election -func NewFactory(instanceID string, database *sql.DB, opts... Option) (*SqlFactory, error) { +func NewFactory(instanceID string, database *sql.DB, opts ...Option) (*SqlFactory, error) { return &SqlFactory{db: database, instanceID: instanceID, opts: opts}, nil } @@ -263,7 +272,9 @@ func (f *SqlFactory) NewElection(ctx context.Context, resourceID string) (electi for _, opt := range f.opts { e = opt(e) } - e.initializeLock(ctx) + if err := e.initializeLock(ctx); err != nil { + return nil, err + } return e, nil } diff --git a/util/election2/sql/election_test.go b/util/election2/sql/election_test.go index 95779aa3be..f3cf8889cf 100644 --- a/util/election2/sql/election_test.go +++ b/util/election2/sql/election_test.go @@ -34,7 +34,7 @@ func TestOneElection(t *testing.T) { factory, err := NewFactory("serv", db, WithElectionInterval(10*time.Millisecond)) if err != nil { - t.Fatalf("Unablet to open database: %v", err) + t.Fatalf("Unable to open database: %v", err) } ctx := context.Background() @@ -71,7 +71,7 @@ func TestTwoElections(t *testing.T) { factory, err := NewFactory("serv", db, WithElectionInterval(10*time.Millisecond)) if err != nil { - t.Fatalf("Unablet to open database: %v", err) + t.Fatalf("Unable to open database: %v", err) } ctx := context.Background() @@ -100,19 +100,48 @@ func TestTwoElections(t *testing.T) { } } -func initializeDB(driver string, uri string) (*sql.DB, error) { - db, err := sql.Open("sqlite3", uri) +func TestElectionTwoServers(t *testing.T) { + db, err := initializeDB("sqllite3", "file::two-election?mode=memory") if err != nil { - return nil, err + t.Fatalf("Unable to initialize database: %v", err) } - // Additional connections open a _new_, _empty_ database! - db.SetMaxOpenConns(1) - _, err = db.Exec("CREATE TABLE leader_election (resource_id TEXT PRIMARY KEY, leader TEXT, last_update TIMESTAMP);") + + factory1, err := NewFactory("s1", db, WithElectionInterval(10*time.Millisecond)) if err != nil { - return nil, err + t.Fatalf("Unable to open database: %v", err) + } + factory2, err := NewFactory("s2", db, WithElectionInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("Unable to open database: %v", err) } - return db, nil + ctx := context.Background() + el1, err := factory1.NewElection(ctx, "10") + if err != nil { + t.Fatalf("NewElection(10): %v", err) + } + el2, err := factory2.NewElection(ctx, "10") + if err != nil { + t.Fatalf("NewElection(10, again): %t %v", err, err) + } + + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await(el1): %v", err) + } + go func() { + time.Sleep(4 * time.Millisecond) + el1.Resign(ctx) + }() + if err := el2.Await(ctx); err != nil { + t.Fatalf("Await(el2): %v", err) + } + + if err := el1.Close(ctx); err != nil { + t.Fatalf("Close(el1): %v", err) + } + if err := el2.Close(ctx); err != nil { + t.Fatalf("Close(el2): %v", err) + } } func TestElection(t *testing.T) { @@ -122,7 +151,7 @@ func TestElection(t *testing.T) { if err != nil { t.Fatalf("Initialize DB: %v", err) } - + fact, err := NewFactory("testID", db, WithElectionInterval(10*time.Millisecond)) if err != nil { t.Fatalf("NewFactory: %v", err) @@ -132,3 +161,18 @@ func TestElection(t *testing.T) { }) } } + +func initializeDB(driver string, uri string) (*sql.DB, error) { + db, err := sql.Open("sqlite3", uri) + if err != nil { + return nil, err + } + // Additional connections open a _new_, _empty_ database! + db.SetMaxOpenConns(1) + _, err = db.Exec("CREATE TABLE leader_election (resource_id TEXT PRIMARY KEY, leader TEXT, last_update TIMESTAMP);") + if err != nil { + return nil, err + } + + return db, nil +} From 4d31437cbf88443115acf44b8df0fbb11f60dace Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Mon, 29 Jan 2024 12:13:36 -0800 Subject: [PATCH 3/5] Fix errcheck lint warning in test --- util/election2/sql/election_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/util/election2/sql/election_test.go b/util/election2/sql/election_test.go index f3cf8889cf..5ab6af54fa 100644 --- a/util/election2/sql/election_test.go +++ b/util/election2/sql/election_test.go @@ -130,7 +130,10 @@ func TestElectionTwoServers(t *testing.T) { } go func() { time.Sleep(4 * time.Millisecond) - el1.Resign(ctx) + err := el1.Resign(ctx) + if err != nil { + t.Log(err) + } }() if err := el2.Await(ctx); err != nil { t.Fatalf("Await(el2): %v", err) From c2b24eb939e2e37b20951e915d2600630b970f12 Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Mon, 29 Jan 2024 13:43:17 -0800 Subject: [PATCH 4/5] Add DB initialization script and integrate with log_signer --- cmd/trillian_log_signer/main.go | 18 +++++++++++++++--- scripts/resetdb.sh | 6 ++++++ util/election2/sql/election.go | 10 +++++----- util/election2/sql/election.sql | 17 +++++++++++++++++ util/election2/sql/election_test.go | 9 ++++++++- 5 files changed, 51 insertions(+), 9 deletions(-) create mode 100644 util/election2/sql/election.sql diff --git a/cmd/trillian_log_signer/main.go b/cmd/trillian_log_signer/main.go index 273099bbd6..b0e4ec23ed 100644 --- a/cmd/trillian_log_signer/main.go +++ b/cmd/trillian_log_signer/main.go @@ -47,6 +47,7 @@ import ( "github.com/google/trillian/util/election" "github.com/google/trillian/util/election2" etcdelect "github.com/google/trillian/util/election2/etcd" + "github.com/google/trillian/util/election2/sql" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "k8s.io/klog/v2" @@ -54,6 +55,7 @@ import ( // Register supported storage providers. _ "github.com/google/trillian/storage/cloudspanner" _ "github.com/google/trillian/storage/crdb" + "github.com/google/trillian/storage/mysql" _ "github.com/google/trillian/storage/mysql" // Load quota providers @@ -71,6 +73,7 @@ var ( numSeqFlag = flag.Int("num_sequencers", 10, "Number of sequencer workers to run in parallel") sequencerGuardWindowFlag = flag.Duration("sequencer_guard_window", 0, "If set, the time elapsed before submitted leaves are eligible for sequencing") forceMaster = flag.Bool("force_master", false, "If true, assume master for all logs") + electionBackend = flag.String("election_backend", "etcd", fmt.Sprintf("Election backend to use. One of: mysql, etcd, noop")) etcdHTTPService = flag.String("etcd_http_service", "trillian-logsigner-http", "Service name to announce our HTTP endpoint under") lockDir = flag.String("lock_file_path", "/test/multimaster", "etcd lock file directory path") healthzTimeout = flag.Duration("healthz_timeout", time.Second*5, "Timeout used during healthz checks") @@ -143,13 +146,22 @@ func main() { instanceID := fmt.Sprintf("%s.%d", hostname, os.Getpid()) var electionFactory election2.Factory switch { - case *forceMaster: + case *forceMaster || *electionBackend == "noop": klog.Warning("**** Acting as master for all logs ****") electionFactory = election2.NoopFactory{} - case client != nil: + case client != nil && *electionBackend == "etcd": electionFactory = etcdelect.NewFactory(instanceID, client, *lockDir) + case *storageSystem == "mysql" && *electionBackend == "mysql": + db, err := mysql.GetDatabase() + if err != nil { + klog.Exit("Failed to get MySQL database when reuested: %v", err) + } + electionFactory, err = sql.NewFactory(instanceID) + if err != nil { + klog.Exitf("Failed to create MySQL election factory: %v", err) + } default: - klog.Exit("Either --force_master or --etcd_servers must be supplied") + klog.Exit("Either --force_master, --etcd_servers, or --election_backend=mysql must be supplied") } qm, err := quota.NewManager(*quotaSystem) diff --git a/scripts/resetdb.sh b/scripts/resetdb.sh index f8fc90cbc7..ee877772ba 100755 --- a/scripts/resetdb.sh +++ b/scripts/resetdb.sh @@ -19,6 +19,7 @@ Accepts environment variables: (default: zaphod). - MYSQL_USER_HOST: The host that the Trillian user will connect from; use '%' as a wildcard (default: localhost). +- MYSQL_USE_ELECTION: If set to true, create election tables as well. EOF } @@ -36,6 +37,7 @@ collect_vars() { [ -z ${MYSQL_USER+x} ] && MYSQL_USER="test" [ -z ${MYSQL_PASSWORD+x} ] && MYSQL_PASSWORD="zaphod" [ -z ${MYSQL_USER_HOST+x} ] && MYSQL_USER_HOST="localhost" + FLAGS=() # handle flags @@ -85,6 +87,10 @@ main() { die "Error: Failed to grant '${MYSQL_USER}' user all privileges on '${MYSQL_DATABASE}'." mysql "${FLAGS[@]}" -D ${MYSQL_DATABASE} < ${TRILLIAN_PATH}/storage/mysql/schema/storage.sql || \ die "Error: Failed to create tables in '${MYSQL_DATABASE}' database." + if [[ "${MYSQL_USE_ELECTION}" = 'true' ]]; then + mysql "${FLAGS[@]}" -D ${MYSQL_DATABASE} < ${TRILLIAN_PATH}/util/election2/sql/election.sql || \ + die "Error: Failed to create election tables in '${MYSQL_DATABASE}' database." + fi echo "Reset Complete" fi } diff --git a/util/election2/sql/election.go b/util/election2/sql/election.go index 74b4f6368b..1bf10aea55 100644 --- a/util/election2/sql/election.go +++ b/util/election2/sql/election.go @@ -170,7 +170,7 @@ func (e *Election) tryBecomeLeader(ctx context.Context) (leaderData, error) { } }() row := tx.QueryRow( - "SELECT leader, last_update FROM leader_election WHERE resource_id = ?", + "SELECT leader, last_update FROM LeaderElection WHERE resource_id = ?", e.resourceID) if err := row.Scan(&leader.currentLeader, &leader.timestamp); err != nil { return leader, fmt.Errorf("Select: %w", err) @@ -182,7 +182,7 @@ func (e *Election) tryBecomeLeader(ctx context.Context) (leaderData, error) { timestamp := time.Now() _, err = tx.Exec( - "UPDATE leader_election SET leader = ?, last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", + "UPDATE LeaderElection SET leader = ?, last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", e.instanceID, timestamp, e.resourceID, leader.currentLeader, leader.timestamp) if err != nil { return leader, fmt.Errorf("Update: %w", err) @@ -206,7 +206,7 @@ func (e *Election) tearDown() error { // Reset election time to epoch to allow a faster fail-over res, err := e.db.Exec( - "UPDATE leader_election SET last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", + "UPDATE LeaderElection SET last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", time.Time{}, e.resourceID, e.instanceID, e.currentLeader.timestamp) if err != nil { return fmt.Errorf("Update: %w", err) @@ -220,12 +220,12 @@ func (e *Election) tearDown() error { func (e *Election) initializeLock(ctx context.Context) error { var leader string err := e.db.QueryRow( - "SELECT leader FROM leader_election WHERE resource_id = ?", + "SELECT leader FROM LeaderElection WHERE resource_id = ?", e.resourceID, ).Scan(&leader) if errors.Is(err, sql.ErrNoRows) { _, err = e.db.Exec( - "INSERT INTO leader_election (resource_id, leader, last_update) VALUES (?, ?, ?)", + "INSERT INTO LeaderElection (resource_id, leader, last_update) VALUES (?, ?, ?)", e.resourceID, "empty leader", time.Time{}, ) } diff --git a/util/election2/sql/election.sql b/util/election2/sql/election.sql new file mode 100644 index 0000000000..2ef7a798ac --- /dev/null +++ b/util/election2/sql/election.sql @@ -0,0 +1,17 @@ +-- MySQL / MariaDB version of the leader election schema + +-- We only have a single table called LeaderElection. It contains +-- a row holding the current leader for each resource, as well as the +-- timestamp that the election was acquired at (last_update). +-- +-- This is less an election than a mad scramble at the start, but once +-- a leader has won the election, they remain in power until they +-- resign or fail to update the last_update time for 10x the +-- electionInterval, which should be coordinated across participants. +-- This is extremely simple, and doesn't perform any sort of +-- load-shedding or fairness at this layer. +CREATE TABLE IF NOT EXISTS LeaderElection( + resource_id VARCHAR(50) PRIMARY KEY, + leader VARCHAR(300) NOT NULL, + last_update TIMESTAMP NOT NULL +); \ No newline at end of file diff --git a/util/election2/sql/election_test.go b/util/election2/sql/election_test.go index 5ab6af54fa..56520a6a86 100644 --- a/util/election2/sql/election_test.go +++ b/util/election2/sql/election_test.go @@ -19,6 +19,7 @@ import ( "context" "database/sql" "fmt" + "os" "testing" "time" @@ -172,7 +173,13 @@ func initializeDB(driver string, uri string) (*sql.DB, error) { } // Additional connections open a _new_, _empty_ database! db.SetMaxOpenConns(1) - _, err = db.Exec("CREATE TABLE leader_election (resource_id TEXT PRIMARY KEY, leader TEXT, last_update TIMESTAMP);") + + tableDecl, err := os.ReadFile("election.sql") + if err != nil { + return nil, err + } + + _, err = db.Exec(string(tableDecl)) if err != nil { return nil, err } From 05b113dc57b3c166c310552e3b49e4d9e3e64b8a Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Tue, 30 Jan 2024 09:50:20 -0800 Subject: [PATCH 5/5] Update to use mysql module, fix factory invocation --- cmd/trillian_log_signer/main.go | 7 +++---- util/election2/{sql => mysql}/election.go | 4 ++-- util/election2/{sql => mysql}/election.sql | 0 util/election2/{sql => mysql}/election_test.go | 4 ++-- 4 files changed, 7 insertions(+), 8 deletions(-) rename util/election2/{sql => mysql}/election.go (98%) rename util/election2/{sql => mysql}/election.sql (100%) rename util/election2/{sql => mysql}/election_test.go (97%) diff --git a/cmd/trillian_log_signer/main.go b/cmd/trillian_log_signer/main.go index b0e4ec23ed..5b86d81746 100644 --- a/cmd/trillian_log_signer/main.go +++ b/cmd/trillian_log_signer/main.go @@ -47,7 +47,7 @@ import ( "github.com/google/trillian/util/election" "github.com/google/trillian/util/election2" etcdelect "github.com/google/trillian/util/election2/etcd" - "github.com/google/trillian/util/election2/sql" + mysqlElection "github.com/google/trillian/util/election2/mysql" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "k8s.io/klog/v2" @@ -56,7 +56,6 @@ import ( _ "github.com/google/trillian/storage/cloudspanner" _ "github.com/google/trillian/storage/crdb" "github.com/google/trillian/storage/mysql" - _ "github.com/google/trillian/storage/mysql" // Load quota providers _ "github.com/google/trillian/quota/crdbqm" @@ -73,7 +72,7 @@ var ( numSeqFlag = flag.Int("num_sequencers", 10, "Number of sequencer workers to run in parallel") sequencerGuardWindowFlag = flag.Duration("sequencer_guard_window", 0, "If set, the time elapsed before submitted leaves are eligible for sequencing") forceMaster = flag.Bool("force_master", false, "If true, assume master for all logs") - electionBackend = flag.String("election_backend", "etcd", fmt.Sprintf("Election backend to use. One of: mysql, etcd, noop")) + electionBackend = flag.String("election_backend", "etcd", "Election backend to use. One of: mysql, etcd, noop") etcdHTTPService = flag.String("etcd_http_service", "trillian-logsigner-http", "Service name to announce our HTTP endpoint under") lockDir = flag.String("lock_file_path", "/test/multimaster", "etcd lock file directory path") healthzTimeout = flag.Duration("healthz_timeout", time.Second*5, "Timeout used during healthz checks") @@ -156,7 +155,7 @@ func main() { if err != nil { klog.Exit("Failed to get MySQL database when reuested: %v", err) } - electionFactory, err = sql.NewFactory(instanceID) + electionFactory, err = mysqlElection.NewFactory(instanceID, db) if err != nil { klog.Exitf("Failed to create MySQL election factory: %v", err) } diff --git a/util/election2/sql/election.go b/util/election2/mysql/election.go similarity index 98% rename from util/election2/sql/election.go rename to util/election2/mysql/election.go index 1bf10aea55..7b0b5d6ef3 100644 --- a/util/election2/sql/election.go +++ b/util/election2/mysql/election.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package etcd provides an implementation of leader election based on a SQL database. -package sql +// Package mysql provides an implementation of leader election based on a SQL database. +package mysql import ( "context" diff --git a/util/election2/sql/election.sql b/util/election2/mysql/election.sql similarity index 100% rename from util/election2/sql/election.sql rename to util/election2/mysql/election.sql diff --git a/util/election2/sql/election_test.go b/util/election2/mysql/election_test.go similarity index 97% rename from util/election2/sql/election_test.go rename to util/election2/mysql/election_test.go index 56520a6a86..4c87347108 100644 --- a/util/election2/sql/election_test.go +++ b/util/election2/mysql/election_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package etcd provides an implementation of leader election based on a SQL database. -package sql +// Package mysql provides an implementation of leader election based on a SQL database. +package mysql import ( "context"