Skip to content

Commit

Permalink
Add upgrade API (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andyz26 authored Jun 14, 2022
1 parent e0abde2 commit b202912
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.GetResourceClusterScaleRulesResponse;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceResponse;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersRequest;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersResponse;
import java.util.concurrent.CompletionStage;

public interface ResourceClusterRouteHandler {
Expand All @@ -41,6 +43,8 @@ public interface ResourceClusterRouteHandler {

CompletionStage<ScaleResourceResponse> scale(final ScaleResourceRequest request);

CompletionStage<UpgradeClusterContainersResponse> upgrade(final UpgradeClusterContainersRequest request);

CompletionStage<GetResourceClusterScaleRulesResponse> createSingleScaleRule(
CreateResourceClusterScaleRuleRequest request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.GetResourceClusterScaleRulesResponse;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceResponse;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersRequest;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersResponse;
import io.mantisrx.server.master.config.ConfigurationProvider;
import java.time.Duration;
import java.util.Optional;
Expand Down Expand Up @@ -94,6 +96,14 @@ public CompletionStage<ScaleResourceResponse> scale(ScaleResourceRequest request
return response;
}

@Override
public CompletionStage<UpgradeClusterContainersResponse> upgrade(UpgradeClusterContainersRequest request) {
CompletionStage<UpgradeClusterContainersResponse> response =
ask(this.resourceClustersHostManagerActor, request, timeout)
.thenApply(UpgradeClusterContainersResponse.class::cast);
return response;
}

@Override
public CompletionStage<GetResourceClusterScaleRulesResponse> createSingleScaleRule(
CreateResourceClusterScaleRuleRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.GetResourceClusterScaleRulesResponse;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceResponse;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersRequest;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersResponse;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.config.MasterConfiguration;
import io.mantisrx.server.master.resourcecluster.ClusterID;
Expand All @@ -66,12 +68,20 @@
* /api/v1/resourceClusters/{}/getAvailableTaskExecutors (GET)
* /api/v1/resourceClusters/{}/getUnregisteredTaskExecutors (GET)
* /api/v1/resourceClusters/{}/scaleSku (POST)
* /api/v1/resourceClusters/{}/upgrade (POST)
* <p>
* <p>
* /api/v1/resourceClusters/{}/scaleRule (POST)
* /api/v1/resourceClusters/{}/scaleRules (GET, POST)
* <p>
* /api/v1/resourceClusters/{}/taskExecutors/{}/getTaskExecutorState (GET)
*
* [Notes]
* To upgrade cluster containers: each container running task executor is using docker image tag based image version.
* In regular case the upgrade is to refresh the container to re-deploy with latest digest associated with the image
* tag (e.g. latest).
* If multiple image digest versions need to be ran/hosted at the same time, it is recommended to create a separate
* sku id in addition to the existing sku(s).
*/
@Slf4j
public class ResourceClustersNonLeaderRedirectRoute extends BaseRoute {
Expand Down Expand Up @@ -138,6 +148,15 @@ protected Route constructRoutes() {
post(() -> scaleClusterSku(clusterName))
))
),
// /{}/upgrade
path(
PathMatchers.segment().slash("upgrade"),
(clusterName) -> pathEndOrSingleSlash(() -> concat(

// POST
post(() -> upgradeCluster(clusterName))
))
),
// /{}/getResourceOverview
path(
PathMatchers.segment().slash("getResourceOverview"),
Expand Down Expand Up @@ -286,7 +305,7 @@ private Route deleteResourceClusterInstanceRoute(String clusterId) {

private Route scaleClusterSku(String clusterName) {
return entity(Jackson.unmarshaller(ScaleResourceRequest.class), skuScaleRequest -> {
log.info("POST api/v1/resourceClusters/{}/actions/scaleSku {}", clusterName, skuScaleRequest);
log.info("POST api/v1/resourceClusters/{}/scaleSku {}", clusterName, skuScaleRequest);
final CompletionStage<ScaleResourceResponse> response =
this.resourceClusterRouteHandler.scale(skuScaleRequest);

Expand All @@ -302,6 +321,25 @@ private Route scaleClusterSku(String clusterName) {
});
}


private Route upgradeCluster(String clusterName) {
return entity(Jackson.unmarshaller(UpgradeClusterContainersRequest.class), upgradeRequest -> {
log.info("POST api/v1/resourceClusters/{}/upgrade {}", clusterName, upgradeRequest);
final CompletionStage<UpgradeClusterContainersResponse> response =
this.resourceClusterRouteHandler.upgrade(upgradeRequest);

return completeAsync(
response,
resp -> complete(
StatusCodes.ACCEPTED,
resp,
Jackson.marshaller()),
Endpoints.RESOURCE_CLUSTERS,
HttpRequestMetrics.HttpVerb.POST
);
});
}

private Route createSingleScaleRule(String clusterName) {
return entity(Jackson.unmarshaller(CreateResourceClusterScaleRuleRequest.class), scaleRuleReq -> {
log.info("POST api/v1/resourceClusters/{}/scaleRule {}", clusterName, scaleRuleReq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.GetResourceClusterScaleRulesResponse;
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleSpec;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersRequest;
import io.mantisrx.master.resourcecluster.resourceprovider.InMemoryOnlyResourceClusterStorageProvider;
import io.mantisrx.master.resourcecluster.resourceprovider.ResourceClusterProvider;
import io.mantisrx.master.resourcecluster.resourceprovider.ResourceClusterStorageProvider;
Expand Down Expand Up @@ -96,6 +97,9 @@ public Receive createReceive() {
.match(GetResourceClusterScaleRulesRequest.class, this::onGetResourceClusterScaleRulesRequest)
.match(ScaleResourceRequest.class, this::onScaleResourceClusterRequest)

// Upgrade section
.match(UpgradeClusterContainersRequest.class, this::onUpgradeClusterContainersRequest)

.build();
}

Expand Down Expand Up @@ -314,4 +318,13 @@ private void onScaleResourceClusterRequest(ScaleResourceRequest req) {

pipe(this.resourceClusterProvider.scaleResource(req), getContext().dispatcher()).to(getSender());
}

private void onUpgradeClusterContainersRequest(UpgradeClusterContainersRequest req) {
log.info("Entering onScaleResourceClusterRequest: " + req);
// [Notes] for scaling-up the request can go straight into provider to increase desire size.
// FOr scaling-down the decision requires getting idle hosts first.

pipe(this.resourceClusterProvider.upgradeContainerResource(req), getContext().dispatcher()).to(getSender());

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.master.resourcecluster.proto;

import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class UpgradeClusterContainersRequest {
String clusterId;

String region;

String optionalSkuId;

MantisResourceClusterEnvType optionalEnvType;

int optionalBatchMaxSize;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.master.resourcecluster.proto;

import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Value;

@Value
public class UpgradeClusterContainersResponse extends BaseResponse {
String clusterId;

String region;

String optionalSkuId;

MantisResourceClusterEnvType optionalEnvType;

@Builder
@JsonCreator
public UpgradeClusterContainersResponse(
@JsonProperty("requestId") final long requestId,
@JsonProperty("responseCode") final ResponseCode responseCode,
@JsonProperty("message") final String message,
@JsonProperty("clusterId") final String clusterId,
@JsonProperty("region") final String region,
@JsonProperty("optionalSkuId") String optionalSkuId,
@JsonProperty("optionalEnvType") MantisResourceClusterEnvType optionalEnvType) {
super(requestId, responseCode, message);
this.clusterId = clusterId;
this.optionalSkuId = optionalSkuId;
this.region = region;
this.optionalEnvType = optionalEnvType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.mantisrx.master.resourcecluster.proto.ResourceClusterProvisionSubmissionResponse;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceResponse;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersRequest;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

Expand All @@ -37,6 +39,11 @@ public CompletionStage<ScaleResourceResponse> scaleResource(ScaleResourceRequest
return CompletableFuture.completedFuture(null);
}

@Override
public CompletionStage<UpgradeClusterContainersResponse> upgradeContainerResource(UpgradeClusterContainersRequest request) {
return CompletableFuture.completedFuture(null);
}

@Override
public ResourceClusterResponseHandler getResponseHandler() {
return new NoopResourceClusterResponseHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.mantisrx.master.resourcecluster.proto.ResourceClusterProvisionSubmissionResponse;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceResponse;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersRequest;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersResponse;
import java.util.concurrent.CompletionStage;

/**
Expand Down Expand Up @@ -50,5 +52,14 @@ CompletionStage<ResourceClusterProvisionSubmissionResponse> provisionClusterIfNo
*/
CompletionStage<ScaleResourceResponse> scaleResource(ScaleResourceRequest scaleRequest);

/**
* To upgrade cluster containers: each container running task executor is using docker image tag based image version.
* In regular case the upgrade is to refresh the container to re-deploy with latest digest associated with the image
* tag (e.g. latest).
* If multiple image digest versions need to be ran/hosted at the same time, it is recommended to create a separate
* sku id in addition to the existing sku(s).
*/
CompletionStage<UpgradeClusterContainersResponse> upgradeContainerResource(UpgradeClusterContainersRequest request);

ResourceClusterResponseHandler getResponseHandler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.mantisrx.master.resourcecluster.proto.ResourceClusterProvisionSubmissionResponse;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceResponse;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersRequest;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersResponse;
import java.util.concurrent.CompletionStage;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -72,6 +74,12 @@ public CompletionStage<ScaleResourceResponse> scaleResource(ScaleResourceRequest
return providerImpl.scaleResource(scaleRequest);
}

@Override
public CompletionStage<UpgradeClusterContainersResponse> upgradeContainerResource(
UpgradeClusterContainersRequest request) {
return providerImpl.upgradeContainerResource(request);
}

@Override
public ResourceClusterResponseHandler getResponseHandler() {
return providerImpl.getResponseHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.mantisrx.master.api.akka.route.handlers.ResourceClusterRouteHandlerAkkaImpl;
import io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode;
import io.mantisrx.master.resourcecluster.ResourceClustersHostManagerActor;
import io.mantisrx.master.resourcecluster.proto.MantisResourceClusterEnvType;
import io.mantisrx.master.resourcecluster.proto.MantisResourceClusterSpec;
import io.mantisrx.master.resourcecluster.proto.ProvisionResourceClusterRequest;
import io.mantisrx.master.resourcecluster.proto.ResourceClusterAPIProto.DeleteResourceClusterResponse;
Expand All @@ -47,6 +48,8 @@
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.GetResourceClusterScaleRulesResponse;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceResponse;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersRequest;
import io.mantisrx.master.resourcecluster.proto.UpgradeClusterContainersResponse;
import io.mantisrx.master.resourcecluster.resourceprovider.InMemoryOnlyResourceClusterStorageProvider;
import io.mantisrx.master.resourcecluster.resourceprovider.NoopResourceClusterResponseHandler;
import io.mantisrx.master.resourcecluster.resourceprovider.ResourceClusterProvider;
Expand Down Expand Up @@ -290,6 +293,32 @@ public void testResourceClusterScaleRulesRoutes() throws IOException {
GetResourceClusterScaleRulesResponse.class));
}

@Test
public void testResourceClusterUpgradeRoutes() throws IOException {
UpgradeClusterContainersRequest createRuleReq1 = UpgradeClusterContainersRequest.builder()
.clusterId(CLUSTER_ID)
.region("us-east-1")
.optionalBatchMaxSize(50)
.optionalSkuId("large")
.optionalEnvType(MantisResourceClusterEnvType.Prod)
.build();

testRoute.run(
HttpRequest.POST(getResourceClusterUpgradeEndpoint(CLUSTER_ID))
.withEntity(HttpEntities.create(
ContentTypes.APPLICATION_JSON,
Jackson.toJson(createRuleReq1))))
.assertStatusCode(StatusCodes.ACCEPTED)
.assertEntityAs(Jackson.unmarshaller(UpgradeClusterContainersResponse.class),
UpgradeClusterContainersResponse.builder()
.responseCode(ResponseCode.SUCCESS)
.clusterId(createRuleReq1.getClusterId())
.region(createRuleReq1.getRegion())
.optionalSkuId(createRuleReq1.getOptionalSkuId())
.optionalEnvType(createRuleReq1.getOptionalEnvType())
.build());
}

final String getResourceClusterEndpoint() {
return "/api/v1/resourceClusters";
}
Expand All @@ -312,6 +341,12 @@ final String getResourceClusterScaleRuleEndpoint(String clusterId) {
clusterId);
}

final String getResourceClusterUpgradeEndpoint(String clusterId) {
return String.format(
"/api/v1/resourceClusters/%s/upgrade",
clusterId);
}

public static class UnitTestResourceProviderAdapter implements ResourceClusterProvider {

private ResourceClusterProvider injectedProvider;
Expand Down Expand Up @@ -353,6 +388,20 @@ public CompletionStage<ScaleResourceResponse> scaleResource(ScaleResourceRequest
.build());
}

@Override
public CompletionStage<UpgradeClusterContainersResponse> upgradeContainerResource(
UpgradeClusterContainersRequest request) {
return CompletableFuture.completedFuture(
UpgradeClusterContainersResponse.builder()
.message("test scale resp")
.region(request.getRegion())
.optionalSkuId(request.getOptionalSkuId())
.clusterId(request.getClusterId())
.optionalEnvType(request.getOptionalEnvType())
.responseCode(ResponseCode.SUCCESS)
.build());
}

@Override
public ResourceClusterResponseHandler getResponseHandler() {
if (this.injectedProvider != null) return this.injectedProvider.getResponseHandler();
Expand Down
Loading

0 comments on commit b202912

Please sign in to comment.