Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JobUpdateTask cleanups #650

Merged
merged 7 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 17 additions & 43 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package feast.core.job;

import feast.core.FeatureSetProto;
import feast.core.SourceProto;
import feast.core.StoreProto;
import feast.core.log.Action;
import feast.core.log.AuditLogger;
import feast.core.log.Resource;
Expand Down Expand Up @@ -52,29 +49,29 @@
@Getter
public class JobUpdateTask implements Callable<Job> {

private final List<FeatureSetProto.FeatureSet> featureSets;
private final SourceProto.Source sourceSpec;
private final StoreProto.Store store;
private final List<FeatureSet> featureSets;
private final Source source;
private final Store store;
private final Optional<Job> currentJob;
private final JobManager jobManager;
private final long jobUpdateTimeoutSeconds;
private final String runnerType;
private final String runnerName;

public JobUpdateTask(
List<FeatureSetProto.FeatureSet> featureSets,
SourceProto.Source sourceSpec,
StoreProto.Store store,
List<FeatureSet> featureSets,
Source source,
Store store,
Optional<Job> currentJob,
JobManager jobManager,
long jobUpdateTimeoutSeconds) {

this.featureSets = featureSets;
this.sourceSpec = sourceSpec;
this.source = source;
this.store = store;
this.currentJob = currentJob;
this.jobManager = jobManager;
this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds;
this.runnerType = jobManager.getRunnerType().toString();
this.runnerName = jobManager.getRunnerType().toString();
Copy link
Member

@woop woop Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runnerName as we have it above refers to the runnerType right? Should we deduplicate that from the configuration (we could do that at the configuration level as well)?

feast:
  jobs:
    polling_interval_milliseconds: 60000
    job_update_timeout_seconds: 240

    active_runner: my_direct_runner

    runners:
      - name: my_direct_runner
        type: DirectRunner
        options: {}

https://github.com/gojek/feast/blame/ded7ca59a2bd5b40715af73fdaa4e4f19ad0b915/core/src/main/java/feast/core/config/FeastProperties.java#L95

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea with the configuration was to provide a more forward compatible configuration schema, but we could just as easily use type there instead of name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, from your example config, jobManager.getRunnerType().toString() will be "DirectRunner" (whereas jobManager.getRunnerType().name() would be "DIRECT").

In JobUpdateTask this value is being used only for log messages.

}

@Override
Expand All @@ -97,7 +94,6 @@ public Job call() {
try {
return submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
Source source = Source.fromProto(sourceSpec);
log.warn("Unable to start job for source {} and sink {}: {}", source, store, e.getMessage());
return null;
} finally {
Expand All @@ -109,15 +105,12 @@ boolean featureSetsChangedFor(Job job) {
Set<String> existingFeatureSetsPopulatedByJob =
job.getFeatureSets().stream().map(FeatureSet::getId).collect(Collectors.toSet());
Set<String> newFeatureSetsPopulatedByJob =
featureSets.stream()
.map(fs -> FeatureSet.fromProto(fs).getId())
.collect(Collectors.toSet());
featureSets.stream().map(FeatureSet::getId).collect(Collectors.toSet());

return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob);
}

private Job createJob() {
Source source = Source.fromProto(sourceSpec);
String jobId = createJobId(source.getId(), store.getName());
return startJob(jobId);
}
Expand All @@ -127,15 +120,9 @@ private Job startJob(String jobId) {

Job job =
new Job(
jobId,
"",
jobManager.getRunnerType(),
Source.fromProto(sourceSpec),
Store.fromProto(store),
featureSetsFromProto(featureSets),
JobStatus.PENDING);
jobId, "", jobManager.getRunnerType(), source, store, featureSets, JobStatus.PENDING);
try {
logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerType);
logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName);

job = jobManager.startJob(job);
var extId = job.getExtId();
Expand All @@ -145,13 +132,13 @@ private Job startJob(String jobId) {
}

var auditMessage = "Job submitted to runner %s with ext id %s.";
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerType, extId);
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName, extId);

return job;
} catch (Exception e) {
log.error(e.getMessage());
var auditMessage = "Job failed to be submitted to runner %s. Job status changed to ERROR.";
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerType);
logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName);

job.setStatus(JobStatus.ERROR);
return job;
Expand All @@ -160,9 +147,9 @@ private Job startJob(String jobId) {

/** Update the given job */
private Job updateJob(Job job) {
job.setFeatureSets(featureSetsFromProto(featureSets));
job.setStore(Store.fromProto(store));
logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerType);
job.setFeatureSets(featureSets);
job.setStore(store);
logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerName);
return jobManager.updateJob(job);
}

Expand All @@ -185,19 +172,6 @@ String createJobId(String sourceId, String storeName) {
return jobId.replaceAll("_", "-");
}

// TODO: Either put in a util somewhere, or stop using proto types in domain logic altogether
private static List<FeatureSet> featureSetsFromProto(List<FeatureSetProto.FeatureSet> protos) {
return protos.stream()
.map(
fsp ->
FeatureSet.fromProto(
FeatureSetProto.FeatureSet.newBuilder()
.setSpec(fsp.getSpec())
.setMeta(fsp.getMeta())
.build()))
.collect(Collectors.toList());
}

private void logAudit(Action action, Job job, String detail, Object... args) {
AuditLogger.log(Resource.JOB, job.getId(), action, detail, args);
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ public boolean hasTerminated() {
return getStatus().isTerminal();
}

public boolean isRunning() {
return getStatus() == JobStatus.RUNNING;
}

public void updateMetrics(List<Metrics> newMetrics) {
metrics.clear();
metrics.addAll(newMetrics);
Expand Down
117 changes: 60 additions & 57 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import feast.core.job.JobUpdateTask;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import java.util.ArrayList;
Expand All @@ -45,6 +44,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.validation.constraints.Positive;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -55,11 +55,11 @@
@Service
public class JobCoordinatorService {

private JobRepository jobRepository;
private FeatureSetRepository featureSetRepository;
private SpecService specService;
private JobManager jobManager;
private JobProperties jobProperties;
private final JobRepository jobRepository;
private final FeatureSetRepository featureSetRepository;
private final SpecService specService;
private final JobManager jobManager;
private final JobProperties jobProperties;

@Autowired
public JobCoordinatorService(
Expand Down Expand Up @@ -90,54 +90,56 @@ public JobCoordinatorService(
@Scheduled(fixedDelayString = "${feast.jobs.polling_interval_milliseconds}")
public void Poll() throws InvalidProtocolBufferException {
log.info("Polling for new jobs...");
@Positive long updateTimeout = jobProperties.getJobUpdateTimeoutSeconds();
List<JobUpdateTask> jobUpdateTasks = new ArrayList<>();
ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build());
for (StoreProto.Store store : listStoresResponse.getStoreList()) {
Set<FeatureSetProto.FeatureSet> featureSets = new HashSet<>();
for (Subscription subscription : store.getSubscriptionsList()) {
featureSets.addAll(
new ArrayList<>(
specService
.listFeatureSets(
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(subscription.getName())
.setFeatureSetVersion(subscription.getVersion())
.setProject(subscription.getProject())
.build())
.getFeatureSetsList()));
}
if (!featureSets.isEmpty()) {
featureSets.stream()
.collect(Collectors.groupingBy(fs -> fs.getSpec().getSource()))
.entrySet()
.stream()
.forEach(
kv -> {
Optional<Job> originalJob =
getJob(Source.fromProto(kv.getKey()), Store.fromProto(store));
jobUpdateTasks.add(
new JobUpdateTask(
kv.getValue(),
kv.getKey(),
store,
originalJob,
jobManager,
jobProperties.getJobUpdateTimeoutSeconds()));
});

for (StoreProto.Store storeSpec : listStoresResponse.getStoreList()) {
Set<FeatureSet> featureSets = new HashSet<>();
Store store = Store.fromProto(storeSpec);

for (Subscription subscription : store.getSubscriptions()) {
var featureSetSpecs =
specService
.listFeatureSets(
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(subscription.getName())
.setFeatureSetVersion(subscription.getVersion())
.setProject(subscription.getProject())
.build())
.getFeatureSetsList();
featureSets.addAll(featureSetsFromProto(featureSetSpecs));
}

featureSets.stream()
.collect(Collectors.groupingBy(FeatureSet::getSource))
.forEach(
(source, setsForSource) -> {
Optional<Job> originalJob = getJob(source, store);
jobUpdateTasks.add(
new JobUpdateTask(
setsForSource, source, store, originalJob, jobManager, updateTimeout));
});
}
if (jobUpdateTasks.size() == 0) {
if (jobUpdateTasks.isEmpty()) {
log.info("No jobs found.");
return;
}

log.info("Creating/Updating {} jobs...", jobUpdateTasks.size());
ExecutorService executorService = Executors.newFixedThreadPool(jobUpdateTasks.size());
startOrUpdateJobs(jobUpdateTasks);

log.info("Updating feature set status");
updateFeatureSetStatuses(jobUpdateTasks);
}

void startOrUpdateJobs(List<JobUpdateTask> tasks) {
ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
ExecutorCompletionService<Job> ecs = new ExecutorCompletionService<>(executorService);
jobUpdateTasks.forEach(ecs::submit);
tasks.forEach(ecs::submit);

int completedTasks = 0;
while (completedTasks < jobUpdateTasks.size()) {
while (completedTasks < tasks.size()) {
try {
Job job = ecs.take().get();
if (job != null) {
Expand All @@ -148,27 +150,23 @@ public void Poll() throws InvalidProtocolBufferException {
}
completedTasks++;
}

log.info("Updating feature set status");
updateFeatureSetStatuses(jobUpdateTasks);
executorService.shutdown();
}

// TODO: make this more efficient
private void updateFeatureSetStatuses(List<JobUpdateTask> jobUpdateTasks) {
Set<FeatureSet> ready = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related to this PR per say, but would it be possible for us to remove this updateFeatureSetStatuses call and use the real job state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're asking: rather than cache feature set statuses in bulk, could status be returned on-demand by checking job state for one given feature set, when requested? I guess the team will have more context on the design.

As it stands, I'd have to get my head more into functional consideration of what it's doing beyond the mechanical refactoring I've done to this class, but it seems like there could be more to look at later:

  1. In the Poll() loop we are:

    • At the beginning, doing getJob for the source/store combo of all feature sets that are subscribed to
    • Then, doing the startOrUpdate of tasks for all those, and waiting on all those tasks
    • Throwing away the results of waiting (updated Job references), because the startOrUpdate returns void
    • Doing getJob again for each of the tasks, in the updateFeatureSetStatuses

    It seems like this second round of DB lookups could be avoided by keeping and using the Jobs we just got back. Unless I'm missing something, if it's trying to avoid a race condition or something.

  2. getJob does jobRepository.findBySourceIdAndStoreNameOrderByLastUpdatedDesc returning all matching records, and then takes the first (most recent) one client-side instead of having SQL do it. And this happens in loops, 2x per polling loop as noted in 1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking your comments here: #664

and then takes the first (most recent) one client-side instead of having SQL do it.

That's a very low hanging fruit.

Set<FeatureSet> pending = new HashSet<>();
for (JobUpdateTask jobUpdateTask : jobUpdateTasks) {
Optional<Job> job =
getJob(
Source.fromProto(jobUpdateTask.getSourceSpec()),
Store.fromProto(jobUpdateTask.getStore()));
if (job.isPresent()) {
if (job.get().getStatus() == JobStatus.RUNNING) {
ready.addAll(job.get().getFeatureSets());
} else {
pending.addAll(job.get().getFeatureSets());
}
}
for (JobUpdateTask task : jobUpdateTasks) {
getJob(task.getSource(), task.getStore())
.ifPresent(
job -> {
if (job.isRunning()) {
ready.addAll(job.getFeatureSets());
} else {
pending.addAll(job.getFeatureSets());
}
});
}
ready.removeAll(pending);
ready.forEach(
Expand Down Expand Up @@ -196,4 +194,9 @@ public Optional<Job> getJob(Source source, Store store) {
// return the latest
return Optional.of(jobs.get(0));
}

// TODO: Put in a util somewhere?
private static List<FeatureSet> featureSetsFromProto(List<FeatureSetProto.FeatureSet> protos) {
return protos.stream().map(FeatureSet::fromProto).collect(Collectors.toList());
}
}
Loading