Skip to content

Commit

Permalink
Store extra metadata and cluster ID for snapshots
Browse files Browse the repository at this point in the history
Write the extra metadata both locally and to S3. These files are placed such that they will not be used by older versions of K3s that do not make use of them.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Oct 4, 2023
1 parent 274a072 commit cc6b50c
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 251 deletions.
177 changes: 70 additions & 107 deletions pkg/cli/etcdsnapshot/etcd_snapshot.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package etcdsnapshot

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -12,8 +13,7 @@ import (

"github.com/erikdubbelboer/gspt"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/cluster"
"github.com/k3s-io/k3s/pkg/daemons/config"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/etcd"
"github.com/k3s-io/k3s/pkg/server"
util2 "github.com/k3s-io/k3s/pkg/util"
Expand All @@ -22,16 +22,22 @@ import (
"gopkg.in/yaml.v2"
)

type etcdCommand struct {
etcd *etcd.ETCD
ctx context.Context
}

// commandSetup setups up common things needed
// for each etcd command.
func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) error {
func commandSetup(app *cli.Context, cfg *cmds.Server, config *server.Config) (*etcdCommand, error) {
ctx := signals.SetupSignalContext()
gspt.SetProcTitle(os.Args[0])

nodeName := app.String("node-name")
if nodeName == "" {
h, err := os.Hostname()
if err != nil {
return err
return nil, err
}
nodeName = h
}
Expand All @@ -40,33 +46,53 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) error {

dataDir, err := server.ResolveDataDir(cfg.DataDir)
if err != nil {
return err
return nil, err
}

config.DisableAgent = true
config.ControlConfig.DataDir = dataDir
config.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
config.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir
config.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress
config.ControlConfig.EtcdListFormat = strings.ToLower(cfg.EtcdListFormat)
config.ControlConfig.EtcdS3 = cfg.EtcdS3
config.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint
config.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA
config.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify
config.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey
config.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey
config.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName
config.ControlConfig.EtcdS3Region = cfg.EtcdS3Region
config.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder
config.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure
config.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout
config.ControlConfig.Runtime = daemonconfig.NewRuntime(nil)
config.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt")
config.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt")
config.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key")
config.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig")

e := etcd.NewETCD()
if err := e.SetControlConfig(&config.ControlConfig); err != nil {
return nil, err
}

initialized, err := e.IsInitialized()
if err != nil {
return nil, err
}
if !initialized {
return nil, fmt.Errorf("etcd database not found in %s", config.ControlConfig.DataDir)
}

sc.DisableAgent = true
sc.ControlConfig.DataDir = dataDir
sc.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
sc.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir
sc.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress
sc.ControlConfig.EtcdListFormat = strings.ToLower(cfg.EtcdListFormat)
sc.ControlConfig.EtcdS3 = cfg.EtcdS3
sc.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint
sc.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA
sc.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify
sc.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey
sc.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey
sc.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName
sc.ControlConfig.EtcdS3Region = cfg.EtcdS3Region
sc.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder
sc.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure
sc.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout
sc.ControlConfig.Runtime = config.NewRuntime(nil)
sc.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt")
sc.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt")
sc.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key")
sc.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig")
sc, err := server.NewContext(ctx, config.ControlConfig.Runtime.KubeConfigAdmin, false)
if err != nil {
return nil, err
}
config.ControlConfig.Runtime.K3s = sc.K3s
config.ControlConfig.Runtime.Core = sc.Core

return nil
return &etcdCommand{etcd: e, ctx: ctx}, nil
}

// Save triggers an on-demand etcd snapshot operation
Expand All @@ -80,43 +106,18 @@ func Save(app *cli.Context) error {
func save(app *cli.Context, cfg *cmds.Server) error {
var serverConfig server.Config

if err := commandSetup(app, cfg, &serverConfig); err != nil {
return err
}

if len(app.Args()) > 0 {
return util2.ErrCommandNoArgs
}

serverConfig.ControlConfig.EtcdSnapshotRetention = 0 // disable retention check

ctx := signals.SetupSignalContext()
e := etcd.NewETCD()
if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil {
return err
}

initialized, err := e.IsInitialized()
ec, err := commandSetup(app, cfg, &serverConfig)
if err != nil {
return err
}
if !initialized {
return fmt.Errorf("etcd database not found in %s", serverConfig.ControlConfig.DataDir)
}

cluster := cluster.New(&serverConfig.ControlConfig)

if err := cluster.Bootstrap(ctx, true); err != nil {
return err
}

sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin, false)
if err != nil {
return err
}
serverConfig.ControlConfig.Runtime.Core = sc.Core
serverConfig.ControlConfig.EtcdSnapshotRetention = 0 // disable retention check

return cluster.Snapshot(ctx, &serverConfig.ControlConfig)
return ec.etcd.Snapshot(ec.ctx)
}

func Delete(app *cli.Context) error {
Expand All @@ -129,7 +130,8 @@ func Delete(app *cli.Context) error {
func delete(app *cli.Context, cfg *cmds.Server) error {
var serverConfig server.Config

if err := commandSetup(app, cfg, &serverConfig); err != nil {
ec, err := commandSetup(app, cfg, &serverConfig)
if err != nil {
return err
}

Expand All @@ -138,19 +140,7 @@ func delete(app *cli.Context, cfg *cmds.Server) error {
return errors.New("no snapshots given for removal")
}

ctx := signals.SetupSignalContext()
e := etcd.NewETCD()
if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil {
return err
}

sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin, false)
if err != nil {
return err
}
serverConfig.ControlConfig.Runtime.Core = sc.Core

return e.DeleteSnapshots(ctx, app.Args())
return ec.etcd.DeleteSnapshots(ec.ctx, app.Args())
}

func List(app *cli.Context) error {
Expand All @@ -160,7 +150,7 @@ func List(app *cli.Context) error {
return list(app, &cmds.ServerConfig)
}

var etcdListFormats = []string{"json", "yaml"}
var etcdListFormats = []string{"json", "yaml", "table"}

func validEtcdListFormat(format string) bool {
for _, supportedFormat := range etcdListFormats {
Expand All @@ -174,17 +164,12 @@ func validEtcdListFormat(format string) bool {
func list(app *cli.Context, cfg *cmds.Server) error {
var serverConfig server.Config

if err := commandSetup(app, cfg, &serverConfig); err != nil {
return err
}

ctx := signals.SetupSignalContext()
e := etcd.NewETCD()
if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil {
ec, err := commandSetup(app, cfg, &serverConfig)
if err != nil {
return err
}

sf, err := e.ListSnapshots(ctx)
sf, err := ec.etcd.ListSnapshots(ec.ctx)
if err != nil {
return err
}
Expand All @@ -208,20 +193,9 @@ func list(app *cli.Context, cfg *cmds.Server) error {
w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0)
defer w.Flush()

if cfg.EtcdS3 {
fmt.Fprint(w, "Name\tSize\tCreated\n")
for _, s := range sf {
if s.NodeName == "s3" {
fmt.Fprintf(w, "%s\t%d\t%s\n", s.Name, s.Size, s.CreatedAt.Format(time.RFC3339))
}
}
} else {
fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n")
for _, s := range sf {
if s.NodeName != "s3" {
fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339))
}
}
fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n")
for _, s := range sf {
fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339))
}
}

Expand All @@ -238,23 +212,12 @@ func Prune(app *cli.Context) error {
func prune(app *cli.Context, cfg *cmds.Server) error {
var serverConfig server.Config

if err := commandSetup(app, cfg, &serverConfig); err != nil {
ec, err := commandSetup(app, cfg, &serverConfig)
if err != nil {
return err
}

serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention

ctx := signals.SetupSignalContext()
e := etcd.NewETCD()
if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil {
return err
}

sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin, false)
if err != nil {
return err
}
serverConfig.ControlConfig.Runtime.Core = sc.Core

return e.PruneSnapshots(ctx)
return ec.etcd.PruneSnapshots(ec.ctx)
}
9 changes: 0 additions & 9 deletions pkg/cluster/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,15 +424,6 @@ func (c *Cluster) bootstrap(ctx context.Context) error {
return c.storageBootstrap(ctx)
}

// Snapshot is a proxy method to call the snapshot method on the managedb
// interface for etcd clusters.
func (c *Cluster) Snapshot(ctx context.Context, config *config.Control) error {
if c.managedDB == nil {
return errors.New("unable to perform etcd snapshot on non-etcd system")
}
return c.managedDB.Snapshot(ctx)
}

// compareConfig verifies that the config of the joining control plane node coincides with the cluster's config
func (c *Cluster) compareConfig() error {
token := c.config.AgentToken
Expand Down
47 changes: 0 additions & 47 deletions pkg/cluster/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,50 +197,3 @@ func TestCluster_migrateBootstrapData(t *testing.T) {
})
}
}

func TestCluster_Snapshot(t *testing.T) {
type fields struct {
clientAccessInfo *clientaccess.Info
config *config.Control
managedDB managed.Driver
joining bool
storageStarted bool
saveBootstrap bool
shouldBootstrap bool
}
type args struct {
ctx context.Context
config *config.Control
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "Fail on non etcd cluster",
fields: fields{},
args: args{
ctx: context.Background(),
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Cluster{
clientAccessInfo: tt.fields.clientAccessInfo,
config: tt.fields.config,
managedDB: tt.fields.managedDB,
joining: tt.fields.joining,
storageStarted: tt.fields.storageStarted,
saveBootstrap: tt.fields.saveBootstrap,
shouldBootstrap: tt.fields.shouldBootstrap,
}
if err := c.Snapshot(tt.args.ctx, tt.args.config); (err != nil) != tt.wantErr {
t.Errorf("Cluster.Snapshot() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/daemons/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/kine/pkg/endpoint"
"github.com/rancher/wrangler/v2/pkg/generated/controllers/core"
Expand Down Expand Up @@ -343,6 +344,7 @@ type ControlRuntime struct {
ClientETCDCert string
ClientETCDKey string

K3s *k3s.Factory
Core *core.Factory
Event record.EventRecorder
EtcdConfig endpoint.ETCDConfig
Expand Down
Loading

0 comments on commit cc6b50c

Please sign in to comment.