diff --git a/database/migrations/postgres/1734008644556351_separate_deployments_history.up.sql b/database/migrations/postgres/1734008644556351_separate_deployments_history.up.sql new file mode 100644 index 000000000..815bdd68e --- /dev/null +++ b/database/migrations/postgres/1734008644556351_separate_deployments_history.up.sql @@ -0,0 +1,68 @@ +-- rename deployments table to deployments_history if it doesn't exist +DO $$ +BEGIN + IF EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = 'deployments' + ) AND NOT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = 'deployments_history' + ) THEN + ALTER TABLE deployments RENAME TO deployments_history; + END IF; +END +$$; +-- create deployments table if it doesn't exist +DROP INDEX IF EXISTS deployments_idx; +DROP INDEX IF EXISTS deployments_version_idx; +CREATE TABLE IF NOT EXISTS deployments( + created timestamp without time zone, + releaseversion bigint, + appname varchar NOT NULL, + envname varchar NOT NULL, + metadata varchar, + transformereslversion integer DEFAULT 0, + PRIMARY KEY(appname,envname), + CONSTRAINT fk_deployments_transformer_id FOREIGN key(transformereslversion) REFERENCES event_sourcing_light(eslversion) +); +CREATE INDEX IF NOT EXISTS deployments_idx ON deployments USING btree ("appname","envname"); +CREATE INDEX IF NOT EXISTS deployments_version_idx ON deployments USING btree ("releaseversion","appname","envname"); + +-- insert data into deployments table from deployments_history table if there's no data inside it +DO $$ +BEGIN + IF EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = 'deployments' + ) AND NOT EXISTS ( + SELECT 1 FROM deployments LIMIT 1 + ) THEN + INSERT INTO deployments (releaseVersion, created, appName, envname, metadata, transformereslversion) + SELECT DISTINCT + deployments_history.releaseVersion, + deployments_history.created, + deployments_history.appName, + deployments_history.envname, + deployments_history.metadata, + deployments_history.transformereslversion + FROM ( + SELECT + MAX(version) AS latestDeployment, + appname, + envname + FROM + "deployments_history" + GROUP BY + appname, envname) AS latest + JOIN + deployments_history AS deployments_history + ON + latest.latestDeployment=deployments_history.version + AND latest.envname=deployments_history.envname + AND latest.appname=deployments_history.appname; + END IF; +END +$$; + +-- Remove all_deployments table +DROP TABLE IF EXISTS all_deployments; diff --git a/database/migrations/sqlite/1734008644556351_separate_deployments_history.up.sql b/database/migrations/sqlite/1734008644556351_separate_deployments_history.up.sql new file mode 100644 index 000000000..524665fe4 --- /dev/null +++ b/database/migrations/sqlite/1734008644556351_separate_deployments_history.up.sql @@ -0,0 +1,29 @@ +CREATE TABLE IF NOT EXISTS deployments_history +( + created TIMESTAMP, + releaseVersion BIGINT NULL, + appName VARCHAR, + envName VARCHAR, + metadata VARCHAR, + transformereslVersion INTEGER DEFAULT 0, + version INTEGER PRIMARY KEY AUTOINCREMENT, + FOREIGN KEY(transformereslVersion) REFERENCES event_sourcing_light(eslVersion) +); + +INSERT INTO deployments_history (created, releaseversion, appname, envname, metadata, transformereslversion) +SELECT created, releaseversion, appname, envname, metadata, transformereslversion +FROM deployments +ORDER BY version; +DROP TABLE IF EXISTS deployments; +DROP TABLE IF EXISTS all_deployments; +CREATE TABLE IF NOT EXISTS deployments +( + created TIMESTAMP, + releaseVersion BIGINT NULL, + appName VARCHAR, + envName VARCHAR, + metadata VARCHAR, + transformereslVersion INTEGER DEFAULT 0, + PRIMARY KEY (appname, envname) + FOREIGN KEY(transformereslVersion) REFERENCES event_sourcing_light(eslVersion) +); diff --git a/pkg/db/db.go b/pkg/db/db.go index db8685f8c..faf5d4a37 100644 --- a/pkg/db/db.go +++ b/pkg/db/db.go @@ -928,30 +928,6 @@ func (h *DBHandler) DBSelectAllApplications(ctx context.Context, transaction *sq return &resultGo, nil } -type DBDeployment struct { - Created time.Time - ReleaseVersion *int64 - App string - Env string - TransformerID TransformerID - Metadata string // json -} - -type Deployment struct { - Created time.Time - App string - Env string - Version *int64 - Metadata DeploymentMetadata - TransformerID TransformerID -} - -type DeploymentMetadata struct { - DeployedByName string - DeployedByEmail string - CiLink string -} - type EnvironmentLock struct { EslVersion EslVersion Created time.Time @@ -1026,7 +1002,6 @@ func (h *DBHandler) RunCustomMigrations( ctx context.Context, getAllAppsFun GetAllAppsFun, writeAllDeploymentsFun WriteAllDeploymentsFun, - writeAllCurrentlyDeployedFun WriteAllDeploymentsFun, writeAllReleasesFun WriteAllReleasesFun, writeAllEnvLocksFun WriteAllEnvLocksFun, writeAllAppLocksFun WriteAllAppLocksFun, @@ -1049,10 +1024,6 @@ func (h *DBHandler) RunCustomMigrations( if err != nil { return err } - err = h.RunCustomMigrationAllDeployments(ctx, writeAllCurrentlyDeployedFun) //TODO: Merge with RunCustomMigrationDeployments - if err != nil { - return err - } err = h.RunCustomMigrationReleases(ctx, getAllAppsFun, writeAllReleasesFun) if err != nil { return err @@ -1080,440 +1051,6 @@ func (h *DBHandler) RunCustomMigrations( return nil } -func (h *DBHandler) DBSelectLatestDeployment(ctx context.Context, tx *sql.Tx, appSelector string, envSelector string) (*Deployment, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectLatestDeployment") - defer span.Finish() - - selectQuery := h.AdaptQuery(fmt.Sprintf( - "SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion" + - " FROM deployments " + - " WHERE appName=? AND envName=? " + - " ORDER BY version DESC " + - " LIMIT 1;")) - span.SetTag("query", selectQuery) - rows, err := tx.QueryContext( - ctx, - selectQuery, - appSelector, - envSelector, - ) - if err != nil { - return nil, fmt.Errorf("could not select deployment from DB. Error: %w\n", err) - } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) - } - }(rows) - return processDeployment(rows) -} - -func (h *DBHandler) DBSelectAllLatestDeploymentsForApplication(ctx context.Context, tx *sql.Tx, appName string) (map[string]Deployment, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllLatestDeployments") - defer span.Finish() - - selectQuery := h.AdaptQuery( - ` - SELECT - deployments.created, - deployments.appname, - deployments.releaseVersion, - deployments.envName, - deployments.metadata - FROM ( - SELECT - MAX(version) AS latest, - appname, - envname - FROM - deployments - GROUP BY - envName, appname - ) AS latest - JOIN - deployments AS deployments - ON - latest.latest=deployments.version - AND latest.appname=deployments.appname - AND latest.envName=deployments.envName - WHERE deployments.appname = (?) AND deployments.releaseVersion IS NOT NULL ;`) - - span.SetTag("query", selectQuery) - rows, err := tx.QueryContext( - ctx, - selectQuery, - appName, - ) - if err != nil { - return nil, fmt.Errorf("could not select deployment from DB. Error: %w\n", err) - } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) - } - }(rows) - return processAllLatestDeploymentsForApp(rows) -} - -func (h *DBHandler) DBSelectAllLatestDeploymentsOnEnvironment(ctx context.Context, tx *sql.Tx, envName string) (map[string]*int64, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllLatestDeployments") - defer span.Finish() - - selectQuery := h.AdaptQuery( - `SELECT - deployments.appname, - deployments.releaseVersion -FROM ( - SELECT - MAX(version) AS latest, - appname, - envname - FROM - deployments - GROUP BY - envName, appname) AS latest -JOIN - deployments AS deployments -ON - latest.latest=deployments.version - AND latest.appname=deployments.appname - AND latest.envName=deployments.envName -WHERE deployments.envName= ?; -`) - - span.SetTag("query", selectQuery) - rows, err := tx.QueryContext( - ctx, - selectQuery, - envName, - ) - if err != nil { - return nil, fmt.Errorf("could not select deployment from DB. Error: %w\n", err) - } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) - } - }(rows) - return processAllLatestDeployments(rows) -} - -func processAllLatestDeployments(rows *sql.Rows) (map[string]*int64, error) { - result := make(map[string]*int64) - for rows.Next() { - var releaseVersion sql.NullInt64 - var appName string - err := rows.Scan(&appName, &releaseVersion) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) - } - - if releaseVersion.Valid { - result[appName] = &releaseVersion.Int64 - } - } - err := rows.Close() - if err != nil { - return nil, fmt.Errorf("deployments: row closing error: %v\n", err) - } - err = rows.Err() - if err != nil { - return nil, fmt.Errorf("deployments: row has error: %v\n", err) - } - return result, nil -} - -func processAllLatestDeploymentsForApp(rows *sql.Rows) (map[string]Deployment, error) { - result := make(map[string]Deployment) - for rows.Next() { - var curr = Deployment{ - Created: time.Time{}, - Env: "", - App: "", - Version: nil, - Metadata: DeploymentMetadata{ - DeployedByName: "", - DeployedByEmail: "", - CiLink: "", - }, - TransformerID: 0, - } - var releaseVersion sql.NullInt64 - var jsonMetadata string - err := rows.Scan(&curr.Created, &curr.App, &releaseVersion, &curr.Env, &jsonMetadata) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) - } - err = json.Unmarshal(([]byte)(jsonMetadata), &curr.Metadata) - if err != nil { - return nil, fmt.Errorf("Error during json unmarshal in deployments. Error: %w. Data: %s\n", err, jsonMetadata) - } - if releaseVersion.Valid { - curr.Version = &releaseVersion.Int64 - } - result[curr.Env] = curr - } - err := rows.Close() - if err != nil { - return nil, fmt.Errorf("deployments: row closing error: %v\n", err) - } - err = rows.Err() - if err != nil { - return nil, fmt.Errorf("deployments: row has error: %v\n", err) - } - return result, nil -} - -func (h *DBHandler) DBSelectSpecificDeployment(ctx context.Context, tx *sql.Tx, appSelector string, envSelector string, releaseVersion uint64) (*Deployment, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectSpecificDeployment") - defer span.Finish() - - selectQuery := h.AdaptQuery(fmt.Sprintf( - "SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion" + - " FROM deployments " + - " WHERE appName=? AND envName=? and releaseVersion=?" + - " ORDER BY version DESC " + - " LIMIT 1;")) - span.SetTag("query", selectQuery) - rows, err := tx.QueryContext( - ctx, - selectQuery, - appSelector, - envSelector, - releaseVersion, - ) - if err != nil { - return nil, fmt.Errorf("could not select deployment from DB. Error: %w\n", err) - } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) - } - }(rows) - return processDeployment(rows) -} - -func processDeployment(rows *sql.Rows) (*Deployment, error) { - var releaseVersion sql.NullInt64 - var row = &DBDeployment{ - Created: time.Time{}, - ReleaseVersion: nil, - App: "", - Env: "", - Metadata: "", - TransformerID: 0, - } - //exhaustruct:ignore - var resultJson = DeploymentMetadata{} - if rows.Next() { - err := rows.Scan(&row.Created, &releaseVersion, &row.App, &row.Env, &row.Metadata, &row.TransformerID) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) - } - if releaseVersion.Valid { - row.ReleaseVersion = &releaseVersion.Int64 - } - - err = json.Unmarshal(([]byte)(row.Metadata), &resultJson) - if err != nil { - return nil, fmt.Errorf("Error during json unmarshal in deployments. Error: %w. Data: %s\n", err, row.Metadata) - } - } - err := rows.Close() - if err != nil { - return nil, fmt.Errorf("deployments: row closing error: %v\n", err) - } - err = rows.Err() - if err != nil { - return nil, fmt.Errorf("deployments: row has error: %v\n", err) - } - return &Deployment{ - Created: row.Created, - App: row.App, - Env: row.Env, - Version: row.ReleaseVersion, - Metadata: resultJson, - TransformerID: row.TransformerID, - }, nil -} - -func (h *DBHandler) DBSelectDeploymentHistory(ctx context.Context, tx *sql.Tx, appSelector string, envSelector string, limit int) ([]Deployment, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectDeploymentHistory") - defer span.Finish() - - selectQuery := h.AdaptQuery(fmt.Sprintf( - "SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion" + - " FROM deployments " + - " WHERE appName=? AND envName=? " + - " ORDER BY version DESC " + - " LIMIT ?;")) - span.SetTag("query", selectQuery) - rows, err := tx.QueryContext( - ctx, - selectQuery, - appSelector, - envSelector, - limit, - ) - if err != nil { - return nil, fmt.Errorf("could not select deployment history from DB. Error: %w\n", err) - } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) - } - }(rows) - - result := make([]Deployment, 0) - - for rows.Next() { - row, err := h.processSingleDeploymentRow(ctx, rows) - if err != nil { - return nil, err - } - result = append(result, *row) - } - err = closeRows(rows) - if err != nil { - return nil, err - } - return result, nil -} - -func (h *DBHandler) DBSelectDeploymentsByTransformerID(ctx context.Context, tx *sql.Tx, transformerID TransformerID, limit uint) ([]Deployment, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectDeploymentsByTransformerID") - defer span.Finish() - - selectQuery := h.AdaptQuery(fmt.Sprintf( - "SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion" + - " FROM deployments " + - " WHERE transformereslVersion=? " + - " ORDER BY version DESC " + - " LIMIT ?;")) - - span.SetTag("query", selectQuery) - rows, err := tx.QueryContext( - ctx, - selectQuery, - transformerID, - limit, - ) - if err != nil { - return nil, fmt.Errorf("could not select deployments by transformer id from DB. Error: %w\n", err) - } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) - } - }(rows) - deployments := make([]Deployment, 0) - for rows.Next() { - row, err := h.processSingleDeploymentRow(ctx, rows) - if err != nil { - return nil, err - } - deployments = append(deployments, *row) - } - err = closeRows(rows) - if err != nil { - return nil, err - } - return deployments, nil -} - -func (h *DBHandler) DBSelectAnyDeployment(ctx context.Context, tx *sql.Tx) (*DBDeployment, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAnyDeployment") - defer span.Finish() - selectQuery := h.AdaptQuery(fmt.Sprintf( - "SELECT created, releaseVersion, appName, envName" + - " FROM deployments " + - " LIMIT 1;")) - span.SetTag("query", selectQuery) - - rows, err := tx.QueryContext( - ctx, - selectQuery, - ) - if err != nil { - return nil, fmt.Errorf("could not select any deployments from DB. Error: %w\n", err) - } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - logger.FromContext(ctx).Sugar().Warnf("deployments row could not be closed: %v", err) - } - }(rows) - //exhaustruct:ignore - var row = &DBDeployment{} - if rows.Next() { - var releaseVersion sql.NullInt64 - err := rows.Scan(&row.Created, &releaseVersion, &row.App, &row.Env) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) - } - if releaseVersion.Valid { - row.ReleaseVersion = &releaseVersion.Int64 - } - } else { - row = nil - } - err = closeRows(rows) - if err != nil { - return nil, err - } - return row, nil -} - -func (h *DBHandler) DBAllDeploymentsContainsData(ctx context.Context, tx *sql.Tx) (bool, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBAllDeploymentsContainsData") - defer span.Finish() - selectQuery := h.AdaptQuery(fmt.Sprintf( - "SELECT eslVersion, created, appName, json" + - " FROM all_deployments " + - " LIMIT 1;")) - span.SetTag("query", selectQuery) - - rows, err := tx.QueryContext( - ctx, - selectQuery, - ) - if err != nil { - return false, fmt.Errorf("could not select any all_deployments from DB. Error: %w\n", err) - } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - logger.FromContext(ctx).Sugar().Warnf("all_deployments row could not be closed: %v", err) - } - }(rows) - //exhaustruct:ignore - result := rows.Next() - - err = closeRows(rows) - if err != nil { - return false, err - } - return result, nil -} - type DBApp struct { EslVersion EslVersion App string @@ -1783,169 +1320,6 @@ func processAppRow(ctx context.Context, rows *sql.Rows) (*DBAppWithMetaData, err return row, nil } -// DBWriteDeployment writes one deployment, meaning "what should be deployed" -func (h *DBHandler) DBWriteDeployment(ctx context.Context, tx *sql.Tx, deployment Deployment, skipOverview bool) error { - span, ctx := tracer.StartSpanFromContext(ctx, "DBWriteDeployment") - defer span.Finish() - if h == nil { - return nil - } - if tx == nil { - return fmt.Errorf("DBWriteDeployment: no transaction provided") - } - - jsonToInsert, err := json.Marshal(deployment.Metadata) - if err != nil { - return fmt.Errorf("could not marshal json data: %w", err) - } - - insertQuery := h.AdaptQuery( - "INSERT INTO deployments (created, releaseVersion, appName, envName, metadata, transformereslVersion) VALUES (?, ?, ?, ?, ?, ?);") - - now, err := h.DBReadTransactionTimestamp(ctx, tx) - if err != nil { - return fmt.Errorf("DBWriteDeployment unable to get transaction timestamp: %w", err) - } - span.SetTag("query", insertQuery) - nullVersion := NewNullInt(deployment.Version) - - _, err = tx.Exec( - insertQuery, - *now, - nullVersion, - deployment.App, - deployment.Env, - jsonToInsert, - deployment.TransformerID) - - if err != nil { - return fmt.Errorf("could not write deployment into DB. Error: %w\n", err) - } - return nil -} - -// DBSelectAllDeploymentsForApp Returns most recent version of deployments for app with name 'appName' -func (h *DBHandler) DBSelectAllDeploymentsForApp(ctx context.Context, tx *sql.Tx, appName string) (*AllDeploymentsForApp, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllDeploymentsForApp") - defer span.Finish() - if h == nil { - return nil, nil - } - if tx == nil { - return nil, fmt.Errorf("DBSelectAllDeploymentsForApp: no transaction provided") - } - - insertQuery := h.AdaptQuery( - "SELECT eslVersion, created, appName, json FROM all_deployments WHERE appName = (?) ORDER BY eslVersion DESC LIMIT 1;") - - span.SetTag("query", insertQuery) - rows, err := tx.Query( - insertQuery, - appName, - ) - - return h.processAllDeploymentRow(ctx, err, rows) -} - -// DBSelectAllDeploymentsForAppAtTimestamp Returns most recent version of deployments for app with name 'appName' at timestamp ts -func (h *DBHandler) DBSelectAllDeploymentsForAppAtTimestamp(ctx context.Context, tx *sql.Tx, appName string, ts time.Time) (*AllDeploymentsForApp, error) { - span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllDeploymentsForAppAtTimestamp") - defer span.Finish() - if h == nil { - return nil, nil - } - if tx == nil { - return nil, fmt.Errorf("DBSelectAllDeploymentsForAppAtTimestamp: no transaction provided") - } - - insertQuery := h.AdaptQuery( - "SELECT eslVersion, created, appName, json " + - "FROM all_deployments " + - "WHERE appName = (?) AND created <= (?) " + - "ORDER BY eslVersion " + - "DESC LIMIT 1;") - - span.SetTag("query", insertQuery) - rows, err := tx.Query( - insertQuery, - appName, - ts, - ) - - return h.processAllDeploymentRow(ctx, err, rows) -} - -// DBUpdateAllDeploymentsForApp Updates table entry for application with name 'appName' with 'envName' -> 'version' -func (h *DBHandler) DBUpdateAllDeploymentsForApp(ctx context.Context, tx *sql.Tx, appName, envName string, version int64) error { - span, ctx := tracer.StartSpanFromContext(ctx, "DBUpdateAllDeploymentsForApp") - defer span.Finish() - if h == nil { - return nil - } - if tx == nil { - return fmt.Errorf("DBUpdateAllDeploymentsForApp: no transaction provided") - } - - //Get current deployment information for app - currentAllDeployments, err := h.DBSelectAllDeploymentsForApp(ctx, tx, appName) - if err != nil { - return fmt.Errorf("could not read current all deployments for app: '%s': %v", appName, err) - } - - //Update current deployment layout with envName -> version - var deploymentsMap map[string]int64 - var previousVersion EslVersion - if currentAllDeployments == nil { //New app - deploymentsMap = map[string]int64{} - previousVersion = 0 - } else { - deploymentsMap = currentAllDeployments.Deployments - previousVersion = currentAllDeployments.Version - - } - deploymentsMap[envName] = version - - //Insert new information into the db - err = h.DBWriteAllDeploymentsForApp(ctx, tx, int(previousVersion), appName, deploymentsMap) - if err != nil { - return fmt.Errorf("could not write all deployments for app: '%s': %v", appName, err) - } - return nil -} - -// DBClearAllDeploymentsForApp Clears all deployments map. Used when undeploying an application -func (h *DBHandler) DBClearAllDeploymentsForApp(ctx context.Context, tx *sql.Tx, appName string) error { - span, ctx := tracer.StartSpanFromContext(ctx, "DBUpdateAllDeploymentsForApp") - defer span.Finish() - if h == nil { - return nil - } - if tx == nil { - return fmt.Errorf("DBClearAllDeploymentsForApp: no transaction provided") - } - - //Get current deployment information for app - currentAllDeployments, err := h.DBSelectAllDeploymentsForApp(ctx, tx, appName) - if err != nil { - return fmt.Errorf("could not read current all deployments for app: '%s': %v", appName, err) - } - - deploymentsMap := map[string]int64{} //EmptyMap - var previousVersion EslVersion - if currentAllDeployments == nil { - previousVersion = 0 - } else { - previousVersion = currentAllDeployments.Version - } - - //Insert new information into the db - err = h.DBWriteAllDeploymentsForApp(ctx, tx, int(previousVersion), appName, deploymentsMap) - if err != nil { - return fmt.Errorf("could not write all deployments for app: '%s': %v", appName, err) - } - return nil -} - // CUSTOM MIGRATIONS func (h *DBHandler) RunCustomMigrationReleases(ctx context.Context, getAllAppsFun GetAllAppsFun, writeAllReleasesFun WriteAllReleasesFun) error { @@ -2013,27 +1387,6 @@ func (h *DBHandler) RunCustomMigrationDeployments(ctx context.Context, getAllDep }) } -func (h *DBHandler) RunCustomMigrationAllDeployments(ctx context.Context, writeAllDeployments WriteAllDeploymentsFun) error { - span, ctx := tracer.StartSpanFromContext(ctx, "RunCustomMigrationDeployments") - defer span.Finish() - - return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { - needsMigrating, err := h.needsAllDeploymentsMigrations(ctx, transaction) - if err != nil { - return err - } - if !needsMigrating { - return nil - } - err = writeAllDeployments(ctx, transaction, h) - if err != nil { - return fmt.Errorf("could not get current deployments to run custom migrations: %v", err) - } - - return nil - }) -} - func (h *DBHandler) needsDeploymentsMigrations(ctx context.Context, transaction *sql.Tx) (bool, error) { l := logger.FromContext(ctx).Sugar() allAppsDb, err := h.DBSelectAnyDeployment(ctx, transaction) @@ -2047,19 +1400,6 @@ func (h *DBHandler) needsDeploymentsMigrations(ctx context.Context, transaction return true, nil } -func (h *DBHandler) needsAllDeploymentsMigrations(ctx context.Context, transaction *sql.Tx) (bool, error) { - l := logger.FromContext(ctx).Sugar() - contains, err := h.DBAllDeploymentsContainsData(ctx, transaction) - if err != nil { - return true, err - } - if contains { - l.Warnf("There are already all_deployments in the DB - skipping migrations") - return !contains, nil - } - return !contains, nil -} - type AllApplicationsJson struct { Apps []string `json:"apps"` } @@ -2239,7 +1579,6 @@ func (h *DBHandler) NeedsMigrations(ctx context.Context) (bool, error) { (*DBHandler).needsTeamLocksMigrations, (*DBHandler).needsCommitEventsMigrations, (*DBHandler).needsEnvironmentsMigrations, - (*DBHandler).needsAllDeploymentsMigrations, } for i := range checkFunctions { f := checkFunctions[i] @@ -4559,46 +3898,6 @@ func (h *DBHandler) processSingleDeploymentAttemptsRow(ctx context.Context, rows } -// processSingleDeploymentRow only processes the row. It assumes that there is an element ready to be processed in rows. -func (h *DBHandler) processSingleDeploymentRow(ctx context.Context, rows *sql.Rows) (*Deployment, error) { - var row = &DBDeployment{ - Created: time.Time{}, - ReleaseVersion: nil, - App: "", - Env: "", - Metadata: "", - TransformerID: 0, - } - var releaseVersion sql.NullInt64 - //exhaustruct:ignore - var resultJson = DeploymentMetadata{} - - err := rows.Scan(&row.Created, &releaseVersion, &row.App, &row.Env, &row.Metadata, &row.TransformerID) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) - } - if releaseVersion.Valid { - row.ReleaseVersion = &releaseVersion.Int64 - } - - err = json.Unmarshal(([]byte)(row.Metadata), &resultJson) - if err != nil { - return nil, fmt.Errorf("Error during json unmarshal in deployments. Error: %w. Data: %s\n", err, row.Metadata) - } - - return &Deployment{ - Created: row.Created, - App: row.App, - Env: row.Env, - Version: row.ReleaseVersion, - Metadata: resultJson, - TransformerID: row.TransformerID, - }, nil -} - // Environments type DBAllEnvironments struct { @@ -5339,96 +4638,6 @@ func (h *DBHandler) DBReadLastFailedEslEvents(ctx context.Context, tx *sql.Tx, l return failedEsls, nil } -type AllDeploymentsForApp struct { - Version EslVersion `json:"version"` - AppName string `json:"appName"` - Deployments map[string]int64 `json:"deployments"` //Maps environment name to release version -} - -func (h *DBHandler) DBWriteAllDeploymentsForApp(ctx context.Context, tx *sql.Tx, prev int, appName string, environmentDeployments map[string]int64) error { - span, ctx := tracer.StartSpanFromContext(ctx, "DBWriteAllDeploymentsForApp") - defer span.Finish() - - if h == nil { - return nil - } - if tx == nil { - return fmt.Errorf("attempting to write to the all_deployments table without a transaction") - } - - insertQuery := h.AdaptQuery( - "INSERT INTO all_deployments (eslVersion, created, appName, json) VALUES (?, ?, ?, ?);", - ) - now, err := h.DBReadTransactionTimestamp(ctx, tx) - if err != nil { - return fmt.Errorf("DBWriteAllDeploymentsForApp unable to get transaction timestamp: %w", err) - } - jsonDeployments, err := json.Marshal(environmentDeployments) - if err != nil { - return fmt.Errorf("could not marshall deployments for app: '%s': %v\n", appName, err) - } - - span.SetTag("query", insertQuery) - _, err = tx.Exec( - insertQuery, - prev+1, - *now, - appName, - jsonDeployments, - ) - if err != nil { - return fmt.Errorf("DBWriteAllDeploymentsForApp error executing query: %w", err) - } - return nil -} - -type DBAllDeploymentsForAppRow struct { - EslVersion EslVersion - Created time.Time - AppName string - DeploymentsJson string -} - -func (h *DBHandler) processAllDeploymentRow(ctx context.Context, err error, rows *sql.Rows) (*AllDeploymentsForApp, error) { - if err != nil { - return nil, fmt.Errorf("could not query all_deployments table from DB. Error: %w\n", err) - } - defer func(rows *sql.Rows) { - err := rows.Close() - if err != nil { - logger.FromContext(ctx).Sugar().Warnf("all_deployments: row could not be closed: %v", err) - } - }(rows) - //exhaustruct:ignore - var row = &DBAllDeploymentsForAppRow{} - //exhaustruct:ignore - var deployments = &AllDeploymentsForApp{} - if rows.Next() { - - err := rows.Scan(&row.EslVersion, &row.Created, &row.AppName, &row.DeploymentsJson) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, fmt.Errorf("Error scanning oldest_deployments row from DB. Error: %w\n", err) - } - err = json.Unmarshal([]byte(row.DeploymentsJson), &deployments.Deployments) - if err != nil { - return nil, fmt.Errorf("Error unmarshalling all deployments. Error: %w\n", err) - } - deployments.AppName = row.AppName - deployments.Version = row.EslVersion - } else { - row = nil - deployments = nil - } - err = closeRows(rows) - if err != nil { - return nil, err - } - return deployments, nil -} - func (h *DBHandler) DBReadTransactionTimestamp(ctx context.Context, tx *sql.Tx) (*time.Time, error) { if h == nil { return nil, nil diff --git a/pkg/db/db_deployments.go b/pkg/db/db_deployments.go new file mode 100644 index 000000000..f5eb714e5 --- /dev/null +++ b/pkg/db/db_deployments.go @@ -0,0 +1,655 @@ +/*This file is part of kuberpult. + +Kuberpult is free software: you can redistribute it and/or modify +it under the terms of the Expat(MIT) License as published by +the Free Software Foundation. + +Kuberpult is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +MIT License for more details. + +You should have received a copy of the MIT License +along with kuberpult. If not, see . + +Copyright freiheit.com*/ + +package db + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "github.com/freiheit-com/kuberpult/pkg/logger" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "time" +) + +type DBDeployment struct { + Created time.Time + ReleaseVersion *int64 + App string + Env string + TransformerID TransformerID + Metadata string // json +} + +type DeploymentMetadata struct { + DeployedByName string + DeployedByEmail string + CiLink string +} + +type Deployment struct { + Created time.Time + App string + Env string + Version *int64 + Metadata DeploymentMetadata + TransformerID TransformerID +} + +// SELECT + +func (h *DBHandler) DBSelectLatestDeployment(ctx context.Context, tx *sql.Tx, appSelector string, envSelector string) (*Deployment, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectLatestDeployment") + defer span.Finish() + selectQuery := h.AdaptQuery(` + SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion + FROM deployments + WHERE appName=? AND envName=? + LIMIT 1; + `) + span.SetTag("query", selectQuery) + rows, err := tx.QueryContext( + ctx, + selectQuery, + appSelector, + envSelector, + ) + if err != nil { + return nil, fmt.Errorf("could not select deployment for app %s on env %s from DB. Error: %w\n", appSelector, envSelector, err) + } + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) + } + }(rows) + return processDeployment(rows) +} + +func (h *DBHandler) DBSelectAllLatestDeploymentsForApplication(ctx context.Context, tx *sql.Tx, appName string) (map[string]Deployment, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllLatestDeploymentsForApplication") + defer span.Finish() + selectQuery := h.AdaptQuery(` + SELECT created, appname, releaseVersion, envName, metadata + FROM deployments + WHERE deployments.appname = (?) AND deployments.releaseVersion IS NOT NULL; + `) + span.SetTag("query", selectQuery) + rows, err := tx.QueryContext( + ctx, + selectQuery, + appName, + ) + if err != nil { + return nil, fmt.Errorf("could not select deployment of app %s from DB. Error: %w\n", appName, err) + } + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) + } + }(rows) + return processAllLatestDeploymentsForApp(rows) +} + +func (h *DBHandler) DBSelectAllLatestDeploymentsOnEnvironment(ctx context.Context, tx *sql.Tx, envName string) (map[string]*int64, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllLatestDeploymentsOnEnvironment") + defer span.Finish() + selectQuery := h.AdaptQuery(` + SELECT appname, releaseVersion + FROM deployments + WHERE deployments.envName= ?; + `) + + span.SetTag("query", selectQuery) + rows, err := tx.QueryContext( + ctx, + selectQuery, + envName, + ) + if err != nil { + return nil, fmt.Errorf("could not select deployment for env %s from DB. Error: %w\n", envName, err) + } + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) + } + }(rows) + return processAllLatestDeployments(rows) +} + +func (h *DBHandler) DBSelectSpecificDeployment(ctx context.Context, tx *sql.Tx, appSelector string, envSelector string, releaseVersion uint64) (*Deployment, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectSpecificDeployment") + defer span.Finish() + selectQuery := h.AdaptQuery(` + SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion + FROM deployments + WHERE appName=? AND envName=? and releaseVersion=? + LIMIT 1; + `) + + span.SetTag("query", selectQuery) + rows, err := tx.QueryContext( + ctx, + selectQuery, + appSelector, + envSelector, + releaseVersion, + ) + if err != nil { + return nil, fmt.Errorf("could not select deployment for app %s on env %s for version %v from DB. Error: %w\n", appSelector, envSelector, releaseVersion, err) + } + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) + } + }(rows) + return processDeployment(rows) +} + +func (h *DBHandler) DBSelectDeploymentHistory(ctx context.Context, tx *sql.Tx, appSelector string, envSelector string, limit int) ([]Deployment, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectDeploymentHistory") + defer span.Finish() + selectQuery := h.AdaptQuery(` + SELECT created, releaseVersion, appName, envname, metadata, transformereslversion + FROM deployments_history + WHERE deployments_history.appname = (?) AND deployments_history.envname = (?) + ORDER BY version DESC + LIMIT ?; + `) + + span.SetTag("query", selectQuery) + rows, err := tx.QueryContext( + ctx, + selectQuery, + appSelector, + envSelector, + limit, + ) + if err != nil { + return nil, fmt.Errorf("could not select deployment history of app %s in env %s from DB. Error: %w\n", appSelector, envSelector, err) + } + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) + } + }(rows) + + result := make([]Deployment, 0) + + for rows.Next() { + row, err := h.processSingleDeploymentRow(ctx, rows) + if err != nil { + return nil, err + } + result = append(result, *row) + } + err = closeRows(rows) + if err != nil { + return nil, err + } + return result, nil +} + +func (h *DBHandler) DBSelectDeploymentsByTransformerID(ctx context.Context, tx *sql.Tx, transformerID TransformerID, limit uint) ([]Deployment, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectDeploymentsByTransformerID") + defer span.Finish() + selectQuery := h.AdaptQuery(` + SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion + FROM deployments + WHERE transformereslVersion=? + LIMIT ?; + `) + + span.SetTag("query", selectQuery) + rows, err := tx.QueryContext( + ctx, + selectQuery, + transformerID, + limit, + ) + if err != nil { + return nil, fmt.Errorf("could not select deployments by transformer id from DB. Error: %w\n", err) + } + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("deployments: row closing error: %v", err) + } + }(rows) + deployments := make([]Deployment, 0) + for rows.Next() { + row, err := h.processSingleDeploymentRow(ctx, rows) + if err != nil { + return nil, err + } + deployments = append(deployments, *row) + } + err = closeRows(rows) + if err != nil { + return nil, err + } + return deployments, nil +} + +func (h *DBHandler) DBSelectAnyDeployment(ctx context.Context, tx *sql.Tx) (*DBDeployment, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAnyDeployment") + defer span.Finish() + selectQuery := h.AdaptQuery(` + SELECT created, releaseVersion, appName, envName + FROM deployments + LIMIT 1; + `) + span.SetTag("query", selectQuery) + + rows, err := tx.QueryContext( + ctx, + selectQuery, + ) + if err != nil { + return nil, fmt.Errorf("could not select any deployments from DB. Error: %w\n", err) + } + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("deployments row could not be closed: %v", err) + } + }(rows) + //exhaustruct:ignore + var row = &DBDeployment{} + if rows.Next() { + var releaseVersion sql.NullInt64 + err := rows.Scan(&row.Created, &releaseVersion, &row.App, &row.Env) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) + } + if releaseVersion.Valid { + row.ReleaseVersion = &releaseVersion.Int64 + } + } else { + row = nil + } + err = closeRows(rows) + if err != nil { + return nil, err + } + return row, nil +} + +func (h *DBHandler) DBSelectAllDeploymentsForApp(ctx context.Context, tx *sql.Tx, appName string) (map[string]int64, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllDeploymentsForApp") + defer span.Finish() + insertQuery := h.AdaptQuery(` + SELECT envname, releaseVersion + FROM deployments + WHERE appName = (?) AND releaseVersion IS NOT NULL; + `) + if h == nil { + return nil, nil + } + if tx == nil { + return nil, fmt.Errorf("DBSelectAllDeploymentsForApp: no transaction provided") + } + + span.SetTag("query", insertQuery) + rows, err := tx.Query( + insertQuery, + appName, + ) + + return h.processAllDeploymentRow(ctx, err, rows) +} + +func (h *DBHandler) DBSelectAllDeploymentsForAppAtTimestamp(ctx context.Context, tx *sql.Tx, appName string, ts time.Time) (map[string]int64, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllDeploymentsForAppAtTimestamp") + defer span.Finish() + query := h.AdaptQuery(` + SELECT + deployments_history.envName, + deployments_history.releaseVersion + FROM ( + SELECT + MAX(version) AS latest, + appname, + envname + FROM + deployments_history + WHERE deployments_history.appname = (?) AND created <= (?) AND deployments_history.releaseVersion IS NOT NULL + GROUP BY + envName, appname + ) AS latest + JOIN + deployments_history AS deployments_history + ON + latest.latest=deployments_history.version + AND latest.appname=deployments_history.appname + AND latest.envName=deployments_history.envName;`) + if h == nil { + return nil, nil + } + if tx == nil { + return nil, fmt.Errorf("DBSelectAllDeploymentsForAppAtTimestamp: no transaction provided") + } + + span.SetTag("query", query) + rows, err := tx.Query( + query, + appName, + ts, + ) + + return h.processAllDeploymentRow(ctx, err, rows) +} + +// UPDATE, DELETE, INSERT + +func (h *DBHandler) DBUpdateOrCreateDeployment(ctx context.Context, tx *sql.Tx, deployment Deployment) error { + span, ctx := tracer.StartSpanFromContext(ctx, "DBUpdateOrCreateDeployment") + defer span.Finish() + err := h.upsertDeploymentRow(ctx, tx, deployment) + if err != nil { + return err + } + err = h.insertDeploymentHistoryRow(ctx, tx, deployment) + if err != nil { + return err + } + return nil +} + +// Internal functions + +func (h *DBHandler) upsertDeploymentRow(ctx context.Context, tx *sql.Tx, deployment Deployment) error { + span, ctx := tracer.StartSpanFromContext(ctx, "upsertDeploymentRow") + defer span.Finish() + upsertQuery := h.AdaptQuery(` + INSERT INTO deployments (created, releaseVersion, appName, envName, metadata, transformereslVersion) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(appName, envName) + DO UPDATE SET created = excluded.created, releaseVersion = excluded.releaseVersion, metadata = excluded.metadata, transformereslversion = excluded.transformereslversion; + `) + if h == nil { + return nil + } + if tx == nil { + return fmt.Errorf("upsertDeploymentRow: no transaction provided") + } + + jsonToInsert, err := json.Marshal(deployment.Metadata) + if err != nil { + return fmt.Errorf("could not marshal json data: %w", err) + } + + now, err := h.DBReadTransactionTimestamp(ctx, tx) + if err != nil { + return fmt.Errorf("upsertDeploymnetRow unable to get transaction timestamp: %w", err) + } + span.SetTag("query", upsertQuery) + nullVersion := NewNullInt(deployment.Version) + + _, err = tx.Exec( + upsertQuery, + *now, + nullVersion, + deployment.App, + deployment.Env, + jsonToInsert, + deployment.TransformerID) + + if err != nil { + return fmt.Errorf("could not write deployment into DB. Error: %w\n", err) + } + return nil +} + +func (h *DBHandler) insertDeploymentHistoryRow(ctx context.Context, tx *sql.Tx, deployment Deployment) error { + span, ctx := tracer.StartSpanFromContext(ctx, "insertDeploymentHistoryRow") + defer span.Finish() + insertQuery := h.AdaptQuery(` + INSERT INTO deployments_history (created, releaseVersion, appName, envName, metadata, transformereslVersion) + VALUES (?, ?, ?, ?, ?, ?); + `) + if h == nil { + return nil + } + if tx == nil { + return fmt.Errorf("DBWriteDeployment: no transaction provided") + } + + jsonToInsert, err := json.Marshal(deployment.Metadata) + if err != nil { + return fmt.Errorf("could not marshal json data: %w", err) + } + + now, err := h.DBReadTransactionTimestamp(ctx, tx) + if err != nil { + return fmt.Errorf("DBWriteDeployment unable to get transaction timestamp: %w", err) + } + span.SetTag("query", insertQuery) + nullVersion := NewNullInt(deployment.Version) + + _, err = tx.Exec( + insertQuery, + *now, + nullVersion, + deployment.App, + deployment.Env, + jsonToInsert, + deployment.TransformerID) + + if err != nil { + return fmt.Errorf("could not write deployment_history into DB. Error: %w\n", err) + } + return nil +} + +// process Rows + +func processDeployment(rows *sql.Rows) (*Deployment, error) { + var releaseVersion sql.NullInt64 + var row = &DBDeployment{ + Created: time.Time{}, + ReleaseVersion: nil, + App: "", + Env: "", + Metadata: "", + TransformerID: 0, + } + //exhaustruct:ignore + var resultJson = DeploymentMetadata{} + if rows.Next() { + err := rows.Scan(&row.Created, &releaseVersion, &row.App, &row.Env, &row.Metadata, &row.TransformerID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) + } + if releaseVersion.Valid { + row.ReleaseVersion = &releaseVersion.Int64 + } + + err = json.Unmarshal(([]byte)(row.Metadata), &resultJson) + if err != nil { + return nil, fmt.Errorf("Error during json unmarshal in deployments. Error: %w. Data: %s\n", err, row.Metadata) + } + } + err := rows.Close() + if err != nil { + return nil, fmt.Errorf("deployments: row closing error: %v\n", err) + } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("deployments: row has error: %v\n", err) + } + return &Deployment{ + Created: row.Created, + App: row.App, + Env: row.Env, + Version: row.ReleaseVersion, + Metadata: resultJson, + TransformerID: row.TransformerID, + }, nil +} + +func processAllLatestDeploymentsForApp(rows *sql.Rows) (map[string]Deployment, error) { + result := make(map[string]Deployment) + for rows.Next() { + var curr = Deployment{ + Created: time.Time{}, + Env: "", + App: "", + Version: nil, + Metadata: DeploymentMetadata{ + DeployedByName: "", + DeployedByEmail: "", + CiLink: "", + }, + TransformerID: 0, + } + var releaseVersion sql.NullInt64 + var jsonMetadata string + err := rows.Scan(&curr.Created, &curr.App, &releaseVersion, &curr.Env, &jsonMetadata) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) + } + err = json.Unmarshal(([]byte)(jsonMetadata), &curr.Metadata) + if err != nil { + return nil, fmt.Errorf("Error during json unmarshal in deployments. Error: %w. Data: %s\n", err, jsonMetadata) + } + if releaseVersion.Valid { + curr.Version = &releaseVersion.Int64 + } + result[curr.Env] = curr + } + err := rows.Close() + if err != nil { + return nil, fmt.Errorf("deployments: row closing error: %v\n", err) + } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("deployments: row has error: %v\n", err) + } + return result, nil +} + +func processAllLatestDeployments(rows *sql.Rows) (map[string]*int64, error) { + result := make(map[string]*int64) + for rows.Next() { + var releaseVersion sql.NullInt64 + var appName string + err := rows.Scan(&appName, &releaseVersion) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) + } + + if releaseVersion.Valid { + result[appName] = &releaseVersion.Int64 + } + } + err := rows.Close() + if err != nil { + return nil, fmt.Errorf("deployments: row closing error: %v\n", err) + } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("deployments: row has error: %v\n", err) + } + return result, nil +} + +func (h *DBHandler) processSingleDeploymentRow(ctx context.Context, rows *sql.Rows) (*Deployment, error) { + var row = &DBDeployment{ + Created: time.Time{}, + ReleaseVersion: nil, + App: "", + Env: "", + Metadata: "", + TransformerID: 0, + } + var releaseVersion sql.NullInt64 + //exhaustruct:ignore + var resultJson = DeploymentMetadata{} + + err := rows.Scan(&row.Created, &releaseVersion, &row.App, &row.Env, &row.Metadata, &row.TransformerID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("Error scanning deployments row from DB. Error: %w\n", err) + } + if releaseVersion.Valid { + row.ReleaseVersion = &releaseVersion.Int64 + } + + err = json.Unmarshal(([]byte)(row.Metadata), &resultJson) + if err != nil { + return nil, fmt.Errorf("Error during json unmarshal in deployments. Error: %w. Data: %s\n", err, row.Metadata) + } + + return &Deployment{ + Created: row.Created, + App: row.App, + Env: row.Env, + Version: row.ReleaseVersion, + Metadata: resultJson, + TransformerID: row.TransformerID, + }, nil +} + +func (h *DBHandler) processAllDeploymentRow(ctx context.Context, err error, rows *sql.Rows) (map[string]int64, error) { + if err != nil { + return nil, fmt.Errorf("could not query deployments table from DB. Error: %w\n", err) + } + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("deployments: row could not be closed: %v", err) + } + }(rows) + deployments := make(map[string]int64) + for rows.Next() { + var rowVersion int64 + var rowEnv string + err := rows.Scan(&rowEnv, &rowVersion) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("Error scanning oldest_deployments row from DB. Error: %w\n", err) + } + deployments[rowEnv] = rowVersion + } + err = closeRows(rows) + if err != nil { + return nil, err + } + return deployments, nil +} diff --git a/pkg/db/db_test.go b/pkg/db/db_test.go index d3e6f9d65..8fac7574a 100644 --- a/pkg/db/db_test.go +++ b/pkg/db/db_test.go @@ -853,12 +853,12 @@ func TestReadWriteDeployment(t *testing.T) { return err } - err = dbHandler.DBWriteDeployment(ctx, transaction, Deployment{ + err = dbHandler.DBUpdateOrCreateDeployment(ctx, transaction, Deployment{ App: tc.App, Env: tc.Env, Version: tc.VersionToDeploy, TransformerID: 0, - }, false) + }) if err != nil { return err } @@ -988,7 +988,7 @@ func TestReadAllLatestDeploymentForApplication(t *testing.T) { } for _, deployment := range tc.SetupDeployments { - err := dbHandler.DBWriteDeployment(ctx, transaction, *deployment, false) + err := dbHandler.DBUpdateOrCreateDeployment(ctx, transaction, *deployment) if err != nil { return err } @@ -1099,7 +1099,7 @@ func TestReadAllLatestDeployment(t *testing.T) { } for _, deployment := range tc.SetupDeployments { - err := dbHandler.DBWriteDeployment(ctx, transaction, *deployment, false) + err := dbHandler.DBUpdateOrCreateDeployment(ctx, transaction, *deployment) if err != nil { return err } @@ -1322,8 +1322,21 @@ func TestAllDeployments(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.DBWriteMigrationsTransformer(ctx, transaction) + if err != nil { + return err + } for _, d := range tc.data { - err := dbHandler.DBUpdateAllDeploymentsForApp(ctx, transaction, tc.AppName, d.EnvName, int64(d.Version)) + newVersion := int64(d.Version) + deployment := Deployment{ + Created: time.Now(), + App: tc.AppName, + Env: d.EnvName, + Version: &newVersion, + Metadata: DeploymentMetadata{}, + TransformerID: 0, + } + err := dbHandler.DBUpdateOrCreateDeployment(ctx, transaction, deployment) if err != nil { t.Fatalf("Error updating all deployments: %v\n", err) } @@ -1332,7 +1345,7 @@ func TestAllDeployments(t *testing.T) { if err != nil { t.Fatalf("Error reading from all deployments: %v\n", err) } - if diff := cmp.Diff(tc.expected, result.Deployments); diff != "" { + if diff := cmp.Diff(tc.expected, result); diff != "" { t.Fatalf("mismatch result (-want, +got):\n%s", diff) } return nil diff --git a/pkg/db/transactions.go b/pkg/db/transactions.go index 6df9fe4cf..41c2fd0fd 100644 --- a/pkg/db/transactions.go +++ b/pkg/db/transactions.go @@ -163,7 +163,7 @@ func withTransactionAllOptions[T any](h *DBHandler, ctx context.Context, opts tr func (h *DBHandler) BeginTransaction(ctx context.Context, readonly bool) (*sql.Tx, error) { return h.DB.BeginTx(ctx, &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, + Isolation: sql.LevelReadCommitted, // Otherwise we would get pq: could not serialize access due to concurrent update error while creating releases in parallel ReadOnly: readonly, }) } diff --git a/services/cd-service/pkg/cmd/server.go b/services/cd-service/pkg/cmd/server.go index 2e0286655..df8651cf1 100755 --- a/services/cd-service/pkg/cmd/server.go +++ b/services/cd-service/pkg/cmd/server.go @@ -364,7 +364,6 @@ func RunServer() { ctx, repo.State().GetAppsAndTeams, repo.State().WriteCurrentlyDeployed, - repo.State().WriteAllCurrentlyDeployed, repo.State().WriteAllReleases, repo.State().WriteCurrentEnvironmentLocks, repo.State().WriteCurrentApplicationLocks, diff --git a/services/cd-service/pkg/repository/repository.go b/services/cd-service/pkg/repository/repository.go index 0b49ea85b..33479510b 100644 --- a/services/cd-service/pkg/repository/repository.go +++ b/services/cd-service/pkg/repository/repository.go @@ -71,6 +71,8 @@ type contextKey string const DdMetricsKey contextKey = "ddMetrics" +var gitMutexLock sync.Mutex + // A Repository provides a multiple reader / single writer access to a git repository. type Repository interface { Apply(ctx context.Context, transformers ...Transformer) error @@ -947,6 +949,17 @@ func (r *repository) ApplyTransformers(ctx context.Context, transaction *sql.Tx, return nil, &TransformerBatchApplyError{TransformerError: fmt.Errorf("%s: %w", "failure in afterTransform", err), Index: -1} } + gitSpan, ctx := tracer.StartSpanFromContext(ctx, "GitCommitCreation") + defer gitSpan.Finish() + gitMutexLock.Lock() + defer gitMutexLock.Unlock() + if state.DBHandler.ShouldUseOtherTables() { + var err error + state, err = r.StateAt(nil) + if err != nil { + return nil, &TransformerBatchApplyError{TransformerError: err, Index: -1} + } + } treeId, insertError := state.Filesystem.(*fs.TreeBuilderFS).Insert() if insertError != nil { return nil, &TransformerBatchApplyError{TransformerError: insertError, Index: -1} @@ -1944,7 +1957,7 @@ func (s *State) GetAllDeploymentsForAppFromDB(ctx context.Context, transaction * if result == nil { return map[string]int64{}, nil } - return result.Deployments, nil + return result, nil } func (s *State) GetAllDeploymentsForAppFromDBAtTimestamp(ctx context.Context, transaction *sql.Tx, appName string, ts time.Time) (map[string]int64, error) { @@ -1955,7 +1968,7 @@ func (s *State) GetAllDeploymentsForAppFromDBAtTimestamp(ctx context.Context, tr if result == nil { return map[string]int64{}, nil } - return result.Deployments, nil + return result, nil } func (s *State) GetAllDeploymentsForAppFromManifest(ctx context.Context, appName string) (map[string]int64, error) { @@ -2200,7 +2213,7 @@ func (s *State) WriteCurrentlyDeployed(ctx context.Context, transaction *sql.Tx, CiLink: "", }, } - err = dbHandler.DBWriteDeployment(ctx, transaction, deployment, true) + err = dbHandler.DBUpdateOrCreateDeployment(ctx, transaction, deployment) if err != nil { return fmt.Errorf("error writing Deployment to DB for app %s in env %s: %w", deployment.App, deployment.Env, err) } @@ -2209,39 +2222,6 @@ func (s *State) WriteCurrentlyDeployed(ctx context.Context, transaction *sql.Tx, return nil } -// WriteAllCurrentlyDeployed writes all active deployments for al apps -func (s *State) WriteAllCurrentlyDeployed(ctx context.Context, transaction *sql.Tx, dbHandler *db.DBHandler) error { - ddSpan, ctx := tracer.StartSpanFromContext(ctx, "WriteAllCurrentlyDeployed") - defer ddSpan.Finish() - _, envNames, err := s.GetEnvironmentConfigsSortedFromManifest() // this is intentional, when doing custom migrations (which is where this function is called), we want to read from the manifest repo explicitly - if err != nil { - return err - } - apps, err := s.GetApplicationsFromFile() - if err != nil { - return err - } - - for _, appName := range apps { - deploymentsForApp := map[string]int64{} - for _, envName := range envNames { - var version *uint64 - version, err = s.GetEnvironmentApplicationVersionFromManifest(envName, appName) - if err != nil { - return fmt.Errorf("could not get version of app %s in env %s", appName, envName) - } - if version != nil { - deploymentsForApp[envName] = int64(*version) - } - } - err = dbHandler.DBWriteAllDeploymentsForApp(ctx, transaction, 0, appName, deploymentsForApp) - if err != nil { - return fmt.Errorf("error writing all deployments to DB for app %s: %w", appName, err) - } - } - return nil -} - // WriteCurrentEnvironmentLocks gets all locks on any environment in manifest and writes them to the DB func (s *State) WriteCurrentEnvironmentLocks(ctx context.Context, transaction *sql.Tx, dbHandler *db.DBHandler) error { ddSpan, ctx := tracer.StartSpanFromContext(ctx, "WriteCurrentEnvironmentLocks") diff --git a/services/cd-service/pkg/repository/transformer.go b/services/cd-service/pkg/repository/transformer.go index 698f1c801..4dda6a24e 100644 --- a/services/cd-service/pkg/repository/transformer.go +++ b/services/cd-service/pkg/repository/transformer.go @@ -1549,7 +1549,7 @@ func (u *UndeployApplication) Transform( deployment.Version = nil deployment.Metadata.DeployedByName = user.Name deployment.Metadata.DeployedByEmail = user.Email - err = state.DBHandler.DBWriteDeployment(ctx, transaction, *deployment, false) + err = state.DBHandler.DBUpdateOrCreateDeployment(ctx, transaction, *deployment) if err != nil { return "", err } @@ -1632,10 +1632,6 @@ func (u *UndeployApplication) Transform( return "", fmt.Errorf("UndeployApplication: could not clear releases for app '%s': %v", u.Application, err) } - err = state.DBHandler.DBClearAllDeploymentsForApp(ctx, transaction, u.Application) - if err != nil { - return "", fmt.Errorf("UndeployApplication: could not clear all deployments for app '%s': %v", u.Application, err) - } allEnvs, err := state.DBHandler.DBSelectAllEnvironments(ctx, transaction) if err != nil { return "", fmt.Errorf("UndeployApplication: could not get all environments: %v", err) @@ -3082,14 +3078,10 @@ func (c *DeployApplicationVersion) Transform( CiLink: c.CiLink, }, } - err = state.DBHandler.DBWriteDeployment(ctx, transaction, newDeployment, c.SkipOverview) + err = state.DBHandler.DBUpdateOrCreateDeployment(ctx, transaction, newDeployment) if err != nil { return "", fmt.Errorf("could not write deployment for %v - %v", newDeployment, err) } - err = state.DBHandler.DBUpdateAllDeploymentsForApp(ctx, transaction, c.Application, c.Environment, int64(c.Version)) - if err != nil { - return "", fmt.Errorf("could not write oldest deployment for %v - %v", newDeployment, err) - } } else { //Check if there is a version of target app already deployed on target environment if _, err := fs.Lstat(versionFile); err == nil { diff --git a/services/cd-service/pkg/repository/transformer_db_test.go b/services/cd-service/pkg/repository/transformer_db_test.go index 0677ec385..e83ab4b4a 100644 --- a/services/cd-service/pkg/repository/transformer_db_test.go +++ b/services/cd-service/pkg/repository/transformer_db_test.go @@ -3391,7 +3391,7 @@ func TestUndeployDBState(t *testing.T) { t.Fatal(err) } - if len(allDeployments.Deployments) != 0 { + if len(allDeployments) != 0 { t.Fatal("No deployments expected, but found some.") } return nil diff --git a/services/cd-service/pkg/service/commit_deployment.go b/services/cd-service/pkg/service/commit_deployment.go index 8b9450617..018ae830d 100644 --- a/services/cd-service/pkg/service/commit_deployment.go +++ b/services/cd-service/pkg/service/commit_deployment.go @@ -39,28 +39,16 @@ func (s *CommitDeploymentServer) GetCommitDeploymentInfo(ctx context.Context, in commitDeploymentStatus := make(map[string]*api.AppCommitDeploymentStatus, 0) var jsonCommitEventsMetadata []byte var jsonAllEnvironmentsMetadata []byte - applicationReleases := make(map[string][]byte, 0) + applicationReleases := make(map[string]map[string]uint64, 0) allApplicationReleasesQuery := ` -SELECT - deployments.appname, - deployments.json -FROM ( - SELECT - MAX(eslVersion) AS latest, - appname - FROM - "public"."all_deployments" - GROUP BY - appname) AS latest -JOIN - "public".all_deployments AS deployments -ON - latest.latest=deployments.eslVersion - AND latest.appname=deployments.appname; +SELECT appname, envname, releaseVersion +FROM deployments +WHERE releaseVersion IS NOT NULL; ` span, ctx := tracer.StartSpanFromContext(ctx, "GetCommitDeploymentInfo") defer span.Finish() span.SetTag("commit_id", in.CommitId) + span.SetTag("query", allApplicationReleasesQuery) err := s.DBHandler.WithTransaction(ctx, true, func(ctx context.Context, transaction *sql.Tx) error { // Get the latest new-release event for the commit @@ -94,12 +82,16 @@ ON for rows.Next() { var appName string - var appRelease []byte - err = rows.Scan(&appName, &appRelease) + var envName string + var appRelease uint64 + err = rows.Scan(&appName, &envName, &appRelease) if err != nil { return err } - applicationReleases[appName] = appRelease + if _, ok := applicationReleases[appName]; !ok { + applicationReleases[appName] = make(map[string]uint64, 0) + } + applicationReleases[appName][envName] = appRelease } return nil }) @@ -129,15 +121,11 @@ ON }, nil } -func getCommitDeploymentInfoForApp(ctx context.Context, h *db.DBHandler, commitReleaseNumber uint64, app string, environments []string, appDeployments []byte) (*api.AppCommitDeploymentStatus, error) { +func getCommitDeploymentInfoForApp(ctx context.Context, h *db.DBHandler, commitReleaseNumber uint64, app string, environments []string, environmentReleases map[string]uint64) (*api.AppCommitDeploymentStatus, error) { span, _ := tracer.StartSpanFromContext(ctx, "getCommitDeploymentInfoForApp") defer span.Finish() span.SetTag("app", app) - environmentReleases, err := getEnvironmentReleases(appDeployments) - if err != nil { - return nil, fmt.Errorf("Could not get environment releases from all_deployments metadata: %v", err) - } commitStatus := getCommitStatus(commitReleaseNumber, environmentReleases, environments) return &api.AppCommitDeploymentStatus{ DeploymentStatus: commitStatus, @@ -197,12 +185,3 @@ func getAllEnvironments(jsonInput []byte) ([]string, error) { } return environments, nil } - -func getEnvironmentReleases(jsonInput []byte) (map[string]uint64, error) { - releases := map[string]uint64{} - err := json.Unmarshal(jsonInput, &releases) - if err != nil { - return nil, err - } - return releases, nil -} diff --git a/services/cd-service/pkg/service/commit_deployment_test.go b/services/cd-service/pkg/service/commit_deployment_test.go index 6192c185f..f32639d92 100644 --- a/services/cd-service/pkg/service/commit_deployment_test.go +++ b/services/cd-service/pkg/service/commit_deployment_test.go @@ -96,50 +96,6 @@ func TestGetAllEnvironments(t *testing.T) { }) } } - -func TestGetEnvironmentReleases(t *testing.T) { - tcs := []struct { - name string - deploymentsJson []byte - expected map[string]uint64 - }{ - { - name: "One environment", - deploymentsJson: []byte(`{"dev":1}`), - expected: map[string]uint64{"dev": 1}, - }, - { - name: "Two environments", - deploymentsJson: []byte(`{"dev":2, "staging":2}`), - expected: map[string]uint64{"dev": 2, "staging": 2}, - }, - { - name: "No environments", - deploymentsJson: []byte(`{}`), - expected: map[string]uint64{}, - }, - } - for _, tc := range tcs { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - environments, err := getEnvironmentReleases(tc.deploymentsJson) - if err != nil { - t.Fatalf("Error getting all environments: %v", err) - } - if len(environments) != len(tc.expected) { - t.Fatalf("Expected %d environments, got %d", len(tc.expected), len(environments)) - } - for env, release := range environments { - if release != tc.expected[env] { - t.Fatalf("Expected %d, got %d", tc.expected[env], release) - } - } - - }) - } -} - func TestGetCommitStatus(t *testing.T) { tcs := []struct { name string diff --git a/services/manifest-repo-export-service/pkg/repository/transformer_test.go b/services/manifest-repo-export-service/pkg/repository/transformer_test.go index 725c3f5ab..89f248ab1 100644 --- a/services/manifest-repo-export-service/pkg/repository/transformer_test.go +++ b/services/manifest-repo-export-service/pkg/repository/transformer_test.go @@ -828,12 +828,12 @@ func TestReleaseTrain(t *testing.T) { } var v int64 v = 1 - err = dbHandler.DBWriteDeployment(ctx, transaction, db.Deployment{ + err = dbHandler.DBUpdateOrCreateDeployment(ctx, transaction, db.Deployment{ App: appName, Env: "production", Version: &v, TransformerID: 5, - }, false) + }) if err != nil { return err } diff --git a/services/manifest-repo-export-service/pkg/service/version_test.go b/services/manifest-repo-export-service/pkg/service/version_test.go index 63436c320..329239bc7 100644 --- a/services/manifest-repo-export-service/pkg/service/version_test.go +++ b/services/manifest-repo-export-service/pkg/service/version_test.go @@ -246,11 +246,11 @@ func TestVersion(t *testing.T) { return err } var version int64 = 1 - err = dbHandler.DBWriteDeployment(ctx, transaction, db.Deployment{ + err = dbHandler.DBUpdateOrCreateDeployment(ctx, transaction, db.Deployment{ App: "test", Env: "development", Version: &version, - }, false) + }) err = repo.Apply(ctx, transaction, tc.Setup...) if err != nil { return err