Skip to content

Commit

Permalink
Apply formatting after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
sundargates committed Oct 6, 2021
1 parent 0c0f9bb commit 7b00765
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 310 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package io.mantisrx.runtime.descriptor;

import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Singular;

import java.util.Map;

@Builder
@EqualsAndHashCode
public class DeploymentStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@

package io.mantisrx.runtime.descriptor;

import java.io.IOException;
import java.util.*;
import java.util.function.Function;

import io.mantisrx.runtime.JobConstraints;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnore;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.*;
import java.util.function.Function;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package io.mantisrx.runtime.descriptor;

import java.io.IOException;
import java.util.List;

import io.mantisrx.runtime.JobConstraints;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.List;
import lombok.Builder;
import lombok.Singular;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@

package io.mantisrx.runtime.descriptor;

import org.junit.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.junit.Assert.*;

import java.util.HashMap;
import java.util.Map;
import org.junit.Test;

public class SchedulingInfoTest {
@Test
public void shouldRequireInheritInstanceCheck() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,21 @@

package io.mantisrx.master.jobcluster;

import static akka.pattern.PatternsCS.ask;
import static io.mantisrx.master.StringConstants.MANTIS_MASTER_USER;
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.CLIENT_ERROR;
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND;
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.SERVER_ERROR;
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.SUCCESS;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static java.util.Optional.ofNullable;

import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;

import io.mantisrx.shaded.com.google.common.collect.Lists;
import com.mantisrx.common.utils.LabelUtils;
import com.netflix.fenzo.triggers.CronTrigger;
import com.netflix.fenzo.triggers.TriggerOperator;
Expand Down Expand Up @@ -53,18 +61,18 @@
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.EnableJobClusterResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobClusterResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.JobClustersManagerInitializeResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.KillJobResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.ListArchivedWorkersRequest;
Expand Down Expand Up @@ -98,7 +106,6 @@
import io.mantisrx.master.jobcluster.proto.JobClusterProto.JobStartedEvent;
import io.mantisrx.master.jobcluster.proto.JobClusterProto.KillJobRequest;
import io.mantisrx.master.jobcluster.proto.JobProto;

import io.mantisrx.runtime.JobConstraints;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.server.core.JobCompletedReason;
Expand All @@ -115,32 +122,21 @@
import io.mantisrx.server.master.domain.SLA;
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.persistence.exceptions.JobClusterAlreadyExistsException;

import io.mantisrx.server.master.scheduler.MantisScheduler;
import io.mantisrx.server.master.scheduler.WorkerEvent;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import static akka.pattern.PatternsCS.ask;
import static io.mantisrx.master.StringConstants.MANTIS_MASTER_USER;
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.CLIENT_ERROR;
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND;
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.SERVER_ERROR;
import static io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode.SUCCESS;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static java.util.Optional.ofNullable;


/**
* Actor responsible for handling all operations related to one Job Cluster.
Expand Down Expand Up @@ -866,7 +862,7 @@ public void onJobClusterUpdate(final UpdateJobClusterRequest request) {
.build();


try {
try {
updateAndSaveJobCluster(jobCluster);
sender.tell(new UpdateJobClusterResponse(request.requestId, SUCCESS, name
+ " Job cluster updated"), getSelf());
Expand Down Expand Up @@ -1117,7 +1113,7 @@ private Observable<MantisJobMetadataView> getFilteredNonTerminalJobList(ListJobC
* @param request
* @return
*/

private Observable<MantisJobMetadataView> getFilteredTerminalJobList(ListJobCriteria request, Set<JobId> jobIdSet) {
if(logger.isTraceEnabled()) { logger.trace("JobClusterActor:getFilteredTerminalJobList"); }

Expand Down Expand Up @@ -1325,7 +1321,7 @@ public void onJobSubmit(final SubmitJobRequest request) {
logger.error("Exception submitting job {} from {}", request.getClusterName(), request.getSubmitter(), e);
numJobSubmissionFailures.increment();
sender.tell(new SubmitJobResponse(request.requestId, CLIENT_ERROR, e.getMessage(), empty()), getSelf());
}
}
}

public void onGetJobDefinitionUpdatedFromJobActorResponse(GetJobDefinitionUpdatedFromJobActorResponse request) {
Expand Down Expand Up @@ -1783,21 +1779,21 @@ public void onGetJobDetailsRequest(GetJobDetailsRequest req) {
} else {
// Could be a terminated job
Optional<CompletedJob> completedJob = jobManager.getCompletedJob(req.getJobId());

if(completedJob.isPresent()) {
if(logger.isDebugEnabled()) { logger.debug("Found Job {} in completed state ", req.getJobId()); }
try {
Optional<IMantisJobMetadata> jobMetaOp = jobStore.getArchivedJob(req.getJobId().getId());
if(jobMetaOp.isPresent()) {
response = new GetJobDetailsResponse(req.requestId, SUCCESS, "", jobMetaOp);

} else {
response = new GetJobDetailsResponse(req.requestId, CLIENT_ERROR_NOT_FOUND, "Job " + req.getJobId() + " not found", empty());
}
} catch (Exception e) {
logger.warn("Exception {} reading Job {} from Storage ", e.getMessage(), req.getJobId());
response = new GetJobDetailsResponse(req.requestId, CLIENT_ERROR, "Exception reading Job " + req.getJobId() + " " + e.getMessage(), empty());

}
} else {
logger.warn("No such job {} ", req.getJobId());
Expand Down Expand Up @@ -1920,8 +1916,8 @@ public void onEnforceSLARequest(JobClusterProto.EnforceSLARequest request) {
}


// enforce max.
} else {
// enforce max.
} else {
List<JobInfo> listOfJobs = new ArrayList<>(activeJobsCount + acceptedJobsCount);
listOfJobs.addAll(jobManager.getActiveJobsList());
listOfJobs.addAll(jobManager.getAcceptedJobsList());
Expand Down Expand Up @@ -2066,7 +2062,7 @@ public void onJobClusterUpdateSLA(UpdateJobClusterSLARequest slaRequest) {
new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE,
jobClusterMetadata.getJobClusterDefinition().getName(), name+" SLA update")
);
} catch(IllegalArgumentException e) {
} catch(IllegalArgumentException e) {
logger.error("Invalid arguement job cluster not updated ", e);
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, CLIENT_ERROR, name + " Job cluster SLA updation failed " + e.getMessage()), getSelf());

Expand Down Expand Up @@ -2203,7 +2199,7 @@ private void updateAndSaveJobCluster(IJobClusterMetadata jobCluster) throws Exce
if(logger.isTraceEnabled()) { logger.trace("Entering JobClusterActor:updateAndSaveJobCluster {}", jobCluster.getJobClusterDefinition().getName()); }
jobStore.updateJobCluster(jobCluster);
jobClusterMetadata = jobCluster;
// enable cluster if
// enable cluster if
if(!jobClusterMetadata.isDisabled()) {
getContext().become(initializedBehavior);
}
Expand Down Expand Up @@ -2258,9 +2254,9 @@ private Optional<JobDefinition> getLastSubmittedJobDefinition(final List<Complet

/**
* 2 cases this can occur
* 1. Graceful shutdown : Where the job cluster actor requests the job actor to terminate. In this case we simply clear the pending
* 1. Graceful shutdown : Where the job cluster actor requests the job actor to terminate. In this case we simply clear the pending
* delete jobs map
*
*
* 2. Unexpected shutdown : The job actor terminated unexpectedly in which case we need to relaunch the actor.
* @param terminatedEvent Event describing a job actor was terminated
*/
Expand Down Expand Up @@ -2833,7 +2829,7 @@ int activeJobsCount() {

return this.activeJobsMap.size();
}

Optional<CompletedJob> getCompletedJob(JobId jId) {
return completedJobsCache.getCompletedJob(jId);
}
Expand Down Expand Up @@ -2868,7 +2864,7 @@ Optional<JobInfo> getJobInfoForNonTerminalJob(JobId jId) {
} else if(this.terminatingJobsMap.containsKey(jId)) {
if(logger.isDebugEnabled() ) { logger.debug("Found {} in terminating state", jId); }
return of(terminatingJobsMap.get(jId));
}
}
return empty();
}

Expand All @@ -2879,7 +2875,7 @@ Optional<JobInfo> getJobInfoForNonTerminalJob(String jobId) {
}
return empty();
}

Optional<JobInfo> getJobInfoByUniqueId(final String uniqueId) {
return this.getAllNonTerminalJobsList().stream().filter((jobInfo) -> {
String unq = jobInfo.jobDefinition.getJobSla().getUserProvidedType();
Expand Down Expand Up @@ -3291,7 +3287,7 @@ public boolean containsKey(JobId jobId) {
return completedJobs.containsKey(jobId);
}
}

static class CronManager {
private static final TriggerOperator triggerOperator;
private static final Logger logger = LoggerFactory.getLogger(CronManager.class);
Expand Down Expand Up @@ -3322,7 +3318,7 @@ static class CronManager {
initCron();
}
}

private void initCron() throws Exception{
if(cronSpec == null || triggerId != null) {
return;
Expand All @@ -3336,9 +3332,9 @@ private void initCron() throws Exception{
} catch (IllegalArgumentException e) {
throw new SchedulerException(e.getMessage(), e);
}

}

private void destroyCron() {
try {
if (triggerId != null) {
Expand All @@ -3356,16 +3352,16 @@ boolean isCronActive() {
return isCronActive;
}
}

public static class CronTriggerAction implements Action1<ActorRef> {

@Override
public void call(ActorRef jobClusterActor) {

jobClusterActor.tell(new JobClusterProto.TriggerCronRequest(), ActorRef.noSender());

}

}


Expand Down
Loading

0 comments on commit 7b00765

Please sign in to comment.