diff --git a/pkg/db/db.go b/pkg/db/db.go index 4e42c31f0..cceb0f9bc 100644 --- a/pkg/db/db.go +++ b/pkg/db/db.go @@ -5026,6 +5026,50 @@ func (h *DBHandler) DBSelectLatestDeploymentAttempt(ctx context.Context, tx *sql return h.processDeploymentAttemptsRow(ctx, rows, err) } +func (h *DBHandler) DBSelectLatestDeploymentAttemptOfAllApps(ctx context.Context, tx *sql.Tx, environmentName string) ([]*QueuedDeployment, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectLatestDeploymentAttemptOfAllApps") + defer span.Finish() + + if h == nil { + return nil, nil + } + if tx == nil { + return nil, fmt.Errorf("DBSelectLatestDeploymentAttemptOfAllApps: no transaction provided") + } + query := h.AdaptQuery( + ` + SELECT DISTINCT + deployment_attempts.eslversion, + deployment_attempts.created, + deployment_attempts.envName, + deployment_attempts.appName, + deployment_attempts.queuedReleaseVersion + FROM ( + SELECT + MAX(eslversion) AS latestRelease, + appname, + envName + FROM + "deployment_attempts" + GROUP BY + envname, appname) AS latest + JOIN + deployment_attempts AS deployment_attempts + ON + latest.latestRelease=deployment_attempts.eslVersion + AND latest.envName=deployment_attempts.envName + AND latest.appname=deployment_attempts.appname + WHERE deployment_attempts.envName=? + ORDER BY deployment_attempts.eslversion DESC; + `) + span.SetTag("query", query) + rows, err := tx.QueryContext( + ctx, + query, + environmentName) + return h.processDeploymentAttemptsRows(ctx, rows, err) +} + func (h *DBHandler) DBWriteDeploymentAttempt(ctx context.Context, tx *sql.Tx, envName, appName string, version *int64, skipOverview bool) error { span, ctx := tracer.StartSpanFromContext(ctx, "DBWriteDeploymentAttempt") defer span.Finish() @@ -5109,6 +5153,31 @@ func (h *DBHandler) dbWriteDeploymentAttemptInternal(ctx context.Context, tx *sq return nil } +func (h *DBHandler) processDeploymentAttemptsRows(ctx context.Context, rows *sql.Rows, err error) ([]*QueuedDeployment, error) { + if err != nil { + return nil, fmt.Errorf("could not query deployment attempts table from DB. Error: %w\n", err) + } + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("row closing error: %v", err) + } + }(rows) + var results []*QueuedDeployment + for rows.Next() { + row, err := h.processSingleDeploymentAttemptsRow(ctx, rows) + if err != nil { + return nil, err + } + results = append(results, row) + } + err = closeRows(rows) + if err != nil { + return nil, err + } + return results, nil +} + func (h *DBHandler) processDeploymentAttemptsRow(ctx context.Context, rows *sql.Rows, err error) (*QueuedDeployment, error) { if err != nil { return nil, fmt.Errorf("could not query deployment attempts table from DB. Error: %w\n", err) diff --git a/services/cd-service/pkg/repository/repository.go b/services/cd-service/pkg/repository/repository.go index c162f60d6..33e1b6e30 100644 --- a/services/cd-service/pkg/repository/repository.go +++ b/services/cd-service/pkg/repository/repository.go @@ -1681,12 +1681,49 @@ func (s *State) GetQueuedVersionFromDB(ctx context.Context, transaction *sql.Tx, return v, nil } +func (s *State) GetQueuedVersionAllAppsFromDB(ctx context.Context, transaction *sql.Tx, environment string) (map[string]*uint64, error) { + queuedDeployments, err := s.DBHandler.DBSelectLatestDeploymentAttemptOfAllApps(ctx, transaction, environment) + result := map[string]*uint64{} + if err != nil || queuedDeployments == nil { + return result, err + } + for _, queuedDeployment := range queuedDeployments { + var v *uint64 + if queuedDeployment.Version != nil { + parsedInt := uint64(*queuedDeployment.Version) + v = &parsedInt + } else { + v = nil + } + result[queuedDeployment.App] = v + } + return result, nil +} + func (s *State) GetQueuedVersion(ctx context.Context, transaction *sql.Tx, environment string, application string) (*uint64, error) { if s.DBHandler.ShouldUseOtherTables() { return s.GetQueuedVersionFromDB(ctx, transaction, environment, application) } return s.GetQueuedVersionFromManifest(environment, application) } +func (s *State) GetQueuedVersionOfAllApps(ctx context.Context, transaction *sql.Tx, environment string) (map[string]*uint64, error) { + if s.DBHandler.ShouldUseOtherTables() { + return s.GetQueuedVersionAllAppsFromDB(ctx, transaction, environment) + } + result := map[string]*uint64{} + apps, err := s.GetEnvironmentApplications(ctx, transaction, environment) + if err != nil { + return result, fmt.Errorf("environment applications for %q not found: %v", environment, err.Error()) + } + for _, appName := range apps { + version, err := s.GetQueuedVersionFromManifest(environment, appName) + if err != nil { + return result, err + } + result[appName] = version + } + return result, nil +} func (s *State) GetQueuedVersionFromManifest(environment string, application string) (*uint64, error) { return s.readSymlink(environment, application, queueFileName) @@ -3292,3 +3329,33 @@ func (s *State) ProcessQueue(ctx context.Context, transaction *sql.Tx, fs billy. } return queueDeploymentMessage, nil } + +func (s *State) ProcessQueueAllApps(ctx context.Context, transaction *sql.Tx, fs billy.Filesystem, environment string) (string, error) { + queuedVersions, err := s.GetQueuedVersionOfAllApps(ctx, transaction, environment) + if err != nil { + return "", err + } + queueDeploymentMessage := "" + for application, queuedVersion := range queuedVersions { + if queuedVersion == nil { + continue + } + + currentlyDeployedVersion, err := s.GetEnvironmentApplicationVersion(ctx, transaction, environment, application) + if err != nil { + return "", err + } + + if currentlyDeployedVersion != nil && *queuedVersion == *currentlyDeployedVersion { + err = s.DeleteQueuedVersion(ctx, transaction, environment, application, false) + if err != nil { + return "", err + } + if queueDeploymentMessage != "" { + queueDeploymentMessage += "\n" + } + queueDeploymentMessage += fmt.Sprintf("deleted queued version %d because it was already deployed. app=%q env=%q", *queuedVersion, application, environment) + } + } + return queueDeploymentMessage, nil +} diff --git a/services/cd-service/pkg/repository/transformer.go b/services/cd-service/pkg/repository/transformer.go index 6bf8cbb35..9deb2205e 100644 --- a/services/cd-service/pkg/repository/transformer.go +++ b/services/cd-service/pkg/repository/transformer.go @@ -2255,20 +2255,10 @@ func (c *DeleteEnvironmentLock) Transform( return "", err } } - apps, err := s.GetEnvironmentApplications(ctx, transaction, c.Environment) - if err != nil { - return "", fmt.Errorf("environment applications for %q not found: %v", c.Environment, err.Error()) - } - additionalMessageFromDeployment := "" - for _, appName := range apps { - queueMessage, err := s.ProcessQueue(ctx, transaction, fs, c.Environment, appName) - if err != nil { - return "", err - } - if queueMessage != "" { - additionalMessageFromDeployment = additionalMessageFromDeployment + "\n" + queueMessage - } + additionalMessageFromDeployment, err := s.ProcessQueueAllApps(ctx, transaction, fs, c.Environment) + if err != nil { + return "", err } GaugeEnvLockMetric(ctx, state, transaction, c.Environment) return fmt.Sprintf("Deleted lock %q on environment %q%s", c.LockId, c.Environment, additionalMessageFromDeployment), nil