Skip to content

Commit

Permalink
Merge pull request #15 from ibuildthecloud/master
Browse files Browse the repository at this point in the history
Update to k8s v1.17 and go.etcd.io package
  • Loading branch information
ibuildthecloud committed Dec 13, 2019
2 parents 35aa313 + c395d22 commit 927503d
Show file tree
Hide file tree
Showing 20 changed files with 344 additions and 119 deletions.
23 changes: 3 additions & 20 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,13 @@ go 1.12
require (
github.com/Rican7/retry v0.1.0
github.com/canonical/go-dqlite v1.1.0
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/go-sql-driver/mysql v1.4.1
github.com/google/btree v1.0.0 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.11.2 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/lib/pq v1.1.1
github.com/mattn/go-sqlite3 v1.10.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0 // indirect
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect
github.com/rancher/wrangler v0.0.0-20190512193419-40fa298578b9
github.com/rancher/wrangler v0.4.0
github.com/sirupsen/logrus v1.4.2
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/urfave/cli v1.21.0
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.3 // indirect
google.golang.org/grpc v1.20.1
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
google.golang.org/grpc v1.23.1
)
310 changes: 250 additions & 60 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"fmt"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/rancher/kine/pkg/endpoint"
"go.etcd.io/etcd/clientv3"
)

type Value struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/drivers/dqlite/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,28 @@ func New(ctx context.Context, datasourceName string) (server.Backend, error) {
if opts.peerFile != "" {
nodeStore, err = client.DefaultNodeStore(opts.peerFile)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "opening peerfile")
}
} else {
nodeStore = client.NewInmemNodeStore()
}

if err := AddPeers(ctx, nodeStore, opts.peers...); err != nil {
return nil, err
return nil, errors.Wrap(err, "add peers")
}

d, err := driver.New(nodeStore,
driver.WithLogFunc(Logger),
driver.WithContext(ctx),
driver.WithDialFunc(Dialer))
if err != nil {
return nil, err
return nil, errors.Wrap(err, "new dqlite driver")
}

sql.Register("dqlite", d)
backend, generic, err := sqlite.NewVariant("dqlite", opts.dsn)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "sqlite client")
}
if err := migrate(ctx, generic.DB); err != nil {
return nil, errors.Wrap(err, "failed to migrate DB from sqlite")
Expand Down
34 changes: 32 additions & 2 deletions pkg/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,42 @@ func (d *Generic) Migrate(ctx context.Context) {
}
}

func Open(driverName, dataSourceName string, paramCharacter string, numbered bool) (*Generic, error) {
func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
return nil, err
}

for i := 0; i < 3; i++ {
if err := db.Ping(); err != nil {
db.Close()
return nil, err
}
}

return db, nil
}

func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter string, numbered bool) (*Generic, error) {
var (
db *sql.DB
err error
)

for i := 0; i < 300; i++ {
db, err = openAndTest(driverName, dataSourceName)
if err == nil {
break
}

logrus.Errorf("failed to ping connection: %v", err)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(time.Second):
}
}

return &Generic{
DB: db,

Expand Down Expand Up @@ -178,7 +208,7 @@ func Open(driverName, dataSourceName string, paramCharacter string, numbered boo

FillSQL: q(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value)
values(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered),
}, nil
}, err
}

func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/drivers/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
createDB = "create database if not exists "
)

func New(dataSourceName string, tlsInfo tls.Config) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server.Backend, error) {
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
Expand All @@ -58,7 +58,7 @@ func New(dataSourceName string, tlsInfo tls.Config) (server.Backend, error) {
return nil, err
}

dialect, err := generic.Open("mysql", parsedDSN, "?", false)
dialect, err := generic.Open(ctx, "mysql", parsedDSN, "?", false)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/drivers/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
createDB = "create database "
)

func New(dataSourceName string, tlsInfo tls.Config) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server.Backend, error) {
parsedDSN, err := prepareDSN(dataSourceName, tlsInfo)
if err != nil {
return nil, err
Expand All @@ -50,7 +50,7 @@ func New(dataSourceName string, tlsInfo tls.Config) (server.Backend, error) {
return nil, err
}

dialect, err := generic.Open("postgres", parsedDSN, "$", true)
dialect, err := generic.Open(ctx, "postgres", parsedDSN, "$", true)
if err != nil {
return nil, err
}
Expand Down
33 changes: 27 additions & 6 deletions pkg/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"database/sql"
"os"
"time"

"github.com/mattn/go-sqlite3"
"github.com/pkg/errors"
"github.com/rancher/kine/pkg/drivers/generic"
"github.com/rancher/kine/pkg/logstructured"
"github.com/rancher/kine/pkg/logstructured/sqllog"
"github.com/rancher/kine/pkg/server"
"github.com/sirupsen/logrus"

// sqlite db driver
_ "github.com/mattn/go-sqlite3"
Expand All @@ -34,20 +37,20 @@ var (
}
)

func New(dataSourceName string) (server.Backend, error) {
backend, _, err := NewVariant("sqlite3", dataSourceName)
func New(ctx context.Context, dataSourceName string) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName)
return backend, err
}

func NewVariant(driverName, dataSourceName string) (server.Backend, *generic.Generic, error) {
func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.Backend, *generic.Generic, error) {
if dataSourceName == "" {
if err := os.MkdirAll("./db", 0700); err != nil {
return nil, nil, err
}
dataSourceName = "./db/state.db?_journal=WAL&cache=shared"
}

dialect, err := generic.Open(driverName, dataSourceName, "?", false)
dialect, err := generic.Open(ctx, driverName, dataSourceName, "?", false)
if err != nil {
return nil, nil, err
}
Expand All @@ -59,9 +62,27 @@ func NewVariant(driverName, dataSourceName string) (server.Backend, *generic.Gen
return err
}

if err := setup(dialect.DB); err != nil {
return nil, nil, err
// this is the first SQL that will be executed on a new DB conn so
// loop on failure here because in the case of dqlite it could still be initializing
for i := 0; i < 300; i++ {
err = setup(dialect.DB)
if err == nil {
break
}
logrus.Errorf("failed to setup db: %v", err)
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-time.After(time.Second):
}
time.Sleep(time.Second)
}
if err != nil {
return nil, nil, errors.Wrap(err, "setup db")
}
//if err := setup(dialect.DB); err != nil {
// return nil, nil, errors.Wrap(err, "setup db")
//}

dialect.Migrate(context.Background())
return logstructured.New(sqllog.New(dialect)), dialect, nil
Expand Down
11 changes: 6 additions & 5 deletions pkg/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strings"

"github.com/pkg/errors"
"github.com/rancher/kine/pkg/drivers/dqlite"
"github.com/rancher/kine/pkg/drivers/mysql"
"github.com/rancher/kine/pkg/drivers/pgsql"
Expand Down Expand Up @@ -52,11 +53,11 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) {

leaderelect, backend, err := getKineStorageBackend(ctx, driver, dsn, config)
if err != nil {
return ETCDConfig{}, err
return ETCDConfig{}, errors.Wrap(err, "building kine")
}

if err := backend.Start(ctx); err != nil {
return ETCDConfig{}, err
return ETCDConfig{}, errors.Wrap(err, "starting kine backend")
}

listen := config.Listener
Expand Down Expand Up @@ -123,13 +124,13 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config)
switch driver {
case SQLiteBackend:
leaderElect = false
backend, err = sqlite.New(dsn)
backend, err = sqlite.New(ctx, dsn)
case DQLiteBackend:
backend, err = dqlite.New(ctx, dsn)
case PostgresBackend:
backend, err = pgsql.New(dsn, cfg.Config)
backend, err = pgsql.New(ctx, dsn, cfg.Config)
case MySQLBackend:
backend, err = mysql.New(dsn, cfg.Config)
backend, err = mysql.New(ctx, dsn, cfg.Config)
default:
return false, nil, fmt.Errorf("storage backend is not defined")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package server
import (
"context"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
)

func isCompact(txn *etcdserverpb.TxnRequest) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package server
import (
"context"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
)

func isCreate(txn *etcdserverpb.TxnRequest) *etcdserverpb.PutRequest {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package server
import (
"context"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
)

func isDelete(txn *etcdserverpb.TxnRequest) (int64, string, bool) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
)

func (l *LimitedServer) get(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
)

func (s *KVServerBridge) LeaseGrant(ctx context.Context, req *etcdserverpb.LeaseGrantRequest) (*etcdserverpb.LeaseGrantResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/limited.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
)

type LimitedServer struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"strings"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
)

func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"fmt"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/mvccpb"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package server
import (
"context"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
)

func isUpdate(txn *etcdserverpb.TxnRequest) (int64, string, []byte, int64, bool) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"sync"
"sync/atomic"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/mvccpb"
)

var (
Expand Down
8 changes: 4 additions & 4 deletions pkg/tls/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package tls
import (
"crypto/tls"

"github.com/coreos/etcd/pkg/transport"
"go.etcd.io/etcd/pkg/transport"
)

type Config struct {
Expand All @@ -18,9 +18,9 @@ func (c Config) ClientConfig() (*tls.Config, error) {
}

info := &transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
CAFile: c.CAFile,
CertFile: c.CertFile,
KeyFile: c.KeyFile,
TrustedCAFile: c.CAFile,
}
tlsConfig, err := info.ClientConfig()
if err != nil {
Expand Down

0 comments on commit 927503d

Please sign in to comment.