diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index 57b2494736..25ce386d40 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -16,9 +16,6 @@ */ package feast.core.job; -import feast.core.FeatureSetProto; -import feast.core.SourceProto; -import feast.core.StoreProto; import feast.core.log.Action; import feast.core.log.AuditLogger; import feast.core.log.Resource; @@ -52,29 +49,29 @@ @Getter public class JobUpdateTask implements Callable { - private final List featureSets; - private final SourceProto.Source sourceSpec; - private final StoreProto.Store store; + private final List featureSets; + private final Source source; + private final Store store; private final Optional currentJob; private final JobManager jobManager; private final long jobUpdateTimeoutSeconds; - private final String runnerType; + private final String runnerName; public JobUpdateTask( - List featureSets, - SourceProto.Source sourceSpec, - StoreProto.Store store, + List featureSets, + Source source, + Store store, Optional currentJob, JobManager jobManager, long jobUpdateTimeoutSeconds) { this.featureSets = featureSets; - this.sourceSpec = sourceSpec; + this.source = source; this.store = store; this.currentJob = currentJob; this.jobManager = jobManager; this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds; - this.runnerType = jobManager.getRunnerType().toString(); + this.runnerName = jobManager.getRunnerType().toString(); } @Override @@ -97,7 +94,6 @@ public Job call() { try { return submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - Source source = Source.fromProto(sourceSpec); log.warn("Unable to start job for source {} and sink {}: {}", source, store, e.getMessage()); return null; } finally { @@ -109,15 +105,12 @@ boolean featureSetsChangedFor(Job job) { Set existingFeatureSetsPopulatedByJob = job.getFeatureSets().stream().map(FeatureSet::getId).collect(Collectors.toSet()); Set newFeatureSetsPopulatedByJob = - featureSets.stream() - .map(fs -> FeatureSet.fromProto(fs).getId()) - .collect(Collectors.toSet()); + featureSets.stream().map(FeatureSet::getId).collect(Collectors.toSet()); return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob); } private Job createJob() { - Source source = Source.fromProto(sourceSpec); String jobId = createJobId(source.getId(), store.getName()); return startJob(jobId); } @@ -127,15 +120,9 @@ private Job startJob(String jobId) { Job job = new Job( - jobId, - "", - jobManager.getRunnerType(), - Source.fromProto(sourceSpec), - Store.fromProto(store), - featureSetsFromProto(featureSets), - JobStatus.PENDING); + jobId, "", jobManager.getRunnerType(), source, store, featureSets, JobStatus.PENDING); try { - logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerType); + logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName); job = jobManager.startJob(job); var extId = job.getExtId(); @@ -145,13 +132,13 @@ private Job startJob(String jobId) { } var auditMessage = "Job submitted to runner %s with ext id %s."; - logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerType, extId); + logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName, extId); return job; } catch (Exception e) { log.error(e.getMessage()); var auditMessage = "Job failed to be submitted to runner %s. Job status changed to ERROR."; - logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerType); + logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName); job.setStatus(JobStatus.ERROR); return job; @@ -160,9 +147,9 @@ private Job startJob(String jobId) { /** Update the given job */ private Job updateJob(Job job) { - job.setFeatureSets(featureSetsFromProto(featureSets)); - job.setStore(Store.fromProto(store)); - logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerType); + job.setFeatureSets(featureSets); + job.setStore(store); + logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerName); return jobManager.updateJob(job); } @@ -185,19 +172,6 @@ String createJobId(String sourceId, String storeName) { return jobId.replaceAll("_", "-"); } - // TODO: Either put in a util somewhere, or stop using proto types in domain logic altogether - private static List featureSetsFromProto(List protos) { - return protos.stream() - .map( - fsp -> - FeatureSet.fromProto( - FeatureSetProto.FeatureSet.newBuilder() - .setSpec(fsp.getSpec()) - .setMeta(fsp.getMeta()) - .build())) - .collect(Collectors.toList()); - } - private void logAudit(Action action, Job job, String detail, Object... args) { AuditLogger.log(Resource.JOB, job.getId(), action, detail, args); } diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 93bdff76bf..fc801f76a4 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -115,6 +115,10 @@ public boolean hasTerminated() { return getStatus().isTerminal(); } + public boolean isRunning() { + return getStatus() == JobStatus.RUNNING; + } + public void updateMetrics(List newMetrics) { metrics.clear(); metrics.addAll(newMetrics); diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 340fc98b99..6f366be508 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -32,7 +32,6 @@ import feast.core.job.JobUpdateTask; import feast.core.model.FeatureSet; import feast.core.model.Job; -import feast.core.model.JobStatus; import feast.core.model.Source; import feast.core.model.Store; import java.util.ArrayList; @@ -45,6 +44,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; +import javax.validation.constraints.Positive; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -55,11 +55,11 @@ @Service public class JobCoordinatorService { - private JobRepository jobRepository; - private FeatureSetRepository featureSetRepository; - private SpecService specService; - private JobManager jobManager; - private JobProperties jobProperties; + private final JobRepository jobRepository; + private final FeatureSetRepository featureSetRepository; + private final SpecService specService; + private final JobManager jobManager; + private final JobProperties jobProperties; @Autowired public JobCoordinatorService( @@ -90,54 +90,56 @@ public JobCoordinatorService( @Scheduled(fixedDelayString = "${feast.jobs.polling_interval_milliseconds}") public void Poll() throws InvalidProtocolBufferException { log.info("Polling for new jobs..."); + @Positive long updateTimeout = jobProperties.getJobUpdateTimeoutSeconds(); List jobUpdateTasks = new ArrayList<>(); ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build()); - for (StoreProto.Store store : listStoresResponse.getStoreList()) { - Set featureSets = new HashSet<>(); - for (Subscription subscription : store.getSubscriptionsList()) { - featureSets.addAll( - new ArrayList<>( - specService - .listFeatureSets( - ListFeatureSetsRequest.Filter.newBuilder() - .setFeatureSetName(subscription.getName()) - .setFeatureSetVersion(subscription.getVersion()) - .setProject(subscription.getProject()) - .build()) - .getFeatureSetsList())); - } - if (!featureSets.isEmpty()) { - featureSets.stream() - .collect(Collectors.groupingBy(fs -> fs.getSpec().getSource())) - .entrySet() - .stream() - .forEach( - kv -> { - Optional originalJob = - getJob(Source.fromProto(kv.getKey()), Store.fromProto(store)); - jobUpdateTasks.add( - new JobUpdateTask( - kv.getValue(), - kv.getKey(), - store, - originalJob, - jobManager, - jobProperties.getJobUpdateTimeoutSeconds())); - }); + + for (StoreProto.Store storeSpec : listStoresResponse.getStoreList()) { + Set featureSets = new HashSet<>(); + Store store = Store.fromProto(storeSpec); + + for (Subscription subscription : store.getSubscriptions()) { + var featureSetSpecs = + specService + .listFeatureSets( + ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName(subscription.getName()) + .setFeatureSetVersion(subscription.getVersion()) + .setProject(subscription.getProject()) + .build()) + .getFeatureSetsList(); + featureSets.addAll(featureSetsFromProto(featureSetSpecs)); } + + featureSets.stream() + .collect(Collectors.groupingBy(FeatureSet::getSource)) + .forEach( + (source, setsForSource) -> { + Optional originalJob = getJob(source, store); + jobUpdateTasks.add( + new JobUpdateTask( + setsForSource, source, store, originalJob, jobManager, updateTimeout)); + }); } - if (jobUpdateTasks.size() == 0) { + if (jobUpdateTasks.isEmpty()) { log.info("No jobs found."); return; } log.info("Creating/Updating {} jobs...", jobUpdateTasks.size()); - ExecutorService executorService = Executors.newFixedThreadPool(jobUpdateTasks.size()); + startOrUpdateJobs(jobUpdateTasks); + + log.info("Updating feature set status"); + updateFeatureSetStatuses(jobUpdateTasks); + } + + void startOrUpdateJobs(List tasks) { + ExecutorService executorService = Executors.newFixedThreadPool(tasks.size()); ExecutorCompletionService ecs = new ExecutorCompletionService<>(executorService); - jobUpdateTasks.forEach(ecs::submit); + tasks.forEach(ecs::submit); int completedTasks = 0; - while (completedTasks < jobUpdateTasks.size()) { + while (completedTasks < tasks.size()) { try { Job job = ecs.take().get(); if (job != null) { @@ -148,27 +150,23 @@ public void Poll() throws InvalidProtocolBufferException { } completedTasks++; } - - log.info("Updating feature set status"); - updateFeatureSetStatuses(jobUpdateTasks); + executorService.shutdown(); } // TODO: make this more efficient private void updateFeatureSetStatuses(List jobUpdateTasks) { Set ready = new HashSet<>(); Set pending = new HashSet<>(); - for (JobUpdateTask jobUpdateTask : jobUpdateTasks) { - Optional job = - getJob( - Source.fromProto(jobUpdateTask.getSourceSpec()), - Store.fromProto(jobUpdateTask.getStore())); - if (job.isPresent()) { - if (job.get().getStatus() == JobStatus.RUNNING) { - ready.addAll(job.get().getFeatureSets()); - } else { - pending.addAll(job.get().getFeatureSets()); - } - } + for (JobUpdateTask task : jobUpdateTasks) { + getJob(task.getSource(), task.getStore()) + .ifPresent( + job -> { + if (job.isRunning()) { + ready.addAll(job.getFeatureSets()); + } else { + pending.addAll(job.getFeatureSets()); + } + }); } ready.removeAll(pending); ready.forEach( @@ -196,4 +194,9 @@ public Optional getJob(Source source, Store store) { // return the latest return Optional.of(jobs.get(0)); } + + // TODO: Put in a util somewhere? + private static List featureSetsFromProto(List protos) { + return protos.stream().map(FeatureSet::fromProto).collect(Collectors.toList()); + } } diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java index c62c7d2198..8d179baebb 100644 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java @@ -49,6 +49,7 @@ import org.mockito.Mock; public class JobUpdateTaskTest { + private static final Runner RUNNER = Runner.DATAFLOW; private static final FeatureSetProto.FeatureSet.Builder fsBuilder = FeatureSetProto.FeatureSet.newBuilder().setMeta(FeatureSetMeta.newBuilder()); @@ -57,82 +58,61 @@ public class JobUpdateTaskTest { @Mock private JobManager jobManager; - private StoreProto.Store store; - private SourceProto.Source source; - private FeatureSetProto.FeatureSet featureSet1; + private Store store; + private Source source; + private FeatureSet featureSet1; @Before public void setUp() { initMocks(this); - when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); + when(jobManager.getRunnerType()).thenReturn(RUNNER); store = - StoreProto.Store.newBuilder() - .setName("test") - .setType(StoreType.REDIS) - .setRedisConfig(RedisConfig.newBuilder().build()) - .addSubscriptions( - Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) - .build(); + Store.fromProto( + StoreProto.Store.newBuilder() + .setName("test") + .setType(StoreType.REDIS) + .setRedisConfig(RedisConfig.newBuilder().build()) + .addSubscriptions( + Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) + .build()); source = - SourceProto.Source.newBuilder() - .setType(SourceType.KAFKA) - .setKafkaSourceConfig( - KafkaSourceConfig.newBuilder() - .setTopic("topic") - .setBootstrapServers("servers:9092") - .build()) - .build(); - - featureSet1 = fsBuilder.setSpec(specBuilder.setName("featureSet1").setSource(source)).build(); + Source.fromProto( + SourceProto.Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setTopic("topic") + .setBootstrapServers("servers:9092") + .build()) + .build()); + + featureSet1 = + FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet1")).build()); + featureSet1.setSource(source); + } + + Job makeJob(String extId, List featureSets, JobStatus status) { + return new Job("job", extId, RUNNER, source, store, featureSets, status); + } + + JobUpdateTask makeTask(List featureSets, Optional currentJob) { + return new JobUpdateTask(featureSets, source, store, currentJob, jobManager, 100L); } @Test public void shouldUpdateJobIfPresent() { - FeatureSetProto.FeatureSet featureSet2 = - fsBuilder.setSpec(specBuilder.setName("featureSet2")).build(); - List existingFeatureSetsPopulatedByJob = - Collections.singletonList(FeatureSet.fromProto(featureSet1)); - List newFeatureSetsPopulatedByJob = - Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)); - - Job originalJob = - new Job( - "job", - "old_ext", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - existingFeatureSetsPopulatedByJob, - JobStatus.RUNNING); - JobUpdateTask jobUpdateTask = - new JobUpdateTask( - Arrays.asList(featureSet1, featureSet2), - source, - store, - Optional.of(originalJob), - jobManager, - 100L); - Job submittedJob = - new Job( - "job", - "old_ext", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - newFeatureSetsPopulatedByJob, - JobStatus.RUNNING); - - Job expected = - new Job( - "job", - "new_ext", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - newFeatureSetsPopulatedByJob, - JobStatus.PENDING); + FeatureSet featureSet2 = + FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet2")).build()); + List existingFeatureSetsPopulatedByJob = Collections.singletonList(featureSet1); + List newFeatureSetsPopulatedByJob = Arrays.asList(featureSet1, featureSet2); + + Job originalJob = makeJob("old_ext", existingFeatureSetsPopulatedByJob, JobStatus.RUNNING); + JobUpdateTask jobUpdateTask = makeTask(newFeatureSetsPopulatedByJob, Optional.of(originalJob)); + Job submittedJob = makeJob("old_ext", newFeatureSetsPopulatedByJob, JobStatus.RUNNING); + + Job expected = makeJob("new_ext", newFeatureSetsPopulatedByJob, JobStatus.PENDING); when(jobManager.updateJob(submittedJob)).thenReturn(expected); Job actual = jobUpdateTask.call(); @@ -141,31 +121,12 @@ public void shouldUpdateJobIfPresent() { @Test public void shouldCreateJobIfNotPresent() { - JobUpdateTask jobUpdateTask = - spy( - new JobUpdateTask( - Arrays.asList(featureSet1), source, store, Optional.empty(), jobManager, 100L)); + var featureSets = Collections.singletonList(featureSet1); + JobUpdateTask jobUpdateTask = spy(makeTask(featureSets, Optional.empty())); doReturn("job").when(jobUpdateTask).createJobId("KAFKA/servers:9092/topic", "test"); - Job expectedInput = - new Job( - "job", - "", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.PENDING); - - Job expected = - new Job( - "job", - "ext", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.RUNNING); + Job expectedInput = makeJob("", featureSets, JobStatus.PENDING); + Job expected = makeJob("ext", featureSets, JobStatus.PENDING); when(jobManager.startJob(expectedInput)).thenReturn(expected); @@ -175,18 +136,9 @@ public void shouldCreateJobIfNotPresent() { @Test public void shouldUpdateJobStatusIfNotCreateOrUpdate() { - Job originalJob = - new Job( - "job", - "ext", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.RUNNING); - JobUpdateTask jobUpdateTask = - new JobUpdateTask( - Arrays.asList(featureSet1), source, store, Optional.of(originalJob), jobManager, 100L); + var featureSets = Collections.singletonList(featureSet1); + Job originalJob = makeJob("ext", featureSets, JobStatus.RUNNING); + JobUpdateTask jobUpdateTask = makeTask(featureSets, Optional.of(originalJob)); when(jobManager.getJobStatus(originalJob)).thenReturn(JobStatus.ABORTING); Job updated = jobUpdateTask.call(); @@ -196,31 +148,12 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { @Test public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { - JobUpdateTask jobUpdateTask = - spy( - new JobUpdateTask( - Arrays.asList(featureSet1), source, store, Optional.empty(), jobManager, 100L)); + var featureSets = Collections.singletonList(featureSet1); + JobUpdateTask jobUpdateTask = spy(makeTask(featureSets, Optional.empty())); doReturn("job").when(jobUpdateTask).createJobId("KAFKA/servers:9092/topic", "test"); - Job expectedInput = - new Job( - "job", - "", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.PENDING); - - Job expected = - new Job( - "job", - "", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.ERROR); + Job expectedInput = makeJob("", featureSets, JobStatus.PENDING); + Job expected = makeJob("", featureSets, JobStatus.ERROR); when(jobManager.startJob(expectedInput)) .thenThrow(new RuntimeException("Something went wrong")); @@ -231,10 +164,13 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { @Test public void shouldTimeout() { + var featureSets = Collections.singletonList(featureSet1); + var timeoutSeconds = 0L; JobUpdateTask jobUpdateTask = spy( new JobUpdateTask( - Arrays.asList(featureSet1), source, store, Optional.empty(), jobManager, 0L)); + featureSets, source, store, Optional.empty(), jobManager, timeoutSeconds)); + Job actual = jobUpdateTask.call(); assertThat(actual, is(IsNull.nullValue())); }