Skip to content

Commit

Permalink
fix: move custom migrations to manifest-export
Browse files Browse the repository at this point in the history
By "custom migrations" we mean those db migrations that can not be solved by plain SQL,
so they are implemented in go.

This provides the basic setup to move the custom migrations to the manifest-export-service.
* It creates a new endpoint to accept migrations, however the endpoint is not called yet.
* It provides the setup like reading/writing to the custom_migration_cutoff table

However, this change does not include actually moving the migrations from cd- to export-service.
This will be done in a separate PR.

Ref: SRX-V6RVYF
  • Loading branch information
sven-urbanski-freiheit-com committed Dec 11, 2024
1 parent 1da7225 commit 87806f0
Show file tree
Hide file tree
Showing 13 changed files with 588 additions and 33 deletions.
2 changes: 2 additions & 0 deletions charts/kuberpult/templates/manifest-repo-export-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ spec:
value: {{ .Values.argocd.generateFiles | quote }}
- name: KUBERPULT_NETWORK_TIMEOUT_SECONDS
value: {{ .Values.manifestRepoExport.networkTimeoutSeconds | quote }}
- name: KUBERPULT_VERSION
value: {{ $.Chart.AppVersion | quote}}
- name: LOG_FORMAT
value: {{ .Values.log.format | quote }}
- name: LOG_LEVEL
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS custom_migration_cutoff
(
migration_done_at TIMESTAMP NOT NULL,
kuberpult_version varchar(100) PRIMARY KEY -- the version as it appears on GitHub, e.g. "1.2.3"
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS custom_migration_cutoff
(
migration_done_at TIMESTAMP NOT NULL,
kuberpult_version varchar(100) PRIMARY KEY -- the version as it appears on GitHub, e.g. "1.2.3"
);
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ services:
- KUBERPULT_DB_MAX_OPEN_CONNECTIONS=5
- KUBERPULT_DB_MAX_IDLE_CONNECTIONS=1
- KUBERPULT_MINIMIZE_EXPORTED_DATA=true
- KUBERPULT_VERSION=v0.1.2
volumes:
- ./services/cd-service:/kp/kuberpult
- ./database:/kp/database
Expand Down
18 changes: 18 additions & 0 deletions pkg/api/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,24 @@ service VersionService {
rpc GetManifests (GetManifestsRequest) returns (GetManifestsResponse) {}
}

service MigrationService {
rpc EnsureCustomMigrationApplied (EnsureCustomMigrationAppliedRequest) returns (EnsureCustomMigrationAppliedResponse) {}
}

message EnsureCustomMigrationAppliedRequest {
KuberpultVersion version = 1;
}

message EnsureCustomMigrationAppliedResponse {
bool migrationsApplied = 1;
}

message KuberpultVersion {
int32 major = 1;
int32 minor = 2;
int32 patch = 3;
}

service OverviewService {
rpc GetOverview (GetOverviewRequest) returns (GetOverviewResponse) {}
rpc StreamOverview (GetOverviewRequest) returns (stream GetOverviewResponse) {}
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2924,7 +2924,7 @@ func (h *DBHandler) needsCommitEventsMigrations(ctx context.Context, transaction
return true, nil
}

// NeedsMigrations: Checks if we need migrations for any table.
// NeedsMigrations checks if we need migrations for any table.
func (h *DBHandler) NeedsMigrations(ctx context.Context) (bool, error) {
span, ctx := tracer.StartSpanFromContext(ctx, "NeedsMigrations")
defer span.Finish()
Expand Down
23 changes: 22 additions & 1 deletion services/manifest-repo-export-service/pkg/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
import (
"context"
"database/sql"
"github.com/freiheit-com/kuberpult/services/manifest-repo-export-service/pkg/migrations"
"strconv"
"time"

Expand Down Expand Up @@ -185,6 +186,16 @@ func Run(ctx context.Context) error {
}
argoCdGenerateFiles := argoCdGenerateFilesString == "true"

kuberpultVersionRaw, err := valid.ReadEnvVar("KUBERPULT_VERSION")
if err != nil {
return err
}
logger.FromContext(ctx).Info("startup", zap.String("kuberpultVersion", kuberpultVersionRaw))
kuberpultVersion, err := migrations.ParseKuberpultVersion(kuberpultVersionRaw)
if err != nil {
return err
}

var dbCfg db.DBConfig
if dbOption == "postgreSQL" {
dbCfg = db.DBConfig{
Expand Down Expand Up @@ -231,6 +242,16 @@ func Run(ctx context.Context) error {
}
}

migrationServer := &service.MigrationServer{DBHandler: dbHandler}
log.Infof("Running Custom Migrations")
_, err = migrationServer.EnsureCustomMigrationApplied(ctx, &api.EnsureCustomMigrationAppliedRequest{
Version: kuberpultVersion,
})
if err != nil {
return fmt.Errorf("error running custom migrations: %w", err)
}
log.Infof("Finished Custom Migrations successfully")

cfg := repository.RepositoryConfig{
URL: gitUrl,
Path: "./repository",
Expand All @@ -250,7 +271,6 @@ func Run(ctx context.Context) error {

DBHandler: dbHandler,
}

repo, err := repository.New(ctx, cfg)
if err != nil {
return fmt.Errorf("repository.new failed %v", err)
Expand All @@ -272,6 +292,7 @@ func Run(ctx context.Context) error {
Register: func(srv *grpc.Server) {
api.RegisterVersionServiceServer(srv, &service.VersionServiceServer{Repository: repo})
api.RegisterGitServiceServer(srv, &service.GitServer{Repository: repo, Config: cfg, PageSize: 10})
api.RegisterMigrationServiceServer(srv, migrationServer)
reflection.Register(srv)
},
},
Expand Down
123 changes: 123 additions & 0 deletions services/manifest-repo-export-service/pkg/migrations/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*This file is part of kuberpult.
Kuberpult is free software: you can redistribute it and/or modify
it under the terms of the Expat(MIT) License as published by
the Free Software Foundation.
Kuberpult is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
MIT License for more details.
You should have received a copy of the MIT License
along with kuberpult. If not, see <https://directory.fsf.org/wiki/License:Expat>.
Copyright freiheit.com*/

package migrations

import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/freiheit-com/kuberpult/pkg/api/v1"
"github.com/freiheit-com/kuberpult/pkg/db"
"github.com/freiheit-com/kuberpult/pkg/logger"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

type OnErrFunc = func(err error) error

// StartSpanFromContext is the same as tracer.StartSpanFromContext, but also returns an onError function that tags the span as error
// You should call the onErrorFunc when the span should be marked as failed.
func StartSpanFromContext(ctx context.Context, name string) (tracer.Span, context.Context, OnErrFunc) {
mySpan, ctx := tracer.StartSpanFromContext(ctx, name)
onErr := func(err error) error {
if err == nil {
return nil
}
mySpan.Finish(tracer.WithError(err))
return err
}
return mySpan, ctx, onErr
}

func DBReadCustomMigrationCutoff(h *db.DBHandler, ctx context.Context, transaction *sql.Tx, requestedVersion *api.KuberpultVersion) (*api.KuberpultVersion, error) {
span, ctx, onErr := StartSpanFromContext(ctx, "DBReadCustomMigrationCutoff")
defer span.Finish()

requestedVersionString := FormatKuberpultVersion(requestedVersion)

selectQuery := h.AdaptQuery(`
SELECT kuberpult_version
FROM custom_migration_cutoff
WHERE kuberpult_version=?
LIMIT 1;`)
span.SetTag("query", selectQuery)
span.SetTag("requestedVersion", requestedVersionString)
rows, err := transaction.QueryContext(
ctx,
selectQuery,
requestedVersionString,
)
if err != nil {
return nil, onErr(fmt.Errorf("could not query cutoff table from DB. Error: %w\n", err))
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
logger.FromContext(ctx).Sugar().Warnf("migration_cutoff: row closing error: %v", err)
}
}(rows)

if !rows.Next() {
return nil, nil
}
var rawVersion string
err = rows.Scan(&rawVersion)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, onErr(fmt.Errorf("migration_cutoff: Error scanning row from DB. Error: %w\n", err))
}
err = rows.Close()
if err != nil {
return nil, onErr(fmt.Errorf("migration_cutoff: row closing error: %v\n", err))
}
err = rows.Err()
if err != nil {
return nil, onErr(fmt.Errorf("migration_cutoff: row has error: %v\n", err))
}

var kuberpultVersion *api.KuberpultVersion
kuberpultVersion, err = ParseKuberpultVersion(rawVersion)
if err != nil {
return nil, onErr(fmt.Errorf("migration_cutoff: Error parsing kuberpult version. Error: %w", err))
}
return kuberpultVersion, nil
}

func DBWriteCustomMigrationCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx, kuberpultVersion *api.KuberpultVersion) error {
span, ctx, onErr := StartSpanFromContext(ctx, "DBWriteCustomMigrationCutoff")
defer span.Finish()

timestamp, err := h.DBReadTransactionTimestamp(ctx, tx)
if err != nil {
return onErr(fmt.Errorf("DBWriteCustomMigrationCutoff: Error reading transaction timestamp from DB. Error: %w", err))
}

insertQuery := h.AdaptQuery("INSERT INTO custom_migration_cutoff (migration_done_at, kuberpult_version) VALUES (?, ?);")
span.SetTag("query", insertQuery)

_, err = tx.Exec(
insertQuery,
timestamp,
FormatKuberpultVersion(kuberpultVersion),
)
if err != nil {
return onErr(fmt.Errorf("could not write to cutoff table from DB. Error: %w\n", err))
}
return nil
}
Loading

0 comments on commit 87806f0

Please sign in to comment.