Skip to content

Commit

Permalink
fix(db): use upsert instead of update/insert
Browse files Browse the repository at this point in the history
Ref: SRX-5WTFDM
  • Loading branch information
AminSlk committed Dec 17, 2024
1 parent e333f59 commit e2b62f2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 96 deletions.
160 changes: 67 additions & 93 deletions pkg/db/db_deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ type Deployment struct {
// SELECT

func (h *DBHandler) DBSelectLatestDeployment(ctx context.Context, tx *sql.Tx, appSelector string, envSelector string) (*Deployment, error) {
selectQuery := h.AdaptQuery(`SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion
FROM deployments
WHERE appName=? AND envName=?
LIMIT 1;`)
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectLatestDeployment")
defer span.Finish()
selectQuery := h.AdaptQuery(`
SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion
FROM deployments
WHERE appName=? AND envName=?
LIMIT 1;
`)
span.SetTag("query", selectQuery)
rows, err := tx.QueryContext(
ctx,
Expand All @@ -80,11 +82,13 @@ func (h *DBHandler) DBSelectLatestDeployment(ctx context.Context, tx *sql.Tx, ap
}

func (h *DBHandler) DBSelectAllLatestDeploymentsForApplication(ctx context.Context, tx *sql.Tx, appName string) (map[string]Deployment, error) {
selectQuery := h.AdaptQuery(`SELECT created, appname, releaseVersion, envName, metadata
FROM deployments
WHERE deployments.appname = (?) AND deployments.releaseVersion IS NOT NULL;`)
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllLatestDeployments")
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllLatestDeploymentsForApplication")
defer span.Finish()
selectQuery := h.AdaptQuery(`
SELECT created, appname, releaseVersion, envName, metadata
FROM deployments
WHERE deployments.appname = (?) AND deployments.releaseVersion IS NOT NULL;
`)
span.SetTag("query", selectQuery)
rows, err := tx.QueryContext(
ctx,
Expand All @@ -104,11 +108,13 @@ func (h *DBHandler) DBSelectAllLatestDeploymentsForApplication(ctx context.Conte
}

func (h *DBHandler) DBSelectAllLatestDeploymentsOnEnvironment(ctx context.Context, tx *sql.Tx, envName string) (map[string]*int64, error) {
selectQuery := h.AdaptQuery(`SELECT appname, releaseVersion
FROM deployments
WHERE deployments.envName= ?;`)
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllLatestDeployments")
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllLatestDeploymentsOnEnvironment")
defer span.Finish()
selectQuery := h.AdaptQuery(`
SELECT appname, releaseVersion
FROM deployments
WHERE deployments.envName= ?;
`)

span.SetTag("query", selectQuery)
rows, err := tx.QueryContext(
Expand All @@ -129,12 +135,14 @@ func (h *DBHandler) DBSelectAllLatestDeploymentsOnEnvironment(ctx context.Contex
}

func (h *DBHandler) DBSelectSpecificDeployment(ctx context.Context, tx *sql.Tx, appSelector string, envSelector string, releaseVersion uint64) (*Deployment, error) {
selectQuery := h.AdaptQuery(`SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion
FROM deployments
WHERE appName=? AND envName=? and releaseVersion=?
LIMIT 1;`)
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectSpecificDeployment")
defer span.Finish()
selectQuery := h.AdaptQuery(`
SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion
FROM deployments
WHERE appName=? AND envName=? and releaseVersion=?
LIMIT 1;
`)

span.SetTag("query", selectQuery)
rows, err := tx.QueryContext(
Expand All @@ -157,14 +165,15 @@ func (h *DBHandler) DBSelectSpecificDeployment(ctx context.Context, tx *sql.Tx,
}

func (h *DBHandler) DBSelectDeploymentHistory(ctx context.Context, tx *sql.Tx, appSelector string, envSelector string, limit int) ([]Deployment, error) {
selectQuery := h.AdaptQuery(`SELECT created, releaseVersion, appName, envname, metadata, transformereslversion
FROM deployments_history
WHERE deployments_history.appname = (?) AND deployments_history.envname = (?)
ORDER BY version DESC
LIMIT ?;
`)
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectDeploymentHistory")
defer span.Finish()
selectQuery := h.AdaptQuery(`
SELECT created, releaseVersion, appName, envname, metadata, transformereslversion
FROM deployments_history
WHERE deployments_history.appname = (?) AND deployments_history.envname = (?)
ORDER BY version DESC
LIMIT ?;
`)

span.SetTag("query", selectQuery)
rows, err := tx.QueryContext(
Expand Down Expand Up @@ -201,12 +210,14 @@ func (h *DBHandler) DBSelectDeploymentHistory(ctx context.Context, tx *sql.Tx, a
}

func (h *DBHandler) DBSelectDeploymentsByTransformerID(ctx context.Context, tx *sql.Tx, transformerID TransformerID, limit uint) ([]Deployment, error) {
selectQuery := h.AdaptQuery(`SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion
FROM deployments
WHERE transformereslVersion=?
LIMIT ?;`)
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectDeploymentsByTransformerID")
defer span.Finish()
selectQuery := h.AdaptQuery(`
SELECT created, releaseVersion, appName, envName, metadata, transformereslVersion
FROM deployments
WHERE transformereslVersion=?
LIMIT ?;
`)

span.SetTag("query", selectQuery)
rows, err := tx.QueryContext(
Expand Down Expand Up @@ -240,11 +251,13 @@ func (h *DBHandler) DBSelectDeploymentsByTransformerID(ctx context.Context, tx *
}

func (h *DBHandler) DBSelectAnyDeployment(ctx context.Context, tx *sql.Tx) (*DBDeployment, error) {
selectQuery := h.AdaptQuery(`SELECT created, releaseVersion, appName, envName
FROM deployments
LIMIT 1;`)
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAnyDeployment")
defer span.Finish()
selectQuery := h.AdaptQuery(`
SELECT created, releaseVersion, appName, envName
FROM deployments
LIMIT 1;
`)
span.SetTag("query", selectQuery)

rows, err := tx.QueryContext(
Expand Down Expand Up @@ -285,11 +298,13 @@ func (h *DBHandler) DBSelectAnyDeployment(ctx context.Context, tx *sql.Tx) (*DBD
}

func (h *DBHandler) DBSelectAllDeploymentsForApp(ctx context.Context, tx *sql.Tx, appName string) (map[string]int64, error) {
insertQuery := h.AdaptQuery(`SELECT envname, releaseVersion
FROM deployments
WHERE appName = (?) AND releaseVersion IS NOT NULL;`)
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllDeploymentsForApp")
defer span.Finish()
insertQuery := h.AdaptQuery(`
SELECT envname, releaseVersion
FROM deployments
WHERE appName = (?) AND releaseVersion IS NOT NULL;
`)
if h == nil {
return nil, nil
}
Expand All @@ -307,6 +322,8 @@ func (h *DBHandler) DBSelectAllDeploymentsForApp(ctx context.Context, tx *sql.Tx
}

func (h *DBHandler) DBSelectAllDeploymentsForAppAtTimestamp(ctx context.Context, tx *sql.Tx, appName string, ts time.Time) (map[string]int64, error) {
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllDeploymentsForAppAtTimestamp")
defer span.Finish()
query := h.AdaptQuery(`
SELECT
deployments_history.envName,
Expand All @@ -328,8 +345,6 @@ func (h *DBHandler) DBSelectAllDeploymentsForAppAtTimestamp(ctx context.Context,
latest.latest=deployments_history.version
AND latest.appname=deployments_history.appname
AND latest.envName=deployments_history.envName;`)
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAllDeploymentsForAppAtTimestamp")
defer span.Finish()
if h == nil {
return nil, nil
}
Expand All @@ -352,21 +367,10 @@ func (h *DBHandler) DBSelectAllDeploymentsForAppAtTimestamp(ctx context.Context,
func (h *DBHandler) DBUpdateOrCreateDeployment(ctx context.Context, tx *sql.Tx, deployment Deployment) error {
span, ctx := tracer.StartSpanFromContext(ctx, "DBUpdateOrCreateDeployment")
defer span.Finish()
retrievedDeployment, err := h.DBSelectLatestDeployment(ctx, tx, deployment.App, deployment.Env)
err := h.upsertDeploymentRow(ctx, tx, deployment)
if err != nil {
return err
}
if retrievedDeployment == nil || (retrievedDeployment.Version == nil && retrievedDeployment.App == "") {
err = h.insertDeploymentRow(ctx, tx, deployment)
if err != nil {
return err
}
} else {
err = h.updateDeploymentRow(ctx, tx, deployment)
if err != nil {
return err
}
}
err = h.insertDeploymentHistoryRow(ctx, tx, deployment)
if err != nil {
return err
Expand All @@ -376,15 +380,20 @@ func (h *DBHandler) DBUpdateOrCreateDeployment(ctx context.Context, tx *sql.Tx,

// Internal functions

func (h *DBHandler) insertDeploymentRow(ctx context.Context, tx *sql.Tx, deployment Deployment) error {
insertQuery := h.AdaptQuery(`INSERT INTO deployments (created, releaseVersion, appName, envName, metadata, transformereslVersion) VALUES (?, ?, ?, ?, ?, ?);`)
span, ctx := tracer.StartSpanFromContext(ctx, "insertDeploymentRow")
func (h *DBHandler) upsertDeploymentRow(ctx context.Context, tx *sql.Tx, deployment Deployment) error {
span, ctx := tracer.StartSpanFromContext(ctx, "upsertDeploymentRow")
defer span.Finish()
upsertQuery := h.AdaptQuery(`
INSERT INTO deployments (created, releaseVersion, appName, envName, metadata, transformereslVersion)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(appName, envName)
DO UPDATE SET created = excluded.created, releaseVersion = excluded.releaseVersion, metadata = excluded.metadata, transformereslversion = excluded.transformereslversion;
`)
if h == nil {
return nil
}
if tx == nil {
return fmt.Errorf("DBWriteDeployment: no transaction provided")
return fmt.Errorf("upsertDeploymentRow: no transaction provided")
}

jsonToInsert, err := json.Marshal(deployment.Metadata)
Expand All @@ -394,13 +403,13 @@ func (h *DBHandler) insertDeploymentRow(ctx context.Context, tx *sql.Tx, deploym

now, err := h.DBReadTransactionTimestamp(ctx, tx)
if err != nil {
return fmt.Errorf("DBWriteDeployment unable to get transaction timestamp: %w", err)
return fmt.Errorf("upsertDeploymnetRow unable to get transaction timestamp: %w", err)
}
span.SetTag("query", insertQuery)
span.SetTag("query", upsertQuery)
nullVersion := NewNullInt(deployment.Version)

_, err = tx.Exec(
insertQuery,
upsertQuery,
*now,
nullVersion,
deployment.App,
Expand All @@ -414,48 +423,13 @@ func (h *DBHandler) insertDeploymentRow(ctx context.Context, tx *sql.Tx, deploym
return nil
}

func (h *DBHandler) updateDeploymentRow(ctx context.Context, tx *sql.Tx, deployment Deployment) error {
updateQuery := h.AdaptQuery(`UPDATE deployments SET created=?, releaseVersion=?, metadata=?, transformereslVersion=? WHERE appname=? AND envname=?;`)
span, ctx := tracer.StartSpanFromContext(ctx, "updateDeploymentRow")
defer span.Finish()
span.SetTag("query", updateQuery)
if h == nil {
return nil
}
if tx == nil {
return fmt.Errorf("DBWriteDeployment: no transaction provided")
}

jsonToInsert, err := json.Marshal(deployment.Metadata)
if err != nil {
return fmt.Errorf("could not marshal json data: %w", err)
}

now, err := h.DBReadTransactionTimestamp(ctx, tx)
if err != nil {
return fmt.Errorf("DBWriteDeployment unable to get transaction timestamp: %w", err)
}
nullVersion := NewNullInt(deployment.Version)

_, err = tx.Exec(
updateQuery,
*now,
nullVersion,
jsonToInsert,
deployment.TransformerID,
deployment.App,
deployment.Env)

if err != nil {
return fmt.Errorf("could not write deployment into DB. Error: %w\n", err)
}
return nil
}

func (h *DBHandler) insertDeploymentHistoryRow(ctx context.Context, tx *sql.Tx, deployment Deployment) error {
insertQuery := h.AdaptQuery(`INSERT INTO deployments_history (created, releaseVersion, appName, envName, metadata, transformereslVersion) VALUES (?, ?, ?, ?, ?, ?);`)
span, ctx := tracer.StartSpanFromContext(ctx, "insertDeploymentHistoryRow")
defer span.Finish()
insertQuery := h.AdaptQuery(`
INSERT INTO deployments_history (created, releaseVersion, appName, envName, metadata, transformereslVersion)
VALUES (?, ?, ?, ?, ?, ?);
`)
if h == nil {
return nil
}
Expand Down Expand Up @@ -485,7 +459,7 @@ func (h *DBHandler) insertDeploymentHistoryRow(ctx context.Context, tx *sql.Tx,
deployment.TransformerID)

if err != nil {
return fmt.Errorf("could not write deployment into DB. Error: %w\n", err)
return fmt.Errorf("could not write deployment_history into DB. Error: %w\n", err)
}
return nil
}
Expand Down
9 changes: 6 additions & 3 deletions services/cd-service/pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,9 +951,12 @@ func (r *repository) ApplyTransformers(ctx context.Context, transaction *sql.Tx,

gitMutexLock.Lock()
defer gitMutexLock.Unlock()
state, err := r.StateAt(nil)
if err != nil {
return nil, &TransformerBatchApplyError{TransformerError: err, Index: -1}
if state.DBHandler.ShouldUseOtherTables() {
var err error
state, err = r.StateAt(nil)
if err != nil {
return nil, &TransformerBatchApplyError{TransformerError: err, Index: -1}
}
}
treeId, insertError := state.Filesystem.(*fs.TreeBuilderFS).Insert()
if insertError != nil {
Expand Down

0 comments on commit e2b62f2

Please sign in to comment.