Skip to content

Commit

Permalink
Refactor driver selection to allow registering new drivers
Browse files Browse the repository at this point in the history
Signed-off-by: Sambhav Kothari <skothari44@bloomberg.net>
  • Loading branch information
sambhav committed Sep 25, 2024
1 parent a799fda commit 5594b15
Show file tree
Hide file tree
Showing 18 changed files with 542 additions and 283 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
Expand Down
166 changes: 164 additions & 2 deletions go.sum

Large diffs are not rendered by default.

132 changes: 2 additions & 130 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,145 +3,17 @@ package main
import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/k3s-io/kine/pkg/endpoint"
"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/signals"
"github.com/k3s-io/kine/pkg/version"
"github.com/k3s-io/kine/pkg/app"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

var (
config endpoint.Config
metricsConfig metrics.Config
)

func main() {
app := cli.NewApp()
app.Name = "kine"
app.Usage = "Minimal etcd v3 API to support custom Kubernetes storage engines"
app.Version = fmt.Sprintf("%s (%s)", version.Version, version.GitCommit)
app.Flags = []cli.Flag{
&cli.StringFlag{
Name: "listen-address",
Value: "0.0.0.0:2379",
Destination: &config.Listener,
},
&cli.StringFlag{
Name: "endpoint",
Usage: "Storage endpoint (default is sqlite)",
Destination: &config.Endpoint,
},
&cli.StringFlag{
Name: "ca-file",
Usage: "CA cert for DB connection",
Destination: &config.BackendTLSConfig.CAFile,
},
&cli.StringFlag{
Name: "cert-file",
Usage: "Certificate for DB connection",
Destination: &config.BackendTLSConfig.CertFile,
},
&cli.StringFlag{
Name: "key-file",
Usage: "Key file for DB connection",
Destination: &config.BackendTLSConfig.KeyFile,
},
&cli.BoolFlag{
Name: "skip-verify",
Usage: "Whether the TLS client should verify the server certificate.",
Destination: &config.BackendTLSConfig.SkipVerify,
Value: false,
},
&cli.StringFlag{
Name: "metrics-bind-address",
Usage: "The address the metric endpoint binds to. Default :8080, set 0 to disable metrics serving.",
Destination: &metricsConfig.ServerAddress,
Value: ":8080",
},
&cli.StringFlag{
Name: "server-cert-file",
Usage: "Certificate for etcd connection",
Destination: &config.ServerTLSConfig.CertFile,
},
&cli.StringFlag{
Name: "server-key-file",
Usage: "Key file for etcd connection",
Destination: &config.ServerTLSConfig.KeyFile,
},
&cli.IntFlag{
Name: "datastore-max-idle-connections",
Usage: "Maximum number of idle connections retained by datastore. If value = 0, the system default will be used. If value < 0, idle connections will not be reused.",
Destination: &config.ConnectionPoolConfig.MaxIdle,
Value: 0,
},
&cli.IntFlag{
Name: "datastore-max-open-connections",
Usage: "Maximum number of open connections used by datastore. If value <= 0, then there is no limit",
Destination: &config.ConnectionPoolConfig.MaxOpen,
Value: 0,
},
&cli.DurationFlag{
Name: "datastore-connection-max-lifetime",
Usage: "Maximum amount of time a connection may be reused. If value <= 0, then there is no limit.",
Destination: &config.ConnectionPoolConfig.MaxLifetime,
Value: 0,
},
&cli.DurationFlag{
Name: "slow-sql-threshold",
Usage: "The duration which SQL executed longer than will be logged. Default 1s, set <= 0 to disable slow SQL log.",
Destination: &metrics.SlowSQLThreshold,
Value: time.Second,
},
&cli.BoolFlag{
Name: "metrics-enable-profiling",
Usage: "Enable net/http/pprof handlers on the metrics bind address. Default is false.",
Destination: &metricsConfig.EnableProfiling,
},
&cli.DurationFlag{
Name: "watch-progress-notify-interval",
Usage: "Interval between periodic watch progress notifications. Default is 5s to ensure support for watch progress notifications.",
Destination: &config.NotifyInterval,
Value: time.Second * 5,
},
&cli.StringFlag{
Name: "emulated-etcd-version",
Usage: "The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate support for watch progress notifications.",
Destination: &config.EmulatedETCDVersion,
Value: "3.5.13",
},
&cli.BoolFlag{Name: "debug"},
}
app.Action = run

app := app.New()
if err := app.Run(os.Args); err != nil {
if !errors.Is(err, context.Canceled) {
logrus.Fatal(err)
}
}
}

func run(c *cli.Context) error {
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: time.RFC3339Nano,
})
if c.Bool("debug") {
logrus.SetLevel(logrus.TraceLevel)
}
ctx := signals.SetupSignalContext()

metricsConfig.ServerTLSConfig = config.ServerTLSConfig
go metrics.Serve(ctx, metricsConfig)
config.MetricsRegisterer = metrics.Registry
_, err := endpoint.Listen(ctx, config)
if err != nil {
return err
}
<-ctx.Done()
return ctx.Err()
}
139 changes: 139 additions & 0 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package app

import (
"fmt"
"time"

"github.com/k3s-io/kine/pkg/endpoint"
"github.com/k3s-io/kine/pkg/metrics"
"github.com/k3s-io/kine/pkg/signals"
"github.com/k3s-io/kine/pkg/version"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

var (
config endpoint.Config
metricsConfig metrics.Config
)

func New() *cli.App {
app := cli.NewApp()
app.Name = "kine"
app.Usage = "Minimal etcd v3 API to support custom Kubernetes storage engines"
app.Version = fmt.Sprintf("%s (%s)", version.Version, version.GitCommit)
app.Flags = []cli.Flag{
&cli.StringFlag{
Name: "listen-address",
Value: "0.0.0.0:2379",
Destination: &config.Listener,
},
&cli.StringFlag{
Name: "endpoint",
Usage: "Storage endpoint (default is sqlite)",
Destination: &config.Endpoint,
},
&cli.StringFlag{
Name: "ca-file",
Usage: "CA cert for DB connection",
Destination: &config.BackendTLSConfig.CAFile,
},
&cli.StringFlag{
Name: "cert-file",
Usage: "Certificate for DB connection",
Destination: &config.BackendTLSConfig.CertFile,
},
&cli.StringFlag{
Name: "key-file",
Usage: "Key file for DB connection",
Destination: &config.BackendTLSConfig.KeyFile,
},
&cli.BoolFlag{
Name: "skip-verify",
Usage: "Whether the TLS client should verify the server certificate.",
Destination: &config.BackendTLSConfig.SkipVerify,
Value: false,
},
&cli.StringFlag{
Name: "metrics-bind-address",
Usage: "The address the metric endpoint binds to. Default :8080, set 0 to disable metrics serving.",
Destination: &metricsConfig.ServerAddress,
Value: ":8080",
},
&cli.StringFlag{
Name: "server-cert-file",
Usage: "Certificate for etcd connection",
Destination: &config.ServerTLSConfig.CertFile,
},
&cli.StringFlag{
Name: "server-key-file",
Usage: "Key file for etcd connection",
Destination: &config.ServerTLSConfig.KeyFile,
},
&cli.IntFlag{
Name: "datastore-max-idle-connections",
Usage: "Maximum number of idle connections retained by datastore. If value = 0, the system default will be used. If value < 0, idle connections will not be reused.",
Destination: &config.ConnectionPoolConfig.MaxIdle,
Value: 0,
},
&cli.IntFlag{
Name: "datastore-max-open-connections",
Usage: "Maximum number of open connections used by datastore. If value <= 0, then there is no limit",
Destination: &config.ConnectionPoolConfig.MaxOpen,
Value: 0,
},
&cli.DurationFlag{
Name: "datastore-connection-max-lifetime",
Usage: "Maximum amount of time a connection may be reused. If value <= 0, then there is no limit.",
Destination: &config.ConnectionPoolConfig.MaxLifetime,
Value: 0,
},
&cli.DurationFlag{
Name: "slow-sql-threshold",
Usage: "The duration which SQL executed longer than will be logged. Default 1s, set <= 0 to disable slow SQL log.",
Destination: &metrics.SlowSQLThreshold,
Value: time.Second,
},
&cli.BoolFlag{
Name: "metrics-enable-profiling",
Usage: "Enable net/http/pprof handlers on the metrics bind address. Default is false.",
Destination: &metricsConfig.EnableProfiling,
},
&cli.DurationFlag{
Name: "watch-progress-notify-interval",
Usage: "Interval between periodic watch progress notifications. Default is 5s to ensure support for watch progress notifications.",
Destination: &config.NotifyInterval,
Value: time.Second * 5,
},
&cli.StringFlag{
Name: "emulated-etcd-version",
Usage: "The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate support for watch progress notifications.",
Destination: &config.EmulatedETCDVersion,
Value: "3.5.13",
},
&cli.BoolFlag{Name: "debug"},
}
app.Action = run
return app
}

func run(c *cli.Context) error {
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: time.RFC3339Nano,
})
if c.Bool("debug") {
logrus.SetLevel(logrus.TraceLevel)
}
ctx := signals.SetupSignalContext()

metricsConfig.ServerTLSConfig = config.ServerTLSConfig
go metrics.Serve(ctx, metricsConfig)
config.MetricsRegisterer = metrics.Registry
_, err := endpoint.Listen(ctx, config)
if err != nil {
return err
}
<-ctx.Done()
return ctx.Err()
}
16 changes: 16 additions & 0 deletions pkg/drivers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package drivers

import (
"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/tls"
"github.com/prometheus/client_golang/prometheus"
)

type Config struct {
MetricsRegisterer prometheus.Registerer
Endpoint string
Scheme string
DataSourceName string
ConnectionPoolConfig generic.ConnectionPoolConfig
BackendTLSConfig tls.Config
}
11 changes: 7 additions & 4 deletions pkg/drivers/dqlite/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/driver"
"github.com/k3s-io/kine/pkg/drivers"
"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/drivers/sqlite"
"github.com/k3s-io/kine/pkg/server"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand All @@ -29,6 +29,7 @@ var (
)

func init() {
generic.RegisterDriver("dqlite", New)
// We assume SQLite will be used multi-threaded
if err := dqlite.ConfigMultiThread(); err != nil {
panic(errors.Wrap(err, "failed to set dqlite multithreaded mode"))
Expand Down Expand Up @@ -69,7 +70,8 @@ outer:
return nil
}

func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error) {
dataSourceName = cfg.Address
opts, err := parseOpts(datasourceName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -98,7 +100,8 @@ func New(ctx context.Context, datasourceName string, connPoolConfig generic.Conn
}

sql.Register("dqlite", d)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig, metricsRegisterer)
cfg.Address = opts.dsn
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", cfg)
if err != nil {
return nil, errors.Wrap(err, "sqlite client")
}
Expand All @@ -120,7 +123,7 @@ func New(ctx context.Context, datasourceName string, connPoolConfig generic.Conn
return err
}

return backend, nil
return true, backend, nil
}

func migrate(ctx context.Context, newDB *sql.DB) (exitErr error) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/drivers/dqlite/no_dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
"context"
"errors"

"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/drivers"
"github.com/k3s-io/kine/pkg/server"
"github.com/prometheus/client_golang/prometheus"
)

func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
return nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`)
func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error) {
return false, nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`)
}

func init() {
drivers.Register("dqlite", New)
}
Loading

0 comments on commit 5594b15

Please sign in to comment.