diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index e1c88806d0e3..58e18a07a9ac 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -576,6 +576,7 @@ func (e *ETCD) Register(handler http.Handler) (http.Handler, error) { e.config.Runtime.LeaderElectedClusterControllerStarts[version.Program+"-etcd"] = func(ctx context.Context) { registerEndpointsHandlers(ctx, e) registerMemberHandlers(ctx, e) + registerSnapshotHandlers(ctx, e) } } diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index 0721b22aed27..084cb915a9a6 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -143,6 +143,7 @@ func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.Conf }, Compressed: strings.HasSuffix(snapshot, compressedExtension), metadataSource: extraMetadata, + nodeSource: s.nodeName, } uploadInfo, err := s.uploadSnapshot(ctx, snapshotKey, snapshot) @@ -394,6 +395,7 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) }, Status: successfulSnapshotStatus, Compressed: compressed, + nodeSource: obj.UserMetadata[nodeNameKey], } sfKey := generateSnapshotName(sf) snapshots[sfKey] = sf diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index 888a5d6398fb..74239c0f217b 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -10,6 +10,7 @@ import ( "math/rand" "net/http" "os" + "path" "path/filepath" "runtime" "sort" @@ -17,7 +18,9 @@ import ( "strings" "time" + apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" "github.com/minio/minio-go/v7" "github.com/pkg/errors" @@ -31,9 +34,12 @@ import ( "golang.org/x/sync/semaphore" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" + "k8s.io/utils/pointer" ) const ( @@ -45,7 +51,7 @@ const ( var ( snapshotExtraMetadataConfigMapName = version.Program + "-etcd-snapshot-extra-metadata" - snapshotConfigMapName = version.Program + "-etcd-snapshots" + labelStorageNode = "etcd." + version.Program + ".cattle.io/snapshot-storage-node" // snapshotDataBackoff will retry at increasing steps for up to ~30 seconds. // If the ConfigMap update fails, the list won't be reconciled again until next time @@ -171,7 +177,7 @@ func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, err defer ss.Close() if _, err := io.Copy(decompressed, ss); err != nil { - os.Remove("") + os.Remove(decompressed.Name()) return "", err } } @@ -271,7 +277,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } logrus.Errorf("Failed to take etcd snapshot: %v", err) if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to save local snapshot failure data to configmap") + return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } } @@ -311,7 +317,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to save local snapshot data to configmap") + return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { @@ -353,7 +359,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } } if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to save snapshot data to configmap") + return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } if err := e.s3.snapshotRetention(ctx); err != nil { logrus.Errorf("Failed to apply s3 snapshot retention policy: %v", err) @@ -398,7 +404,10 @@ type snapshotFile struct { S3 *s3Config `json:"s3Config,omitempty"` Compressed bool `json:"compressed"` + // these fields are used for the internal representation of the snapshot + // to populate other fields before serialization to the legacy configmap. metadataSource *v1.ConfigMap `json:"-"` + nodeSource string `json:"-"` } // listLocalSnapshots provides a list of the currently stored @@ -542,7 +551,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { } if e.config.EtcdS3 { - if err := e.s3.deleteSnapshot(s); err != nil { + if err := e.s3.deleteSnapshot(ctx, s); err != nil { if isNotExist(err) { logrus.Infof("Snapshot %s not found in S3", s) } else { @@ -583,61 +592,47 @@ func marshalSnapshotFile(sf snapshotFile) ([]byte, error) { return json.Marshal(sf) } -// AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata -// available at the time. This is primarily necessary to record failures, as successful snapshots will have a file on disk -// or S3 that will be found when reconciling. +// addSnapshotData syncs an internal snapshotFile representation to an ETCDSnapshotFile resource +// of the same name. Resources will be created or updated as necessary. func (e *ETCD) addSnapshotData(sf snapshotFile) error { - // make sure the core.Factory is initialized. There can - // be a race between this core code startup. - for e.config.Runtime.Core == nil { + // make sure the K3s factory is initialized. + for e.config.Runtime.K3s == nil { runtime.Gosched() } sfKey := generateSnapshotName(sf) - marshalledSnapshotFile, err := marshalSnapshotFile(sf) - if err != nil { - return err - } + esf := apisv1.NewETCDSnapshotFile(metav1.NamespaceNone, sfKey, apisv1.ETCDSnapshotFile{}) - pruneCount := pruneStepSize - var lastErr error return retry.OnError(snapshotDataBackoff, func(err error) bool { - return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err) - }, func() error { - snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) - - if apierrors.IsNotFound(getErr) { - cm := v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: snapshotConfigMapName, - Namespace: metav1.NamespaceSystem, - }, - Data: map[string]string{sfKey: string(marshalledSnapshotFile)}, + return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) + }, func() (err error) { + sf.toETCDSnapshotFile(esf) + if esf.CreationTimestamp.IsZero() { + esf, err = e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile().Create(esf) + // Only emit an event for the snapshot when creating the resource + if err == nil { + e.emitEvent(esf) } - _, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(&cm) - return err - } - - if snapshotConfigMap.Data == nil { - snapshotConfigMap.Data = make(map[string]string) + } else { + esf, err = e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile().Update(esf) } + return err + }) +} - // If the configmap update was rejected due to size, drop the oldest entries from the map. - // We will continue to remove an increasing number of old snapshots from the map until the request succeeds, - // or the number we would attempt to remove exceeds the number stored. - if isTooLargeError(lastErr) { - logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount) - if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil { - return err - } - pruneCount += pruneStepSize +func (e *ETCD) emitEvent(esf *apisv1.ETCDSnapshotFile) { + if e.config.Runtime.Event == nil { + return + } + if esf.Status.Error != nil { + message := "Snapshot save failed on " + esf.Spec.NodeName + if esf.Status.Error.Message != nil { + message += ": " + *esf.Status.Error.Message } - - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshotFile) - - _, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) - return lastErr - }) + e.config.Runtime.Event.Event(esf, v1.EventTypeWarning, "ETCDSnapshotFailed", message) + } else { + e.config.Runtime.Event.Event(esf, v1.EventTypeNormal, "ETCDSnapshotCreated", "Snapshot saved on "+esf.Spec.NodeName) + } } // generateSnapshotName generates a derived name for the snapshot that is safe for use @@ -651,33 +646,7 @@ func generateSnapshotName(sf snapshotFile) string { return name.SafeConcatName(nodeName, snapshotName) } -// pruneConfigMap drops the oldest entries from the configMap. -// Note that the actual snapshot files are not removed, just the entries that track them in the configmap. -func pruneConfigMap(snapshotConfigMap *v1.ConfigMap, pruneCount int) error { - if pruneCount > len(snapshotConfigMap.Data) { - return errors.New("unable to reduce snapshot ConfigMap size by eliding old snapshots") - } - - var snapshotFiles []snapshotFile - retention := len(snapshotConfigMap.Data) - pruneCount - for name := range snapshotConfigMap.Data { - basename, compressed := strings.CutSuffix(name, compressedExtension) - ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) - snapshotFiles = append(snapshotFiles, snapshotFile{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed}) - } - - // sort newest-first so we can prune entries past the retention count - sort.Slice(snapshotFiles, func(i, j int) bool { - return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) - }) - - for _, snapshotFile := range snapshotFiles[retention:] { - delete(snapshotConfigMap.Data, snapshotFile.Name) - } - return nil -} - -// ReconcileSnapshotData reconciles snapshot data in the snapshot ConfigMap. +// ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources. // It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots // and reconcile snapshots from S3. func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { @@ -687,167 +656,120 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { runtime.Gosched() } - logrus.Infof("Reconciling etcd snapshot data in %s ConfigMap", snapshotConfigMapName) - defer logrus.Infof("Reconciliation of snapshot data in %s ConfigMap complete", snapshotConfigMapName) - - pruneCount := pruneStepSize - var lastErr error - return retry.OnError(retry.DefaultBackoff, func(err error) bool { - return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err) - }, func() error { - snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) - if apierrors.IsNotFound(getErr) { - cm := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: snapshotConfigMapName, - Namespace: metav1.NamespaceSystem, - }, - } - cm, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(cm) - if err != nil { - return err - } - snapshotConfigMap = cm - } - - logrus.Debugf("Attempting to reconcile etcd snapshot data for configmap generation %d", snapshotConfigMap.Generation) - if snapshotConfigMap.Data == nil { - snapshotConfigMap.Data = map[string]string{} - } - - snapshotFiles, err := e.listLocalSnapshots() - if err != nil { - return err - } + logrus.Infof("Reconciling ETCDSnapshotFile resources") + defer logrus.Infof("Reconciliation of ETCDSnapshotFile resources complete") - // s3ListSuccessful is set to true if we are successful at listing snapshots from S3 to eliminate accidental - // clobbering of S3 snapshots in the configmap due to misconfigured S3 credentials/details - var s3ListSuccessful bool + // Get snapshots from local filesystem + snapshotFiles, err := e.listLocalSnapshots() + if err != nil { + return err + } - if e.config.EtcdS3 { - if err := e.initS3IfNil(ctx); err != nil { - logrus.Warnf("Unable to initialize S3 client: %v", err) - return err - } + nodeNames := []string{os.Getenv("NODE_NAME")} - if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { - logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) - } else { - for k, v := range s3Snapshots { - snapshotFiles[k] = v - } - s3ListSuccessful = true - } + // Get snapshots from S3 + if e.config.EtcdS3 { + if err := e.initS3IfNil(ctx); err != nil { + return err } - nodeName := os.Getenv("NODE_NAME") - - // deletedSnapshots is a map[string]string where key is the configmap key and the value is the marshalled snapshot file - // it will be populated below with snapshots that are either from S3 or on the local node. Notably, deletedSnapshots will - // not contain snapshots that are in the "failed" status - deletedSnapshots := make(map[string]string) - // failedSnapshots is a slice of unmarshaled snapshot files sourced from the configmap - // These are stored unmarshaled so we can sort based on name. - var failedSnapshots []snapshotFile - var failedS3Snapshots []snapshotFile - - // remove entries for this node and s3 (if S3 is enabled) only - for k, v := range snapshotConfigMap.Data { - var sf snapshotFile - if err := json.Unmarshal([]byte(v), &sf); err != nil { - return err - } - if (sf.NodeName == nodeName || (sf.NodeName == "s3" && s3ListSuccessful)) && sf.Status != failedSnapshotStatus { - // Only delete the snapshot if the snapshot was not failed - // sf.Status != FailedSnapshotStatus is intentional, as it is possible we are reconciling snapshots stored from older versions that did not set status - deletedSnapshots[generateSnapshotName(sf)] = v // store a copy of the snapshot - delete(snapshotConfigMap.Data, k) - } else if sf.Status == failedSnapshotStatus && sf.NodeName == nodeName && e.config.EtcdSnapshotRetention >= 1 { - // Handle locally failed snapshots. - failedSnapshots = append(failedSnapshots, sf) - delete(snapshotConfigMap.Data, k) - } else if sf.Status == failedSnapshotStatus && e.config.EtcdS3 && sf.NodeName == "s3" && strings.HasPrefix(sf.Name, e.config.EtcdSnapshotName+"-"+nodeName) && e.config.EtcdSnapshotRetention >= 1 { - // If we're operating against S3, we can clean up failed S3 snapshots that failed on this node. - failedS3Snapshots = append(failedS3Snapshots, sf) - delete(snapshotConfigMap.Data, k) + if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { + logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) + } else { + for k, v := range s3Snapshots { + snapshotFiles[k] = v } + nodeNames = append(nodeNames, "s3") } + } - // Apply the failed snapshot retention policy to locally failed snapshots - if len(failedSnapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { - // sort newest-first so we can record only the retention count - sort.Slice(failedSnapshots, func(i, j int) bool { - return failedSnapshots[j].CreatedAt.Before(failedSnapshots[i].CreatedAt) - }) + // Try to load metadata from the legacy configmap, in case any local or s3 snapshots + // were created by an old release that does not write the metadata alongside the snapshot file. + snapshotConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } - for _, dfs := range failedSnapshots[:e.config.EtcdSnapshotRetention] { - sfKey := generateSnapshotName(dfs) - marshalledSnapshot, err := marshalSnapshotFile(dfs) + if snapshotConfigMap != nil { + for sfKey, sf := range snapshotFiles { + // if the configmap has data for this snapshot, and local metadata is empty, + // deserialize the value from the configmap and attempt to load it. + if cmSnapshotValue := snapshotConfigMap.Data[sfKey]; cmSnapshotValue != "" && sf.Metadata == "" && sf.metadataSource == nil { + b, err := base64.StdEncoding.DecodeString(cmSnapshotValue) if err != nil { - logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err) - } else { - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) + logrus.Warnf("Failed to decode configmap data for snapshot %s: %v", sfKey, err) + continue } + if err := json.Unmarshal(b, &sf); err != nil { + logrus.Warnf("Failed to unmarshal configmap data for snapshot %s: %v", sfKey, err) + continue + } + snapshotFiles[sfKey] = sf } } + } - // Apply the failed snapshot retention policy to the S3 snapshots - if len(failedS3Snapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { - // sort newest-first so we can record only the retention count - sort.Slice(failedS3Snapshots, func(i, j int) bool { - return failedS3Snapshots[j].CreatedAt.Before(failedS3Snapshots[i].CreatedAt) - }) + labelSelector := &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{ + Key: labelStorageNode, + Operator: metav1.LabelSelectorOpIn, + Values: nodeNames, + }}, + } - for _, dfs := range failedS3Snapshots[:e.config.EtcdSnapshotRetention] { - sfKey := generateSnapshotName(dfs) - marshalledSnapshot, err := marshalSnapshotFile(dfs) - if err != nil { - logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err) - } else { - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) - } - } - } + snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() + esfList, err := snapshots.List(metav1.ListOptions{LabelSelector: labelSelector.String()}) + if err != nil { + return err + } - // save the local entries to the ConfigMap if they are still on disk or in S3. - for _, snapshot := range snapshotFiles { - var sf snapshotFile - sfKey := generateSnapshotName(snapshot) - if v, ok := deletedSnapshots[sfKey]; ok { - // use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it - if err := json.Unmarshal([]byte(v), &sf); err != nil { - logrus.Errorf("Error unmarshaling snapshot file: %v", err) - // use the snapshot with info we sourced from disk/S3 (will be missing metadata, but something is better than nothing) - sf = snapshot - } - } else { - sf = snapshot + // If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync + // If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes + for _, esf := range esfList.Items { + if _, ok := snapshotFiles[esf.Name]; ok { + delete(snapshotFiles, esf.Name) + } else { + if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { + logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err) } + } + } - sf.Status = successfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful. - marshalledSnapshot, err := marshalSnapshotFile(sf) - if err != nil { - logrus.Warnf("Failed to marshal snapshot metadata %s to store in configmap, received error: %v", sf.Name, err) - } else { - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) - } + // Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created + for _, sf := range snapshotFiles { + if err := e.addSnapshotData(sf); err != nil { + logrus.Errorf("Failed to sync ETCDSnapshotFile: %v", err) } + } - // If the configmap update was rejected due to size, drop the oldest entries from the map. - // We will continue to remove an increasing number of old snapshots from the map until the request succeeds, - // or the number we would attempt to remove exceeds the number stored. - if isTooLargeError(lastErr) { - logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount) - if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil { - return err - } - pruneCount += pruneStepSize + // Get a list of all etcd nodes currently in the cluster + nodes := e.config.Runtime.Core.Core().V1().Node() + etcdSelector := labels.Set{util.ETCDRoleLabelKey: "true"} + nodeList, err := nodes.List(metav1.ListOptions{LabelSelector: etcdSelector.String()}) + if err != nil { + return err + } + + // List all snapshots in Kubernetes not stored on S3 or a current etcd node. + // These snapshots are local to a node that no longer runs etcd and cannot be restored. + // If the node rejoins later and has local snapshots, it will reconcile them itself. + labelSelector.MatchExpressions[0].Operator = metav1.LabelSelectorOpNotIn + labelSelector.MatchExpressions[0].Values = []string{"s3"} + for _, node := range nodeList.Items { + labelSelector.MatchExpressions[0].Values = append(labelSelector.MatchExpressions[0].Values, node.Name) + } + + esfList, err = snapshots.List(metav1.ListOptions{LabelSelector: labelSelector.String()}) + if err != nil { + return err + } + for _, esf := range esfList.Items { + if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { + logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err) } + } - logrus.Debugf("Updating snapshot ConfigMap (%s) with %d entries", snapshotConfigMapName, len(snapshotConfigMap.Data)) - _, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) - return lastErr - }) + return nil } // setSnapshotFunction schedules snapshots at the configured interval. @@ -871,7 +793,7 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) return nil } - logrus.Infof("Applying local snapshot retention policy: retention: %d, snapshotPrefix: %s, directory: %s", retention, snapshotPrefix, snapshotDir) + logrus.Infof("Applying snapshot retention=%d to local snapshots with prefix %s in %s", retention, snapshotPrefix, snapshotDir) var snapshotFiles []snapshotFile if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error { @@ -946,3 +868,105 @@ func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) erro } return os.WriteFile(metadataPath, m, 0700) } + +func (sf *snapshotFile) fromETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { + if esf.Status.ReadyToUse != nil && *esf.Status.ReadyToUse { + sf.Status = successfulSnapshotStatus + } else { + sf.Status = failedSnapshotStatus + } + if esf.Status.Size != nil { + sf.Size = esf.Status.Size.Value() + } + sf.Name = path.Base(esf.Spec.Location) + sf.Location = esf.Spec.Location + sf.CreatedAt = esf.Status.CreationTime + sf.nodeSource = esf.Spec.NodeName + + if esf.Status.Error != nil { + sf.CreatedAt = esf.Status.Error.Time + message := "etcd snapshot failed" + if esf.Status.Error.Message != nil { + message = *esf.Status.Error.Message + } + sf.Message = base64.StdEncoding.EncodeToString([]byte(message)) + } + + if len(esf.Spec.Metadata) > 0 { + b, _ := json.Marshal(esf.Spec.Metadata) + sf.Metadata = base64.StdEncoding.EncodeToString(b) + } + + if esf.Spec.S3 == nil { + sf.NodeName = esf.Spec.NodeName + } else { + sf.NodeName = "s3" + sf.S3 = &s3Config{ + Endpoint: esf.Spec.S3.Endpoint, + EndpointCA: esf.Spec.S3.EndpointCA, + SkipSSLVerify: esf.Spec.S3.SkipSSLVerify, + Bucket: esf.Spec.S3.Bucket, + Region: esf.Spec.S3.Region, + Folder: esf.Spec.S3.Prefix, + Insecure: esf.Spec.S3.Insecure, + } + } +} + +func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { + esf.Status.ReadyToUse = pointer.Bool(sf.Status == successfulSnapshotStatus) + esf.Status.Size = resource.NewQuantity(sf.Size, resource.DecimalSI) + esf.Spec.Location = sf.Location + if sf.nodeSource != "" { + esf.Spec.NodeName = sf.nodeSource + } else { + esf.Spec.NodeName = sf.NodeName + } + + if sf.Message == "" { + esf.Status.CreationTime = sf.CreatedAt + } else { + message, err := base64.StdEncoding.DecodeString(sf.Message) + if err != nil { + logrus.Warnf("Failed to decode error message for %s: %v", esf.Name, err) + } else { + esf.Status.Error = &apisv1.ETCDSnapshotError{ + Time: sf.CreatedAt, + Message: pointer.String(string(message)), + } + } + } + + if sf.metadataSource != nil { + esf.Spec.Metadata = sf.metadataSource.Data + } else if sf.Metadata != "" { + metadata, err := base64.StdEncoding.DecodeString(sf.Metadata) + if err != nil { + logrus.Warnf("Failed to decode metadata for %s: %v", esf.Name, err) + } else { + if err := json.Unmarshal(metadata, &esf.Spec.Metadata); err != nil { + logrus.Warnf("Failed to unmarshal metadata for %s: %v", esf.Name, err) + } + } + } + + if esf.ObjectMeta.Labels == nil { + esf.ObjectMeta.Labels = map[string]string{} + } + + if sf.S3 == nil { + esf.ObjectMeta.Labels[labelStorageNode] = esf.Spec.NodeName + } else { + esf.ObjectMeta.Labels[labelStorageNode] = "s3" + esf.Spec.S3 = &apisv1.ETCDSnapshotS3{ + Endpoint: sf.S3.Endpoint, + EndpointCA: sf.S3.EndpointCA, + SkipSSLVerify: sf.S3.SkipSSLVerify, + Bucket: sf.S3.Bucket, + Region: sf.S3.Region, + Prefix: sf.S3.Folder, + Insecure: sf.S3.Insecure, + } + } + +} diff --git a/pkg/etcd/snapshot_controller.go b/pkg/etcd/snapshot_controller.go new file mode 100644 index 000000000000..147d1ee11dc0 --- /dev/null +++ b/pkg/etcd/snapshot_controller.go @@ -0,0 +1,132 @@ +package etcd + +import ( + "context" + "sort" + "strconv" + "strings" + "time" + + apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" + controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1" + "github.com/k3s-io/k3s/pkg/version" + "github.com/pkg/errors" + controllerv1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/core/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/sirupsen/logrus" +) + +var ( + snapshotConfigMapName = version.Program + "-etcd-snapshots" +) + +type etcdSnapshotHandler struct { + ctx context.Context + etcd *ETCD + snapshots controllersv1.ETCDSnapshotFileController + configmaps controllerv1.ConfigMapController +} + +func registerSnapshotHandlers(ctx context.Context, etcd *ETCD) { + snapshots := etcd.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() + e := &etcdSnapshotHandler{ + ctx: ctx, + etcd: etcd, + snapshots: snapshots, + configmaps: etcd.config.Runtime.Core.Core().V1().ConfigMap(), + } + + logrus.Infof("Starting managed etcd snapshot configmap controller") + snapshots.OnChange(ctx, "managed-etcd-controller", e.sync) + snapshots.OnRemove(ctx, "managed-etcd-controller", e.onRemove) +} + +func (e *etcdSnapshotHandler) sync(key string, esf *apisv1.ETCDSnapshotFile) (*apisv1.ETCDSnapshotFile, error) { + snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + snapshotConfigMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: snapshotConfigMapName, + Namespace: metav1.NamespaceSystem, + }, + } + } else { + return esf, errors.Wrap(err, "failed to get snapshot configmap") + } + } + + sf := snapshotFile{} + sf.fromETCDSnapshotFile(esf) + m, err := marshalSnapshotFile(sf) + if err != nil { + return esf, errors.Wrap(err, "failed to marshal snapshot configmap data") + } + marshalledSnapshot := string(m) + + if snapshotConfigMap.Data[key] != marshalledSnapshot { + if snapshotConfigMap.Data == nil { + snapshotConfigMap.Data = map[string]string{} + } + snapshotConfigMap.Data[key] = marshalledSnapshot + if snapshotConfigMap.CreationTimestamp.IsZero() { + _, err = e.configmaps.Create(snapshotConfigMap) + } else { + _, err = e.configmaps.Update(snapshotConfigMap) + } + } + + if err != nil { + err = errors.Wrap(err, "failed to sync snapshot to configmap") + } + + return esf, err +} + +func (e *etcdSnapshotHandler) onRemove(key string, esf *apisv1.ETCDSnapshotFile) (*apisv1.ETCDSnapshotFile, error) { + snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return esf, nil + } + return esf, errors.Wrap(err, "failed to get snapshot configmap") + } + + if _, ok := snapshotConfigMap.Data[key]; ok { + delete(snapshotConfigMap.Data, key) + if _, err := e.configmaps.Update(snapshotConfigMap); err != nil { + return esf, errors.Wrap(err, "failed to remove snapshot from configmap") + } + } + + return esf, nil +} + +// pruneConfigMap drops the oldest entries from the configMap. +// Note that the actual snapshot files are not removed, just the entries that track them in the configmap. +func pruneConfigMap(snapshotConfigMap *v1.ConfigMap, pruneCount int) error { + if pruneCount > len(snapshotConfigMap.Data) { + return errors.New("unable to reduce snapshot ConfigMap size by eliding old snapshots") + } + + var snapshotFiles []snapshotFile + retention := len(snapshotConfigMap.Data) - pruneCount + for name := range snapshotConfigMap.Data { + basename, compressed := strings.CutSuffix(name, compressedExtension) + ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) + snapshotFiles = append(snapshotFiles, snapshotFile{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed}) + } + + // sort newest-first so we can prune entries past the retention count + sort.Slice(snapshotFiles, func(i, j int) bool { + return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) + }) + + for _, snapshotFile := range snapshotFiles[retention:] { + delete(snapshotConfigMap.Data, snapshotFile.Name) + } + return nil +}