Skip to content

Commit

Permalink
core: Refactor JobUpdateTask to use domain model types
Browse files Browse the repository at this point in the history
And factor out things that don't differ between its test cases.

JobCoordinatorService unmarshals protos to model types eagerly as they
come off the wire, so we're left dealing with models everywhere else.
  • Loading branch information
ches committed Apr 28, 2020
1 parent 15cc078 commit ded7ca5
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 223 deletions.
60 changes: 17 additions & 43 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package feast.core.job;

import feast.core.FeatureSetProto;
import feast.core.SourceProto;
import feast.core.StoreProto;
import feast.core.log.Action;
import feast.core.log.AuditLogger;
import feast.core.log.Resource;
Expand Down Expand Up @@ -52,29 +49,29 @@
@Getter
public class JobUpdateTask implements Callable<Job> {

private final List<FeatureSetProto.FeatureSet> featureSets;
private final SourceProto.Source sourceSpec;
private final StoreProto.Store store;
private final List<FeatureSet> featureSets;
private final Source source;
private final Store store;
private final Optional<Job> currentJob;
private final JobManager jobManager;
private final long jobUpdateTimeoutSeconds;
private final String runnerType;
private final String runnerName;

public JobUpdateTask(
List<FeatureSetProto.FeatureSet> featureSets,
SourceProto.Source sourceSpec,
StoreProto.Store store,
List<FeatureSet> featureSets,
Source source,
Store store,
Optional<Job> currentJob,
JobManager jobManager,
long jobUpdateTimeoutSeconds) {

this.featureSets = featureSets;
this.sourceSpec = sourceSpec;
this.source = source;
this.store = store;
this.currentJob = currentJob;
this.jobManager = jobManager;
this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds;
this.runnerType = jobManager.getRunnerType().toString();
this.runnerName = jobManager.getRunnerType().toString();
}

@Override
Expand All @@ -97,7 +94,6 @@ public Job call() {
try {
return submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
Source source = Source.fromProto(sourceSpec);
log.warn("Unable to start job for source {} and sink {}: {}", source, store, e.getMessage());
return null;
} finally {
Expand All @@ -109,15 +105,12 @@ boolean featureSetsChangedFor(Job job) {
Set<String> existingFeatureSetsPopulatedByJob =
job.getFeatureSets().stream().map(FeatureSet::getId).collect(Collectors.toSet());
Set<String> newFeatureSetsPopulatedByJob =
featureSets.stream()
.map(fs -> FeatureSet.fromProto(fs).getId())
.collect(Collectors.toSet());
featureSets.stream().map(FeatureSet::getId).collect(Collectors.toSet());

return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob);
}

private Job createJob() {
Source source = Source.fromProto(sourceSpec);
String jobId = createJobId(source.getId(), store.getName());
return startJob(jobId);
}
Expand All @@ -127,15 +120,9 @@ private Job startJob(String jobId) {

Job job =
new Job(
jobId,
"",
jobManager.getRunnerType(),
Source.fromProto(sourceSpec),
Store.fromProto(store),
featureSetsFromProto(featureSets),
JobStatus.PENDING);
jobId, "", jobManager.getRunnerType(), source, store, featureSets, JobStatus.PENDING);
try {
logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerType);
logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName);

job = jobManager.startJob(job);
var extId = job.getExtId();
Expand All @@ -145,13 +132,13 @@ private Job startJob(String jobId) {
}

var auditMessage = "Job submitted to runner %s with ext id %s.";
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerType, extId);
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName, extId);

return job;
} catch (Exception e) {
log.error(e.getMessage());
var auditMessage = "Job failed to be submitted to runner %s. Job status changed to ERROR.";
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerType);
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName);

job.setStatus(JobStatus.ERROR);
return job;
Expand All @@ -160,9 +147,9 @@ private Job startJob(String jobId) {

/** Update the given job */
private Job updateJob(Job job) {
job.setFeatureSets(featureSetsFromProto(featureSets));
job.setStore(Store.fromProto(store));
logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerType);
job.setFeatureSets(featureSets);
job.setStore(store);
logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerName);
return jobManager.updateJob(job);
}

Expand All @@ -185,19 +172,6 @@ String createJobId(String sourceId, String storeName) {
return jobId.replaceAll("_", "-");
}

// TODO: Either put in a util somewhere, or stop using proto types in domain logic altogether
private static List<FeatureSet> featureSetsFromProto(List<FeatureSetProto.FeatureSet> protos) {
return protos.stream()
.map(
fsp ->
FeatureSet.fromProto(
FeatureSetProto.FeatureSet.newBuilder()
.setSpec(fsp.getSpec())
.setMeta(fsp.getMeta())
.build()))
.collect(Collectors.toList());
}

private void logAudit(Action action, Job job, String detail, Object... args) {
AuditLogger.log(Resource.JOB, job.getId(), action, detail, args);
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ public boolean hasTerminated() {
return getStatus().isTerminal();
}

public boolean isRunning() {
return getStatus() == JobStatus.RUNNING;
}

public void updateMetrics(List<Metrics> newMetrics) {
metrics.clear();
metrics.addAll(newMetrics);
Expand Down
117 changes: 60 additions & 57 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import feast.core.job.JobUpdateTask;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import java.util.ArrayList;
Expand All @@ -45,6 +44,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.validation.constraints.Positive;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -55,11 +55,11 @@
@Service
public class JobCoordinatorService {

private JobRepository jobRepository;
private FeatureSetRepository featureSetRepository;
private SpecService specService;
private JobManager jobManager;
private JobProperties jobProperties;
private final JobRepository jobRepository;
private final FeatureSetRepository featureSetRepository;
private final SpecService specService;
private final JobManager jobManager;
private final JobProperties jobProperties;

@Autowired
public JobCoordinatorService(
Expand Down Expand Up @@ -90,54 +90,56 @@ public JobCoordinatorService(
@Scheduled(fixedDelayString = "${feast.jobs.polling_interval_milliseconds}")
public void Poll() throws InvalidProtocolBufferException {
log.info("Polling for new jobs...");
@Positive long updateTimeout = jobProperties.getJobUpdateTimeoutSeconds();
List<JobUpdateTask> jobUpdateTasks = new ArrayList<>();
ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build());
for (StoreProto.Store store : listStoresResponse.getStoreList()) {
Set<FeatureSetProto.FeatureSet> featureSets = new HashSet<>();
for (Subscription subscription : store.getSubscriptionsList()) {
featureSets.addAll(
new ArrayList<>(
specService
.listFeatureSets(
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(subscription.getName())
.setFeatureSetVersion(subscription.getVersion())
.setProject(subscription.getProject())
.build())
.getFeatureSetsList()));
}
if (!featureSets.isEmpty()) {
featureSets.stream()
.collect(Collectors.groupingBy(fs -> fs.getSpec().getSource()))
.entrySet()
.stream()
.forEach(
kv -> {
Optional<Job> originalJob =
getJob(Source.fromProto(kv.getKey()), Store.fromProto(store));
jobUpdateTasks.add(
new JobUpdateTask(
kv.getValue(),
kv.getKey(),
store,
originalJob,
jobManager,
jobProperties.getJobUpdateTimeoutSeconds()));
});

for (StoreProto.Store storeSpec : listStoresResponse.getStoreList()) {
Set<FeatureSet> featureSets = new HashSet<>();
Store store = Store.fromProto(storeSpec);

for (Subscription subscription : store.getSubscriptions()) {
var featureSetSpecs =
specService
.listFeatureSets(
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(subscription.getName())
.setFeatureSetVersion(subscription.getVersion())
.setProject(subscription.getProject())
.build())
.getFeatureSetsList();
featureSets.addAll(featureSetsFromProto(featureSetSpecs));
}

featureSets.stream()
.collect(Collectors.groupingBy(FeatureSet::getSource))
.forEach(
(source, setsForSource) -> {
Optional<Job> originalJob = getJob(source, store);
jobUpdateTasks.add(
new JobUpdateTask(
setsForSource, source, store, originalJob, jobManager, updateTimeout));
});
}
if (jobUpdateTasks.size() == 0) {
if (jobUpdateTasks.isEmpty()) {
log.info("No jobs found.");
return;
}

log.info("Creating/Updating {} jobs...", jobUpdateTasks.size());
ExecutorService executorService = Executors.newFixedThreadPool(jobUpdateTasks.size());
startOrUpdateJobs(jobUpdateTasks);

log.info("Updating feature set status");
updateFeatureSetStatuses(jobUpdateTasks);
}

void startOrUpdateJobs(List<JobUpdateTask> tasks) {
ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
ExecutorCompletionService<Job> ecs = new ExecutorCompletionService<>(executorService);
jobUpdateTasks.forEach(ecs::submit);
tasks.forEach(ecs::submit);

int completedTasks = 0;
while (completedTasks < jobUpdateTasks.size()) {
while (completedTasks < tasks.size()) {
try {
Job job = ecs.take().get();
if (job != null) {
Expand All @@ -148,27 +150,23 @@ public void Poll() throws InvalidProtocolBufferException {
}
completedTasks++;
}

log.info("Updating feature set status");
updateFeatureSetStatuses(jobUpdateTasks);
executorService.shutdown();
}

// TODO: make this more efficient
private void updateFeatureSetStatuses(List<JobUpdateTask> jobUpdateTasks) {
Set<FeatureSet> ready = new HashSet<>();
Set<FeatureSet> pending = new HashSet<>();
for (JobUpdateTask jobUpdateTask : jobUpdateTasks) {
Optional<Job> job =
getJob(
Source.fromProto(jobUpdateTask.getSourceSpec()),
Store.fromProto(jobUpdateTask.getStore()));
if (job.isPresent()) {
if (job.get().getStatus() == JobStatus.RUNNING) {
ready.addAll(job.get().getFeatureSets());
} else {
pending.addAll(job.get().getFeatureSets());
}
}
for (JobUpdateTask task : jobUpdateTasks) {
getJob(task.getSource(), task.getStore())
.ifPresent(
job -> {
if (job.isRunning()) {
ready.addAll(job.getFeatureSets());
} else {
pending.addAll(job.getFeatureSets());
}
});
}
ready.removeAll(pending);
ready.forEach(
Expand Down Expand Up @@ -196,4 +194,9 @@ public Optional<Job> getJob(Source source, Store store) {
// return the latest
return Optional.of(jobs.get(0));
}

// TODO: Put in a util somewhere?
private static List<FeatureSet> featureSetsFromProto(List<FeatureSetProto.FeatureSet> protos) {
return protos.stream().map(FeatureSet::fromProto).collect(Collectors.toList());
}
}
Loading

0 comments on commit ded7ca5

Please sign in to comment.