Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JobUpdateTask cleanups #650

Merged
merged 7 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 67 additions & 108 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package feast.core.job;

import feast.core.FeatureSetProto;
import feast.core.SourceProto;
import feast.core.StoreProto;
import feast.core.log.Action;
import feast.core.log.AuditLogger;
import feast.core.log.Resource;
Expand Down Expand Up @@ -52,168 +49,130 @@
@Getter
public class JobUpdateTask implements Callable<Job> {

private final List<FeatureSetProto.FeatureSet> featureSets;
private final SourceProto.Source sourceSpec;
private final StoreProto.Store store;
private final List<FeatureSet> featureSets;
private final Source source;
private final Store store;
private final Optional<Job> currentJob;
private JobManager jobManager;
private long jobUpdateTimeoutSeconds;
private final JobManager jobManager;
private final long jobUpdateTimeoutSeconds;
private final String runnerName;

public JobUpdateTask(
List<FeatureSetProto.FeatureSet> featureSets,
SourceProto.Source sourceSpec,
StoreProto.Store store,
List<FeatureSet> featureSets,
Source source,
Store store,
Optional<Job> currentJob,
JobManager jobManager,
long jobUpdateTimeoutSeconds) {

this.featureSets = featureSets;
this.sourceSpec = sourceSpec;
this.source = source;
this.store = store;
this.currentJob = currentJob;
this.jobManager = jobManager;
this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds;
this.runnerName = jobManager.getRunnerType().toString();
Copy link
Member

@woop woop Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runnerName as we have it above refers to the runnerType right? Should we deduplicate that from the configuration (we could do that at the configuration level as well)?

feast:
  jobs:
    polling_interval_milliseconds: 60000
    job_update_timeout_seconds: 240

    active_runner: my_direct_runner

    runners:
      - name: my_direct_runner
        type: DirectRunner
        options: {}

https://github.com/gojek/feast/blame/ded7ca59a2bd5b40715af73fdaa4e4f19ad0b915/core/src/main/java/feast/core/config/FeastProperties.java#L95

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea with the configuration was to provide a more forward compatible configuration schema, but we could just as easily use type there instead of name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, from your example config, jobManager.getRunnerType().toString() will be "DirectRunner" (whereas jobManager.getRunnerType().name() would be "DIRECT").

In JobUpdateTask this value is being used only for log messages.

}

@Override
public Job call() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Copy link
Member Author

@ches ches Apr 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhilingc I'm trying to understand why this is used—JobUpdateTask is a Callable and its instances are all dispatched on separate threads by JobCoordinatorService, it seems redundant to have the tasks then use another thread again where they invoke a job manager (that will [usually] do network I/O). If anything, it would seem that futures should be at the layer of the job manager instead. call() submits and then blocks on them before returning.

One more question for you is about the parameters of helper methods like startJob and updateJob: most of them except the job are instance state of the JobUpdateTask itself, and they were able to be final, so it seems like there's no need for passing them around through arguments. I removed them in the process of working out what call() actually needed to touch. Am I missing anything, or were they just a vestige of some refactoring in your initial work?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading through the code I'm not sure what my past self was thinking tbqh :( You're right, its completely redundant.

As for the second point, you're right. I must've missed it.

Source source = Source.fromProto(sourceSpec);
Future<Job> submittedJob;
if (currentJob.isPresent()) {
Set<String> existingFeatureSetsPopulatedByJob =
currentJob.get().getFeatureSets().stream()
.map(FeatureSet::getId)
.collect(Collectors.toSet());
Set<String> newFeatureSetsPopulatedByJob =
featureSets.stream()
.map(fs -> FeatureSet.fromProto(fs).getId())
.collect(Collectors.toSet());
if (existingFeatureSetsPopulatedByJob.size() == newFeatureSetsPopulatedByJob.size()
&& existingFeatureSetsPopulatedByJob.containsAll(newFeatureSetsPopulatedByJob)) {
ches marked this conversation as resolved.
Show resolved Hide resolved
Job job = currentJob.get();
JobStatus newJobStatus = jobManager.getJobStatus(job);
if (newJobStatus != job.getStatus()) {
AuditLogger.log(
Resource.JOB,
job.getId(),
Action.STATUS_CHANGE,
"Job status updated: changed from %s to %s",
job.getStatus(),
newJobStatus);
}
job.setStatus(newJobStatus);
return job;

if (currentJob.isEmpty()) {
submittedJob = executorService.submit(this::createJob);
} else {
Job job = currentJob.get();

if (featureSetsChangedFor(job)) {
submittedJob = executorService.submit(() -> updateJob(job));
} else {
submittedJob =
executorService.submit(() -> updateJob(currentJob.get(), featureSets, store));
return updateStatus(job);
}
} else {
String jobId = createJobId(source.getId(), store.getName());
submittedJob = executorService.submit(() -> startJob(jobId, featureSets, sourceSpec, store));
}

Job job = null;
try {
job = submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS);
return submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("Unable to start job for source {} and sink {}: {}", source, store, e.getMessage());
return null;
} finally {
executorService.shutdownNow();
}
return job;
}

boolean featureSetsChangedFor(Job job) {
Set<String> existingFeatureSetsPopulatedByJob =
job.getFeatureSets().stream().map(FeatureSet::getId).collect(Collectors.toSet());
Set<String> newFeatureSetsPopulatedByJob =
featureSets.stream().map(FeatureSet::getId).collect(Collectors.toSet());

return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob);
}

private Job createJob() {
String jobId = createJobId(source.getId(), store.getName());
return startJob(jobId);
}

/** Start or update the job to ingest data to the sink. */
private Job startJob(
String jobId,
List<FeatureSetProto.FeatureSet> featureSetProtos,
SourceProto.Source source,
StoreProto.Store sinkSpec) {

List<FeatureSet> featureSets =
featureSetProtos.stream()
.map(
fsp ->
FeatureSet.fromProto(
FeatureSetProto.FeatureSet.newBuilder()
.setSpec(fsp.getSpec())
.setMeta(fsp.getMeta())
.build()))
.collect(Collectors.toList());
private Job startJob(String jobId) {

Job job =
new Job(
jobId,
"",
jobManager.getRunnerType(),
Source.fromProto(source),
Store.fromProto(sinkSpec),
featureSets,
JobStatus.PENDING);
jobId, "", jobManager.getRunnerType(), source, store, featureSets, JobStatus.PENDING);
try {
AuditLogger.log(
Resource.JOB,
jobId,
Action.SUBMIT,
"Building graph and submitting to %s",
jobManager.getRunnerType().toString());
logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName);

job = jobManager.startJob(job);
if (job.getExtId().isEmpty()) {
var extId = job.getExtId();
if (extId.isEmpty()) {
throw new RuntimeException(
String.format("Could not submit job: \n%s", "unable to retrieve job external id"));
}

AuditLogger.log(
Resource.JOB,
jobId,
Action.STATUS_CHANGE,
"Job submitted to runner %s with ext id %s.",
jobManager.getRunnerType().toString(),
job.getExtId());
var auditMessage = "Job submitted to runner %s with ext id %s.";
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName, extId);

return job;
} catch (Exception e) {
log.error(e.getMessage());
AuditLogger.log(
Resource.JOB,
jobId,
Action.STATUS_CHANGE,
"Job failed to be submitted to runner %s. Job status changed to ERROR.",
jobManager.getRunnerType().toString());
var auditMessage = "Job failed to be submitted to runner %s. Job status changed to ERROR.";
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName);

job.setStatus(JobStatus.ERROR);
return job;
}
}

/** Update the given job */
private Job updateJob(
Job job, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store store) {
job.setFeatureSets(
featureSets.stream()
.map(
fs ->
FeatureSet.fromProto(
FeatureSetProto.FeatureSet.newBuilder()
.setSpec(fs.getSpec())
.setMeta(fs.getMeta())
.build()))
.collect(Collectors.toList()));
job.setStore(feast.core.model.Store.fromProto(store));
AuditLogger.log(
Resource.JOB,
job.getId(),
Action.UPDATE,
"Updating job %s for runner %s",
job.getId(),
jobManager.getRunnerType().toString());
private Job updateJob(Job job) {
job.setFeatureSets(featureSets);
job.setStore(store);
logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerName);
return jobManager.updateJob(job);
}

private Job updateStatus(Job job) {
JobStatus currentStatus = job.getStatus();
JobStatus newStatus = jobManager.getJobStatus(job);
if (newStatus != currentStatus) {
var auditMessage = "Job status updated: changed from %s to %s";
logAudit(Action.STATUS_CHANGE, job, auditMessage, currentStatus, newStatus);
}

job.setStatus(newStatus);
return job;
}

String createJobId(String sourceId, String storeName) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String sourceIdTrunc = sourceId.split("/")[0].toLowerCase();
String jobId = String.format("%s-to-%s", sourceIdTrunc, storeName) + dateSuffix;
return jobId.replaceAll("_", "-");
}

private void logAudit(Action action, Job job, String detail, Object... args) {
AuditLogger.log(Resource.JOB, job.getId(), action, detail, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,15 @@ public void abortJob(String dataflowJobId) {
}

/**
* Restart a restart dataflow job. Dataflow should ensure continuity between during the restart,
* so no data should be lost during the restart operation.
* Restart a Dataflow job. Dataflow should ensure continuity such that no data should be lost
* during the restart operation.
*
* @param job job to restart
* @return the restarted job
*/
@Override
public Job restartJob(Job job) {
JobStatus status = job.getStatus();
if (JobStatus.getTerminalState().contains(status)) {
if (job.getStatus().isTerminal()) {
// job yet not running: just start job
return this.startJob(job);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ public PipelineResult runPipeline(ImportOptions pipelineOptions) throws IOExcept
*/
@Override
public Job restartJob(Job job) {
JobStatus status = job.getStatus();
if (JobStatus.getTerminalState().contains(status)) {
if (job.getStatus().isTerminal()) {
// job yet not running: just start job
return this.startJob(job);
} else {
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public Job(
this.status = jobStatus;
}

public boolean hasTerminated() {
return getStatus().isTerminal();
}

public boolean isRunning() {
return getStatus() == JobStatus.RUNNING;
}

public void updateMetrics(List<Metrics> newMetrics) {
metrics.clear();
metrics.addAll(newMetrics);
Expand Down
38 changes: 22 additions & 16 deletions core/src/main/java/feast/core/model/JobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package feast.core.model;

import feast.core.IngestionJobProto.IngestionJobStatus;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

public enum JobStatus {
/** Job status is not known. */
Expand Down Expand Up @@ -53,33 +51,41 @@ public enum JobStatus {
/** job has been suspended */
SUSPENDED;

private static final Collection<JobStatus> TERMINAL_STATE =
Collections.unmodifiableList(Arrays.asList(COMPLETED, ABORTED, ERROR));
private static final Set<JobStatus> TERMINAL_STATES = Set.of(COMPLETED, ABORTED, ERROR);

/**
* Get a collection of terminal job state.
* Get the set of terminal job states.
*
* <p>Terminal job state is final and will not change to any other state.
* <p>A terminal job state is final and will not change to any other state.
*
* @return collection of terminal job state.
* @return set of terminal job states.
*/
public static Collection<JobStatus> getTerminalState() {
return TERMINAL_STATE;
public static Set<JobStatus> getTerminalStates() {
return TERMINAL_STATES;
Comment on lines -66 to +64
Copy link
Member Author

@ches ches Apr 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got a little carried away, JobStatus changes are further than I meant to go for now… but I started touching JobCoordinatorService for filter(job -> !job.hasTerminated()) and I ended up here…

These semantically are/should be Sets I think. I'm not sure I see value in these static getters wrapping constants, after this PR their only remaining use is in a few tests. Considered removing them, but thought I'd leave it to discuss first.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

}

private static final Collection<JobStatus> TRANSITIONAL_STATES =
Collections.unmodifiableList(Arrays.asList(PENDING, ABORTING, SUSPENDING));
private static final Set<JobStatus> TRANSITIONAL_STATES = Set.of(PENDING, ABORTING, SUSPENDING);

/**
* Get Transitional Job Status states. Transitionals states are assigned to jobs that
* Get Transitional Job Status states. Transitional states are assigned to jobs that are
* transitioning to a more stable state (ie SUSPENDED, ABORTED etc.)
*
* @return Collection of transitional Job Status states.
* @return set of transitional Job Status states.
*/
public static final Collection<JobStatus> getTransitionalStates() {
public static Set<JobStatus> getTransitionalStates() {
return TRANSITIONAL_STATES;
}

/** @return true if this {@code JobStatus} is a terminal state. */
public boolean isTerminal() {
return getTerminalStates().contains(this);
}

/** @return true if this {@code JobStatus} is a transitional state. */
public boolean isTransitional() {
return getTransitionalStates().contains(this);
}

private static final Map<JobStatus, IngestionJobStatus> INGESTION_JOB_STATUS_MAP =
Map.of(
JobStatus.UNKNOWN, IngestionJobStatus.UNKNOWN,
Expand All @@ -95,7 +101,7 @@ public static final Collection<JobStatus> getTransitionalStates() {
/**
* Convert a Job Status to Ingestion Job Status proto
*
* @return IngestionJobStatus proto derieved from this job status
* @return IngestionJobStatus proto derived from this job status
*/
public IngestionJobStatus toProto() {
// maps job models job status to ingestion job status
Expand Down
Loading