Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

atlasmigration: store migrations.tar.gz in k8s' secrets #175

Merged
merged 8 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions charts/atlas-operator/templates/manager-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ rules:
- configmaps
- secrets
verbs:
- create
- delete
- get
- list
- update
- watch
- apiGroups:
- ""
Expand Down
3 changes: 3 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ rules:
- configmaps
- secrets
verbs:
- create
- delete
- get
- list
- update
- watch
- apiGroups:
- ""
Expand Down
106 changes: 105 additions & 1 deletion controllers/atlasmigration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ limitations under the License.
package controllers

import (
"bytes"
"compress/gzip"
"context"
"crypto/sha256"
"encoding/hex"
Expand All @@ -40,6 +42,8 @@ import (
"strings"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -54,7 +58,7 @@ import (
"github.com/ariga/atlas-operator/controllers/watch"
)

//+kubebuilder:rbac:groups=core,resources=configmaps;secrets,verbs=get;list;watch
//+kubebuilder:rbac:groups=core,resources=configmaps;secrets,verbs=create;update;delete;get;list;watch
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
//+kubebuilder:rbac:groups=db.atlasgo.io,resources=atlasmigrations,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=db.atlasgo.io,resources=atlasmigrations/finalizers,verbs=update
Expand Down Expand Up @@ -173,12 +177,57 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque
r.recordErrEvent(res, err)
return result(err)
}
if data.Dir != nil {
// Compress the migration directory then store it in the secret
// for later use when atlas runs the migration down.
if err := r.storeDirState(ctx, res, data.Dir); err != nil {
res.SetNotReady("StoringDirState", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
}
status.ObservedHash = hash
res.SetReady(*status)
r.recorder.Eventf(res, corev1.EventTypeNormal, "Applied", "Version %s applied", status.LastAppliedVersion)
return ctrl.Result{}, nil
}

func (r *AtlasMigrationReconciler) readDirState(ctx context.Context, obj client.Object) (migrate.Dir, error) {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: makeKeyLatest(obj.GetName()),
Namespace: obj.GetNamespace(),
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(secret), secret); err != nil {
return nil, err
}
return extractDirFromSecret(secret)
}

func (r *AtlasMigrationReconciler) storeDirState(ctx context.Context, obj client.Object, dir migrate.Dir) error {
var labels = make(map[string]string, len(obj.GetLabels())+1)
for k, v := range obj.GetLabels() {
labels[k] = v
}
labels["name"] = obj.GetName()
secret, err := newSecretObject(obj, dir, labels)
if err != nil {
return err
}
// Set the namespace of the secret to the same as the resource
secret.Namespace = obj.GetNamespace()
switch err := r.Create(ctx, secret); {
case err == nil:
return nil
case apierrors.IsAlreadyExists(err):
// Update the secret if it already exists
return r.Update(ctx, secret)
default:
return err
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *AtlasMigrationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -388,3 +437,58 @@ func (c *cloud) hasRemoteDir() bool {
}
return c.RemoteDir != nil && c.RemoteDir.Name != ""
}

func makeKeyLatest(resName string) string {
// Inspired by the helm chart key format
const storageKey = "io.atlasgo.db.v1"
return fmt.Sprintf("%s.%s.latest", storageKey, resName)
}

func newSecretObject(obj client.Object, dir migrate.Dir, labels map[string]string) (*corev1.Secret, error) {
const owner = "atlasgo.io"
if labels == nil {
labels = map[string]string{}
}
labels["owner"] = owner
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
if err := migrate.ArchiveDirTo(w, dir); err != nil {
return nil, err
}
// Close the gzip writer to flush the buffer
if err := w.Close(); err != nil {
return nil, err
}
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: makeKeyLatest(obj.GetName()),
Labels: labels,
OwnerReferences: []metav1.OwnerReference{
// Set the owner reference to the given object
// This will ensure that the secret is deleted when the owner is deleted.
*metav1.NewControllerRef(obj, obj.GetObjectKind().GroupVersionKind()),
},
},
Type: "atlasgo.io/db.v1",
Data: map[string][]byte{
// k8s already encodes the tarball in base64
// so we don't need to encode it again.
"migrations.tar.gz": buf.Bytes(),
},
}, nil
}

func extractDirFromSecret(sec *corev1.Secret) (migrate.Dir, error) {
if sec.Type != "atlasgo.io/db.v1" {
return nil, fmt.Errorf("invalid secret type, got %q", sec.Type)
}
tarball, ok := sec.Data["migrations.tar.gz"]
if !ok {
return nil, errors.New("migrations.tar.gz not found")
}
r, err := gzip.NewReader(bytes.NewReader(tarball))
if err != nil {
return nil, err
}
return migrate.UnarchiveDirFrom(r)
}
59 changes: 55 additions & 4 deletions controllers/atlasmigration_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestMigration_Local(t *testing.T) {
require.Equal(t, version, res.Status.LastAppliedVersion)
})
}
newDir := func(dir map[string]string) {
updateDir := func(dir map[string]string) {
t.Helper()
h.patch(t, &dbv1alpha1.AtlasMigration{
ObjectMeta: meta,
Expand All @@ -193,26 +194,49 @@ func TestMigration_Local(t *testing.T) {
},
})
}
assertDir := func(dirMap map[string]string) {
t.Helper()
// Check the content of the tarball
h.get(t, &dbv1alpha1.AtlasMigration{ObjectMeta: meta})
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: makeKeyLatest("atlas-migration"),
Namespace: "default",
},
}
h.get(t, secret)
dir, err := extractDirFromSecret(secret)
require.NoError(t, err)
require.NotNil(t, dir)
// It should contain the same files as the local directory
testContent(t, dirMap, dir)
}
// First reconcile
assert(ctrl.Result{Requeue: true}, false, "Reconciling", "Reconciling", "")
// Second reconcile
assert(ctrl.Result{}, true, "Applied", "", "20230412003626")
assertDir(obj.Spec.Dir.Local)
// Third reconcile, should not change the status
assert(ctrl.Result{}, true, "Applied", "", "20230412003626")
// Update the migration script
newDir(map[string]string{
newDir := map[string]string{
"20230412003626_create_foo.sql": "CREATE TABLE foo (id INT PRIMARY KEY);",
"20230808132722_add-boo.sql": "CREATE TABLE boo (id INT PRIMARY KEY);",
"atlas.sum": `h1:zgFwhjzwhLZr82YtR4+PijDiVYNxwr18C3EqZtG4wyE=
20230412003626_create_foo.sql h1:8C7Hz48VGKB0trI2BsK5FWpizG6ttcm9ep+tX32y0Tw=
20230808132722_add-boo.sql h1:tD/Qak7Q4n0bp9wO8bjWYhRRcgp+oYcUDQIumztpYpg=`,
})
}
updateDir(newDir)
// Fourth reconcile, should change the status to Reconciling
assert(ctrl.Result{Requeue: true}, false, "Reconciling", "Current migration data has changed", "20230412003626")
// The content should not change during the migration
assertDir(obj.Spec.Dir.Local)
// Fifth reconcile, should change the status to Applied
assert(ctrl.Result{}, true, "Applied", "", "20230808132722")
// The content should change to the new directory
assertDir(newDir)
// Update the migration script with bad SQL
newDir(map[string]string{
updateDir(map[string]string{
"20230412003626_create_foo.sql": "CREATE TABLE foo (id INT PRIMARY KEY);",
"20230808132722_add-boo.sql": "CREATE TABLE boo (id INT PRIMARY KEY);",
"20230808140359_bad-sql.sql": "SYNTAX ERROR",
Expand All @@ -225,6 +249,8 @@ func TestMigration_Local(t *testing.T) {
assert(ctrl.Result{Requeue: true}, false, "Reconciling", "Current migration data has changed", "20230808132722")
// Seventh reconcile, should change the status to Failed
assert(ctrl.Result{}, false, "Migrating", `"SYNTAX ERROR" from version "20230808140359"`, "20230808132722")
// The content should not change when the migration fails
assertDir(newDir)
// Check the events generated by the controller
require.Equal(t, []string{
"Normal Applied Version 20230412003626 applied",
Expand Down Expand Up @@ -315,6 +341,31 @@ func TestReconcile_LocalMigrationDir(t *testing.T) {

status := tt.status()
require.EqualValues(tt, "20230412003626", status.LastAppliedVersion)

fsDir, err := getSecretValue(context.Background(), tt.r, "default", &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: makeKeyLatest("atlas-migration"),
},
Key: "migrations.tar.gz",
})
require.NoError(t, err)
require.NotNil(t, fsDir)

// Check the content of the tarball
dir, err := tt.r.readDirState(context.Background(), am)
require.NoError(t, err)
require.NotNil(t, dir)
// It should contain the same files as the local directory
testContent(t, am.Spec.Dir.Local, dir)
}

func testContent(t *testing.T, files map[string]string, dir fs.FS) {
t.Helper()
for f, c := range files {
foo, err := fs.ReadFile(dir, f)
require.NoError(t, err)
require.EqualValues(t, c, string(foo))
}
}

func TestReconcile_LocalMigrationDir_ConfigMap(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions controllers/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
"github.com/ariga/atlas-operator/controllers/watch"
)

//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps;secrets,verbs=get;list;watch
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=create;update;delete;get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps;secrets,verbs=create;get;list;watch
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
//+kubebuilder:rbac:groups=core,resources=pods,verbs=create;delete;get;list;watch
//+kubebuilder:rbac:groups=db.atlasgo.io,resources=atlasschemas,verbs=get;list;watch;create;update;patch;delete
Expand Down
14 changes: 7 additions & 7 deletions controllers/devdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ type (
recorder record.EventRecorder
prewarm bool
}
resourceOwner interface {
metav1.Object
runtime.Object
}
)

func newDevDB(mgr Manager, r record.EventRecorder, prewarm bool) *devDBReconciler {
Expand All @@ -74,7 +70,7 @@ func newDevDB(mgr Manager, r record.EventRecorder, prewarm bool) *devDBReconcile
}

// cleanUp clean up any resources created by the controller
func (r *devDBReconciler) cleanUp(ctx context.Context, sc resourceOwner) {
func (r *devDBReconciler) cleanUp(ctx context.Context, sc client.Object) {
// If prewarmDevDB is false, scale down the deployment to 0
if !r.prewarm {
deploy := &appsv1.Deployment{}
Expand Down Expand Up @@ -107,7 +103,7 @@ func (r *devDBReconciler) cleanUp(ctx context.Context, sc resourceOwner) {

// devURL returns the URL of the dev database for the given target URL.
// It creates a dev database if it does not exist.
func (r *devDBReconciler) devURL(ctx context.Context, sc resourceOwner, targetURL url.URL) (string, error) {
func (r *devDBReconciler) devURL(ctx context.Context, sc client.Object, targetURL url.URL) (string, error) {
drv := driver(targetURL.Scheme)
if drv == "sqlite" {
return "sqlite://db?mode=memory", nil
Expand All @@ -133,7 +129,11 @@ func (r *devDBReconciler) devURL(ctx context.Context, sc resourceOwner, targetUR
if err != nil {
return "", err
}
ctrl.SetControllerReference(sc, deploy, r.scheme)
// Set the owner reference to the given object
// This will ensure that the deployment is deleted when the owner is deleted.
if err := ctrl.SetControllerReference(sc, deploy, r.scheme); err != nil {
return "", err
}
if err := r.Create(ctx, deploy); err != nil {
return "", transient(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22
toolchain go1.22.0

require (
ariga.io/atlas v0.21.2-0.20240418081819-02b3f6239b04
ariga.io/atlas v0.21.2-0.20240424054305-cf871e4a7c1e
ariga.io/atlas-go-sdk v0.5.4-0.20240419053913-865efa3e6f8f
github.com/stretchr/testify v1.9.0
golang.org/x/mod v0.17.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ariga.io/atlas v0.21.2-0.20240418081819-02b3f6239b04 h1:YF3qiqtnhn+y4tfhZKTfZKfizpjqHYt7rWPUb+eA4ZA=
ariga.io/atlas v0.21.2-0.20240418081819-02b3f6239b04/go.mod h1:VPlcXdd4w2KqKnH54yEZcry79UAhpaWaxEsmn5JRNoE=
ariga.io/atlas v0.21.2-0.20240424054305-cf871e4a7c1e h1:3iOt1DD4c6IGRjxcm+C5IeI+pHo7HnqMoN14xjVjxsE=
ariga.io/atlas v0.21.2-0.20240424054305-cf871e4a7c1e/go.mod h1:VPlcXdd4w2KqKnH54yEZcry79UAhpaWaxEsmn5JRNoE=
ariga.io/atlas-go-sdk v0.5.4-0.20240419053913-865efa3e6f8f h1:KlImyCEkKSsoFVOzxOnsraDzdt95g0Sqewt8sjJcbrY=
ariga.io/atlas-go-sdk v0.5.4-0.20240419053913-865efa3e6f8f/go.mod h1:9Q+/04PVyJHUse1lEE9Kp6E18xj/6mIzaUTcWYSjSnQ=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
Expand Down
Loading