From 5bbc9366c4ca1f077aec7ec7edd5673c000a5789 Mon Sep 17 00:00:00 2001 From: "Giau. Tran Minh" <12751435+giautm@users.noreply.github.com> Date: Sun, 5 May 2024 17:07:28 +0700 Subject: [PATCH] controllers: support migrate down command (#167) --- api/v1alpha1/atlasmigration_types.go | 19 +- api/v1alpha1/zz_generated.deepcopy.go | 40 +++ charts/atlas-operator/templates/crds/crd.yaml | 20 ++ .../bases/db.atlasgo.io_atlasmigrations.yaml | 20 ++ .../migration/mysql_migrate_down.yaml | 21 ++ controllers/atlasmigration_controller.go | 276 ++++++++++------ controllers/atlasmigration_controller_test.go | 300 ++++++++++++++++-- controllers/atlasschema_controller_test.go | 2 +- controllers/common.go | 1 + controllers/testhelper.go | 53 +++- scripts/integration-tests.sh | 30 ++ 11 files changed, 654 insertions(+), 128 deletions(-) create mode 100644 config/integration/migration/mysql_migrate_down.yaml diff --git a/api/v1alpha1/atlasmigration_types.go b/api/v1alpha1/atlasmigration_types.go index 13cba958..b8740c0d 100644 --- a/api/v1alpha1/atlasmigration_types.go +++ b/api/v1alpha1/atlasmigration_types.go @@ -50,8 +50,10 @@ type ( Conditions []metav1.Condition `json:"conditions,omitempty"` // LastAppliedVersion is the version of the most recent successful versioned migration. LastAppliedVersion string `json:"lastAppliedVersion,omitempty"` - //LastDeploymentURL is the Deployment URL of the most recent successful versioned migration. + // LastDeploymentURL is the Deployment URL of the most recent successful versioned migration. LastDeploymentURL string `json:"lastDeploymentUrl,omitempty"` + // ApprovalURL is the URL to approve the migration. + ApprovalURL string `json:"approvalUrl,omitempty"` // ObservedHash is the hash of the most recent successful versioned migration. ObservedHash string `json:"observed_hash"` // LastApplied is the unix timestamp of the most recent successful versioned migration. @@ -80,6 +82,8 @@ type ( // ExecOrder controls how Atlas computes and executes pending migration files to the database. // +kubebuilder:default=linear ExecOrder MigrateExecOrder `json:"execOrder,omitempty"` + // ProtectedFlows defines the protected flows of a deployment. + ProtectedFlows *ProtectFlows `json:"protectedFlows,omitempty"` } // Cloud defines the Atlas Cloud configuration. Cloud struct { @@ -106,6 +110,19 @@ type ( Name string `json:"name,omitempty"` Tag string `json:"tag,omitempty"` } + // ProtectedFlows defines the protected flows of a deployment. + ProtectFlows struct { + MigrateDown *DeploymentFlow `json:"migrateDown,omitempty"` + } + // DeploymentFlow defines the flow of a deployment. + DeploymentFlow struct { + // Allow allows the flow to be executed. + // +kubebuilder:default=false + Allow bool `json:"allow,omitempty"` + // AutoApprove allows the flow to be automatically approved. + // +kubebuilder:default=false + AutoApprove bool `json:"autoApprove,omitempty"` + } ) // ExecOrder controls how Atlas computes and executes pending migration files to the database. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index f97008a3..18428f8a 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -90,6 +90,11 @@ func (in *AtlasMigrationSpec) DeepCopyInto(out *AtlasMigrationSpec) { in.Cloud.DeepCopyInto(&out.Cloud) in.Dir.DeepCopyInto(&out.Dir) in.DevURLFrom.DeepCopyInto(&out.DevURLFrom) + if in.ProtectedFlows != nil { + in, out := &in.ProtectedFlows, &out.ProtectedFlows + *out = new(ProtectFlows) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AtlasMigrationSpec. @@ -309,6 +314,21 @@ func (in *Credentials) DeepCopy() *Credentials { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeploymentFlow) DeepCopyInto(out *DeploymentFlow) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentFlow. +func (in *DeploymentFlow) DeepCopy() *DeploymentFlow { + if in == nil { + return nil + } + out := new(DeploymentFlow) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Diff) DeepCopyInto(out *Diff) { *out = *in @@ -407,6 +427,26 @@ func (in *Policy) DeepCopy() *Policy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ProtectFlows) DeepCopyInto(out *ProtectFlows) { + *out = *in + if in.MigrateDown != nil { + in, out := &in.MigrateDown, &out.MigrateDown + *out = new(DeploymentFlow) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProtectFlows. +func (in *ProtectFlows) DeepCopy() *ProtectFlows { + if in == nil { + return nil + } + out := new(ProtectFlows) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Remote) DeepCopyInto(out *Remote) { *out = *in diff --git a/charts/atlas-operator/templates/crds/crd.yaml b/charts/atlas-operator/templates/crds/crd.yaml index 4b54cb1a..ef5ec9b7 100644 --- a/charts/atlas-operator/templates/crds/crd.yaml +++ b/charts/atlas-operator/templates/crds/crd.yaml @@ -258,6 +258,23 @@ spec: - linear-skip - non-linear type: string + protectedFlows: + description: ProtectedFlows defines the protected flows of a deployment. + properties: + migrateDown: + description: DeploymentFlow defines the flow of a deployment. + properties: + allow: + default: false + description: Allow allows the flow to be executed. + type: boolean + autoApprove: + default: false + description: AutoApprove allows the flow to be automatically + approved. + type: boolean + type: object + type: object revisionsSchema: description: RevisionsSchema defines the schema that revisions table resides in @@ -297,6 +314,9 @@ spec: status: description: AtlasMigrationStatus defines the observed state of AtlasMigration properties: + approvalUrl: + description: ApprovalURL is the URL to approve the migration. + type: string conditions: description: Conditions represent the latest available observations of an object's state. diff --git a/config/crd/bases/db.atlasgo.io_atlasmigrations.yaml b/config/crd/bases/db.atlasgo.io_atlasmigrations.yaml index a8818bcd..40156135 100644 --- a/config/crd/bases/db.atlasgo.io_atlasmigrations.yaml +++ b/config/crd/bases/db.atlasgo.io_atlasmigrations.yaml @@ -273,6 +273,23 @@ spec: - linear-skip - non-linear type: string + protectedFlows: + description: ProtectedFlows defines the protected flows of a deployment. + properties: + migrateDown: + description: DeploymentFlow defines the flow of a deployment. + properties: + allow: + default: false + description: Allow allows the flow to be executed. + type: boolean + autoApprove: + default: false + description: AutoApprove allows the flow to be automatically + approved. + type: boolean + type: object + type: object revisionsSchema: description: RevisionsSchema defines the schema that revisions table resides in @@ -312,6 +329,9 @@ spec: status: description: AtlasMigrationStatus defines the observed state of AtlasMigration properties: + approvalUrl: + description: ApprovalURL is the URL to approve the migration. + type: string conditions: description: Conditions represent the latest available observations of an object's state. diff --git a/config/integration/migration/mysql_migrate_down.yaml b/config/integration/migration/mysql_migrate_down.yaml new file mode 100644 index 00000000..8c17709d --- /dev/null +++ b/config/integration/migration/mysql_migrate_down.yaml @@ -0,0 +1,21 @@ +# Copyright 2024 The Atlas Operator Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: db.atlasgo.io/v1alpha1 +kind: AtlasMigration +spec: + protectedFlows: + migrateDown: + allow: true + autoApprove: true diff --git a/controllers/atlasmigration_controller.go b/controllers/atlasmigration_controller.go index d65e6618..c479269d 100644 --- a/controllers/atlasmigration_controller.go +++ b/controllers/atlasmigration_controller.go @@ -12,21 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -/* - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package controllers import ( @@ -82,10 +67,13 @@ type ( URL *url.URL DevURL string Dir migrate.Dir + DirLatest migrate.Dir Cloud *cloud RevisionsSchema string Baseline string ExecOrder string + MigrateDown bool + ObservedHash string } cloud struct { URL string @@ -144,16 +132,10 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.recordErrEvent(res, err) return result(err) } - hash, err := data.hash() - if err != nil { - res.SetNotReady("CalculatingHash", err.Error()) - r.recordErrEvent(res, err) - return result(err) - } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. // This is to ensure that other tools know we are still applying the changes. - if res.IsReady() && res.IsHashModified(hash) { + if res.IsReady() && res.IsHashModified(data.ObservedHash) { res.SetNotReady("Reconciling", "Current migration data has changed") return ctrl.Result{Requeue: true}, nil } @@ -165,39 +147,21 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque // TODO(giautm): Create DevDB and run linter for new migration // files before applying it to the target database. - // Create a working directory for the Atlas CLI - // The working directory contains the atlas.hcl config - // and the migrations directory (if any) - wd, err := atlasexec.NewWorkingDir( - atlasexec.WithAtlasHCL(data.render), - atlasexec.WithMigrations(data.Dir), - ) - if err != nil { - res.SetNotReady("ReadingMigrationData", err.Error()) - r.recordErrEvent(res, err) - return result(err) + if data.DevURL == "" { + // The user has not specified an URL for dev-db, + // spin up a dev-db and get the connection string. + data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) + if err != nil { + res.SetNotReady("GettingDevDB", err.Error()) + return result(err) + } } - defer wd.Close() // Reconcile given resource - status, err := r.reconcile(ctx, wd.Path(), data.EnvName) + err = r.reconcile(ctx, data, res) if err != nil { - res.SetNotReady("Migrating", strings.TrimSpace(err.Error())) r.recordErrEvent(res, err) - return result(err) - } - if data.Dir != nil { - // Compress the migration directory then store it in the secret - // for later use when atlas runs the migration down. - if err := r.storeDirState(ctx, res, data.Dir); err != nil { - res.SetNotReady("StoringDirState", err.Error()) - r.recordErrEvent(res, err) - return result(err) - } } - status.ObservedHash = hash - res.SetReady(*status) - r.recorder.Eventf(res, corev1.EventTypeNormal, "Applied", "Version %s applied", status.LastAppliedVersion) - return ctrl.Result{}, nil + return result(err) } func (r *AtlasMigrationReconciler) readDirState(ctx context.Context, obj client.Object) (migrate.Dir, error) { @@ -208,7 +172,7 @@ func (r *AtlasMigrationReconciler) readDirState(ctx context.Context, obj client. }, } if err := r.Get(ctx, client.ObjectKeyFromObject(secret), secret); err != nil { - return nil, err + return nil, client.IgnoreNotFound(err) } return extractDirFromSecret(secret) } @@ -223,8 +187,6 @@ func (r *AtlasMigrationReconciler) storeDirState(ctx context.Context, obj client if err != nil { return err } - // Set the namespace of the secret to the same as the resource - secret.Namespace = obj.GetNamespace() switch err := r.Create(ctx, secret); { case err == nil: return nil @@ -273,48 +235,152 @@ func (r *AtlasMigrationReconciler) watchRefs(res *dbv1alpha1.AtlasMigration) { } } +const ( + StatePending = "PENDING_USER" + StateApproved = "APPROVED" + StateAborted = "ABORTED" + StateApplied = "APPLIED" +) + // Reconcile the given AtlasMigration resource. -func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, wd, envName string) (_ *dbv1alpha1.AtlasMigrationStatus, _ error) { - c, err := r.atlasClient(wd) +func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migrationData, res *dbv1alpha1.AtlasMigration) error { + log := ctrl.Log.WithName("atlas_migration.reconcile") + // Create a working directory for the Atlas CLI + // The working directory contains the atlas.hcl config + // and the migrations directory (if any) + wd, err := atlasexec.NewWorkingDir( + atlasexec.WithAtlasHCL(data.render), + atlasexec.WithMigrations(data.Dir), + ) if err != nil { - return nil, err + res.SetNotReady("ReadingMigrationData", err.Error()) + return err } + defer wd.Close() + c, err := r.atlasClient(wd.Path()) + if err != nil { + return err + } + log.Info("reconciling migration", "env", data.EnvName) // Check if there are any pending migration files - status, err := c.MigrateStatus(ctx, &atlasexec.MigrateStatusParams{Env: envName}) + status, err := c.MigrateStatus(ctx, &atlasexec.MigrateStatusParams{Env: data.EnvName}) if err != nil { + res.SetNotReady("Migrating", err.Error()) if isChecksumErr(err) { - return nil, err + return err } - return nil, transient(err) + return transient(err) } - if len(status.Pending) == 0 { + switch { + case len(status.Pending) == 0 && len(status.Applied) > 0 && len(status.Available) < len(status.Applied): + if !data.MigrateDown { + res.SetNotReady("Migrating", "Migrate down is not allowed") + return fmt.Errorf("migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade") + } + // The downgrade is allowed, apply the last migration version + last := status.Available[len(status.Available)-1] + log.Info("downgrading to the last available version", "version", last.Version) + params := &atlasexec.MigrateDownParams{ + Env: data.EnvName, + ToVersion: last.Version, + Context: &atlasexec.DeployRunContext{ + TriggerType: atlasexec.TriggerTypeKubernetes, + TriggerVersion: dbv1alpha1.VersionFromContext(ctx), + }, + } + // Atlas needs all versions to be present in the directory + // to downgrade to a specific version. + switch { + case data.Cloud != nil && data.Cloud.RemoteDir != nil: + // Use the `latest` tag of the remote directory to fetch all versions. + params.DirURL = fmt.Sprintf("atlas://%s", data.Cloud.RemoteDir.Name) + case data.DirLatest != nil: + // Copy the dir-state from latest deployment to the different location + // (to avoid the conflict with the current migration directory) + // then use it to downgrade. + current := fmt.Sprintf("migrations-%s", status.Current) + if err = wd.CopyFS(current, data.DirLatest); err != nil { + return err + } + params.DirURL = fmt.Sprintf("file://%s", current) + default: + return fmt.Errorf("unable to downgrade, no dir-state found") + } + run, err := c.MigrateDown(ctx, params) + if err != nil { + res.SetNotReady("Migrating", err.Error()) + if !isSQLErr(err) { + err = transient(err) + } + return err + } + switch run.Status { + case StatePending: + res.SetNotReady("ApprovalPending", "Deployment is waiting for approval") + res.Status.ApprovalURL = run.URL + return transient(fmt.Errorf("plan approval pending, review here: %s", run.URL)) + case StateAborted: + res.SetNotReady("PlanRejected", "Deployment is aborted") + res.Status.ApprovalURL = run.URL + // Migration is aborted, no need to reapply + return fmt.Errorf("plan rejected, review here: %s", run.URL) + case StateApplied, StateApproved: + res.SetReady(dbv1alpha1.AtlasMigrationStatus{ + ObservedHash: data.ObservedHash, + ApprovalURL: run.URL, + LastApplied: run.Start.Unix(), + LastAppliedVersion: run.Target, + LastDeploymentURL: run.URL, + }) + r.recordApplied(res, run.Target) + } + case len(status.Pending) == 0: + log.Info("no pending migrations") + // No pending migrations var lastApplied int64 if len(status.Applied) > 0 { lastApplied = status.Applied[len(status.Applied)-1].ExecutedAt.Unix() } - return &dbv1alpha1.AtlasMigrationStatus{ + res.SetReady(dbv1alpha1.AtlasMigrationStatus{ + ObservedHash: data.ObservedHash, LastApplied: lastApplied, LastAppliedVersion: status.Current, - }, nil - } - // Execute Atlas CLI migrate command - report, err := c.MigrateApply(ctx, &atlasexec.MigrateApplyParams{ - Env: envName, - Context: &atlasexec.DeployRunContext{ - TriggerType: atlasexec.TriggerTypeKubernetes, - TriggerVersion: dbv1alpha1.VersionFromContext(ctx), - }, - }) - if err != nil { - if !isSQLErr(err) { - err = transient(err) + }) + r.recordApplied(res, status.Current) + default: + log.Info("applying pending migrations", "count", len(status.Pending)) + // There are pending migrations + // Execute Atlas CLI migrate command + report, err := c.MigrateApply(ctx, &atlasexec.MigrateApplyParams{ + Env: data.EnvName, + Context: &atlasexec.DeployRunContext{ + TriggerType: atlasexec.TriggerTypeKubernetes, + TriggerVersion: dbv1alpha1.VersionFromContext(ctx), + }, + }) + if err != nil { + res.SetNotReady("Migrating", err.Error()) + if !isSQLErr(err) { + err = transient(err) + } + return err } - return nil, err + res.SetReady(dbv1alpha1.AtlasMigrationStatus{ + ObservedHash: data.ObservedHash, + LastApplied: report.End.Unix(), + LastAppliedVersion: report.Target, + }) + r.recordApplied(res, report.Target) } - return &dbv1alpha1.AtlasMigrationStatus{ - LastApplied: report.End.Unix(), - LastAppliedVersion: report.Target, - }, nil + if data.Dir != nil { + // Compress the migration directory then store it in the secret + // for later use when atlas runs the migration down. + if err = r.storeDirState(ctx, res, data.Dir); err != nil { + res.SetNotReady("StoringDirState", err.Error()) + return err + } + } + return nil } // Extract migration data from the given resource @@ -327,6 +393,7 @@ func (r *AtlasMigrationReconciler) extractData(ctx context.Context, res *dbv1alp RevisionsSchema: s.RevisionsSchema, Baseline: s.Baseline, ExecOrder: string(s.ExecOrder), + MigrateDown: false, } ) if env := s.EnvName; env != "" { @@ -341,6 +408,14 @@ func (r *AtlasMigrationReconciler) extractData(ctx context.Context, res *dbv1alp if c.TokenFrom.SecretKeyRef == nil { return nil, errors.New("cannot use remote directory without Atlas Cloud token") } + if f := s.ProtectedFlows; f != nil { + if d := f.MigrateDown; d != nil { + if d.Allow && d.AutoApprove { + return nil, errors.New("cannot auto-approve migrate-down for remote directory") + } + data.MigrateDown = d.Allow + } + } token, err := getSecretValue(ctx, r, res.Namespace, c.TokenFrom.SecretKeyRef) if err != nil { return nil, err @@ -351,20 +426,32 @@ func (r *AtlasMigrationReconciler) extractData(ctx context.Context, res *dbv1alp URL: c.URL, RemoteDir: &d.Remote, } - case d.ConfigMapRef != nil: - if d.Local != nil { + case d.Local != nil || d.ConfigMapRef != nil: + if d.Local != nil && d.ConfigMapRef != nil { return nil, errors.New("cannot use both configmaps and local directory") } - cfgMap, err := getConfigMap(ctx, r, res.Namespace, d.ConfigMapRef) - if err != nil { - return nil, err + if f := s.ProtectedFlows; f != nil { + if d := f.MigrateDown; d != nil { + if d.Allow && !d.AutoApprove { + return nil, errors.New("cannot allow migrate-down without auto-approve") + } + // Allow migrate-down only if the flow is allowed and auto-approved + data.MigrateDown = d.Allow && d.AutoApprove + } + } + files := d.Local + if files == nil { + cfgMap, err := getConfigMap(ctx, r, res.Namespace, d.ConfigMapRef) + if err != nil { + return nil, err + } + files = cfgMap.Data } - data.Dir, err = memDir(cfgMap.Data) + data.Dir, err = memDir(files) if err != nil { return nil, err } - case d.Local != nil: - data.Dir, err = memDir(d.Local) + data.DirLatest, err = r.readDirState(ctx, res) if err != nil { return nil, err } @@ -379,9 +466,17 @@ func (r *AtlasMigrationReconciler) extractData(ctx context.Context, res *dbv1alp return nil, err } } + data.ObservedHash, err = hashMigrationData(data) + if err != nil { + return nil, err + } return data, nil } +func (r *AtlasMigrationReconciler) recordApplied(res *dbv1alpha1.AtlasMigration, ver string) { + r.recorder.Eventf(res, corev1.EventTypeNormal, "Applied", "Version %s applied", ver) +} + func (r *AtlasMigrationReconciler) recordErrEvent(res *dbv1alpha1.AtlasMigration, err error) { reason := "Error" if isTransient(err) { @@ -391,7 +486,7 @@ func (r *AtlasMigrationReconciler) recordErrEvent(res *dbv1alpha1.AtlasMigration } // Calculate the hash of the given data -func (d *migrationData) hash() (string, error) { +func hashMigrationData(d *migrationData) (string, error) { h := sha256.New() h.Write([]byte(d.URL.String())) if c := d.Cloud; c != nil { @@ -478,8 +573,9 @@ func newSecretObject(obj client.Object, dir migrate.Dir, labels map[string]strin } return &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: makeKeyLatest(obj.GetName()), - Labels: labels, + Name: makeKeyLatest(obj.GetName()), + Namespace: obj.GetNamespace(), + Labels: labels, OwnerReferences: []metav1.OwnerReference{ // Set the owner reference to the given object // This will ensure that the secret is deleted when the owner is deleted. diff --git a/controllers/atlasmigration_controller_test.go b/controllers/atlasmigration_controller_test.go index 27f07c8b..447d25cc 100644 --- a/controllers/atlasmigration_controller_test.go +++ b/controllers/atlasmigration_controller_test.go @@ -53,7 +53,7 @@ func TestReconcile_Notfound(t *testing.T) { obj := &dbv1alpha1.AtlasMigration{ ObjectMeta: migrationObjmeta(), } - _, run := newRunner(NewAtlasMigrationReconciler, nil) + _, run := newRunner(NewAtlasMigrationReconciler, nil, nil) // Nope when the object is not found run(obj, func(result ctrl.Result, err error) { require.NoError(t, err) @@ -85,7 +85,7 @@ func TestMigration_ConfigMap(t *testing.T) { 20230412003626_create_foo.sql h1:8C7Hz48VGKB0trI2BsK5FWpizG6ttcm9ep+tX32y0Tw=`, }, }) - }) + }, nil) assert := func(except ctrl.Result, ready bool, reason, msg, version string) { t.Helper() reconcile(obj, func(result ctrl.Result, err error) { @@ -169,7 +169,7 @@ func TestMigration_Local(t *testing.T) { h, reconcile := newRunner(NewAtlasMigrationReconciler, func(cb *fake.ClientBuilder) { cb.WithStatusSubresource(obj) cb.WithObjects(obj) - }) + }, nil) assert := func(except ctrl.Result, ready bool, reason, msg, version string) { t.Helper() reconcile(obj, func(result ctrl.Result, err error) { @@ -259,6 +259,247 @@ func TestMigration_Local(t *testing.T) { }, h.events()) } +func TestMigration_MigrateDown_Remote_Protected(t *testing.T) { + var ( + meta = migrationObjmeta() + obj = &dbv1alpha1.AtlasMigration{ + ObjectMeta: meta, + Spec: dbv1alpha1.AtlasMigrationSpec{ + TargetSpec: v1alpha1.TargetSpec{ + URL: "sqlite://file?mode=memory", + }, + Cloud: v1alpha1.Cloud{ + TokenFrom: v1alpha1.TokenFrom{ + SecretKeyRef: &corev1.SecretKeySelector{ + Key: "token", + LocalObjectReference: corev1.LocalObjectReference{ + Name: "my-secret", + }, + }, + }, + }, + Dir: v1alpha1.Dir{ + Remote: v1alpha1.Remote{ + Name: "my-dir", + Tag: "v1", + }, + }, + }, + Status: v1alpha1.AtlasMigrationStatus{ + Conditions: []metav1.Condition{ + {Type: "Ready", Status: metav1.ConditionFalse}, + }, + }, + } + ) + mockExec := &mockAtlasExec{} + mockExec.status.res = &atlasexec.MigrateStatus{ + Current: "2", + Applied: []*atlasexec.Revision{ + {Version: "1"}, + {Version: "2"}, + }, + Available: []atlasexec.File{ + // Only the first migration is available. + // This happens when the migration is downgraded. + {Version: "1", Name: "1.sql"}, + }, + } + h, reconcile := newRunner(NewAtlasMigrationReconciler, func(cb *fake.ClientBuilder) { + cb.WithStatusSubresource(obj) + cb.WithObjects( + obj, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + `token`: []byte(`my-token`), + }, + }) + }, mockExec) + assert := func(except ctrl.Result, ready bool, reason, msg, version, approvalURL, deploymentURL string) { + t.Helper() + reconcile(obj, func(result ctrl.Result, err error) { + require.NoError(t, err) + require.EqualValues(t, except, result) + res := &dbv1alpha1.AtlasMigration{ObjectMeta: meta} + h.get(t, res) + require.Len(t, res.Status.Conditions, 1) + require.Equal(t, ready, res.IsReady()) + require.Equal(t, reason, res.Status.Conditions[0].Reason) + require.Contains(t, res.Status.Conditions[0].Message, msg) + require.Equal(t, version, res.Status.LastAppliedVersion) + require.Equal(t, approvalURL, res.Status.ApprovalURL) + require.Equal(t, deploymentURL, res.Status.LastDeploymentURL) + }) + } + // No changes because the migration down is not allowed + assert(ctrl.Result{}, false, "Migrating", "Migrate down is not allowed", "", "", "") + + obj = &dbv1alpha1.AtlasMigration{ + ObjectMeta: meta, + Spec: dbv1alpha1.AtlasMigrationSpec{ + ProtectedFlows: &dbv1alpha1.ProtectFlows{ + MigrateDown: &dbv1alpha1.DeploymentFlow{ + Allow: true, + AutoApprove: true, + }, + }, + }, + } + h.patch(t, obj) + // Unable to migrate down with auto-approve + assert(ctrl.Result{}, false, "ReadingMigrationData", "cannot auto-approve migrate-down for remote directory", "", "", "") + + // Refresh the object, and disable auto-approve + h.client.Get(context.Background(), client.ObjectKeyFromObject(obj), obj) + obj.Spec.ProtectedFlows.MigrateDown.AutoApprove = false + h.client.Update(context.Background(), obj) + + mockExec.down.res = &atlasexec.MigrateDown{ + Current: "2", + Target: "1", + Status: StatePending, + URL: "THIS_IS_DEPLOYMENT_URL", + } + // Reconcile again + assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "Deployment is waiting for approval", "", "THIS_IS_DEPLOYMENT_URL", "") + assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "Deployment is waiting for approval", "", "THIS_IS_DEPLOYMENT_URL", "") + + mockExec.down.res = &atlasexec.MigrateDown{ + Current: "2", + Target: "1", + Status: StateApproved, + URL: "THIS_IS_DEPLOYMENT_URL", + } + // The plan is approved, and the migration should be applied + assert(ctrl.Result{}, true, "Applied", "", "1", "THIS_IS_DEPLOYMENT_URL", "THIS_IS_DEPLOYMENT_URL") + + // Check the events generated by the controller + require.Equal(t, []string{ + "Warning Error migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade", + "Warning Error cannot auto-approve migrate-down for remote directory", + "Warning TransientErr plan approval pending, review here: THIS_IS_DEPLOYMENT_URL", + "Warning TransientErr plan approval pending, review here: THIS_IS_DEPLOYMENT_URL", + "Normal Applied Version 1 applied", + }, h.events()) +} + +func TestMigration_MigrateDown_Local(t *testing.T) { + var ( + meta = migrationObjmeta() + obj = &dbv1alpha1.AtlasMigration{ + ObjectMeta: meta, + Spec: dbv1alpha1.AtlasMigrationSpec{ + TargetSpec: v1alpha1.TargetSpec{ + URL: "sqlite://file?mode=memory", + }, + Dir: v1alpha1.Dir{ + Local: map[string]string{ + "1.sql": "CREATE TABLE t1 (id INT);", + "atlas.sum": `h1:NIfJIuMahN58AEbN26mlFN1UfIH5YYAPLVish2vrYA0= +1.sql h1:0qg7r5sBBfy1rYGVxtli7zUY58RKN5V9gk8tBlLQVDU= +`, + }, + }, + }, + Status: v1alpha1.AtlasMigrationStatus{ + Conditions: []metav1.Condition{ + {Type: "Ready", Status: metav1.ConditionFalse}, + }, + }, + } + ) + latestDir := must(memDir(map[string]string{ + "1.sql": "CREATE TABLE t1 (id INT);", + "2.sql": "CREATE TABLE t2 (id INT);", + "atlas.sum": `h1:8Ehl3NpkxXEFgRtAWgLEM/zYnb4XFCptf24UF4ceFAE= +1.sql h1:0qg7r5sBBfy1rYGVxtli7zUY58RKN5V9gk8tBlLQVDU= +2.sql h1:PXVCtjN2/kqx0z4CPUGz6Oz+1FHbN7TAXi+j+nimLho= +`, + })) + mockExec := &mockAtlasExec{} + mockExec.status.res = &atlasexec.MigrateStatus{ + Current: "2", + Applied: []*atlasexec.Revision{ + {Version: "1"}, + {Version: "2"}, + }, + Available: []atlasexec.File{ + // Only the first migration is available. + // This happens when the migration is downgraded. + {Version: "1", Name: "1.sql"}, + }, + } + h, reconcile := newRunner(NewAtlasMigrationReconciler, func(cb *fake.ClientBuilder) { + cb.WithStatusSubresource(obj) + cb.WithObjects( + obj, + must(newSecretObject(obj, latestDir, nil)), + ) + }, mockExec) + assert := func(except ctrl.Result, ready bool, reason, msg, version, approvalURL, deploymentURL string) { + t.Helper() + reconcile(obj, func(result ctrl.Result, err error) { + require.NoError(t, err) + require.EqualValues(t, except, result) + res := &dbv1alpha1.AtlasMigration{ObjectMeta: meta} + h.get(t, res) + require.Len(t, res.Status.Conditions, 1) + require.Equal(t, ready, res.IsReady()) + require.Equal(t, reason, res.Status.Conditions[0].Reason) + require.Contains(t, res.Status.Conditions[0].Message, msg) + require.Equal(t, version, res.Status.LastAppliedVersion) + require.Equal(t, approvalURL, res.Status.ApprovalURL) + require.Equal(t, deploymentURL, res.Status.LastDeploymentURL) + }) + } + + mockExec.down.res = &atlasexec.MigrateDown{ + Current: "2", + Target: "1", + Status: StateApplied, + URL: "", + } + // No changes because the migration down is not allowed + assert(ctrl.Result{}, false, "Migrating", "Migrate down is not allowed", "", "", "") + + h.patch(t, &dbv1alpha1.AtlasMigration{ + ObjectMeta: meta, + Spec: dbv1alpha1.AtlasMigrationSpec{ + ProtectedFlows: &dbv1alpha1.ProtectFlows{ + MigrateDown: &dbv1alpha1.DeploymentFlow{ + Allow: true, + }, + }, + }, + }) + // Unable to migrate down without auto-approve + assert(ctrl.Result{}, false, "ReadingMigrationData", "cannot allow migrate-down without auto-approve", "", "", "") + + h.patch(t, &dbv1alpha1.AtlasMigration{ + ObjectMeta: meta, + Spec: dbv1alpha1.AtlasMigrationSpec{ + ProtectedFlows: &dbv1alpha1.ProtectFlows{ + MigrateDown: &dbv1alpha1.DeploymentFlow{ + Allow: true, + AutoApprove: true, + }, + }, + }, + }) + // Migrate down should be successful + assert(ctrl.Result{}, true, "Applied", "", "1", "", "") + + // Check the events generated by the controller + require.Equal(t, []string{ + "Warning Error migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade", + "Warning Error cannot allow migrate-down without auto-approve", + "Normal Applied Version 1 applied", + }, h.events()) +} + func TestReconcile_Diff(t *testing.T) { tt := migrationCliTest(t) tt.initDefaultAtlasMigration() @@ -436,7 +677,7 @@ func TestReconcile_reconcile(t *testing.T) { tt := migrationCliTest(t) tt.initDefaultMigrationDir() - md, err := tt.r.extractData(context.Background(), &v1alpha1.AtlasMigration{ + res := &v1alpha1.AtlasMigration{ ObjectMeta: migrationObjmeta(), Spec: v1alpha1.AtlasMigrationSpec{ TargetSpec: v1alpha1.TargetSpec{URL: tt.dburl}, @@ -444,19 +685,12 @@ func TestReconcile_reconcile(t *testing.T) { ConfigMapRef: &corev1.LocalObjectReference{Name: "my-configmap"}, }, }, - }) - require.NoError(t, err) - wd, err := atlasexec.NewWorkingDir( - atlasexec.WithAtlasHCL(md.render), - atlasexec.WithMigrations(md.Dir), - ) + } + md, err := tt.r.extractData(context.Background(), res) require.NoError(t, err) - defer func() { - require.NoError(t, wd.Close()) - }() - status, err := tt.r.reconcile(context.Background(), wd.Path(), "test") + err = tt.r.reconcile(context.Background(), md, res) require.NoError(t, err) - require.EqualValues(t, "20230412003626", status.LastAppliedVersion) + require.EqualValues(t, "20230412003626", res.Status.LastAppliedVersion) } func TestReconcile_reconciling(t *testing.T) { @@ -473,6 +707,7 @@ func TestReconcile_reconciling(t *testing.T) { }, Spec: v1alpha1.AtlasMigrationSpec{ TargetSpec: v1alpha1.TargetSpec{URL: tt.dburl}, + EnvName: "test", Dir: v1alpha1.Dir{ Local: map[string]string{ "1.sql": "bar", @@ -494,33 +729,27 @@ func TestReconcile_reconciling(t *testing.T) { func TestReconcile_reconcile_upToDate(t *testing.T) { tt := migrationCliTest(t) tt.initDefaultMigrationDir() - tt.k8s.put(&dbv1alpha1.AtlasMigration{ + res := &dbv1alpha1.AtlasMigration{ ObjectMeta: migrationObjmeta(), Status: dbv1alpha1.AtlasMigrationStatus{ LastAppliedVersion: "20230412003626", }, - }) + } + tt.k8s.put(res) md, err := tt.r.extractData(context.Background(), &v1alpha1.AtlasMigration{ ObjectMeta: migrationObjmeta(), Spec: v1alpha1.AtlasMigrationSpec{ TargetSpec: v1alpha1.TargetSpec{URL: tt.dburl}, + EnvName: "test", Dir: v1alpha1.Dir{ ConfigMapRef: &corev1.LocalObjectReference{Name: "my-configmap"}, }, }, }) require.NoError(t, err) - wd, err := atlasexec.NewWorkingDir( - atlasexec.WithAtlasHCL(md.render), - atlasexec.WithMigrations(md.Dir), - ) - require.NoError(t, err) - defer func() { - require.NoError(t, wd.Close()) - }() - status, err := tt.r.reconcile(context.Background(), wd.Path(), "test") + err = tt.r.reconcile(context.Background(), md, res) require.NoError(t, err) - require.EqualValues(t, "20230412003626", status.LastAppliedVersion) + require.EqualValues(t, "20230412003626", res.Status.LastAppliedVersion) } func TestReconcile_reconcile_baseline(t *testing.T) { @@ -529,28 +758,31 @@ func TestReconcile_reconcile_baseline(t *testing.T) { tt.addMigrationScript("20230412003627_create_bar.sql", "CREATE TABLE bar (id INT PRIMARY KEY);") tt.addMigrationScript("20230412003628_create_baz.sql", "CREATE TABLE baz (id INT PRIMARY KEY);") - md, err := tt.r.extractData(context.Background(), &v1alpha1.AtlasMigration{ + res := &v1alpha1.AtlasMigration{ ObjectMeta: migrationObjmeta(), Spec: v1alpha1.AtlasMigrationSpec{ TargetSpec: v1alpha1.TargetSpec{URL: tt.dburl}, + EnvName: "test", Dir: v1alpha1.Dir{ ConfigMapRef: &corev1.LocalObjectReference{Name: "my-configmap"}, }, Baseline: "20230412003627", }, - }) + } + md, err := tt.r.extractData(context.Background(), res) require.NoError(t, err) + err = tt.r.reconcile(context.Background(), md, res) + require.NoError(t, err) + require.EqualValues(t, "20230412003628", res.Status.LastAppliedVersion) + wd, err := atlasexec.NewWorkingDir( atlasexec.WithAtlasHCL(md.render), atlasexec.WithMigrations(md.Dir), ) require.NoError(t, err) - defer func() { + t.Cleanup(func() { require.NoError(t, wd.Close()) - }() - status, err := tt.r.reconcile(context.Background(), wd.Path(), "test") - require.NoError(t, err) - require.EqualValues(t, "20230412003628", status.LastAppliedVersion) + }) cli, err := tt.r.atlasClient(wd.Path()) require.NoError(t, err) report, err := cli.MigrateStatus(context.Background(), &atlasexec.MigrateStatusParams{ diff --git a/controllers/atlasschema_controller_test.go b/controllers/atlasschema_controller_test.go index 21de4f1f..023d3552 100644 --- a/controllers/atlasschema_controller_test.go +++ b/controllers/atlasschema_controller_test.go @@ -112,7 +112,7 @@ func TestReconcile_Reconcile(t *testing.T) { h, reconcile := newRunner(NewAtlasSchemaReconciler, func(cb *fake.ClientBuilder) { cb.WithStatusSubresource(obj) cb.WithObjects(obj) - }) + }, nil) assert := func(except ctrl.Result, ready bool, reason, msg string) { t.Helper() reconcile(obj, func(result ctrl.Result, err error) { diff --git a/controllers/common.go b/controllers/common.go index febcedf4..014baa90 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -45,6 +45,7 @@ type ( // AtlasExec is the interface for the Atlas SDK. AtlasExec interface { MigrateApply(ctx context.Context, params *atlasexec.MigrateApplyParams) (*atlasexec.MigrateApply, error) + MigrateDown(ctx context.Context, params *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error) MigrateLint(ctx context.Context, params *atlasexec.MigrateLintParams) (*atlasexec.SummaryReport, error) MigrateStatus(ctx context.Context, params *atlasexec.MigrateStatusParams) (*atlasexec.MigrateStatus, error) diff --git a/controllers/testhelper.go b/controllers/testhelper.go index bc9cb0ed..792081d4 100644 --- a/controllers/testhelper.go +++ b/controllers/testhelper.go @@ -43,14 +43,58 @@ type ( recorder *record.FakeRecorder scheme *runtime.Scheme } + mockCmd[T any] struct { + res *T + err error + } + mockAtlasExec struct { + apply mockCmd[atlasexec.MigrateApply] + down mockCmd[atlasexec.MigrateDown] + lint mockCmd[atlasexec.SummaryReport] + status mockCmd[atlasexec.MigrateStatus] + schemaApply mockCmd[atlasexec.SchemaApply] + schemaInspect mockCmd[string] + } ) +var _ AtlasExec = &mockAtlasExec{} + +// SchemaApply implements AtlasExec. +func (m *mockAtlasExec) SchemaApply(ctx context.Context, params *atlasexec.SchemaApplyParams) (*atlasexec.SchemaApply, error) { + return m.schemaApply.res, m.schemaApply.err +} + +// SchemaInspect implements AtlasExec. +func (m *mockAtlasExec) SchemaInspect(ctx context.Context, params *atlasexec.SchemaInspectParams) (string, error) { + return *m.schemaInspect.res, m.schemaInspect.err +} + +// MigrateApply implements AtlasExec. +func (m *mockAtlasExec) MigrateApply(context.Context, *atlasexec.MigrateApplyParams) (*atlasexec.MigrateApply, error) { + return m.apply.res, m.apply.err +} + +// MigrateDown implements AtlasExec. +func (m *mockAtlasExec) MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error) { + return m.down.res, m.down.err +} + +// MigrateLint implements AtlasExec. +func (m *mockAtlasExec) MigrateLint(ctx context.Context, params *atlasexec.MigrateLintParams) (*atlasexec.SummaryReport, error) { + return m.lint.res, m.lint.err +} + +// MigrateStatus implements AtlasExec. +func (m *mockAtlasExec) MigrateStatus(context.Context, *atlasexec.MigrateStatusParams) (*atlasexec.MigrateStatus, error) { + return m.status.res, m.status.err +} + var globalAtlasMock = func(dir string) (AtlasExec, error) { return atlasexec.NewClient(dir, "atlas") } // newRunner returns a runner that can be used to test a reconcile.Reconciler. -func newRunner[T reconcile.Reconciler](fn func(Manager, AtlasExecFn, bool) T, modify func(*fake.ClientBuilder)) (*helper, runner) { +func newRunner[T reconcile.Reconciler](fn func(Manager, AtlasExecFn, bool) T, modify func(*fake.ClientBuilder), mock *mockAtlasExec) (*helper, runner) { scheme := runtime.NewScheme() clientgoscheme.AddToScheme(scheme) dbv1alpha1.AddToScheme(scheme) @@ -64,7 +108,12 @@ func newRunner[T reconcile.Reconciler](fn func(Manager, AtlasExecFn, bool) T, mo client: c, recorder: r, scheme: scheme, - }, globalAtlasMock, true) + }, func(s string) (AtlasExec, error) { + if mock == nil { + return globalAtlasMock(s) + } + return mock, nil + }, true) h := &helper{client: c, recorder: r} return h, func(obj client.Object, fn check) { fn(a.Reconcile(context.Background(), request(obj))) diff --git a/scripts/integration-tests.sh b/scripts/integration-tests.sh index 92bb79fa..d66f5f56 100755 --- a/scripts/integration-tests.sh +++ b/scripts/integration-tests.sh @@ -92,6 +92,7 @@ echo "" echo "Expect the devdb deployment is scaled to 0" kubectl get deployment atlasschema-mysql-atlas-dev-db \ -o=jsonpath='{.spec.replicas}' | grep -q '0' +kubectl delete -f ./schema cd ./migration @@ -111,3 +112,32 @@ echo "" echo "Expected the new column to be present" mysql_exec "SHOW COLUMNS FROM myapp.posts LIKE 'created_at';" | grep -q 'created_at' echo "" + +mysql_reset "Test migrate down" +k8s_dircfg ./mysql-migrations migration-dir +kubectl delete -f ./mysql_migration.yaml +kubectl apply -f ./mysql_migration.yaml +kubectl wait --timeout=120s --for=condition=ready \ + AtlasMigration/atlasmigration-sample +if ! mysql_exec "SHOW COLUMNS FROM myapp.posts LIKE 'created_at';" | grep -q 'created_at'; then + echo "The column created_at should be present at this point" + exit 1 +fi +atlas migrate rm --dir=file://./mysql-migrations +echo "Removed the last migration, which have the column created_at" +k8s_dircfg ./mysql-migrations migration-dir +# Expect migration is failured +kubectl wait --timeout=120s \ + --for=jsonpath='{.status.conditions[*].message}'="Migrate down is not allowed" \ + AtlasMigration/atlasmigration-sample +# Patch the migration to allow down migration +kubectl patch AtlasMigration/atlasmigration-sample \ + --type merge --patch-file ./mysql_migrate_down.yaml +kubectl wait --timeout=120s --for=condition=ready \ + AtlasMigration/atlasmigration-sample +if mysql_exec "SHOW COLUMNS FROM myapp.posts LIKE 'created_at';" | grep -q 'created_at'; then + echo "The column created_at should not be present at this point" + exit 1 +else + echo "The column created_at is not present" +fi