diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index b6362e4929..cc32aa958b 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -30,6 +30,7 @@ import org.springframework.context.ApplicationContext import org.springframework.context.annotation.Primary import org.springframework.stereotype.Component import rx.Observable +import javax.annotation.Nonnull /** * Intended for performing red/black Orca deployments which do not share the @@ -192,6 +193,13 @@ class DualExecutionRepository( ).distinct { it.id } } + override fun retrievePipelineConfigIdsForApplication(application: String): List { + return ( + primary.retrievePipelineConfigIdsForApplication(application) + + previous.retrievePipelineConfigIdsForApplication(application) + ).distinct() + } + override fun retrievePipelinesForPipelineConfigId( pipelineConfigId: String, criteria: ExecutionCriteria @@ -202,6 +210,26 @@ class DualExecutionRepository( ).distinct { it.id } } + override fun retrieveAndFilterPipelineExecutionIdsForApplication( + @Nonnull application: String, + @Nonnull pipelineConfigIds: List, + @Nonnull criteria: ExecutionCriteria + ): List { + return primary.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria) + + previous.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria) + } + + override fun retrievePipelineExecutionDetailsForApplication( + @Nonnull application: String, + pipelineConfigIds: List, + queryTimeoutSeconds: Int + ): Collection { + return ( + primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds) + + previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds) + ).distinctBy { it.id } + } + override fun retrievePipelinesForPipelineConfigIdsBetweenBuildTimeBoundary( pipelineConfigIds: MutableList, buildTimeStartBoundary: Long, diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index e9e1c5976a..4cfe657aef 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -92,6 +92,21 @@ Observable retrieve( Observable retrievePipelinesForPipelineConfigId( @Nonnull String pipelineConfigId, @Nonnull ExecutionCriteria criteria); + @Nonnull + Collection retrievePipelineConfigIdsForApplication(@Nonnull String application); + + @Nonnull + Collection retrieveAndFilterPipelineExecutionIdsForApplication( + @Nonnull String application, + @Nonnull List pipelineConfigIds, + @Nonnull ExecutionCriteria criteria); + + @Nonnull + Collection retrievePipelineExecutionDetailsForApplication( + @Nonnull String application, + @Nonnull List pipelineConfigIds, + int queryTimeoutSeconds); + /** * Returns executions in the time boundary. Redis impl does not respect pageSize or offset params, * and returns all executions. Sql impl respects these params. diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 4395c9f2bf..96ee40242d 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -23,10 +23,11 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria +import rx.Observable import java.lang.System.currentTimeMillis import java.time.Instant import java.util.concurrent.ConcurrentHashMap -import rx.Observable +import javax.annotation.Nonnull class InMemoryExecutionRepository : ExecutionRepository { @@ -276,6 +277,34 @@ class InMemoryExecutionRepository : ExecutionRepository { ) } + override fun retrievePipelineConfigIdsForApplication(application: String): List { + return pipelines.values + .filter { it.application == application } + .map { it.pipelineConfigId } + .distinct() + } + + override fun retrieveAndFilterPipelineExecutionIdsForApplication( + @Nonnull application: String, + @Nonnull pipelineConfigIds: List, + @Nonnull criteria: ExecutionCriteria + ): List { + return pipelines.values + .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } + .applyCriteria(criteria) + .map { it.id } + } + + override fun retrievePipelineExecutionDetailsForApplication( + application: String, + pipelineConfigIds: List, + queryTimeoutSeconds: Int + ): Collection { + return pipelines.values + .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } + .distinctBy { it.id } + } + override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution { return retrieveByCorrelationId(ORCHESTRATION, correlationId) } diff --git a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/Front50Service.groovy b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/Front50Service.groovy index e3cbc3e37f..cb64a15a3a 100644 --- a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/Front50Service.groovy +++ b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/Front50Service.groovy @@ -69,7 +69,7 @@ interface Front50Service { List> getPipelines(@Path("applicationName") String applicationName, @Query("refresh") boolean refresh) @GET("/pipelines/{applicationName}") - List> getPipelines(@Path("applicationName") String applicationName, @Query("refresh") boolean refresh, @Query("enabledPipelines") boolean enabledPipelines) + List> getPipelines(@Path("applicationName") String applicationName, @Query("refresh") boolean refresh, @Query("pipelineStateFilter") String pipelineStateFilter) @GET("/pipelines/{pipelineId}/get") Map getPipeline(@Path("pipelineId") String pipelineId) diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index 70a405e42f..bfc6070edb 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -479,6 +479,37 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet return currentObservable; } + @Override + public @Nonnull List retrievePipelineConfigIdsForApplication( + @Nonnull String application) { + // TODO: not implemented yet - this method, at present, is primarily meant for the + // SqlExecutionRepository + // implementation. + return List.of(); + } + + @Override + public @Nonnull List retrieveAndFilterPipelineExecutionIdsForApplication( + @Nonnull String application, + @Nonnull List pipelineConfigIds, + @Nonnull ExecutionCriteria criteria) { + // TODO: not implemented yet - this method, at present, is primarily meant for the + // SqlExecutionRepository + // implementation. + return List.of(); + } + + @Override + public @Nonnull List retrievePipelineExecutionDetailsForApplication( + @Nonnull String application, + @Nonnull List pipelineExecutionIds, + int queryTimeoutSeconds) { + // TODO: not implemented yet - this method, at present, is primarily meant for the + // SqlExecutionRepository + // implementation. + return List.of(); + } + /* * There is no guarantee that the returned results will be sorted. * @param limit and the param @offset are only implemented in SqlExecutionRepository diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 1ef18fb8e1..abd49437d5 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -53,8 +53,6 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.Execu import com.netflix.spinnaker.orca.pipeline.persistence.UnpausablePipelineException import com.netflix.spinnaker.orca.pipeline.persistence.UnresumablePipelineException import de.huxhorn.sulky.ulid.SpinULID -import java.lang.System.currentTimeMillis -import java.security.SecureRandom import java.time.Duration import org.jooq.DSLContext import org.jooq.DatePart @@ -80,7 +78,33 @@ import org.jooq.impl.DSL.value import org.slf4j.LoggerFactory import rx.Observable import java.io.ByteArrayOutputStream +import java.lang.System.currentTimeMillis import java.nio.charset.StandardCharsets +import java.security.SecureRandom +import java.util.stream.Collectors.toList +import kotlin.collections.Collection +import kotlin.collections.Iterable +import kotlin.collections.Iterator +import kotlin.collections.List +import kotlin.collections.Map +import kotlin.collections.MutableList +import kotlin.collections.chunked +import kotlin.collections.distinct +import kotlin.collections.firstOrNull +import kotlin.collections.forEach +import kotlin.collections.isEmpty +import kotlin.collections.isNotEmpty +import kotlin.collections.listOf +import kotlin.collections.map +import kotlin.collections.mapOf +import kotlin.collections.mutableListOf +import kotlin.collections.mutableMapOf +import kotlin.collections.plus +import kotlin.collections.set +import kotlin.collections.toList +import kotlin.collections.toMutableList +import kotlin.collections.toMutableMap +import kotlin.collections.toTypedArray /** * A generic SQL [ExecutionRepository]. @@ -427,6 +451,129 @@ class SqlExecutionRepository( ) } + override fun retrievePipelineConfigIdsForApplication(application: String): List = + withPool(poolName) { + return jooq.selectDistinct(field("config_id")) + .from(PIPELINE.tableName) + .where(field("application").eq(application)) + .fetch(0, String::class.java) + } + + /** + * this function supports the following ExecutionCriteria currently: + * 'limit', a.k.a page size and + * 'statuses'. + * + * It executes the following query to determine how many pipeline executions exist that satisfy the above + * ExecutionCriteria. It then returns a list of all these execution ids. + * + * It does this by executing the following query: + * - If the execution criteria does not contain any statuses: + * SELECT config_id, id + FROM pipelines force index (`pipeline_application_idx`) + WHERE application = "myapp" + ORDER BY + config_id; + * - If the execution criteria contains statuses: + * SELECT config_id, id + FROM pipelines force index (`pipeline_application_status_starttime_idx`) + WHERE ( + application = "myapp" and + status in ("status1", "status2) + ) + ORDER BY + config_id; + + * It then applies the limit execution criteria on the result set obtained above. We observed load issues in the db + * when running a query where the limit was calculated in the query itself. Therefore, we are moving that logic to + * the code below to ease the burden on the db in such circumstances. + */ + override fun retrieveAndFilterPipelineExecutionIdsForApplication( + application: String, + pipelineConfigIds: List, + criteria: ExecutionCriteria + ): List { + + // baseQueryPredicate for the flow where there are no statuses in the execution criteria + var baseQueryPredicate = field("application").eq(application) + .and(field("config_id").`in`(*pipelineConfigIds.toTypedArray())) + + var table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx") + else PIPELINE.tableName + // baseQueryPredicate for the flow with statuses + if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus.values().size) { + val statusStrings = criteria.statuses.map { it.toString() } + baseQueryPredicate = baseQueryPredicate + .and(field("status").`in`(*statusStrings.toTypedArray())) + + table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_status_starttime_idx") + else PIPELINE.tableName + } + + val finalResult: MutableList = mutableListOf() + + withPool(poolName) { + val baseQuery = jooq.select(field("config_id"), field("id")) + .from(table) + .where(baseQueryPredicate) + .orderBy(field("config_id")) + .fetch().intoGroups("config_id", "id") + + baseQuery.forEach { + val count = it.value.size + if (criteria.pageSize < count) { + finalResult.addAll(it.value + .stream() + .skip((count - criteria.pageSize).toLong()) + .collect(toList()) as List + ) + } else { + finalResult.addAll(it.value as List) + } + } + } + return finalResult + } + + /** + * It executes the following query to get execution details for n executions at a time in a specific application + * + * SELECT id, body, compressed_body, compression_type, `partition` + FROM pipelines force index (`pipeline_application_idx`) + left outer join + pipelines_compressed_executions + using (`id`) + WHERE ( + application = "" and + id in ('id1', 'id2', 'id3') + ); + * + * it then gets all the stage information for all the executions returned from the above query. + */ + override fun retrievePipelineExecutionDetailsForApplication( + application: String, + pipelineExecutions: List, + queryTimeoutSeconds: Int + ): Collection { + withPool(poolName) { + val baseQuery = jooq.select(selectExecutionFields(compressionProperties)) + .from( + if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx") + else PIPELINE.tableName + ) + .leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id")) + .where( + field("application").eq(application) + .and(field("id").`in`(*pipelineExecutions.toTypedArray())) + ) + .queryTimeout(queryTimeoutSeconds) // add an explicit timeout so that the query doesn't run forever + .fetch() + + log.debug("getting stage information for all the executions found so far") + return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq) + } + } + override fun retrievePipelinesForPipelineConfigId( pipelineConfigId: String, criteria: ExecutionCriteria diff --git a/orca-web/orca-web.gradle b/orca-web/orca-web.gradle index 1226a50311..88417858c8 100644 --- a/orca-web/orca-web.gradle +++ b/orca-web/orca-web.gradle @@ -94,9 +94,19 @@ dependencies { testImplementation("io.strikt:strikt-core") testImplementation("io.mockk:mockk") testImplementation("org.apache.groovy:groovy-json") + testImplementation("com.nhaarman:mockito-kotlin") + testImplementation("io.spinnaker.kork:kork-sql-test") + testImplementation("org.testcontainers:mysql") testImplementation ("com.squareup.retrofit2:retrofit-mock") } +sourceSets { + main { + java { srcDirs = [] } // no source dirs for the java compiler + groovy { srcDirs = ["src/main/java", "src/main/groovy"] } // compile everything in src/ with groovy + } +} + test { //The Implementation-Version is set in the MANIFEST.MF for the JAR produced via testing so that //assertions can be made against the version (see orca-plugins-test, for example). diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index fb4747788d..6e65a01824 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -18,7 +18,9 @@ package com.netflix.spinnaker.orca.controllers import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.annotations.VisibleForTesting +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.netflix.spectator.api.Registry +import com.netflix.spinnaker.config.TaskControllerConfigurationProperties import com.netflix.spinnaker.kork.web.exceptions.NotFoundException import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder import com.netflix.spinnaker.orca.api.pipeline.models.* @@ -34,9 +36,8 @@ import com.netflix.spinnaker.orca.util.ExpressionUtils import com.netflix.spinnaker.security.AuthenticatedRequest import groovy.transform.InheritConstructors import groovy.util.logging.Slf4j -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Value import org.springframework.http.HttpStatus +import org.springframework.lang.Nullable import org.springframework.security.access.prepost.PostAuthorize import org.springframework.security.access.prepost.PostFilter import org.springframework.security.access.prepost.PreAuthorize @@ -47,6 +48,10 @@ import rx.schedulers.Schedulers import java.nio.charset.Charset import java.time.Clock import java.time.ZoneOffset +import java.util.concurrent.Callable +import java.util.concurrent.Executors +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future import java.util.concurrent.TimeUnit import java.util.stream.Collectors @@ -60,46 +65,44 @@ import static java.time.temporal.ChronoUnit.DAYS @Slf4j @RestController class TaskController { - @Autowired(required = false) Front50Service front50Service - - @Autowired ExecutionRepository executionRepository - - @Autowired ExecutionRunner executionRunner - - @Autowired CompoundExecutionOperator executionOperator - - @Autowired Collection stageBuilders - - @Autowired ContextParameterProcessor contextParameterProcessor - - @Autowired ExpressionUtils expressionUtils - - @Autowired ObjectMapper mapper - - @Autowired Registry registry - - @Autowired StageDefinitionBuilderFactory stageDefinitionBuilderFactory - - @Value('${tasks.days-of-execution-history:14}') - int daysOfExecutionHistory - - @Value('${tasks.number-of-old-pipeline-executions-to-include:2}') - int numberOfOldPipelineExecutionsToInclude - - @Value('${tasks.exclude-execution-of-disabled-pipelines:false}') - Boolean excludeExecutionsOfDisabledPipelines - - Clock clock = Clock.systemUTC() + TaskControllerConfigurationProperties configurationProperties + Clock clock + + TaskController(@Nullable Front50Service front50Service, + ExecutionRepository executionRepository, + ExecutionRunner executionRunner, + CompoundExecutionOperator executionOperator, + Collection stageBuilders, + ContextParameterProcessor contextParameterProcessor, + ExpressionUtils expressionUtils, + ObjectMapper mapper, + Registry registry, + StageDefinitionBuilderFactory stageDefinitionBuilderFactory, + TaskControllerConfigurationProperties configurationProperties + ) { + this.front50Service = front50Service + this.executionRepository = executionRepository + this.executionRunner = executionRunner + this.executionOperator = executionOperator + this.stageBuilders = stageBuilders + this.contextParameterProcessor = contextParameterProcessor + this.expressionUtils = expressionUtils + this.mapper = mapper + this.registry = registry + this.stageDefinitionBuilderFactory = stageDefinitionBuilderFactory + this.configurationProperties = configurationProperties + this.clock = Clock.systemUTC() + } @PreAuthorize("hasPermission(#application, 'APPLICATION', 'READ')") @RequestMapping(value = "/applications/{application}/tasks", method = RequestMethod.GET) @@ -118,7 +121,7 @@ class TaskController { clock .instant() .atZone(ZoneOffset.UTC) - .minusDays(daysOfExecutionHistory) + .minusDays(this.configurationProperties.getDaysOfExecutionHistory()) .toInstant() ) @@ -586,11 +589,9 @@ class TaskController { @PreAuthorize("hasPermission(#application, 'APPLICATION', 'READ')") @RequestMapping(value = "/applications/{application}/pipelines", method = RequestMethod.GET) List getPipelinesForApplication(@PathVariable String application, - @RequestParam(value = "limit", defaultValue = "5") - int limit, - @RequestParam(value = "statuses", required = false) - String statuses, - @RequestParam(value = "expand", defaultValue = "true") Boolean expand) { + @RequestParam(value = "limit", defaultValue = "5") int limit, + @RequestParam(value = "statuses", required = false) String statuses, + @RequestParam(value = "expand", defaultValue = "true") Boolean expand) { if (!front50Service) { throw new UnsupportedOperationException("Cannot lookup pipelines, front50 has not been enabled. Fix this by setting front50.enabled: true") } @@ -598,26 +599,41 @@ class TaskController { if (!limit) { return [] } - statuses = statuses ?: ExecutionStatus.values()*.toString().join(",") def executionCriteria = new ExecutionCriteria( pageSize: limit, statuses: (statuses.split(",") as Collection) ) - def pipelineConfigIds = front50Service.getPipelines(application, false, excludeExecutionsOfDisabledPipelines)*.id as List + // get all relevant pipeline and strategy configs from front50 + def pipelineConfigIds = front50Service.getPipelines(application, false, this.configurationProperties.excludeExecutionsOfDisabledPipelines ? "enabled" : "all")*.id as List + log.debug("received ${pipelineConfigIds.size()} pipelines for application: $application from front50") def strategyConfigIds = front50Service.getStrategies(application)*.id as List - def allIds = pipelineConfigIds + strategyConfigIds + log.debug("received ${strategyConfigIds.size()} strategies for application: $application from front50") - def allPipelines = rx.Observable.merge(allIds.collect { - executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) - }).subscribeOn(Schedulers.io()).toList().toBlocking().single().sort(startTimeOrId) + def allFront50PipelineConfigIds = pipelineConfigIds + strategyConfigIds + + List allPipelineExecutions = [] + if (this.configurationProperties.getOptimizeExecutionRetrieval()) { + allPipelineExecutions.addAll( + optimizedGetPipelineExecutions(application, allFront50PipelineConfigIds, executionCriteria) + ) + } else { + allPipelineExecutions = rx.Observable.merge(allFront50PipelineConfigIds.collect { + log.debug("processing pipeline config id: $it") + executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) + }).subscribeOn(Schedulers.io()).toList().toBlocking().single() + } + + allPipelineExecutions.sort(startTimeOrId) if (!expand) { - unexpandPipelineExecutions(allPipelines) + log.debug("unexpanding pipeline executions") + unexpandPipelineExecutions(allPipelineExecutions) } - return filterPipelinesByHistoryCutoff(allPipelines, limit) + log.debug("filtering pipelines by history") + return filterPipelinesByHistoryCutoff(allPipelineExecutions, limit) } private static void validateSearchForPipelinesByTriggerParameters(long triggerTimeStartBoundary, long triggerTimeEndBoundary, int startIndex, int size) { @@ -666,7 +682,7 @@ class TaskController { private List filterPipelinesByHistoryCutoff(List pipelines, int limit) { // TODO-AJ The eventual goal is to return `allPipelines` without the need to group + filter below (WIP) - def cutoffTime = clock.instant().minus(daysOfExecutionHistory, DAYS).toEpochMilli() + def cutoffTime = clock.instant().minus(this.configurationProperties.getDaysOfExecutionHistory(), DAYS).toEpochMilli() def pipelinesSatisfyingCutoff = [] pipelines.groupBy { @@ -677,8 +693,9 @@ class TaskController { !it.startTime || it.startTime > cutoffTime } if (!recentPipelines && sortedPipelinesGroup) { - // no pipeline executions within `daysOfExecutionHistory` so include the first `numberOfOldPipelineExecutionsToInclude` - def upperBounds = Math.min(sortedPipelinesGroup.size(), numberOfOldPipelineExecutionsToInclude) - 1 + // no pipeline executions within `this.configurationProperties.getDaysOfExecutionHistory()` so include + // the first `this.configurationProperties.numberOfOldPipelineExecutionsToInclude()` + def upperBounds = Math.min(sortedPipelinesGroup.size(), this.getConfigurationProperties().getNumberOfOldPipelineExecutionsToInclude()) - 1 recentPipelines = sortedPipelinesGroup[0..upperBounds] } @@ -853,6 +870,113 @@ class TaskController { return false } + /** + * this optimized flow speeds up the execution retrieval process for all pipelines in an application. It + * does it in three steps: + *

+ * 1. It compares the list of pipeline config ids obtained from front50 with what is stored in the orca db itself. + * Rationale: We can ignore those process config ids that have no executions. The absence of a pipeline config + * id from the orca db indicates the same. So to reduce the number of config ids to process, we + * intersect the result obtained from front50 and orca db, which gives us the reduced list. + * Note: this could be further optimized by cutting front50 out from the picture completely. + * But I do not know what other side-effects that may cause, hence I am going ahead with the above logic. + * + *

+ * 2. It then uses the list of pipeline config ids obtained from step 1 and gets all the valid executions + * associated with each one of them. The valid executions are found after applying the execution criteria. + * + *

+ * 3. It then processes n pipeline executions at a time to retrieve the complete execution details. In addition, + * we make use of a configured thread pool to process multiple batches of n executions in parallel. + */ + private List optimizedGetPipelineExecutions(String application, + List front50PipelineConfigIds, ExecutionCriteria executionCriteria) { + List finalResult = [] + log.info("running optimized execution retrieval process with: " + + "${this.configurationProperties.getMaxExecutionRetrievalThreads()} threads and processing" + + " ${this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()} pipeline executions at a time") + + List commonPipelineConfigIdsInFront50AndOrca + try { + List allOrcaPipelineConfigIds = executionRepository.retrievePipelineConfigIdsForApplication(application) + log.info("found ${allOrcaPipelineConfigIds.size()} pipeline config ids for application: $application in orca") + commonPipelineConfigIdsInFront50AndOrca = front50PipelineConfigIds.intersect(allOrcaPipelineConfigIds) + log.info("found ${commonPipelineConfigIdsInFront50AndOrca.size()} pipeline config ids that are common in orca " + + "and front50 for application: $application. " + + "Saved ${front50PipelineConfigIds.size() - commonPipelineConfigIdsInFront50AndOrca.size()} extra pipeline " + + "config id queries") + } catch (Exception e) { + log.warn("retrieving pipeline config ids from orca db failed. using the result obtained from front50 ", e) + commonPipelineConfigIdsInFront50AndOrca = front50PipelineConfigIds + } + + if (commonPipelineConfigIdsInFront50AndOrca.size() == 0 ) { + log.info("no pipeline config ids found.") + return finalResult + } + + // get complete list of executions based on the execution criteria + log.info("filtering pipeline executions based on the execution criteria: " + + "limit: ${executionCriteria.getPageSize()}, statuses: ${executionCriteria.getStatuses()}") + List filteredPipelineExecutionIds = executionRepository.retrieveAndFilterPipelineExecutionIdsForApplication( + application, + commonPipelineConfigIdsInFront50AndOrca, + executionCriteria + ) + if (filteredPipelineExecutionIds.size() == 0) { + log.info("no pipeline executions found") + return finalResult + } + + // need to define a new executor service since we want a dedicated set of threads to be made available for every + // new request for performance reasons + ExecutorService executorService = Executors.newFixedThreadPool( + this.configurationProperties.getMaxExecutionRetrievalThreads(), + new ThreadFactoryBuilder() + .setNameFormat("application-" + application + "-%d") + .build()) + + try { + List>> futures = new ArrayList<>(filteredPipelineExecutionIds.size()) + log.info("processing ${filteredPipelineExecutionIds.size()} pipeline executions") + + // process a chunk of the executions at a time + filteredPipelineExecutionIds + .collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()) + .each { List chunkedExecutions -> + futures.add(executorService.submit({ + try { + List result = executionRepository.retrievePipelineExecutionDetailsForApplication( + application, + chunkedExecutions, + this.configurationProperties.getExecutionRetrievalTimeoutSeconds() + ) + log.debug("completed execution retrieval for ${result.size()} executions") + return result + } catch (Exception e) { // handle exceptions such as query timeouts etc. + log.error("error occurred while retrieving these executions: ${chunkedExecutions.toString()} " + + "for application: ${application}.", e) + // in case of errors, this will return partial results. We are going with this best-effort approach + // because the UI keeps refreshing the executions view frequently. Hence, the user will eventually see + // these executions via one of the subsequent calls. Partial data is better than an exception at this + // point since the latter will result in a UI devoid of any executions. + // + return [] + } + } as Callable>)) + } + + futures.each { Future> future -> finalResult.addAll(future.get()) } + return finalResult + } finally { + try { + executorService.shutdownNow() // attempt to shutdown the executor service + } catch (Exception e) { + log.error("shutting down the executor service failed", e) + } + } + } + @InheritConstructors @ResponseStatus(HttpStatus.NOT_IMPLEMENTED) private static class FeatureNotEnabledException extends RuntimeException {} diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java new file mode 100644 index 0000000000..47ee3584a9 --- /dev/null +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -0,0 +1,82 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.config; + +import com.netflix.spinnaker.orca.controllers.TaskController; +import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties("tasks.controller") +@Data +public class TaskControllerConfigurationProperties { + /** + * flag to enable speeding up execution retrieval. This is applicable for the {@link + * TaskController#getPipelinesForApplication(String, int, String, Boolean)} endpoint. Only valid + * for {@link SqlExecutionRepository} currently. The implementation for all the other execution + * repositories needs to be added + */ + boolean optimizeExecutionRetrieval = false; + + /** + * only applicable if optimizeExecutionRetrieval = true. It specifies how many threads should + * process the queries to retrieve the executions. Needs to be tuned appropriately since this has + * the potential to exhaust the connection pool size for the database. + */ + int maxExecutionRetrievalThreads = 4; + + /** + * only applicable if optimizeExecutionRetrieval = true. It specifies how many pipeline executions + * should be processed at a time. 150 worked with an orca sql db that contained lots of pipelines + * and executions for a single application (about 1200 pipelines and 1500 executions with each + * execution of size >= 1 MB). 50 is kept as default, keeping in view that majority of the cases + * have lesser number of executions. + * + *

It can be further tuned, depending on your setup, since 150 executions work well for some + * applications but a higher number may be appropriate for others. + */ + int maxNumberOfPipelineExecutionsToProcess = 50; + + /** + * only applicable if optimizeExecutionRetrieval = true. It specifies the max time after which the + * execution retrieval query will timeout. + */ + int executionRetrievalTimeoutSeconds = 60; + + /** moved this to here. Earlier definition was in the {@link TaskController} class */ + int daysOfExecutionHistory = 14; + + /** moved this to here. Earlier definition was in the {@link TaskController} class */ + int numberOfOldPipelineExecutionsToInclude = 2; + + boolean excludeExecutionsOfDisabledPipelines = false; + + public boolean getOptimizeExecutionRetrieval() { + return this.optimizeExecutionRetrieval; + } + + // need to set this explicitly so that it works in kotlin tests + public void setOptimizeExecutionRetrieval(boolean optimizeExecutionRetrieval) { + this.optimizeExecutionRetrieval = optimizeExecutionRetrieval; + } + + public int getDaysOfExecutionHistory() { + return this.daysOfExecutionHistory; + } +} diff --git a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy index 8dee3526af..74020cf33b 100644 --- a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy +++ b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy @@ -19,16 +19,19 @@ package com.netflix.spinnaker.orca.controllers import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.collect.Collections2 import com.netflix.spectator.api.NoopRegistry +import com.netflix.spinnaker.config.TaskControllerConfigurationProperties import com.netflix.spinnaker.kork.artifacts.model.Artifact -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus +import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType import com.netflix.spinnaker.orca.front50.Front50Service import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator import com.netflix.spinnaker.orca.pipeline.ExecutionRunner +import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory import com.netflix.spinnaker.orca.pipeline.model.* import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor +import com.netflix.spinnaker.orca.util.ExpressionUtils import groovy.json.JsonSlurper import org.springframework.http.MediaType import org.springframework.mock.web.MockHttpServletResponse @@ -60,25 +63,26 @@ class TaskControllerSpec extends Specification { def registry = new NoopRegistry() def clock = Clock.fixed(Instant.now(), UTC) - int daysOfExecutionHistory = 14 - int numberOfOldPipelineExecutionsToInclude = 2 + def taskControllerConfigurationProperties = new TaskControllerConfigurationProperties() + int daysOfExecutionHistory = taskControllerConfigurationProperties.getDaysOfExecutionHistory() ObjectMapper objectMapper = OrcaObjectMapper.newInstance() void setup() { mockMvc = MockMvcBuilders.standaloneSetup( - new TaskController( - front50Service: front50Service, - executionRepository: executionRepository, - executionRunner: executionRunner, - daysOfExecutionHistory: daysOfExecutionHistory, - numberOfOldPipelineExecutionsToInclude: numberOfOldPipelineExecutionsToInclude, - clock: clock, - mapper: mapper, - registry: registry, - contextParameterProcessor: new ContextParameterProcessor(), - executionOperator: executionOperator - ) + new TaskController( + front50Service, + executionRepository, + executionRunner, + executionOperator, + List.of(Mock(StageDefinitionBuilder)), + new ContextParameterProcessor(), + Mock(ExpressionUtils), + mapper, + registry, + Mock(StageDefinitionBuilderFactory), + taskControllerConfigurationProperties + ) ).build() } @@ -247,6 +251,8 @@ class TaskControllerSpec extends Specification { front50Service.getPipelines(app, false) >> [[id: "1"], [id: "2"]] front50Service.getStrategies(app) >> [] + executionRepository.retrievePipelineConfigIdsForApplication(app) >> { return List.of( '2')} + when: def response = mockMvc.perform(get("/applications/$app/pipelines")).andReturn().response List results = new ObjectMapper().readValue(response.contentAsString, List) diff --git a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt new file mode 100644 index 0000000000..c1fcea4705 --- /dev/null +++ b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt @@ -0,0 +1,260 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.controllers + +import com.fasterxml.jackson.core.type.TypeReference +import com.netflix.spectator.api.NoopRegistry +import com.netflix.spinnaker.config.ExecutionCompressionProperties +import com.netflix.spinnaker.config.TaskControllerConfigurationProperties +import com.netflix.spinnaker.kork.sql.config.RetryProperties +import com.netflix.spinnaker.kork.sql.test.SqlTestUtil +import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution +import com.netflix.spinnaker.orca.front50.Front50Service +import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper +import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor +import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository +import com.nhaarman.mockito_kotlin.mock +import dev.minutest.junit.JUnit5Minutests +import dev.minutest.rootContext +import org.jooq.exception.DataAccessException +import org.jooq.impl.DSL.field +import org.jooq.impl.DSL.table +import org.junit.Assert.assertThrows +import org.junit.jupiter.api.assertThrows +import org.mockito.Mockito +import org.springframework.test.web.servlet.MockMvc +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get +import org.springframework.test.web.servlet.setup.MockMvcBuilders +import strikt.api.expectCatching +import strikt.api.expectThat +import strikt.assertions.isA +import strikt.assertions.isEqualTo +import strikt.assertions.isFailure +import java.time.Clock +import java.time.Instant +import java.time.ZoneId +import java.time.temporal.ChronoUnit + +class TaskControllerTest : JUnit5Minutests { + data class Fixture(val optimizeExecution: Boolean) { + + private val clock: Clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()) + val database: SqlTestUtil.TestDatabase = SqlTestUtil.initTcMysqlDatabase()!! + + private val executionRepository: SqlExecutionRepository = SqlExecutionRepository( + partitionName = "test", + jooq = database.context, + mapper = OrcaObjectMapper.getInstance(), + retryProperties = RetryProperties(), + compressionProperties = ExecutionCompressionProperties(), + pipelineRefEnabled = false + ) + + private val taskControllerConfigurationProperties: TaskControllerConfigurationProperties = TaskControllerConfigurationProperties() + .apply { + optimizeExecutionRetrieval = optimizeExecution + } + + private val daysOfExecutionHistory: Long = taskControllerConfigurationProperties.daysOfExecutionHistory.toLong() + + private val front50Service: Front50Service = mock() + + private val taskController: TaskController = TaskController( + front50Service, + executionRepository, + mock(), + mock(), + listOf(mock()), + ContextParameterProcessor(), + mock(), + OrcaObjectMapper.getInstance(), + NoopRegistry(), + mock(), + taskControllerConfigurationProperties + ) + + val subject: MockMvc = MockMvcBuilders.standaloneSetup(taskController).build() + + fun setup() { + database.context + .insertInto(table("pipelines"), + listOf( + field("config_id"), + field("id"), + field("application"), + field("build_time"), + field("start_time"), + field("body"), + field("status") + )) + .values( + listOf( + "1", + "1-exec-id-1", + "test-app", + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(120, ChronoUnit.MINUTES).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(120, ChronoUnit.HOURS).toEpochMilli(), + "{\"id\": \"1-exec-id-1\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"1\"}", + "SUCCEEDED" + ) + ) + .values( + listOf( + "1", + "1-exec-id-2", + "test-app", + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(115, ChronoUnit.MINUTES).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(115, ChronoUnit.MINUTES).toEpochMilli(), + "{\"id\": \"1-exec-id-2\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"1\"}", + "TERMINAL" + ) + ) + .values( + listOf( + "1", + "1-exec-id-3", + "test-app", + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(114, ChronoUnit.MINUTES).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(114, ChronoUnit.MINUTES).toEpochMilli(), + "{\"id\": \"1-exec-id-3\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"1\"}", + "RUNNING" + ) + ) + .values( + listOf( + "2", + "2-exec-id-1", + "test-app", + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(2, ChronoUnit.HOURS).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(2, ChronoUnit.HOURS).toEpochMilli(), + "{\"id\": \"2-exec-id-1\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"2\"}", + "NOT_STARTED" + ) + ) + .values( + listOf( + "3", + "3-exec-id-1", + "test-app-2", + clock.instant().minus(daysOfExecutionHistory + 1, ChronoUnit.DAYS).minus(2, ChronoUnit.HOURS).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory + 1, ChronoUnit.DAYS).minus(2, ChronoUnit.HOURS).toEpochMilli(), + "{\"id\": \"3-exec-id-1\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"3\"}", + "STOPPED" + ) + ) + .execute() + Mockito.`when`(front50Service.getPipelines("test-app", false,"all")) + .thenReturn( + listOf( + mapOf("id" to "1"), + mapOf("id" to "2")) + ) + + Mockito.`when`(front50Service.getStrategies("test-app")) + .thenReturn(listOf()) + } + + fun cleanUp() { + SqlTestUtil.cleanupDb(database.context) + } + } + + fun tests() = rootContext { + context("execution retrieval without optimization") { + fixture { + Fixture(false) + } + + before { setup() } + after { cleanUp() } + + test("retrieve executions with limit = 2 & expand = false") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get("/applications/test-app/pipelines?limit=2&expand=false")).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + val expectedOutput = listOf("1-exec-id-2", "1-exec-id-3","2-exec-id-1") + expectThat(results.size).isEqualTo(3) + results.forEach { + assert(it.id in expectedOutput) + } + } + + test("retrieve executions with limit = 2 & expand = false with statuses") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get( + "/applications/test-app/pipelines?limit=2&expand=false&statuses=RUNNING,SUSPENDED,PAUSED,NOT_STARTED") + ).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + val expectedOutput = listOf("1-exec-id-3","2-exec-id-1") + expectThat(results.size).isEqualTo(2) + results.forEach { + assert(it.id in expectedOutput) + } + } + } + + context("execution retrieval with optimization") { + fixture { + Fixture(true) + } + + before { setup() } + after { cleanUp() } + + test("retrieve executions with limit = 2 & expand = false") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get("/applications/test-app/pipelines?limit=2&expand=false")).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + val expectedOutput = listOf("1-exec-id-2", "1-exec-id-3","2-exec-id-1") + expectThat(results.size).isEqualTo(3) + results.forEach { + assert(it.id in expectedOutput) + } + } + + test("retrieve executions with limit = 2 & expand = false with statuses") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get( + "/applications/test-app/pipelines?limit=2&expand=false&statuses=RUNNING,SUSPENDED,PAUSED,NOT_STARTED") + ).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + val expectedOutput = listOf("1-exec-id-3","2-exec-id-1") + expectThat(results.size).isEqualTo(2) + results.forEach { + assert(it.id in expectedOutput) + } + } + } + + context("test query having explicit query timeouts") { + fixture { + Fixture(true) + } + + before { setup() } + after { cleanUp() } + + test("it returns a DataAccessException on query timeout") { + expectCatching { + database.context.select(field("sleep(10)")).queryTimeout(1).execute() + } + .isFailure() + .isA() + } + } + } +}