diff --git a/apis/apps/v1alpha1/backupschedule_types.go b/apis/apps/v1alpha1/backupschedule_types.go index c12ff343..3a927abd 100644 --- a/apis/apps/v1alpha1/backupschedule_types.go +++ b/apis/apps/v1alpha1/backupschedule_types.go @@ -20,54 +20,88 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +var ( + maxSuccessfulBackupJobsDef int32 = 3 + maxSuccessfulFailedJobsDef int32 = 3 +) + // +genclient // +kubebuilder:object:root=true // +kubebuilder:subresource:status -// +kubebuilder:resource:shortName="bs" +// +kubebuilder:resource:shortName="nsb" +// +kubebuilder:printcolumn:name="Schedule",type=string,JSONPath=`.spec.schedule`,description="The current schedule set for the scheduled backup" +// +kubebuilder:printcolumn:name="Pause",type=string,JSONPath=`.spec.pause`,description="Whether or not the scheduled backup is paused" +// +kubebuilder:printcolumn:name="Last Triggered Backup",type=string,JSONPath=`.status.lastScheduledBackupTime`,description="The timestamp at which the last backup was triggered" +// +kubebuilder:printcolumn:name="Last Successful Backup",format=date-time,type=string,JSONPath=`.status.lastSuccessfulBackupTime`,description="The timestamp at which the last backup was successful completed" +// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` -type BackupSchedule struct { +type NebulaScheduledBackup struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec BackupScheduleSpec `json:"spec,omitempty"` - Status BackupScheduleStatus `json:"status,omitempty"` + Spec ScheduledBackupSpec `json:"spec,omitempty"` + Status ScheduledBackupStatus `json:"status,omitempty"` } // +kubebuilder:object:root=true -// BackupScheduleList contains a list of BackupSchedule. -type BackupScheduleList struct { +// NebulaScheduledBackupList contains a list of NebulaScheduledBackup. +type NebulaScheduledBackupList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` - Items []BackupSchedule `json:"items"` + Items []NebulaScheduledBackup `json:"items"` } -// BackupScheduleSpec contains the specification for a backupSchedule of a nebula cluster backupSchedule. -type BackupScheduleSpec struct { +// ScheduledBackupSpec contains the specification for a NebulaScheduledBackup of a nebula cluster NebulaScheduledBackup. +type ScheduledBackupSpec struct { // Schedule specifies the cron string used for backup scheduling. Schedule string `json:"schedule"` // Pause means paused backupSchedule - Pause bool `json:"pause,omitempty"` - // MaxBackups is to specify how many backups we want to keep - // 0 is magic number to indicate un-limited backups. - // if MaxBackups and MaxReservedTime are set at the same time, MaxReservedTime is preferred + Pause *bool `json:"pause,omitempty"` + // MaxBackups specifies how many backups we want to keep in the remote storage bucket. + // 0 is the magic number to indicate unlimited backups. + // if both MaxBackups and MaxReservedTime are set at the same time, MaxReservedTime will be used // and MaxBackups is ignored. MaxBackups *int32 `json:"maxBackups,omitempty"` - // MaxReservedTime is to specify how long backups we want to keep. - MaxReservedTime *string `json:"maxReservedTime,omitempty"` - // BackupTemplate is the specification of the backup structure to get scheduled. + // MaxRetentionTime specifies how long we want the backups in the remote storage bucket to be kept for. + // +kubebuilder:validation:Pattern=`^([0-9]+(\.[0-9]+)?(s|m|h))+$` + MaxRetentionTime *string `json:"maxRetentionTime,omitempty"` + // BackupTemplate is the specification of the backup structure to schedule. BackupTemplate BackupSpec `json:"backupTemplate"` - // LogBackupTemplate is the specification of the log backup structure to get scheduled. + // MaxSuccessfulNebulaBackupJobs specifies the maximum number of successful backup jobs to keep. Default 3. + MaxSuccessfulNebulaBackupJobs *int32 `json:"maxSuccessfulNebulaBackupJobs,omitempty"` + // MaxFailedNebulaBackupJobs specifies the maximum number of failed backup jobs to keep. Default 3 + MaxFailedNebulaBackupJobs *int32 `json:"maxFailedNebulaBackupJobs,omitempty"` +} + +// ScheduledBackupStatus represents the current status of a nebula cluster NebulaScheduledBackup. +type ScheduledBackupStatus struct { + // CurrPauseStatus represent the current pause status of the nebula scheduled backup + CurrPauseStatus *bool `json:"currPauseStatus,omitempty"` + // LastBackup represents the last backup. Used for scheduled incremental backups. Not supported for now. + //LastBackup string `json:"lastBackup,omitempty"` + // LastScheduledBackupTime represents the last time a backup job was successfully scheduled. + LastScheduledBackupTime *metav1.Time `json:"lastScheduledBackupTime,omitempty"` + // LastSuccessfulBackupTime represents the last time a backup was successfully created. + LastSuccessfulBackupTime *metav1.Time `json:"lastSuccessfulBackupTime,omitempty"` + // NumberOfSuccessfulBackups represents the total number of successful Nebula Backups run by this the Nebula Scheduled Backup + NumberOfSuccessfulBackups *int32 `json:"numberofSuccessfulBackups,omitempty"` + // NumberOfFailedBackups represents the total number of failed Nebula Backups run by this the Nebula Scheduled Backup + NumberOfFailedBackups *int32 `json:"numberofFailedBackups,omitempty"` + // MostRecentJobFailed represents if the most recent backup job failed. + MostRecentJobFailed *bool `json:"mostRecentJobFailed,omitempty"` } -// BackupScheduleStatus represents the current status of a nebula cluster backupSchedule. -type BackupScheduleStatus struct { - // LastBackup represents the last backup. - LastBackup string `json:"lastBackup,omitempty"` - // LastBackupTime represents the last time the backup was successfully created. - LastBackupTime *metav1.Time `json:"lastBackupTime,omitempty"` +// Defaulting implementation for ScheduledBackupStatus +func (nsb *NebulaScheduledBackup) Default() { + if nsb.Spec.MaxSuccessfulNebulaBackupJobs == nil { + nsb.Spec.MaxSuccessfulNebulaBackupJobs = &maxSuccessfulBackupJobsDef + } + if nsb.Spec.MaxFailedNebulaBackupJobs == nil { + nsb.Spec.MaxFailedNebulaBackupJobs = &maxSuccessfulFailedJobsDef + } } func init() { - SchemeBuilder.Register(&BackupSchedule{}, &BackupScheduleList{}) + SchemeBuilder.Register(&NebulaScheduledBackup{}, &NebulaScheduledBackupList{}) } diff --git a/apis/apps/v1alpha1/nebulabackup_types.go b/apis/apps/v1alpha1/nebulabackup_types.go index ffbc5df6..ce9b4869 100644 --- a/apis/apps/v1alpha1/nebulabackup_types.go +++ b/apis/apps/v1alpha1/nebulabackup_types.go @@ -34,6 +34,8 @@ type NebulaBackup struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` + Env []corev1.EnvVar `json:"env,omitempty"` + Spec BackupSpec `json:"spec,omitempty"` Status BackupStatus `json:"status,omitempty"` } @@ -51,8 +53,6 @@ type NebulaBackupList struct { type BackupConditionType string const ( - // BackupPending means the backup is pending, waiting for create backup job - BackupPending BackupConditionType = "Pending" // BackupRunning means the backup is running. BackupRunning BackupConditionType = "Running" // BackupComplete means the backup has successfully executed and the @@ -104,10 +104,10 @@ type BackupSpec struct { type BackupStatus struct { // TimeStarted is the time at which the backup was started. // +nullable - TimeStarted metav1.Time `json:"timeStarted,omitempty"` + TimeStarted *metav1.Time `json:"timeStarted,omitempty"` // TimeCompleted is the time at which the backup was completed. // +nullable - TimeCompleted metav1.Time `json:"timeCompleted,omitempty"` + TimeCompleted *metav1.Time `json:"timeCompleted,omitempty"` // Phase is a user readable state inferred from the underlying Backup conditions Phase BackupConditionType `json:"phase,omitempty"` // +nullable diff --git a/apis/apps/v1alpha1/nebularestore_types.go b/apis/apps/v1alpha1/nebularestore_types.go index 8fa81859..d909cde6 100644 --- a/apis/apps/v1alpha1/nebularestore_types.go +++ b/apis/apps/v1alpha1/nebularestore_types.go @@ -97,6 +97,9 @@ type BRConfig struct { BackupName string `json:"backupName"` // Concurrency is used to control the number of concurrent file downloads during data restoration. Concurrency int32 `json:"concurrency,omitempty"` + // StorageProviderType specifies the type of storage backups should be stored in (currently only s3 is supported). + // +kubebuilder:validation:Pattern=`^(s3)$` + StorageProviderType string `json:"storageProviderType,omitempty"` // StorageProvider configures where and how backups should be stored. StorageProvider `json:",inline"` } diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 50865222..13ca3e97 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -82,110 +82,6 @@ func (in *BackupCondition) DeepCopy() *BackupCondition { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BackupSchedule) DeepCopyInto(out *BackupSchedule) { - *out = *in - out.TypeMeta = in.TypeMeta - in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - in.Spec.DeepCopyInto(&out.Spec) - in.Status.DeepCopyInto(&out.Status) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackupSchedule. -func (in *BackupSchedule) DeepCopy() *BackupSchedule { - if in == nil { - return nil - } - out := new(BackupSchedule) - in.DeepCopyInto(out) - return out -} - -// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *BackupSchedule) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { - return c - } - return nil -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BackupScheduleList) DeepCopyInto(out *BackupScheduleList) { - *out = *in - out.TypeMeta = in.TypeMeta - in.ListMeta.DeepCopyInto(&out.ListMeta) - if in.Items != nil { - in, out := &in.Items, &out.Items - *out = make([]BackupSchedule, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackupScheduleList. -func (in *BackupScheduleList) DeepCopy() *BackupScheduleList { - if in == nil { - return nil - } - out := new(BackupScheduleList) - in.DeepCopyInto(out) - return out -} - -// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *BackupScheduleList) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { - return c - } - return nil -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BackupScheduleSpec) DeepCopyInto(out *BackupScheduleSpec) { - *out = *in - if in.MaxBackups != nil { - in, out := &in.MaxBackups, &out.MaxBackups - *out = new(int32) - **out = **in - } - if in.MaxReservedTime != nil { - in, out := &in.MaxReservedTime, &out.MaxReservedTime - *out = new(string) - **out = **in - } - in.BackupTemplate.DeepCopyInto(&out.BackupTemplate) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackupScheduleSpec. -func (in *BackupScheduleSpec) DeepCopy() *BackupScheduleSpec { - if in == nil { - return nil - } - out := new(BackupScheduleSpec) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BackupScheduleStatus) DeepCopyInto(out *BackupScheduleStatus) { - *out = *in - if in.LastBackupTime != nil { - in, out := &in.LastBackupTime, &out.LastBackupTime - *out = (*in).DeepCopy() - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackupScheduleStatus. -func (in *BackupScheduleStatus) DeepCopy() *BackupScheduleStatus { - if in == nil { - return nil - } - out := new(BackupScheduleStatus) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BackupSpec) DeepCopyInto(out *BackupSpec) { *out = *in @@ -221,8 +117,14 @@ func (in *BackupSpec) DeepCopy() *BackupSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BackupStatus) DeepCopyInto(out *BackupStatus) { *out = *in - in.TimeStarted.DeepCopyInto(&out.TimeStarted) - in.TimeCompleted.DeepCopyInto(&out.TimeCompleted) + if in.TimeStarted != nil { + in, out := &in.TimeStarted, &out.TimeStarted + *out = (*in).DeepCopy() + } + if in.TimeCompleted != nil { + in, out := &in.TimeCompleted, &out.TimeCompleted + *out = (*in).DeepCopy() + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]BackupCondition, len(*in)) @@ -585,6 +487,13 @@ func (in *NebulaBackup) DeepCopyInto(out *NebulaBackup) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]v1.EnvVar, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -915,6 +824,65 @@ func (in *NebulaRestoreList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NebulaScheduledBackup) DeepCopyInto(out *NebulaScheduledBackup) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NebulaScheduledBackup. +func (in *NebulaScheduledBackup) DeepCopy() *NebulaScheduledBackup { + if in == nil { + return nil + } + out := new(NebulaScheduledBackup) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NebulaScheduledBackup) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NebulaScheduledBackupList) DeepCopyInto(out *NebulaScheduledBackupList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]NebulaScheduledBackup, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NebulaScheduledBackupList. +func (in *NebulaScheduledBackupList) DeepCopy() *NebulaScheduledBackupList { + if in == nil { + return nil + } + out := new(NebulaScheduledBackupList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NebulaScheduledBackupList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RestoreCondition) DeepCopyInto(out *RestoreCondition) { *out = *in @@ -1061,6 +1029,90 @@ func (in *SSLCertsSpec) DeepCopy() *SSLCertsSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScheduledBackupSpec) DeepCopyInto(out *ScheduledBackupSpec) { + *out = *in + if in.Pause != nil { + in, out := &in.Pause, &out.Pause + *out = new(bool) + **out = **in + } + if in.MaxBackups != nil { + in, out := &in.MaxBackups, &out.MaxBackups + *out = new(int32) + **out = **in + } + if in.MaxRetentionTime != nil { + in, out := &in.MaxRetentionTime, &out.MaxRetentionTime + *out = new(string) + **out = **in + } + in.BackupTemplate.DeepCopyInto(&out.BackupTemplate) + if in.MaxSuccessfulNebulaBackupJobs != nil { + in, out := &in.MaxSuccessfulNebulaBackupJobs, &out.MaxSuccessfulNebulaBackupJobs + *out = new(int32) + **out = **in + } + if in.MaxFailedNebulaBackupJobs != nil { + in, out := &in.MaxFailedNebulaBackupJobs, &out.MaxFailedNebulaBackupJobs + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScheduledBackupSpec. +func (in *ScheduledBackupSpec) DeepCopy() *ScheduledBackupSpec { + if in == nil { + return nil + } + out := new(ScheduledBackupSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScheduledBackupStatus) DeepCopyInto(out *ScheduledBackupStatus) { + *out = *in + if in.CurrPauseStatus != nil { + in, out := &in.CurrPauseStatus, &out.CurrPauseStatus + *out = new(bool) + **out = **in + } + if in.LastScheduledBackupTime != nil { + in, out := &in.LastScheduledBackupTime, &out.LastScheduledBackupTime + *out = (*in).DeepCopy() + } + if in.LastSuccessfulBackupTime != nil { + in, out := &in.LastSuccessfulBackupTime, &out.LastSuccessfulBackupTime + *out = (*in).DeepCopy() + } + if in.NumberOfSuccessfulBackups != nil { + in, out := &in.NumberOfSuccessfulBackups, &out.NumberOfSuccessfulBackups + *out = new(int32) + **out = **in + } + if in.NumberOfFailedBackups != nil { + in, out := &in.NumberOfFailedBackups, &out.NumberOfFailedBackups + *out = new(int32) + **out = **in + } + if in.MostRecentJobFailed != nil { + in, out := &in.MostRecentJobFailed, &out.MostRecentJobFailed + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScheduledBackupStatus. +func (in *ScheduledBackupStatus) DeepCopy() *ScheduledBackupStatus { + if in == nil { + return nil + } + out := new(ScheduledBackupStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) { *out = *in diff --git a/cmd/controller-manager/app/controller-manager.go b/cmd/controller-manager/app/controller-manager.go index abff26d4..0ec866af 100644 --- a/cmd/controller-manager/app/controller-manager.go +++ b/cmd/controller-manager/app/controller-manager.go @@ -42,6 +42,7 @@ import ( "github.com/vesoft-inc/nebula-operator/pkg/controller/nebulabackup" "github.com/vesoft-inc/nebula-operator/pkg/controller/nebulacluster" "github.com/vesoft-inc/nebula-operator/pkg/controller/nebularestore" + "github.com/vesoft-inc/nebula-operator/pkg/controller/nebulascheduledbackup" klogflag "github.com/vesoft-inc/nebula-operator/pkg/flag/klog" profileflag "github.com/vesoft-inc/nebula-operator/pkg/flag/profile" "github.com/vesoft-inc/nebula-operator/pkg/version" @@ -180,6 +181,15 @@ func Run(ctx context.Context, opts *options.Options) error { return err } + scheduledBackupReconciler, err := nebulascheduledbackup.NewBackupReconciler(mgr) + if err != nil { + return err + } + + if err := scheduledBackupReconciler.SetupWithManager(mgr); err != nil { + klog.Errorf("failed to set up ScheduledNebulaBackup controller: %v", err) + } + if opts.EnableAdmissionWebhook { decoder := admission.NewDecoder(mgr.GetScheme()) klog.Info("Registering webhooks to nebula-controller-manager") diff --git a/config/crd/bases/apps.nebula-graph.io_nebulabackups.yaml b/config/crd/bases/apps.nebula-graph.io_nebulabackups.yaml index 57b8d9e5..6ad5ed7d 100644 --- a/config/crd/bases/apps.nebula-graph.io_nebulabackups.yaml +++ b/config/crd/bases/apps.nebula-graph.io_nebulabackups.yaml @@ -67,6 +67,9 @@ spec: secretName: type: string type: object + storageProviderType: + pattern: ^(s3)$ + type: string required: - clusterName type: object @@ -81,6 +84,12 @@ spec: type: object x-kubernetes-map-type: atomic type: array + maxBackups: + format: int32 + type: integer + maxReservedTimeEpoch: + format: int64 + type: integer nodeSelector: additionalProperties: type: string diff --git a/config/crd/bases/apps.nebula-graph.io_nebularestores.yaml b/config/crd/bases/apps.nebula-graph.io_nebularestores.yaml index cbcc1a32..91805318 100644 --- a/config/crd/bases/apps.nebula-graph.io_nebularestores.yaml +++ b/config/crd/bases/apps.nebula-graph.io_nebularestores.yaml @@ -69,6 +69,9 @@ spec: secretName: type: string type: object + storageProviderType: + pattern: ^(s3)$ + type: string required: - clusterName type: object diff --git a/config/crd/bases/apps.nebula-graph.io_backupschedules.yaml b/config/crd/bases/apps.nebula-graph.io_nebulascheduledbackups.yaml similarity index 56% rename from config/crd/bases/apps.nebula-graph.io_backupschedules.yaml rename to config/crd/bases/apps.nebula-graph.io_nebulascheduledbackups.yaml index cef4a221..2ef51a8c 100644 --- a/config/crd/bases/apps.nebula-graph.io_backupschedules.yaml +++ b/config/crd/bases/apps.nebula-graph.io_nebulascheduledbackups.yaml @@ -5,19 +5,40 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.11.3 creationTimestamp: null - name: backupschedules.apps.nebula-graph.io + name: nebulascheduledbackups.apps.nebula-graph.io spec: group: apps.nebula-graph.io names: - kind: BackupSchedule - listKind: BackupScheduleList - plural: backupschedules + kind: NebulaScheduledBackup + listKind: NebulaScheduledBackupList + plural: nebulascheduledbackups shortNames: - - bs - singular: backupschedule + - nsb + singular: nebulascheduledbackup scope: Namespaced versions: - - name: v1alpha1 + - additionalPrinterColumns: + - description: The current schedule set for the scheduled backup + jsonPath: .spec.schedule + name: Schedule + type: string + - description: Whether or not the scheduled backup is paused + jsonPath: .spec.pause + name: Pause + type: string + - description: The timestamp at which the last backup was triggered + jsonPath: .status.lastScheduledBackupTime + name: Last Triggered Backup + type: string + - description: The timestamp at which the last backup was successful completed + format: date-time + jsonPath: .status.lastSuccessfulBackupTime + name: Last Successful Backup + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 schema: openAPIV3Schema: properties: @@ -53,6 +74,9 @@ spec: secretName: type: string type: object + storageProviderType: + pattern: ^(s3)$ + type: string required: - clusterName type: object @@ -67,6 +91,12 @@ spec: type: object x-kubernetes-map-type: atomic type: array + maxBackups: + format: int32 + type: integer + maxReservedTimeEpoch: + format: int64 + type: integer nodeSelector: additionalProperties: type: string @@ -77,8 +107,15 @@ spec: maxBackups: format: int32 type: integer - maxReservedTime: + maxFailedNebulaBackupJobs: + format: int32 + type: integer + maxRetentionTime: + pattern: ^([0-9]+(\.[0-9]+)?(s|m|h))+$ type: string + maxSuccessfulNebulaBackupJobs: + format: int32 + type: integer pause: type: boolean schedule: @@ -89,11 +126,22 @@ spec: type: object status: properties: - lastBackup: + currPauseStatus: + type: boolean + lastScheduledBackupTime: + format: date-time type: string - lastBackupTime: + lastSuccessfulBackupTime: format: date-time type: string + mostRecentJobFailed: + type: boolean + numberofFailedBackups: + format: int32 + type: integer + numberofSuccessfulBackups: + format: int32 + type: integer type: object type: object served: true diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 63700b43..05bbcc49 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -3,6 +3,9 @@ # It should be run by config/default resources: - bases/apps.nebula-graph.io_nebulaclusters.yaml +- bases/apps.nebula-graph.io_nebulabackups.yaml +- bases/apps.nebula-graph.io_nebularestores.yaml +- bases/apps.nebula-graph.io_nebulascheduledbackups.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index ee23e5cd..fbb80d9e 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -41,7 +41,7 @@ patchesStrategicMerge: # [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'. # Uncomment 'CERTMANAGER' sections in crd/kustomization.yaml to enable the CA injection in the admission webhooks. # 'CERTMANAGER' needs to be enabled to use ca injection -- webhookcainjection_patch.yaml +#- webhookcainjection_patch.yaml # the following config is for teaching kustomize how to do var substitution vars: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index e5a38ebe..34d4194e 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -25,6 +25,16 @@ rules: - create - list - patch +- apiGroups: + - "" + resources: + - jobs + verbs: + - create + - delete + - get + - list + - watch - apiGroups: - "" resources: @@ -214,6 +224,32 @@ rules: - get - patch - update +- apiGroups: + - apps.nebula-graph.io + resources: + - nebulascheduledbackups + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - apps.nebula-graph.io + resources: + - nebulascheduledbackups/finalizers + verbs: + - update +- apiGroups: + - apps.nebula-graph.io + resources: + - nebulascheduledbackups/status + verbs: + - get + - patch + - update - apiGroups: - autoscaling.nebula-graph.io resources: @@ -234,6 +270,16 @@ rules: - get - patch - update +- apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - get + - list + - watch - apiGroups: - metrics.k8s.io resources: diff --git a/config/samples/nebulascheduledbackup.yaml b/config/samples/nebulascheduledbackup.yaml new file mode 100644 index 00000000..7df690bf --- /dev/null +++ b/config/samples/nebulascheduledbackup.yaml @@ -0,0 +1,28 @@ +apiVersion: v1 +kind: Secret +metadata: + name: aws-s3-secret +type: Opaque +data: + access-key: + secret-key: +--- +apiVersion: apps.nebula-graph.io/v1alpha1 +kind: NebulaScheduledBackup +metadata: + name: backup-kl +spec: + backupTemplate: + toolImage: reg.vesoft-inc.com/cloud-dev/br-ent:kevin-test + br: + clusterName: nebula + s3: + region: "us-west-2" + bucket: "nebula-br-test" + endpoint: "https://s3.us-west-2.amazonaws.com" + secretName: "aws-s3-secret" + schedule: "*/5 * * * *" + maxRetentionTime: "24h" + maxSuccessfulNebulaBackupJobs: 3 + maxFailedNebulaBackupJobs: 3 + maxBackups: 5 diff --git a/go.mod b/go.mod index c2d1847d..69016eb0 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/onsi/gomega v1.27.7 github.com/openkruise/kruise-api v1.3.0 github.com/pkg/errors v0.9.1 + github.com/robfig/cron v1.2.0 github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.2 diff --git a/go.sum b/go.sum index cab23830..d8be949e 100644 --- a/go.sum +++ b/go.sum @@ -365,6 +365,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= diff --git a/pkg/controller/nebulabackup/nebula_backup_control.go b/pkg/controller/nebulabackup/nebula_backup_control.go index 9a795584..c6d87ff8 100644 --- a/pkg/controller/nebulabackup/nebula_backup_control.go +++ b/pkg/controller/nebulabackup/nebula_backup_control.go @@ -17,11 +17,15 @@ limitations under the License. package nebulabackup import ( + "fmt" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/klog/v2" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/pkg/kube" + "github.com/vesoft-inc/nebula-operator/pkg/util/condition" "github.com/vesoft-inc/nebula-operator/pkg/util/errors" ) @@ -44,48 +48,42 @@ func NewBackupControl(clientSet kube.ClientSet, backupManager Manager) ControlIn } func (c *defaultBackupControl) SyncNebulaBackup(bp *v1alpha1.NebulaBackup) error { - phase := bp.Status.Phase - if phase == "" { - phase = v1alpha1.BackupPending + + namespace := bp.GetNamespace() + name := bp.GetName() + + if condition.IsBackupInvalid(bp) { + klog.Infof("Skipping sync because NebulaBackup [%s/%s] is invalid.", namespace, name) + return nil + } + + if condition.IsBackupComplete(bp) { + klog.Infof("Skipping sync because NebulaBackup [%s/%s] is already complete.", namespace, name) + return nil + } + + if condition.IsBackupFailed(bp) { + klog.Infof("Skipping sync because NebulaBackup [%s/%s] has already failed.", namespace, name) + return nil } - switch phase { - case v1alpha1.BackupPending: - klog.Infof("create backup job %s", bp.Name) - err := c.backupManager.Create(bp) - if err != nil && !errors.IsReconcileError(err) { - klog.Errorf("Fail to create NebulaBackup [%s/%s], %v", bp.Namespace, bp.Name, err) - if err = c.clientSet.NebulaBackup().UpdateNebulaBackupStatus(bp, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "CreateBackupJobFailed", - Message: err.Error(), - }, &kube.BackupUpdateStatus{ - ConditionType: v1alpha1.BackupFailed, - }); err != nil { - klog.Errorf("Fail to update the condition of NebulaBackup [%s/%s], %v", bp.Namespace, bp.Name, err) - } + err := c.backupManager.Sync(bp) + if err != nil && !errors.IsReconcileError(err) { + if apierrors.IsNotFound(err) { + return nil } - return err - case v1alpha1.BackupRunning: - klog.Infof("sync backup job %s", bp.Name) - err := c.backupManager.Sync(bp) - if err != nil && !errors.IsReconcileError(err) { - klog.Errorf("Fail to sync NebulaBackup [%s/%s], %v", bp.Namespace, bp.Name, err) - if err = c.clientSet.NebulaBackup().UpdateNebulaBackupStatus(bp, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "SyncNebulaBackupFailed", - Message: err.Error(), - }, &kube.BackupUpdateStatus{ - ConditionType: v1alpha1.BackupFailed, - }); err != nil { - klog.Errorf("Fail to update the condition of NebulaBackup [%s/%s], %v", bp.Namespace, bp.Name, err) - } + if err := c.clientSet.NebulaBackup().UpdateNebulaBackupStatus(bp, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: err.Error(), + }, &kube.BackupUpdateStatus{ + ConditionType: v1alpha1.BackupFailed, + }); err != nil { + return fmt.Errorf("failed to update NebulaBackup [%s/%s], %v", namespace, name, err) } - return err + + return nil } - klog.Infof("sync NebulaBackup success, backup %s phase is %s", bp.Name, phase) - return nil + return err } diff --git a/pkg/controller/nebulabackup/nebula_backup_controller.go b/pkg/controller/nebulabackup/nebula_backup_controller.go index f511c803..53c77478 100644 --- a/pkg/controller/nebulabackup/nebula_backup_controller.go +++ b/pkg/controller/nebulabackup/nebula_backup_controller.go @@ -62,6 +62,8 @@ func NewBackupReconciler(mgr ctrl.Manager) (*Reconciler, error) { // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch;list +// +kubebuilder:rbac:groups="",resources=jobs,verbs=create;get;list;watch;delete +// +kubebuilder:rbac:groups="batch",resources=jobs,verbs=create;get;list;watch;delete // +kubebuilder:rbac:groups=apps.nebula-graph.io,resources=nebulaclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps.nebula-graph.io,resources=nebulaclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=apps.nebula-graph.io,resources=nebulabackups,verbs=get;list;watch;create;update;patch;delete diff --git a/pkg/controller/nebulabackup/nebula_backup_manager.go b/pkg/controller/nebulabackup/nebula_backup_manager.go index bc5abd22..a627b6c2 100644 --- a/pkg/controller/nebulabackup/nebula_backup_manager.go +++ b/pkg/controller/nebulabackup/nebula_backup_manager.go @@ -18,13 +18,14 @@ package nebulabackup import ( "fmt" + "os" "time" - "github.com/aws/aws-sdk-go/aws" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" "k8s.io/utils/pointer" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" @@ -34,14 +35,11 @@ import ( ) const ( - EnvS3AccessKeyName = "S3_ACCESS_KEY" - EnvS3SecretKeyName = "S3_SECRET_KEY" + S3AccessKey = "AWS_ACCESS_KEY_ID" + S3SecretKey = "AWS_SECRET_ACCESS_KEY" ) type Manager interface { - // Create creates a backup job. - Create(backup *v1alpha1.NebulaBackup) error - // Sync implements the logic for syncing NebulaBackup. Sync(backup *v1alpha1.NebulaBackup) error } @@ -56,127 +54,252 @@ func NewBackupManager(clientSet kube.ClientSet) Manager { return &backupManager{clientSet: clientSet} } -func (bm *backupManager) Create(backup *v1alpha1.NebulaBackup) error { - ns := backup.GetNamespace() - if backup.Spec.BR.ClusterNamespace != nil { - ns = *backup.Spec.BR.ClusterNamespace - } +func (bm *backupManager) Sync(backup *v1alpha1.NebulaBackup) error { - nc, err := bm.clientSet.NebulaCluster().GetNebulaCluster(ns, backup.Spec.BR.ClusterName) - if err != nil { - return fmt.Errorf("get nebula cluster %s/%s err: %w", ns, backup.Spec.BR.ClusterName, err) - } + var nbCondition *v1alpha1.BackupCondition + var nbStatus *kube.BackupUpdateStatus + var backupJob *batchv1.Job + if backup.Status.TimeStarted == nil { + ns := backup.GetNamespace() + if backup.Spec.BR.ClusterNamespace != nil { + ns = *backup.Spec.BR.ClusterNamespace + } - if !nc.IsReady() { - return fmt.Errorf("nebula cluster %s/%s is not ready", ns, backup.Spec.BR.ClusterName) - } + nc, err := bm.clientSet.NebulaCluster().GetNebulaCluster(ns, backup.Spec.BR.ClusterName) + if err != nil { + return fmt.Errorf("get nebula cluster %s/%s err: %w", ns, backup.Spec.BR.ClusterName, err) + } + + if !nc.IsReady() { + return fmt.Errorf("nebula cluster %s/%s is not ready", ns, backup.Spec.BR.ClusterName) + } + + backupJob = bm.generateBackupJob(backup, fmt.Sprintf("%v:%v", nc.MetadComponent().GetPodFQDN(0), nc.MetadComponent().GetPort(v1alpha1.MetadPortNameThrift))) + if err = bm.clientSet.Job().CreateJob(backupJob); err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("create backup job err: %w", err) + } + + nbCondition = &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupRunning, + Status: corev1.ConditionTrue, + Reason: "CreateBackupJobSuccess", + } + + nbStatus = &kube.BackupUpdateStatus{ + TimeStarted: &metav1.Time{Time: time.Now()}, + ConditionType: v1alpha1.BackupRunning, + } + + klog.Infof("Backup job [%s/%s] for NebulaBackup [%s/%s] created successfully", backupJob.Namespace, backupJob.Name, backup.Namespace, backup.Name) + + } else { + var err error + backupJob, err = bm.clientSet.Job().GetJob(backup.Namespace, backup.Name) + if err != nil { + return fmt.Errorf("get backup job %s/%s err: %w", backup.Namespace, backup.Name, err) + } - backupJob := generateBackupJob(backup, nc.GetMetadThriftConnAddress()) - if err = bm.clientSet.Job().CreateJob(backupJob); err != nil && !apierrors.IsAlreadyExists(err) { - return fmt.Errorf("create backup job err: %w", err) + for _, condition := range backupJob.Status.Conditions { + if condition.Type == batchv1.JobComplete && condition.Status == corev1.ConditionTrue { + nbCondition = &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupComplete, + Status: corev1.ConditionTrue, + Reason: "BackupComplete", + } + + nbStatus = &kube.BackupUpdateStatus{ + TimeCompleted: &metav1.Time{Time: backupJob.Status.CompletionTime.Time}, + ConditionType: v1alpha1.BackupComplete, + } + + if err := bm.clientSet.NebulaBackup().UpdateNebulaBackupStatus(backup, nbCondition, nbStatus); err != nil { + return fmt.Errorf("update nebula backup %s/%s status err: %w", backup.Namespace, backup.Name, err) + } + + return nil + } else if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue { + return fmt.Errorf("backup failed, reason: %s, message: %s", condition.Reason, condition.Message) + } + } } - if err = bm.clientSet.NebulaBackup().UpdateNebulaBackupStatus(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupRunning, - Status: corev1.ConditionTrue, - Reason: "CreateBackupJobSuccess", - }, &kube.BackupUpdateStatus{ - TimeStarted: &metav1.Time{Time: time.Now()}, - ConditionType: v1alpha1.BackupRunning, - }); err != nil { + if err := bm.clientSet.NebulaBackup().UpdateNebulaBackupStatus(backup, nbCondition, nbStatus); err != nil { return fmt.Errorf("update nebula backup %s/%s status err: %w", backup.Namespace, backup.Name, err) } - return nil + return utilerrors.ReconcileErrorf("Waiting for backup job [%s/%s] of NebulaBackup [%s/%s] to complete", backupJob.Namespace, backupJob.Name, backup.Namespace, backup.Name) } -func (bm *backupManager) Sync(backup *v1alpha1.NebulaBackup) error { - backupJob, err := bm.clientSet.Job().GetJob(backup.Namespace, backup.Name) - if err != nil { - return fmt.Errorf("get backup job %s/%s err: %w", backup.Namespace, backup.Name, err) +func createStorageString(backup *v1alpha1.NebulaBackup) string { + var storageString string + switch backup.Spec.BR.StorageProviderType { + case "s3": + storageString = fmt.Sprintf("--storage s3://%s --s3.access_key $(%s) --s3.secret_key $(%s) --s3.region %s --s3.endpoint %s", backup.Spec.BR.S3.Bucket, S3AccessKey, S3SecretKey, backup.Spec.BR.S3.Region, backup.Spec.BR.S3.Endpoint) } + return storageString +} - if backupJob.Status.CompletionTime != nil { - if err = bm.clientSet.NebulaBackup().UpdateNebulaBackupStatus(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupComplete, - Status: corev1.ConditionTrue, - Reason: "BackupComplete", - }, &kube.BackupUpdateStatus{ - TimeCompleted: &metav1.Time{Time: backupJob.Status.CompletionTime.Time}, - ConditionType: v1alpha1.BackupComplete, - }); err != nil { - return fmt.Errorf("update nebula backup %s/%s status err: %w", backup.Namespace, backup.Name, err) +func getEnvForCredentials(backup *v1alpha1.NebulaBackup) []corev1.EnvVar { + if os.Getenv(S3AccessKey) != "" && os.Getenv(S3SecretKey) != "" { + return []corev1.EnvVar{ + { + Name: S3AccessKey, + Value: os.Getenv(S3AccessKey), + }, + { + Name: S3SecretKey, + Value: os.Getenv(S3SecretKey), + }, } - return nil } - for _, condition := range backupJob.Status.Conditions { - if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue { - return fmt.Errorf("backup failed, reason: %s, message: %s", condition.Reason, condition.Message) - } + return []corev1.EnvVar{ + { + Name: S3AccessKey, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: backup.Spec.BR.S3.SecretName, + }, + Key: br.S3AccessKey, + }, + }, + }, + { + Name: S3SecretKey, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: backup.Spec.BR.S3.SecretName, + }, + Key: br.S3SecretKey, + }, + }, + }, } - - return utilerrors.ReconcileErrorf("waiting for backup job [%s/%s] done", backup.Namespace, backup.Name) } -func generateBackupJob(backup *v1alpha1.NebulaBackup, metaAddr string) *batchv1.Job { - bpCmdHead := "exec /usr/local/bin/br-ent backup full" - if backup.Spec.BR.BackupName != "" { - bpCmdHead = fmt.Sprintf("exec /usr/local/bin/br-ent backup incr --base %s", backup.Spec.BR.BackupName) - } +func (bm *backupManager) generateBackupJob(backup *v1alpha1.NebulaBackup, metaAddr string) *batchv1.Job { + var podSpec corev1.PodSpec + var ttlFinished *int32 + storageString := createStorageString(backup) - backupCmd := fmt.Sprintf("%s --meta %s --storage s3://%s --s3.access_key $%s --s3.secret_key $%s --s3.region %s --s3.endpoint %s", - bpCmdHead, metaAddr, backup.Spec.BR.S3.Bucket, EnvS3AccessKeyName, EnvS3SecretKeyName, backup.Spec.BR.S3.Region, backup.Spec.BR.S3.Endpoint) + credentials := getEnvForCredentials(backup) + + if len(backup.OwnerReferences) != 0 && backup.OwnerReferences[0].Kind == "NebulaScheduledBackup" && len(backup.Env) != 0 { + envtoPass := append(credentials, backup.Env...) + podSpec = corev1.PodSpec{ + ImagePullSecrets: backup.Spec.ImagePullSecrets, + Containers: []corev1.Container{ + { + Name: "backup", + Image: backup.Spec.ToolImage, + ImagePullPolicy: backup.Spec.ImagePullPolicy, + Env: append(envtoPass, + corev1.EnvVar{ + Name: "META_ADDRESS", + Value: metaAddr, + }, + corev1.EnvVar{ + Name: "STORAGE_LINE", + Value: storageString, + }, + ), + Command: []string{"/bin/bash", "-c"}, + Args: []string{"/usr/local/bin/runnable/backup-cleanup-run.sh"}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "backup-cleanup-volume-runnable", + MountPath: "/usr/local/bin/runnable", + }, + }, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "backup-init", + Image: backup.Spec.ToolImage, + ImagePullPolicy: backup.Spec.ImagePullPolicy, + Command: []string{"/bin/bash", "-c"}, + Args: []string{"cp /usr/local/bin/backup-cleanup.sh /usr/local/bin/runnable/backup-cleanup-run.sh; chmod +x /usr/local/bin/runnable/backup-cleanup-run.sh"}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "backup-cleanup-volume", + MountPath: "/usr/local/bin/backup-cleanup.sh", + SubPath: "backup-cleanup.sh", + }, + { + Name: "backup-cleanup-volume-runnable", + MountPath: "/usr/local/bin/runnable", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "backup-cleanup-volume", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: fmt.Sprintf("backup-scripts-cm-%v", backup.OwnerReferences[0].Name)}, + }, + }, + }, + { + Name: "backup-cleanup-volume-runnable", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + NodeSelector: backup.Spec.NodeSelector, + } + } else { + bpCmdHead := "exec /usr/local/bin/br-ent backup full" + if backup.Spec.BR.BackupName != "" { + bpCmdHead = fmt.Sprintf("exec /usr/local/bin/br-ent backup incr --base %s", backup.Spec.BR.BackupName) + } + + backupCmd := fmt.Sprintf("%s --meta %s %s", bpCmdHead, metaAddr, storageString) + + podSpec = corev1.PodSpec{ + ImagePullSecrets: backup.Spec.ImagePullSecrets, + Containers: []corev1.Container{ + { + Name: "backup", + Image: backup.Spec.ToolImage, + ImagePullPolicy: backup.Spec.ImagePullPolicy, + Env: credentials, + Command: []string{"/bin/sh", "-ecx"}, + Args: []string{backupCmd}, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + NodeSelector: backup.Spec.NodeSelector, + } + ttlFinished = pointer.Int32(600) + } return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: backup.Name, Namespace: backup.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: backup.APIVersion, + Kind: backup.Kind, + Name: backup.Name, + UID: backup.UID, + }, + }, }, Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(1), Completions: pointer.Int32(1), BackoffLimit: pointer.Int32(0), Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - ImagePullSecrets: backup.Spec.ImagePullSecrets, - Containers: []corev1.Container{ - { - Name: "backup", - Image: backup.Spec.ToolImage, - ImagePullPolicy: backup.Spec.ImagePullPolicy, - Env: []corev1.EnvVar{ - { - Name: EnvS3AccessKeyName, - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: backup.Spec.BR.S3.SecretName, - }, - Key: br.S3AccessKey, - }, - }, - }, - { - Name: EnvS3SecretKeyName, - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: backup.Spec.BR.S3.SecretName, - }, - Key: br.S3SecretKey, - }, - }, - }, - }, - Command: []string{"/bin/sh", "-ecx"}, - Args: []string{backupCmd}, - }, - }, - RestartPolicy: corev1.RestartPolicyNever, - NodeSelector: backup.Spec.NodeSelector, - }, + Spec: podSpec, }, - TTLSecondsAfterFinished: aws.Int32(600), + TTLSecondsAfterFinished: ttlFinished, }, } } diff --git a/pkg/controller/nebulascheduledbackup/nebula_scheduled_backup_control.go b/pkg/controller/nebulascheduledbackup/nebula_scheduled_backup_control.go new file mode 100644 index 00000000..3577e0c4 --- /dev/null +++ b/pkg/controller/nebulascheduledbackup/nebula_scheduled_backup_control.go @@ -0,0 +1,101 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nebulascheduledbackup + +import ( + "time" + + "k8s.io/klog/v2" + + condutil "github.com/vesoft-inc/nebula-operator/pkg/util/condition" + utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" + "github.com/vesoft-inc/nebula-operator/pkg/kube" +) + +type ControlInterface interface { + Sync(bp *v1alpha1.NebulaScheduledBackup) (*time.Duration, error) +} + +var _ ControlInterface = (*defaultScheduledBackupControl)(nil) + +type defaultScheduledBackupControl struct { + clientSet kube.ClientSet + scheduledBackupManager Manager +} + +func NewBackupControl(clientSet kube.ClientSet, scheduledBackupManager Manager) ControlInterface { + return &defaultScheduledBackupControl{ + clientSet: clientSet, + scheduledBackupManager: scheduledBackupManager, + } +} + +func (c *defaultScheduledBackupControl) Sync(sbp *v1alpha1.NebulaScheduledBackup) (*time.Duration, error) { + ownedNebulaBackups, err := c.clientSet.NebulaBackup().ListNebulaBackupsByUID(sbp.Namespace, sbp.UID) + if err != nil { + klog.Errorf("Fail to get nebula backup jobs owned by [%s/%s], err: %v", sbp.Namespace, sbp.Name, err) + return nil, err + } + + var successfulBackups, failedBackups, runningBackups []v1alpha1.NebulaBackup + for _, nebulaBackup := range ownedNebulaBackups { + if condutil.IsBackupComplete(&nebulaBackup) { + successfulBackups = append(successfulBackups, nebulaBackup) + } else if condutil.IsBackupFailed(&nebulaBackup) && condutil.IsBackupInvalid(&nebulaBackup) { + failedBackups = append(failedBackups, nebulaBackup) + } else { + runningBackups = append(runningBackups, nebulaBackup) + } + } + + err = c.scheduledBackupManager.CleanupFinishedNBs(sbp, successfulBackups, failedBackups) + if err != nil { + klog.Errorf("Fail to cleanup nebula backup jobs owned by [%s/%s], err: %v", sbp.Namespace, sbp.Name, err) + return nil, err + } + + now := metav1.Now() + + syncUpdates, NextBackupTime, err := c.scheduledBackupManager.SyncNebulaScheduledBackup(sbp, successfulBackups, failedBackups, runningBackups, &now) + if err != nil { + return nil, utilerrors.ReconcileErrorf("error syncing Nebula scheduled backup [%s/%s]. Err: %v", sbp.Namespace, sbp.Name, err) + } + + if syncUpdates != nil { + klog.Infof("Updating Nebula scheduled backup [%s/%s]", sbp.Namespace, sbp.Name) + err = c.clientSet.NebulaScheduledBackup().SetNebulaScheduledBackupStatus(sbp, syncUpdates) + if err != nil { + klog.Errorf("Fail to update Nebula scheduled backup [%s/%s], err: %v", sbp.Namespace, sbp.Name, err) + return nil, err + } + klog.Infof("Nebula scheduled backup [%s/%s] updated successfully.", sbp.Namespace, sbp.Name) + } else { + klog.Infof("No status updates needed for Nebula scheduled backup [%s/%s]", sbp.Namespace, sbp.Name) + } + + klog.Infof("Reconcile NebulaScheduledBackup success for Nebula scheduled backup %s", sbp.Name) + + if NextBackupTime != nil { + newReconcilDuration := NextBackupTime.Sub(now.Local()) + return &newReconcilDuration, nil + } + + return nil, nil +} diff --git a/pkg/controller/nebulascheduledbackup/nebula_scheduled_backup_controller.go b/pkg/controller/nebulascheduledbackup/nebula_scheduled_backup_controller.go new file mode 100644 index 00000000..b654d4f7 --- /dev/null +++ b/pkg/controller/nebulascheduledbackup/nebula_scheduled_backup_controller.go @@ -0,0 +1,132 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nebulascheduledbackup + +import ( + "context" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" + "github.com/vesoft-inc/nebula-operator/pkg/kube" + errorsutil "github.com/vesoft-inc/nebula-operator/pkg/util/errors" +) + +const ( + defaultTimeout = 5 * time.Second + reconcileTimeOut = 10 * time.Second +) + +var _ reconcile.Reconciler = (*Reconciler)(nil) + +// Reconciler reconciles a NebulaBackup object +type Reconciler struct { + control ControlInterface + client client.Client +} + +func NewBackupReconciler(mgr ctrl.Manager) (*Reconciler, error) { + clientSet, err := kube.NewClientSet(mgr.GetConfig()) + if err != nil { + return nil, err + } + + backupMgr := NewBackupManager(clientSet) + + return &Reconciler{ + control: NewBackupControl(clientSet, backupMgr), + client: mgr.GetClient(), + }, nil +} + +// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch;list +// +kubebuilder:rbac:groups=apps.nebula-graph.io,resources=nebulaclusters,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps.nebula-graph.io,resources=nebulaclusters/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps.nebula-graph.io,resources=nebulascheduledbackups,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps.nebula-graph.io,resources=nebulascheduledbackups/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps.nebula-graph.io,resources=nebulascheduledbackups/finalizers,verbs=update + +func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res reconcile.Result, retErr error) { + key := req.NamespacedName.String() + subCtx, cancel := context.WithTimeout(ctx, time.Minute*1) + defer cancel() + + startTime := time.Now() + defer func() { + if retErr == nil { + if res.Requeue || res.RequeueAfter > 0 { + klog.Infof("Finished reconciling NebulaScheduledBackup [%s] (%v), result: %v", key, time.Since(startTime), res) + } else { + klog.Infof("Finished reconciling NebulaScheduledBackup [%s], spendTime: (%v)", key, time.Since(startTime)) + } + } else { + klog.Errorf("Failed to reconcile NebulaScheduledBackup [%s], spendTime: (%v)", key, time.Since(startTime)) + } + }() + + var scheduledBackup v1alpha1.NebulaScheduledBackup + if err := r.client.Get(subCtx, req.NamespacedName, &scheduledBackup); err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("Skipping because NebulaScheduledBackup [%s] has been deleted", key) + } + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + klog.Info("Start to reconcile NebulaScheduledBackup") + + // Check the current resource version + currentResourceVersion := scheduledBackup.GetResourceVersion() + klog.Infof("Current resource version: %v", currentResourceVersion) + + updatedScheduledBackup := scheduledBackup.DeepCopy() + newReconcilerDuration, err := r.syncNebulaScheduledBackup(updatedScheduledBackup) + if err != nil { + if errorsutil.IsReconcileError(err) { + klog.Infof("NebulaScheduledBackup [%s] reconcile details: %v", key, err) + return ctrl.Result{RequeueAfter: reconcileTimeOut}, err + } + klog.Errorf("NebulaScheduledBackup [%s] reconcile failed: %v", key, err) + return ctrl.Result{RequeueAfter: defaultTimeout}, err + } + + if newReconcilerDuration != nil { + // wait until next backup time to reconcile to avoid wasting resources. + return ctrl.Result{Requeue: true, RequeueAfter: *newReconcilerDuration}, nil + } + return ctrl.Result{}, nil +} + +func (r *Reconciler) syncNebulaScheduledBackup(backup *v1alpha1.NebulaScheduledBackup) (*time.Duration, error) { + newReconcilerDuration, err := r.control.Sync(backup) + return newReconcilerDuration, err +} + +// SetupWithManager sets up the controller with the Manager. +func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.NebulaScheduledBackup{}). + WithOptions(controller.Options{MaxConcurrentReconciles: 5}). + Complete(r) +} diff --git a/pkg/controller/nebulascheduledbackup/nebula_scheduled_backup_manager.go b/pkg/controller/nebulascheduledbackup/nebula_scheduled_backup_manager.go new file mode 100644 index 00000000..0e9bf291 --- /dev/null +++ b/pkg/controller/nebulascheduledbackup/nebula_scheduled_backup_manager.go @@ -0,0 +1,407 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nebulascheduledbackup + +import ( + "fmt" + "sort" + "time" + + "github.com/robfig/cron" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" + "github.com/vesoft-inc/nebula-operator/pkg/kube" +) + +const ( + BackupPrefix = "nsb" +) + +type Manager interface { + // CleanupFinishedNBJobs implements the logic for deleting all finished Nebula Backup jobs associated with the NebulaScheduledBackup + CleanupFinishedNBs(backup *v1alpha1.NebulaScheduledBackup, successfulJobs, FailedJobs []v1alpha1.NebulaBackup) error + + // SyncNebulaScheduledBackup implements the logic for syncing a NebulaScheduledBackup. + SyncNebulaScheduledBackup(backup *v1alpha1.NebulaScheduledBackup, successfulBackups, failedBackups, runningBackups []v1alpha1.NebulaBackup, now *metav1.Time) (*kube.ScheduledBackupUpdateStatus, *metav1.Time, error) +} + +var _ Manager = (*scheduledBackupManager)(nil) + +type scheduledBackupManager struct { + clientSet kube.ClientSet +} + +func NewBackupManager(clientSet kube.ClientSet) Manager { + return &scheduledBackupManager{clientSet: clientSet} +} + +func (bm *scheduledBackupManager) CleanupFinishedNBs(scheduledBackup *v1alpha1.NebulaScheduledBackup, successfulJobs, failedJobs []v1alpha1.NebulaBackup) error { + klog.V(4).Info("Cleaning up previous successful and failed backup jobs") + if pointer.Int32Deref(scheduledBackup.Spec.MaxSuccessfulNebulaBackupJobs, -1) < 0 && pointer.Int32Deref(scheduledBackup.Spec.MaxFailedNebulaBackupJobs, -1) < 0 { + klog.V(4).Infof("Nßo limit set for either successful backup jobs or failed backup jobs for nebula scheduled backup %s/%s. Nothing to cleanup.", scheduledBackup.Namespace, scheduledBackup.Name) + return nil + } + + var jobsToDelete int32 + if pointer.Int32Deref(scheduledBackup.Spec.MaxSuccessfulNebulaBackupJobs, -1) >= 0 { + jobsToDelete = int32(len(successfulJobs)) - *scheduledBackup.Spec.MaxSuccessfulNebulaBackupJobs + if jobsToDelete > 0 { + err := bm.cleanupNBs(scheduledBackup, successfulJobs, jobsToDelete) + if err != nil { + return fmt.Errorf("cleanup successful backup jobs failed for nebula scheduled backup %s/%s. err: %w", scheduledBackup.Namespace, scheduledBackup.Name, err) + } + } else { + klog.V(4).Infof("Number of successful backup jobs is less then %v. Nothing to cleanup", *scheduledBackup.Spec.MaxSuccessfulNebulaBackupJobs) + } + } + + if pointer.Int32Deref(scheduledBackup.Spec.MaxFailedNebulaBackupJobs, -1) >= 0 { + jobsToDelete = int32(len(failedJobs)) - *scheduledBackup.Spec.MaxFailedNebulaBackupJobs + if jobsToDelete > 0 { + err := bm.cleanupNBs(scheduledBackup, failedJobs, jobsToDelete) + if err != nil { + return fmt.Errorf("cleanup failed backup jobs failed for nebula scheduled backup %s/%s. err: %w", scheduledBackup.Namespace, scheduledBackup.Name, err) + } + } else { + klog.V(4).Infof("Number of failed backup jobs is less then %v. Nothing to cleanup", *scheduledBackup.Spec.MaxFailedNebulaBackupJobs) + } + } + + return nil +} + +func (bm *scheduledBackupManager) SyncNebulaScheduledBackup(scheduledBackup *v1alpha1.NebulaScheduledBackup, successfulBackups, failedBackups, runningBackups []v1alpha1.NebulaBackup, now *metav1.Time) (*kube.ScheduledBackupUpdateStatus, *metav1.Time, error) { + // Don't do anything if cronjob is paused + if scheduledBackup.Spec.Pause != nil && *scheduledBackup.Spec.Pause { + klog.V(4).Infof("Nebula scheduled backup %s/%s is paused.", scheduledBackup.Namespace, scheduledBackup.Name) + return &kube.ScheduledBackupUpdateStatus{ + CurrPauseStatus: scheduledBackup.Spec.Pause, + }, nil, nil + } + + // Check Nebula Graph cluster status and return error if it's down. + ns := scheduledBackup.GetNamespace() + if scheduledBackup.Spec.BackupTemplate.BR.ClusterNamespace != nil { + ns = *scheduledBackup.Spec.BackupTemplate.BR.ClusterNamespace + } + + nc, err := bm.clientSet.NebulaCluster().GetNebulaCluster(ns, scheduledBackup.Spec.BackupTemplate.BR.ClusterName) + if err != nil { + return nil, nil, fmt.Errorf("get nebula cluster %s/%s err: %w", ns, scheduledBackup.Spec.BackupTemplate.BR.ClusterName, err) + } + + if !nc.IsReady() { + return nil, nil, fmt.Errorf("nebula cluster %s/%s is not ready", ns, scheduledBackup.Spec.BackupTemplate.BR.ClusterName) + } + + // Create or undate backup cleanup scripts config map. Needed to clean up previous backups + nbConfigMap := generateBackupCleanupScriptsConfigMap(scheduledBackup) + err = bm.clientSet.ConfigMap().CreateOrUpdateConfigMap(nbConfigMap) + if err != nil { + return nil, nil, fmt.Errorf("create or update config map for backup cleanup scripts err: %w", err) + } + + // Check status of most recent job + klog.V(4).Info("Checking status of most recent job") + var lastSuccessfulTime *metav1.Time + var lastBackupFailed bool + numberOfSuccessfulBackups := pointer.Int32Deref(scheduledBackup.Status.NumberOfSuccessfulBackups, 0) + numberOfFailedBackups := pointer.Int32Deref(scheduledBackup.Status.NumberOfFailedBackups, 0) + + if len(successfulBackups) != 0 && len(failedBackups) != 0 { + sort.Sort(byBackupStartTime(successfulBackups)) + sort.Sort(byBackupStartTime(failedBackups)) + + lastSuccessfulBackup := successfulBackups[len(successfulBackups)-1] + lastFailedBackup := failedBackups[len(failedBackups)-1] + if lastSuccessfulBackup.Status.TimeStarted.After(lastFailedBackup.Status.TimeStarted.Local()) { + // Most recent backup job succeeded + lastSuccessfulTime = lastSuccessfulBackup.Status.TimeCompleted + numberOfSuccessfulBackups++ + klog.V(4).Infof("Most recent backup job %s/%s for Nebula scheduled backup %s/%s succeeded", lastSuccessfulBackup.Namespace, lastSuccessfulBackup.Name, scheduledBackup.Namespace, scheduledBackup.Name) + } else { + // Most recent backup job failed + lastBackupFailed = true + numberOfFailedBackups++ + klog.Errorf("Most recent backup job %s/%s for Nebula scheduled backup %s/%s failed", lastFailedBackup.Namespace, lastFailedBackup.Name, scheduledBackup.Namespace, scheduledBackup.Name) + } + } else if len(successfulBackups) != 0 { + lastSuccessfulBackup := successfulBackups[len(successfulBackups)-1] + lastSuccessfulTime = lastSuccessfulBackup.Status.TimeCompleted + numberOfSuccessfulBackups++ + klog.V(4).Infof("Most recent backup job %s/%s for Nebula scheduled backup %s/%s succeeded", lastSuccessfulBackup.Namespace, lastSuccessfulBackup.Name, scheduledBackup.Namespace, scheduledBackup.Name) + } else if len(failedBackups) != 0 { + lastFailedBackup := failedBackups[len(failedBackups)-1] + lastBackupFailed = true + numberOfFailedBackups++ + klog.Errorf("Most recent backup job %s/%s for Nebula scheduled backup %s/%s failed", lastFailedBackup.Namespace, lastFailedBackup.Name, scheduledBackup.Namespace, scheduledBackup.Name) + } else { + klog.V(4).Info("No previous successful or failed backup jobs detected.") + } + + // Calculate previous and next backup time + previousBackupTime, nextBackupTime, err := calculateRunTimes(scheduledBackup, now) + if err != nil { + return nil, nil, fmt.Errorf("failed to calculate next backup time for Nebula scheduled backup %s/%s. Err: %w", scheduledBackup.Namespace, scheduledBackup.Name, err) + } + + if previousBackupTime != nil && now.After(previousBackupTime.Local()) && (scheduledBackup.Status.CurrPauseStatus == nil || !*scheduledBackup.Status.CurrPauseStatus) { + if len(runningBackups) > 0 { + // active running backup from last iteration has not completed. Issue warning and do not start new backup + klog.Warningf("Active backup job from last scheduled run detected for Nebula scheduled backup %s/%s. Will not kick off new backup job. Please consider decreasing the backup frequency by adjusting the schedule.", scheduledBackup.Namespace, scheduledBackup.Name) + if len(runningBackups) != 1 { + // This could be because someone manually started a nebula backup and associated it to this scheduled backup. Need to issue another warning. + klog.Warningf("More than 1 active backup detected for Nebula scheduled backup %s/%s. Please check if the second job was manually started and associated to this crontab", scheduledBackup.Namespace, scheduledBackup.Name) + } + + for _, backup := range runningBackups { + klog.Warningf("Active backup job %v detected", backup.Name) + } + } else { + klog.V(4).Infof("Triggering new Nebula backup job for Nebula Scheduled Backup %s/%s.", scheduledBackup.Namespace, scheduledBackup.Name) + + var nbEnvVar = []corev1.EnvVar{} + + if scheduledBackup.Spec.MaxBackups != nil { + nbEnvVar = append(nbEnvVar, + corev1.EnvVar{ + Name: "NUM_BACKUPS_KEEP", + Value: fmt.Sprintf("%v", scheduledBackup.Spec.MaxBackups), + }, + ) + } + + if scheduledBackup.Spec.MaxRetentionTime != nil { + maxBackupDuration, err := time.ParseDuration(*scheduledBackup.Spec.MaxRetentionTime) + if err != nil { + return nil, nil, fmt.Errorf("parsing maxReservedTime for nebula scheduled backup %s/%s failed. err: %w", scheduledBackup.Namespace, scheduledBackup.Name, err) + } + + nbEnvVar = append(nbEnvVar, + corev1.EnvVar{ + Name: "RESERVED_TIME_EPOCH", + Value: fmt.Sprintf("%v", maxBackupDuration.Seconds()), + }, + ) + } + + nebulaBackup := v1alpha1.NebulaBackup{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s-%v", BackupPrefix, scheduledBackup.Name, previousBackupTime.Unix()), + Namespace: scheduledBackup.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *getOwnerReferenceForSubresources(scheduledBackup), + }, + }, + Env: nbEnvVar, + Spec: scheduledBackup.Spec.BackupTemplate, + } + + err = bm.clientSet.NebulaBackup().CreateNebulaBackup(&nebulaBackup) + if err != nil { + return nil, nil, fmt.Errorf("trigger nebula backup for nebula scheduled backup %s/%s failed. err: %w", scheduledBackup.Namespace, scheduledBackup.Name, err) + } + } + } + + return &kube.ScheduledBackupUpdateStatus{ + CurrPauseStatus: scheduledBackup.Spec.Pause, + LastScheduledBackupTime: previousBackupTime, + LastSuccessfulBackupTime: lastSuccessfulTime, + NumberOfSuccessfulBackups: &numberOfSuccessfulBackups, + NumberOfFailedBackups: &numberOfFailedBackups, + MostRecentJobFailed: &lastBackupFailed, + }, nextBackupTime, nil +} + +func (bm *scheduledBackupManager) cleanupNBs(scheduledBackup *v1alpha1.NebulaScheduledBackup, nebulaBackupJobs []v1alpha1.NebulaBackup, numToDelete int32) error { + sort.Sort(byBackupStartTime(nebulaBackupJobs)) + + for i := 0; i < int(numToDelete); i++ { + klog.V(4).Infof("Removing Nebula Backup %s/%s triggered by Nebula Scheduled Backup %s/%s", nebulaBackupJobs[i].Namespace, nebulaBackupJobs[i].Name, scheduledBackup.Namespace, scheduledBackup.Name) + err := bm.clientSet.NebulaBackup().DeleteNebulaBackup(nebulaBackupJobs[i].Namespace, nebulaBackupJobs[i].Name) + if err != nil { + return fmt.Errorf("error deleteing backup job %v/%v for nebula scheduled backup %s/%s. err: %w", nebulaBackupJobs[i].Namespace, nebulaBackupJobs[i].Name, scheduledBackup.Namespace, scheduledBackup.Name, err) + } + } + + return nil +} + +func getBackupCleanupScriptsObjectKey(scheduledBackup *v1alpha1.NebulaScheduledBackup) client.ObjectKey { + return client.ObjectKey{ + Namespace: scheduledBackup.Namespace, + Name: fmt.Sprintf("backup-scripts-cm-%v", scheduledBackup.Name), + } +} + +func getOwnerReferenceForSubresources(scheduledBackup *v1alpha1.NebulaScheduledBackup) *metav1.OwnerReference { + return &metav1.OwnerReference{ + APIVersion: scheduledBackup.APIVersion, + Kind: scheduledBackup.Kind, + Name: scheduledBackup.Name, + UID: scheduledBackup.UID, + } +} + +func generateBackupCleanupScriptsConfigMap(scheduledBackup *v1alpha1.NebulaScheduledBackup) *corev1.ConfigMap { + nbObjectKey := getBackupCleanupScriptsObjectKey(scheduledBackup) + + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: nbObjectKey.Name, + Namespace: nbObjectKey.Namespace, + Labels: map[string]string{ + "backup-name": scheduledBackup.Name, + "backup-namespace": scheduledBackup.Namespace, + }, + OwnerReferences: []metav1.OwnerReference{ + *getOwnerReferenceForSubresources(scheduledBackup), + }, + }, + + Data: map[string]string{ + "backup-cleanup.sh": `#!/bin/bash +set -x +# Step 1: run backup +echo "Running Nebula backup..." +/usr/local/bin/br-ent backup full --meta "$META_ADDRESS" $STORAGE_LINE +echo "Nebula backup complete.\n" + +# Step 2: get current backups +echo "Getting current backup info..." +backup_names=($(br-ent show --s3.endpoint "$S3_ENDPOINT" $STORAGE_LINE | grep -e "._[0-9]" | awk -F '|' '{print $2}')) +backup_dates=($(br-ent show --s3.endpoint "$S3_ENDPOINT" $STORAGE_LINE | grep -e "._[0-9]" | awk -F '|' '{print $3}' | tr " " "T")) +total_backups=$(br-ent show --s3.endpoint "$S3_ENDPOINT" $STORAGE_LINE | grep -e "._[0-9]" | wc -l) +echo "Current backup info retrieved.\n" + + +# Step 3: remove backups that need to be removed +echo "Removing previous expired backups..." +if [[ -n $RESERVED_TIME_EPOCH ]] && [[ $RESERVED_TIME_EPOCH -gt 0 ]]; then + echo "Maximum retention time of ${RESERVED_TIME_EPOCH}s set. Removing all previous backups exceeding this retention time...." + now=$(date +"%s") + echo "Current time epoach: $now" + for ind in ${!backup_names[@]} + do + nb_date=$(echo "${backup_dates[ind]}" | tr "T" " " | xargs) + nb_date_epoach=$(date -d "$nb_date" +"%s") + diff=$((now - nb_date_epoach)) + echo "Backing up file ${backup_names[ind]}. Backup date: \"$nb_date ($nb_date_epoach)\". Diff: \"$diff\"" + + if [[ $diff -gt $RESERVED_TIME_EPOCH ]]; then + echo "File ${backup_names[ind]} is older than the maximum reserved time. Deleting..." + br-ent cleanup --meta "$META_ADDRESS" $STORAGE_LINE --name "${backup_names[ind]}" + fi + done + echo "All necessary previous backups cleaned up." +elif [[ -n $NUM_BACKUPS_KEEP ]] && [[ $NUM_BACKUPS_KEEP -gt 0 ]]; then + if [[ $total_backups -gt $NUM_BACKUPS_KEEP ]]; then + echo "Maximum number of backups $NUM_BACKUPS_KEEP set. Removing all backups exceeding this number starting with the oldest backup..." + num_to_del=$((total_backups - NUM_BACKUPS_KEEP)) + echo "Number of previous backups to delete: $num_to_del" + for ind in $(seq 0 $((num_to_del - 1))) + do + br-ent cleanup --meta "$META_ADDRESS" $STORAGE_LINE --name "${backup_names[ind]}" + done + echo "All necessary previous backups cleaned up." + else + echo "Current number of backups has not exceeded the maximum number of backups to be kept. Will leave all previous backups alone." + fi +else + echo "No max retention time or maximum number of backups set. Will leave all previous backups alone" +fi +`, + }, + } +} + +// calculateRunTimes calculates the next runtime and the most recent run time if the scheduled backup was not paused. +func calculateRunTimes(scheduledBackup *v1alpha1.NebulaScheduledBackup, now *metav1.Time) (*metav1.Time, *metav1.Time, error) { + schedule, err := cron.ParseStandard(scheduledBackup.Spec.Schedule) + if err != nil { + return nil, nil, fmt.Errorf("get schedule for nebula scheduled backup %s/%s failed. err: %w", scheduledBackup.Namespace, scheduledBackup.Name, err) + } + + earliestTime := scheduledBackup.CreationTimestamp + if scheduledBackup.Status.LastScheduledBackupTime != nil { + earliestTime = *scheduledBackup.Status.LastScheduledBackupTime + } + + nextT1 := metav1.NewTime(schedule.Next(earliestTime.Local())) + nextT2 := metav1.NewTime(schedule.Next(nextT1.Local())) + + if now.Before(&nextT1) { + return nil, &nextT1, nil + } + + if now.Before(&nextT2) { + return &nextT1, &nextT2, nil + } + + // Check for invalid cron schedule that'll slide past ParseStandard (i.e "0 0 31 2 *") + timeBetweenSchedules := nextT2.Sub(nextT1.Local()).Round(time.Second).Seconds() + if timeBetweenSchedules < 1 { + return nil, nil, fmt.Errorf("invalid schedule %v", scheduledBackup.Spec.Schedule) + } + + elapsedTime := now.Sub(nextT1.Local()).Seconds() + missedRuns := (elapsedTime / timeBetweenSchedules) + 1 + + if missedRuns > 80 { + klog.Warningf("too many missed runs (>80) for nebula scheduled backup %s/%s. Please check for possible clock skew", scheduledBackup.Namespace, scheduledBackup.Name) + } + + mostRecentBackupTime := metav1.NewTime(nextT1.Add(time.Duration((missedRuns-1-1)*timeBetweenSchedules) * time.Second)) + for t := schedule.Next(mostRecentBackupTime.Local()); !t.After(now.Local()); t = schedule.Next(t) { + mostRecentBackupTime = metav1.NewTime(t) + } + + nextBackupTime := metav1.NewTime(schedule.Next(mostRecentBackupTime.Local())) + + return &mostRecentBackupTime, &nextBackupTime, nil +} + +// byBackupStartTime sorts a list of nebula backups by start timestamp, using their names as a tie breaker. +type byBackupStartTime []v1alpha1.NebulaBackup + +func (bt byBackupStartTime) Len() int { + return len(bt) +} + +func (bt byBackupStartTime) Swap(i, j int) { + bt[i], bt[j] = bt[j], bt[i] +} + +func (bt byBackupStartTime) Less(i, j int) bool { + if bt[i].Status.TimeStarted == nil && bt[j].Status.TimeStarted != nil { + return false + } + if bt[i].Status.TimeStarted != nil && bt[j].Status.TimeStarted == nil { + return true + } + if bt[i].Status.TimeStarted.Equal(bt[j].Status.TimeStarted) { + return bt[i].Name < bt[j].Name + } + return bt[i].Status.TimeStarted.Before(bt[j].Status.TimeStarted) +} diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 3a9e70ae..fac6c30b 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -37,25 +37,27 @@ type ClientSet interface { NebulaCluster() NebulaCluster NebulaRestore() NebulaRestore NebulaBackup() NebulaBackup + NebulaScheduledBackup() NebulaScheduledBackup NebulaAutoscaler() NebulaAutoscaler } type clientSet struct { - nodeClient Node - secretClient Secret - cmClient ConfigMap - pvClient PersistentVolume - pvcClient PersistentVolumeClaim - podClient Pod - jobClient Job - svcClient Service - ingressClient Ingress - workloadClient Workload - deployClient Deployment - nebulaClient NebulaCluster - restoreClient NebulaRestore - backupClient NebulaBackup - autoscalerClient NebulaAutoscaler + nodeClient Node + secretClient Secret + cmClient ConfigMap + pvClient PersistentVolume + pvcClient PersistentVolumeClaim + podClient Pod + jobClient Job + svcClient Service + ingressClient Ingress + workloadClient Workload + deployClient Deployment + nebulaClient NebulaCluster + restoreClient NebulaRestore + backupClient NebulaBackup + scheduledBackupClient NebulaScheduledBackup + autoscalerClient NebulaAutoscaler } func NewClientSet(config *rest.Config) (ClientSet, error) { @@ -64,21 +66,22 @@ func NewClientSet(config *rest.Config) (ClientSet, error) { return nil, errors.Errorf("error building runtime client: %v", err) } return &clientSet{ - nodeClient: NewNode(cli), - secretClient: NewSecret(cli), - cmClient: NewConfigMap(cli), - pvClient: NewPV(cli), - pvcClient: NewPVC(cli), - podClient: NewPod(cli), - jobClient: NewJob(cli), - svcClient: NewService(cli), - ingressClient: NewIngress(cli), - workloadClient: NewWorkload(cli), - deployClient: NewDeployment(cli), - nebulaClient: NewNebulaCluster(cli), - restoreClient: NewNebulaRestore(cli), - backupClient: NewNebulaBackup(cli), - autoscalerClient: NewNebulaAutoscaler(cli), + nodeClient: NewNode(cli), + secretClient: NewSecret(cli), + cmClient: NewConfigMap(cli), + pvClient: NewPV(cli), + pvcClient: NewPVC(cli), + podClient: NewPod(cli), + jobClient: NewJob(cli), + svcClient: NewService(cli), + ingressClient: NewIngress(cli), + workloadClient: NewWorkload(cli), + deployClient: NewDeployment(cli), + nebulaClient: NewNebulaCluster(cli), + restoreClient: NewNebulaRestore(cli), + backupClient: NewNebulaBackup(cli), + scheduledBackupClient: NewScheduledNebulaBackup(cli), + autoscalerClient: NewNebulaAutoscaler(cli), }, nil } @@ -138,6 +141,10 @@ func (c *clientSet) NebulaBackup() NebulaBackup { return c.backupClient } +func (c *clientSet) NebulaScheduledBackup() NebulaScheduledBackup { + return c.scheduledBackupClient +} + func (c *clientSet) NebulaAutoscaler() NebulaAutoscaler { return c.autoscalerClient } diff --git a/pkg/kube/nebulabackup.go b/pkg/kube/nebulabackup.go index c2d3a6ed..e57fdc3e 100644 --- a/pkg/kube/nebulabackup.go +++ b/pkg/kube/nebulabackup.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -38,8 +39,11 @@ type BackupUpdateStatus struct { } type NebulaBackup interface { + CreateNebulaBackup(nb *v1alpha1.NebulaBackup) error GetNebulaBackup(namespace, name string) (*v1alpha1.NebulaBackup, error) UpdateNebulaBackupStatus(backup *v1alpha1.NebulaBackup, condition *v1alpha1.BackupCondition, newStatus *BackupUpdateStatus) error + DeleteNebulaBackup(namespace, name string) error + ListNebulaBackupsByUID(namespace string, ownerReferenceUID types.UID) ([]v1alpha1.NebulaBackup, error) } type backupClient struct { @@ -50,6 +54,17 @@ func NewNebulaBackup(cli client.Client) NebulaBackup { return &backupClient{cli: cli} } +func (c *backupClient) CreateNebulaBackup(nb *v1alpha1.NebulaBackup) error { + if err := c.cli.Create(context.TODO(), nb); err != nil { + if apierrors.IsAlreadyExists(err) { + klog.Infof("NebulaBackup %s/%s already exists", nb.Namespace, nb.Name) + return nil + } + return err + } + return nil +} + func (r *backupClient) GetNebulaBackup(namespace, name string) (*v1alpha1.NebulaBackup, error) { backup := &v1alpha1.NebulaBackup{} err := r.cli.Get(context.TODO(), types.NamespacedName{ @@ -63,6 +78,24 @@ func (r *backupClient) GetNebulaBackup(namespace, name string) (*v1alpha1.Nebula return backup, nil } +func (r *backupClient) ListNebulaBackupsByUID(namespace string, ownerReferenceUID types.UID) ([]v1alpha1.NebulaBackup, error) { + backupList := v1alpha1.NebulaBackupList{} + if err := r.cli.List(context.TODO(), &backupList, client.InNamespace(namespace)); err != nil { + return nil, err + } + + var filteredBackups []v1alpha1.NebulaBackup + for _, backup := range backupList.Items { + for _, ownerRef := range backup.OwnerReferences { + if ownerRef.UID == ownerReferenceUID { + filteredBackups = append(filteredBackups, backup) + } + } + } + + return filteredBackups, nil +} + func (r *backupClient) UpdateNebulaBackupStatus(backup *v1alpha1.NebulaBackup, condition *v1alpha1.BackupCondition, newStatus *BackupUpdateStatus) error { var isStatusUpdate bool var isConditionUpdate bool @@ -76,6 +109,14 @@ func (r *backupClient) UpdateNebulaBackupStatus(backup *v1alpha1.NebulaBackup, c utilruntime.HandleError(fmt.Errorf("get NebulaBackup [%s/%s] failed: %v", ns, rtName, err)) return err } + + // Make sure current resource version changes to avoid immediate reconcile if no error + updateErr := r.cli.Update(context.TODO(), backup) + if updateErr != nil { + klog.Errorf("update NebulaScheduledBackup [%s/%s] status failed: %v", ns, rtName, updateErr) + return updateErr + } + isStatusUpdate = updateBackupStatus(&backup.Status, newStatus) isConditionUpdate = condutil.UpdateNebulaBackupCondition(&backup.Status, condition) if isStatusUpdate || isConditionUpdate { @@ -91,6 +132,21 @@ func (r *backupClient) UpdateNebulaBackupStatus(backup *v1alpha1.NebulaBackup, c }) } +func (r *backupClient) DeleteNebulaBackup(namespace, name string) error { + nb, err := r.GetNebulaBackup(namespace, name) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + if err := r.cli.Delete(context.TODO(), nb); err != nil { + return err + } + klog.Infof("NebulaBackup [%s/%s] deleted successfully", namespace, name) + return nil +} + func updateBackupStatus(status *v1alpha1.BackupStatus, newStatus *BackupUpdateStatus) bool { if newStatus == nil { return false @@ -102,11 +158,11 @@ func updateBackupStatus(status *v1alpha1.BackupStatus, newStatus *BackupUpdateSt isUpdate = true } if newStatus.TimeStarted != nil { - status.TimeStarted = *newStatus.TimeStarted + status.TimeStarted = newStatus.TimeStarted isUpdate = true } if newStatus.TimeCompleted != nil { - status.TimeCompleted = *newStatus.TimeCompleted + status.TimeCompleted = newStatus.TimeCompleted isUpdate = true } diff --git a/pkg/kube/nebulascheduledbackup.go b/pkg/kube/nebulascheduledbackup.go new file mode 100644 index 00000000..0d699c2e --- /dev/null +++ b/pkg/kube/nebulascheduledbackup.go @@ -0,0 +1,144 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" +) + +type ScheduledBackupUpdateStatus struct { + // Used for scheduled incremental backups. Not supported for now. + // LastBackup string + CurrPauseStatus *bool + LastScheduledBackupTime *metav1.Time + LastSuccessfulBackupTime *metav1.Time + NumberOfSuccessfulBackups *int32 + NumberOfFailedBackups *int32 + MostRecentJobFailed *bool +} + +type NebulaScheduledBackup interface { + GetNebulaScheduledBackup(namespace, name string) (*v1alpha1.NebulaScheduledBackup, error) + SetNebulaScheduledBackupStatus(backup *v1alpha1.NebulaScheduledBackup, newStatus *ScheduledBackupUpdateStatus) error +} + +type scheduledBackupClient struct { + cli client.Client +} + +func NewScheduledNebulaBackup(cli client.Client) NebulaScheduledBackup { + return &scheduledBackupClient{cli: cli} +} + +func (r *scheduledBackupClient) GetNebulaScheduledBackup(namespace, name string) (*v1alpha1.NebulaScheduledBackup, error) { + scheduledBackup := &v1alpha1.NebulaScheduledBackup{} + err := r.cli.Get(context.TODO(), types.NamespacedName{ + Name: name, + Namespace: namespace, + }, scheduledBackup) + if err != nil { + klog.V(4).ErrorS(err, "failed to get NebulaScheduledBackup", "namespace", namespace, "name", name) + return nil, err + } + return scheduledBackup, nil +} + +func (r *scheduledBackupClient) SetNebulaScheduledBackupStatus(backup *v1alpha1.NebulaScheduledBackup, newStatus *ScheduledBackupUpdateStatus) error { + var isStatusUpdate bool + ns := backup.GetNamespace() + rtName := backup.GetName() + + return retry.OnError(retry.DefaultRetry, func(e error) bool { return e != nil }, func() error { + if updated, err := r.GetNebulaScheduledBackup(ns, rtName); err == nil { + backup = updated.DeepCopy() + } else { + utilruntime.HandleError(fmt.Errorf("get NebulaScheduledBackup [%s/%s] failed: %v", ns, rtName, err)) + return err + } + + // Make sure current resource version changes to avoid immediate reconcile if no error + updateErr := r.cli.Update(context.TODO(), backup) + if updateErr != nil { + klog.Errorf("update NebulaScheduledBackup [%s/%s] status failed: %v", ns, rtName, updateErr) + return updateErr + } + + isStatusUpdate = updateScheduledBackupStatus(&backup.Status, newStatus) + if isStatusUpdate { + updateErr = r.cli.Status().Update(context.TODO(), backup) + if updateErr == nil { + klog.Infof("NebulaScheduledBackup [%s/%s] updated successfully", ns, rtName) + return nil + } + klog.Errorf("update NebulaScheduledBackup [%s/%s] status failed: %v", ns, rtName, updateErr) + return updateErr + } + return nil + }) +} + +func updateScheduledBackupStatus(status *v1alpha1.ScheduledBackupStatus, newStatus *ScheduledBackupUpdateStatus) bool { + if newStatus == nil { + return false + } + + isUpdate := false + // Used for scheduled incremental backups. Not supported for now. + /* if newStatus.LastBackup != "" { + status.LastBackup = newStatus.LastBackup + isUpdate = true + } */ + if newStatus.CurrPauseStatus != status.CurrPauseStatus { + status.CurrPauseStatus = newStatus.CurrPauseStatus + isUpdate = true + } + + if newStatus.LastScheduledBackupTime != nil { + status.LastScheduledBackupTime = newStatus.LastScheduledBackupTime + isUpdate = true + } + + if newStatus.LastSuccessfulBackupTime != nil { + status.LastSuccessfulBackupTime = newStatus.LastSuccessfulBackupTime + isUpdate = true + } + + if newStatus.NumberOfSuccessfulBackups != nil { + status.NumberOfSuccessfulBackups = newStatus.NumberOfSuccessfulBackups + } + + if newStatus.NumberOfFailedBackups != nil { + status.NumberOfFailedBackups = newStatus.NumberOfFailedBackups + } + + if newStatus.MostRecentJobFailed != nil { + status.MostRecentJobFailed = newStatus.MostRecentJobFailed + isUpdate = true + } + + return isUpdate +}