Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQL election module #3318

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cmd/trillian_log_signer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
"github.com/google/trillian/util/election"
"github.com/google/trillian/util/election2"
etcdelect "github.com/google/trillian/util/election2/etcd"
"github.com/google/trillian/util/election2/sql"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment relates to the package of the new code. I recommend mysql over sql to be clear at which implementation it is targeting, and for consistency with the flag names used in this file to pick it.

clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"k8s.io/klog/v2"

// Register supported storage providers.
_ "github.com/google/trillian/storage/cloudspanner"
_ "github.com/google/trillian/storage/crdb"
"github.com/google/trillian/storage/mysql"
_ "github.com/google/trillian/storage/mysql"

// Load quota providers
Expand All @@ -71,6 +73,7 @@
numSeqFlag = flag.Int("num_sequencers", 10, "Number of sequencer workers to run in parallel")
sequencerGuardWindowFlag = flag.Duration("sequencer_guard_window", 0, "If set, the time elapsed before submitted leaves are eligible for sequencing")
forceMaster = flag.Bool("force_master", false, "If true, assume master for all logs")
electionBackend = flag.String("election_backend", "etcd", fmt.Sprintf("Election backend to use. One of: mysql, etcd, noop"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to electionSystem to match with quotaSystem

etcdHTTPService = flag.String("etcd_http_service", "trillian-logsigner-http", "Service name to announce our HTTP endpoint under")
lockDir = flag.String("lock_file_path", "/test/multimaster", "etcd lock file directory path")
healthzTimeout = flag.Duration("healthz_timeout", time.Second*5, "Timeout used during healthz checks")
Expand Down Expand Up @@ -143,13 +146,22 @@
instanceID := fmt.Sprintf("%s.%d", hostname, os.Getpid())
var electionFactory election2.Factory
switch {
case *forceMaster:
case *forceMaster || *electionBackend == "noop":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not fantastic that we can now control this behavior with two different flags, but I also cannot thing of a better way forward for now. On the long run, we could / should probably deprecate forceMaster. No action required!

klog.Warning("**** Acting as master for all logs ****")
electionFactory = election2.NoopFactory{}
case client != nil:
case client != nil && *electionBackend == "etcd":
electionFactory = etcdelect.NewFactory(instanceID, client, *lockDir)
case *storageSystem == "mysql" && *electionBackend == "mysql":
db, err := mysql.GetDatabase()

Check failure on line 155 in cmd/trillian_log_signer/main.go

View workflow job for this annotation

GitHub Actions / Run govulncheck

db declared and not used

Check failure on line 155 in cmd/trillian_log_signer/main.go

View workflow job for this annotation

GitHub Actions / integration

db declared and not used
if err != nil {
klog.Exit("Failed to get MySQL database when reuested: %v", err)
}
electionFactory, err = sql.NewFactory(instanceID)

Check failure on line 159 in cmd/trillian_log_signer/main.go

View workflow job for this annotation

GitHub Actions / Run govulncheck

not enough arguments in call to sql.NewFactory

Check failure on line 159 in cmd/trillian_log_signer/main.go

View workflow job for this annotation

GitHub Actions / integration

not enough arguments in call to sql.NewFactory
if err != nil {
klog.Exitf("Failed to create MySQL election factory: %v", err)
}
default:
klog.Exit("Either --force_master or --etcd_servers must be supplied")
klog.Exit("Either --force_master, --etcd_servers, or --election_backend=mysql must be supplied")
}

qm, err := quota.NewManager(*quotaSystem)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/letsencrypt/pkcs11key/v4 v4.0.0
github.com/lib/pq v1.10.9
github.com/mattn/go-sqlite3 v1.14.20
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/pseudomuto/protoc-gen-doc v1.5.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.20 h1:BAZ50Ns0OFBNxdAqFhbZqdPcht1Xlb16pDCqkq1spr0=
github.com/mattn/go-sqlite3 v1.14.20/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/miekg/pkcs11 v1.0.2/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
Expand Down
6 changes: 6 additions & 0 deletions scripts/resetdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Accepts environment variables:
(default: zaphod).
- MYSQL_USER_HOST: The host that the Trillian user will connect from; use '%' as
a wildcard (default: localhost).
- MYSQL_USE_ELECTION: If set to true, create election tables as well.
EOF
}

Expand All @@ -36,6 +37,7 @@ collect_vars() {
[ -z ${MYSQL_USER+x} ] && MYSQL_USER="test"
[ -z ${MYSQL_PASSWORD+x} ] && MYSQL_PASSWORD="zaphod"
[ -z ${MYSQL_USER_HOST+x} ] && MYSQL_USER_HOST="localhost"

FLAGS=()

# handle flags
Expand Down Expand Up @@ -85,6 +87,10 @@ main() {
die "Error: Failed to grant '${MYSQL_USER}' user all privileges on '${MYSQL_DATABASE}'."
mysql "${FLAGS[@]}" -D ${MYSQL_DATABASE} < ${TRILLIAN_PATH}/storage/mysql/schema/storage.sql || \
die "Error: Failed to create tables in '${MYSQL_DATABASE}' database."
if [[ "${MYSQL_USE_ELECTION}" = 'true' ]]; then
mysql "${FLAGS[@]}" -D ${MYSQL_DATABASE} < ${TRILLIAN_PATH}/util/election2/sql/election.sql || \
die "Error: Failed to create election tables in '${MYSQL_DATABASE}' database."
fi
echo "Reset Complete"
fi
}
Expand Down
280 changes: 280 additions & 0 deletions util/election2/sql/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
// Copyright 2023 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package etcd provides an implementation of leader election based on a SQL database.
package sql

import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"

"github.com/google/trillian/util/election2"
"k8s.io/klog/v2"
)

type leaderData struct {
currentLeader string
timestamp time.Time
}

// Election is an implementation of election2.Election based on a SQL database.
type Election struct {
db *sql.DB
instanceID string
resourceID string

currentLeader leaderData
leaderLock sync.Cond

// If a channel is supplied with the cancel, it will be signalled when the election routine has exited.
cancel chan *chan error
electionInterval time.Duration
}

var _ election2.Election = (*Election)(nil)

// Await implements election2.Election
func (e *Election) Await(ctx context.Context) error {
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
if e.cancel == nil {
e.cancel = make(chan *chan error)
go e.becomeLeaderLoop(context.Background(), e.cancel)
}
if e.currentLeader.currentLeader == e.instanceID {
return nil
}
for e.currentLeader.currentLeader != e.instanceID {
e.leaderLock.Wait()

select {
case <-ctx.Done():
return ctx.Err()
default:
klog.Infof("Waiting for leadership, %s is the leader at %s", e.currentLeader.currentLeader, e.currentLeader.timestamp)
}
}
klog.Infof("%s became leader for %s at %s", e.instanceID, e.resourceID, e.currentLeader.timestamp)
return nil
}

// Close implements election2.Election
func (e *Election) Close(ctx context.Context) error {
if err := e.Resign(ctx); err != nil {
klog.Errorf("Failed to resign leadership: %v", err)
return err
}
return nil
}

// Resign implements election2.Election
func (e *Election) Resign(ctx context.Context) error {
e.leaderLock.L.Lock()
closer := e.cancel
e.cancel = nil
e.leaderLock.L.Unlock()
if closer == nil {
return nil
}
// Stop trying to elect ourselves
done := make(chan error)
closer <- &done
return <-done
}

// WithMastership implements election2.Election
func (e *Election) WithMastership(ctx context.Context) (context.Context, error) {
cctx, cancel := context.WithCancel(ctx)
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
if e.currentLeader.currentLeader != e.instanceID {
// Not the leader, cancel
cancel()
return cctx, nil
}

// Start a goroutine to cancel the context when we are no longer leader
go func() {
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
for e.currentLeader.currentLeader == e.instanceID {
e.leaderLock.Wait()
}
select {
case <-ctx.Done():
// Don't complain if our context already completed.
return
default:
cancel()
klog.Warningf("%s cancelled: lost leadership, %s is the leader at %s", e.resourceID, e.currentLeader.currentLeader, e.currentLeader.timestamp)
}
}()

return cctx, nil
}

// becomeLeaderLoop runs continuously to participate in elections until a message is sent on `cancel`
func (e *Election) becomeLeaderLoop(ctx context.Context, closer chan *chan error) {
for {
select {
case ch := <-closer:
err := e.tearDown()
klog.Infof("Election teardown for %s: %v", e.resourceID, err)
if ch != nil {
*ch <- err
}
return
default:
leader, err := e.tryBecomeLeader(ctx)
if err != nil {
klog.Errorf("Failed attempt to become leader for %s, retrying: %v", e.resourceID, err)
} else {
e.leaderLock.L.Lock()
if leader != e.currentLeader {
// Note: this code does not actually care _which_ instance was
// elected, it sends notifications on each leadership change.
e.currentLeader = leader
e.leaderLock.Broadcast()
}
e.leaderLock.L.Unlock()
}
time.Sleep(e.electionInterval)
}
}
}

func (e *Election) tryBecomeLeader(ctx context.Context) (leaderData, error) {
leader := leaderData{}
tx, err := e.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return leader, fmt.Errorf("BeginTX: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil {
klog.Errorf("Rollback failed: %v", err)
}
}()
row := tx.QueryRow(
"SELECT leader, last_update FROM LeaderElection WHERE resource_id = ?",
e.resourceID)
if err := row.Scan(&leader.currentLeader, &leader.timestamp); err != nil {
return leader, fmt.Errorf("Select: %w", err)
}

if leader.currentLeader != e.instanceID && leader.timestamp.Add(e.electionInterval*10).After(time.Now()) {
return leader, nil // Someone else won the election
}

timestamp := time.Now()
_, err = tx.Exec(
"UPDATE LeaderElection SET leader = ?, last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?",
e.instanceID, timestamp, e.resourceID, leader.currentLeader, leader.timestamp)
if err != nil {
return leader, fmt.Errorf("Update: %w", err)
}

if err := tx.Commit(); err != nil {
return leader, fmt.Errorf("Commit failed: %w", err)
}
leader = leaderData{currentLeader: e.instanceID, timestamp: timestamp}
return leader, nil
}

func (e *Election) tearDown() error {
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
if e.currentLeader.currentLeader != e.instanceID {
return nil
}
e.currentLeader.currentLeader = "empty leader"
e.leaderLock.Broadcast()

// Reset election time to epoch to allow a faster fail-over
res, err := e.db.Exec(
"UPDATE LeaderElection SET last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?",
time.Time{}, e.resourceID, e.instanceID, e.currentLeader.timestamp)
if err != nil {
return fmt.Errorf("Update: %w", err)
}
if n, err := res.RowsAffected(); n != 1 || err != nil {
return fmt.Errorf("failed to resign leadership: %d, %w", n, err)
}
return nil
}

func (e *Election) initializeLock(ctx context.Context) error {
var leader string
err := e.db.QueryRow(
"SELECT leader FROM LeaderElection WHERE resource_id = ?",
e.resourceID,
).Scan(&leader)
if errors.Is(err, sql.ErrNoRows) {
_, err = e.db.Exec(
"INSERT INTO LeaderElection (resource_id, leader, last_update) VALUES (?, ?, ?)",
e.resourceID, "empty leader", time.Time{},
)
}
return err
}

type SqlFactory struct {
db *sql.DB
instanceID string
opts []Option
}

var _ election2.Factory = (*SqlFactory)(nil)

type Option func(*Election) *Election

func NewFactory(instanceID string, database *sql.DB, opts ...Option) (*SqlFactory, error) {
return &SqlFactory{db: database, instanceID: instanceID, opts: opts}, nil
}

func WithElectionInterval(interval time.Duration) Option {
return func(f *Election) *Election {
f.electionInterval = interval
return f
}
}

// NewElection implements election2.Factory.
func (f *SqlFactory) NewElection(ctx context.Context, resourceID string) (election2.Election, error) {
// Ensure we have a database connection
if f.db == nil {
return nil, fmt.Errorf("no database connection")
}
if err := f.db.Ping(); err != nil {
return nil, err
}
e := &Election{
db: f.db,
instanceID: f.instanceID,
resourceID: resourceID,
leaderLock: sync.Cond{L: &sync.Mutex{}},
electionInterval: 1 * time.Second,
}
for _, opt := range f.opts {
e = opt(e)
}
if err := e.initializeLock(ctx); err != nil {
return nil, err
}

return e, nil
}
17 changes: 17 additions & 0 deletions util/election2/sql/election.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- MySQL / MariaDB version of the leader election schema

-- We only have a single table called LeaderElection. It contains
-- a row holding the current leader for each resource, as well as the
-- timestamp that the election was acquired at (last_update).
--
-- This is less an election than a mad scramble at the start, but once
-- a leader has won the election, they remain in power until they
-- resign or fail to update the last_update time for 10x the
-- electionInterval, which should be coordinated across participants.
-- This is extremely simple, and doesn't perform any sort of
-- load-shedding or fairness at this layer.
CREATE TABLE IF NOT EXISTS LeaderElection(
resource_id VARCHAR(50) PRIMARY KEY,
leader VARCHAR(300) NOT NULL,
last_update TIMESTAMP NOT NULL
);
Loading
Loading