diff --git a/kayenta-orca/src/main/java/com/netflix/kayenta/config/OrcaCompositeHealthContributor.java b/kayenta-orca/src/main/java/com/netflix/kayenta/config/OrcaCompositeHealthContributor.java new file mode 100644 index 000000000..9bdd609e4 --- /dev/null +++ b/kayenta-orca/src/main/java/com/netflix/kayenta/config/OrcaCompositeHealthContributor.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022 JPMorgan Chase & Co + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.kayenta.config; + +import java.util.*; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.jetbrains.annotations.NotNull; +import org.springframework.boot.actuate.health.*; + +public class OrcaCompositeHealthContributor implements CompositeHealthContributor { + + private final StatusAggregator statusAggregator; + private final Map> contributors; + + public OrcaCompositeHealthContributor( + StatusAggregator statusAggregator, HealthContributorRegistry healthContributorRegistry) { + this.statusAggregator = statusAggregator; + + this.contributors = new LinkedHashMap<>(); + healthContributorRegistry.forEach( + contributor -> + contributors.put( + contributor.getName(), + NamedContributor.of(contributor.getName(), contributor.getContributor()))); + } + + @Override + public HealthContributor getContributor(String name) { + return contributors.get(name).getContributor(); + } + + @Override + public Stream> stream() { + return CompositeHealthContributor.super.stream(); + } + + @NotNull + @Override + public Iterator> iterator() { + return contributors.values().iterator(); + } + + @Override + public void forEach(Consumer> action) { + CompositeHealthContributor.super.forEach(action); + } + + @Override + public Spliterator> spliterator() { + return CompositeHealthContributor.super.spliterator(); + } + + public Status status() { + Set statuses = + this.contributors.values().stream() + .map(contributor -> ((HealthIndicator) contributor.getContributor()).getHealth(false)) + .map(Health::getStatus) + .collect(Collectors.toSet()); + + return this.statusAggregator.getAggregateStatus(statuses); + } +} diff --git a/kayenta-orca/src/main/java/com/netflix/kayenta/orca/controllers/PipelineController.java b/kayenta-orca/src/main/java/com/netflix/kayenta/orca/controllers/PipelineController.java index 16d7761e0..3ff9f7298 100644 --- a/kayenta-orca/src/main/java/com/netflix/kayenta/orca/controllers/PipelineController.java +++ b/kayenta-orca/src/main/java/com/netflix/kayenta/orca/controllers/PipelineController.java @@ -20,6 +20,7 @@ import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE; import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.kayenta.config.OrcaCompositeHealthContributor; import com.netflix.spinnaker.kork.discovery.DiscoveryStatusChangeEvent; import com.netflix.spinnaker.kork.discovery.RemoteStatusChangedEvent; import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus; @@ -31,12 +32,9 @@ import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.actuate.health.CompositeHealthIndicator; -import org.springframework.boot.actuate.health.Health; -import org.springframework.boot.actuate.health.HealthAggregator; -import org.springframework.boot.actuate.health.HealthIndicator; -import org.springframework.boot.actuate.health.HealthIndicatorRegistry; +import org.springframework.boot.actuate.health.HealthContributorRegistry; import org.springframework.boot.actuate.health.Status; +import org.springframework.boot.actuate.health.StatusAggregator; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -57,7 +55,7 @@ public class PipelineController { private final ExecutionRepository executionRepository; private final ObjectMapper kayentaObjectMapper; private final ConfigurableApplicationContext context; - private final HealthIndicator healthIndicator; + private final OrcaCompositeHealthContributor orcaCompositeHealthContributor; private final ScheduledAnnotationBeanPostProcessor postProcessor; private Boolean upAtLeastOnce = false; @@ -67,22 +65,24 @@ public PipelineController( ExecutionRepository executionRepository, ObjectMapper kayentaObjectMapper, ConfigurableApplicationContext context, - HealthIndicatorRegistry healthIndicators, - HealthAggregator healthAggregator, + HealthContributorRegistry healthContributorRegistry, + StatusAggregator statusAggregator, ScheduledAnnotationBeanPostProcessor postProcessor) { this.executionLauncher = executionLauncher; this.executionRepository = executionRepository; this.kayentaObjectMapper = kayentaObjectMapper; this.context = context; - this.healthIndicator = new CompositeHealthIndicator(healthAggregator, healthIndicators); + this.orcaCompositeHealthContributor = + new OrcaCompositeHealthContributor(statusAggregator, healthContributorRegistry); this.postProcessor = postProcessor; } + // TODO(duftler): Expose /inservice and /outofservice endpoints. @Scheduled(initialDelay = 10000, fixedDelay = 5000) void startOrcaQueueProcessing() { if (!upAtLeastOnce) { - Health health = healthIndicator.health(); - if (health.getStatus() == Status.UP) { + Status status = orcaCompositeHealthContributor.status(); + if (status == Status.UP) { upAtLeastOnce = true; context.publishEvent( new RemoteStatusChangedEvent(new DiscoveryStatusChangeEvent(STARTING, UP))); @@ -92,7 +92,7 @@ void startOrcaQueueProcessing() { } else { log.warn( "Health indicators are still reporting DOWN; not starting orca queue processing yet: {}", - health); + status); } } }