Skip to content

Commit

Permalink
refactor: keep request state in respective classes
Browse files Browse the repository at this point in the history
(cherry picked from commit c26778b)
  • Loading branch information
romansmirnov authored and github-actions[bot] committed Mar 22, 2022
1 parent 00d3420 commit 52c9bba
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 92 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway.impl.job;

import io.camunda.zeebe.gateway.impl.broker.PartitionIdIterator;

public class InflightActivateJobsRequestState {

private final PartitionIdIterator iterator;
private int remainingAmount;
private boolean pollPrevPartition;
private boolean resourceExhaustedWasPresent;

public InflightActivateJobsRequestState(
final PartitionIdIterator iterator, final int remainingAmount) {
this.iterator = iterator;
this.remainingAmount = remainingAmount;
}

private boolean hasNextPartition() {
return iterator.hasNext();
}

public int getCurrentPartition() {
return iterator.getCurrentPartitionId();
}

public int getNextPartition() {
return pollPrevPartition ? iterator.getCurrentPartitionId() : iterator.next();
}

public int getRemainingAmount() {
return remainingAmount;
}

public void setRemainingAmount(int remainingAmount) {
this.remainingAmount = remainingAmount;
}

public boolean wasResourceExhaustedPresent() {
return resourceExhaustedWasPresent;
}

public void setResourceExhaustedWasPresent(final boolean resourceExhaustedWasPresent) {
this.resourceExhaustedWasPresent = resourceExhaustedWasPresent;
}

public void setPollPrevPartition(boolean pollPrevPartition) {
this.pollPrevPartition = pollPrevPartition;
}

public boolean shouldActivateJobs() {
return remainingAmount > 0 && (pollPrevPartition || hasNextPartition());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,7 @@ private void activateJobsUnchecked(
final int partitionsCount = topology.getPartitionsCount();
activateJobsHandler.activateJobs(
partitionsCount,
request.getRequest(),
request.getMaxJobsToActivate(),
request.getType(),
request,
response -> onResponse(request, response),
error -> onError(state, request, error),
(remainingAmount, containedResourceExhaustedResponse) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
*/
package io.camunda.zeebe.gateway.impl.job;

import static io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler.toInflightActivateJobsRequest;

import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.RequestMapper;
import io.camunda.zeebe.gateway.ResponseMapper;
import io.camunda.zeebe.gateway.cmd.BrokerErrorException;
import io.camunda.zeebe.gateway.cmd.BrokerRejectionException;
Expand All @@ -17,9 +18,7 @@
import io.camunda.zeebe.gateway.impl.broker.PartitionIdIterator;
import io.camunda.zeebe.gateway.impl.broker.RequestDispatchStrategy;
import io.camunda.zeebe.gateway.impl.broker.RoundRobinDispatchStrategy;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.protocol.record.ErrorCode;
Expand Down Expand Up @@ -57,14 +56,12 @@ public void accept(ActorControl actor) {
public void activateJobs(
final ActivateJobsRequest request,
final ServerStreamObserver<ActivateJobsResponse> responseObserver) {
final BrokerClusterState topology = brokerClient.getTopologyManager().getTopology();
final var topology = topologyManager.getTopology();
if (topology != null) {
final int partitionsCount = topology.getPartitionsCount();
final var inflightRequest = toInflightActivateJobsRequest(request, responseObserver);
activateJobs(
partitionsCount,
RequestMapper.toActivateJobsRequest(request),
request.getMaxJobsToActivate(),
request.getType(),
topology.getPartitionsCount(),
inflightRequest,
responseObserver::onNext,
responseObserver::onError,
(remainingAmount, resourceExhaustedWasPresent) -> responseObserver.onCompleted());
Expand All @@ -73,109 +70,73 @@ public void activateJobs(

public void activateJobs(
final int partitionsCount,
final BrokerActivateJobsRequest request,
final int maxJobsToActivate,
final String type,
final InflightActivateJobsRequest request,
final Consumer<ActivateJobsResponse> onResponse,
final Consumer<Throwable> onError,
final BiConsumer<Integer, Boolean> onCompleted) {
activateJobs(
request,
partitionIdIteratorForType(type, partitionsCount),
maxJobsToActivate,
type,
onResponse,
onError,
onCompleted);
}
final var jobType = request.getType();
final var maxJobsToActivate = request.getMaxJobsToActivate();
final var partitionIterator = partitionIdIteratorForType(jobType, partitionsCount);

private void activateJobs(
final BrokerActivateJobsRequest request,
final PartitionIdIterator partitionIdIterator,
final int remainingAmount,
final String jobType,
final Consumer<ActivateJobsResponse> onResponse,
final Consumer<Throwable> onError,
final BiConsumer<Integer, Boolean> onCompleted) {
activateJobs(
request,
partitionIdIterator,
remainingAmount,
jobType,
onResponse,
onError,
onCompleted,
false,
false);
final var requestState =
new InflightActivateJobsRequestState(partitionIterator, maxJobsToActivate);
final var delegate = new ResponseObserverDelegate(onResponse, onError, onCompleted);

activateJobs(request, requestState, delegate);
}

private void activateJobs(
final BrokerActivateJobsRequest request,
final PartitionIdIterator partitionIdIterator,
final int remainingAmount,
final String jobType,
final Consumer<ActivateJobsResponse> onResponse,
final Consumer<Throwable> onError,
final BiConsumer<Integer, Boolean> onCompleted,
final boolean pollPrevPartition,
final boolean resourceExhaustedWasPresent) {
final InflightActivateJobsRequest request,
final InflightActivateJobsRequestState requestState,
final ResponseObserverDelegate delegate) {

if (remainingAmount > 0 && (pollPrevPartition || partitionIdIterator.hasNext())) {
final int partitionId =
pollPrevPartition
? partitionIdIterator.getCurrentPartitionId()
: partitionIdIterator.next();
if (requestState.shouldActivateJobs()) {
final var brokerRequest = request.getRequest();
final var partitionId = requestState.getNextPartition();
final var remainingAmount = requestState.getRemainingAmount();

// partitions to check and jobs to activate left
request.setPartitionId(partitionId);
request.setMaxJobsToActivate(remainingAmount);
brokerRequest.setPartitionId(partitionId);
brokerRequest.setMaxJobsToActivate(remainingAmount);

brokerClient
.sendRequest(request)
.sendRequest(brokerRequest)
.whenComplete(
(response, error) -> {
(brokerResponse, error) -> {
if (error == null) {
final var response = brokerResponse.getResponse();
final ActivateJobsResponse grpcResponse =
ResponseMapper.toActivateJobsResponse(
response.getKey(), response.getResponse());
ResponseMapper.toActivateJobsResponse(brokerResponse.getKey(), response);
final int jobsCount = grpcResponse.getJobsCount();
if (jobsCount > 0) {
onResponse.accept(grpcResponse);
delegate.onResponse(grpcResponse);
}

activateJobs(
request,
partitionIdIterator,
remainingAmount - jobsCount,
jobType,
onResponse,
onError,
onCompleted,
response.getResponse().getTruncated(),
resourceExhaustedWasPresent);
final var remainingJobsToActivate = requestState.getRemainingAmount() - jobsCount;
final var shouldPollCurrentPartitionAgain = response.getTruncated();

requestState.setRemainingAmount(remainingJobsToActivate);
requestState.setPollPrevPartition(shouldPollCurrentPartitionAgain);
activateJobs(request, requestState, delegate);
} else {
final boolean wasResourceExhausted = wasResourceExhausted(error);
if (isRejection(error)) {
onError.accept(error);
delegate.onError(error);
return;
} else if (!wasResourceExhausted) {
logErrorResponse(partitionIdIterator, jobType, error);
logErrorResponse(requestState.getCurrentPartition(), request.getType(), error);
}

activateJobs(
request,
partitionIdIterator,
remainingAmount,
jobType,
onResponse,
onError,
onCompleted,
false,
wasResourceExhausted);
requestState.setResourceExhaustedWasPresent(wasResourceExhausted);
requestState.setPollPrevPartition(false);
activateJobs(request, requestState, delegate);
}
});
} else {
// enough jobs activated or no more partitions left to check
onCompleted.accept(remainingAmount, resourceExhaustedWasPresent);
final var remainingAmount = requestState.getRemainingAmount();
final var resourceExhaustedWasPresent = requestState.wasResourceExhaustedPresent();
delegate.onCompleted(remainingAmount, resourceExhaustedWasPresent);
}
}

Expand All @@ -192,13 +153,9 @@ private boolean wasResourceExhausted(final Throwable error) {
return false;
}

private void logErrorResponse(
final PartitionIdIterator partitionIdIterator, final String jobType, final Throwable error) {
private void logErrorResponse(final int partition, final String jobType, final Throwable error) {
Loggers.GATEWAY_LOGGER.warn(
"Failed to activate jobs for type {} from partition {}",
jobType,
partitionIdIterator.getCurrentPartitionId(),
error);
"Failed to activate jobs for type {} from partition {}", jobType, partition, error);
}

private PartitionIdIterator partitionIdIteratorForType(
Expand All @@ -209,4 +166,32 @@ private PartitionIdIterator partitionIdIteratorForType(
return new PartitionIdIterator(
nextPartitionSupplier.determinePartition(), partitionsCount, topologyManager);
}

private static final class ResponseObserverDelegate {

private final Consumer<ActivateJobsResponse> onResponseDelegate;
private final Consumer<Throwable> onErrorDelegate;
private final BiConsumer<Integer, Boolean> onCompletedDelegate;

private ResponseObserverDelegate(
final Consumer<ActivateJobsResponse> onResponseDelegate,
final Consumer<Throwable> onErrorDelegate,
final BiConsumer<Integer, Boolean> onCompletedDelegate) {
this.onResponseDelegate = onResponseDelegate;
this.onErrorDelegate = onErrorDelegate;
this.onCompletedDelegate = onCompletedDelegate;
}

public void onResponse(final ActivateJobsResponse response) {
onResponseDelegate.accept(response);
}

public void onError(final Throwable t) {
onErrorDelegate.accept(t);
}

public void onCompleted(final int remainingAmount, final boolean resourceExhaustedWasPresent) {
onCompletedDelegate.accept(remainingAmount, resourceExhaustedWasPresent);
}
}
}

0 comments on commit 52c9bba

Please sign in to comment.