diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index 04aab0cff6..25ce386d40 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -16,9 +16,6 @@ */ package feast.core.job; -import feast.core.FeatureSetProto; -import feast.core.SourceProto; -import feast.core.StoreProto; import feast.core.log.Action; import feast.core.log.AuditLogger; import feast.core.log.Resource; @@ -52,134 +49,96 @@ @Getter public class JobUpdateTask implements Callable { - private final List featureSets; - private final SourceProto.Source sourceSpec; - private final StoreProto.Store store; + private final List featureSets; + private final Source source; + private final Store store; private final Optional currentJob; - private JobManager jobManager; - private long jobUpdateTimeoutSeconds; + private final JobManager jobManager; + private final long jobUpdateTimeoutSeconds; + private final String runnerName; public JobUpdateTask( - List featureSets, - SourceProto.Source sourceSpec, - StoreProto.Store store, + List featureSets, + Source source, + Store store, Optional currentJob, JobManager jobManager, long jobUpdateTimeoutSeconds) { this.featureSets = featureSets; - this.sourceSpec = sourceSpec; + this.source = source; this.store = store; this.currentJob = currentJob; this.jobManager = jobManager; this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds; + this.runnerName = jobManager.getRunnerType().toString(); } @Override public Job call() { ExecutorService executorService = Executors.newSingleThreadExecutor(); - Source source = Source.fromProto(sourceSpec); Future submittedJob; - if (currentJob.isPresent()) { - Set existingFeatureSetsPopulatedByJob = - currentJob.get().getFeatureSets().stream() - .map(FeatureSet::getId) - .collect(Collectors.toSet()); - Set newFeatureSetsPopulatedByJob = - featureSets.stream() - .map(fs -> FeatureSet.fromProto(fs).getId()) - .collect(Collectors.toSet()); - if (existingFeatureSetsPopulatedByJob.size() == newFeatureSetsPopulatedByJob.size() - && existingFeatureSetsPopulatedByJob.containsAll(newFeatureSetsPopulatedByJob)) { - Job job = currentJob.get(); - JobStatus newJobStatus = jobManager.getJobStatus(job); - if (newJobStatus != job.getStatus()) { - AuditLogger.log( - Resource.JOB, - job.getId(), - Action.STATUS_CHANGE, - "Job status updated: changed from %s to %s", - job.getStatus(), - newJobStatus); - } - job.setStatus(newJobStatus); - return job; + + if (currentJob.isEmpty()) { + submittedJob = executorService.submit(this::createJob); + } else { + Job job = currentJob.get(); + + if (featureSetsChangedFor(job)) { + submittedJob = executorService.submit(() -> updateJob(job)); } else { - submittedJob = - executorService.submit(() -> updateJob(currentJob.get(), featureSets, store)); + return updateStatus(job); } - } else { - String jobId = createJobId(source.getId(), store.getName()); - submittedJob = executorService.submit(() -> startJob(jobId, featureSets, sourceSpec, store)); } - Job job = null; try { - job = submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS); + return submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.warn("Unable to start job for source {} and sink {}: {}", source, store, e.getMessage()); + return null; + } finally { executorService.shutdownNow(); } - return job; + } + + boolean featureSetsChangedFor(Job job) { + Set existingFeatureSetsPopulatedByJob = + job.getFeatureSets().stream().map(FeatureSet::getId).collect(Collectors.toSet()); + Set newFeatureSetsPopulatedByJob = + featureSets.stream().map(FeatureSet::getId).collect(Collectors.toSet()); + + return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob); + } + + private Job createJob() { + String jobId = createJobId(source.getId(), store.getName()); + return startJob(jobId); } /** Start or update the job to ingest data to the sink. */ - private Job startJob( - String jobId, - List featureSetProtos, - SourceProto.Source source, - StoreProto.Store sinkSpec) { - - List featureSets = - featureSetProtos.stream() - .map( - fsp -> - FeatureSet.fromProto( - FeatureSetProto.FeatureSet.newBuilder() - .setSpec(fsp.getSpec()) - .setMeta(fsp.getMeta()) - .build())) - .collect(Collectors.toList()); + private Job startJob(String jobId) { + Job job = new Job( - jobId, - "", - jobManager.getRunnerType(), - Source.fromProto(source), - Store.fromProto(sinkSpec), - featureSets, - JobStatus.PENDING); + jobId, "", jobManager.getRunnerType(), source, store, featureSets, JobStatus.PENDING); try { - AuditLogger.log( - Resource.JOB, - jobId, - Action.SUBMIT, - "Building graph and submitting to %s", - jobManager.getRunnerType().toString()); + logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName); job = jobManager.startJob(job); - if (job.getExtId().isEmpty()) { + var extId = job.getExtId(); + if (extId.isEmpty()) { throw new RuntimeException( String.format("Could not submit job: \n%s", "unable to retrieve job external id")); } - AuditLogger.log( - Resource.JOB, - jobId, - Action.STATUS_CHANGE, - "Job submitted to runner %s with ext id %s.", - jobManager.getRunnerType().toString(), - job.getExtId()); + var auditMessage = "Job submitted to runner %s with ext id %s."; + logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName, extId); return job; } catch (Exception e) { log.error(e.getMessage()); - AuditLogger.log( - Resource.JOB, - jobId, - Action.STATUS_CHANGE, - "Job failed to be submitted to runner %s. Job status changed to ERROR.", - jobManager.getRunnerType().toString()); + var auditMessage = "Job failed to be submitted to runner %s. Job status changed to ERROR."; + logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName); job.setStatus(JobStatus.ERROR); return job; @@ -187,33 +146,33 @@ private Job startJob( } /** Update the given job */ - private Job updateJob( - Job job, List featureSets, StoreProto.Store store) { - job.setFeatureSets( - featureSets.stream() - .map( - fs -> - FeatureSet.fromProto( - FeatureSetProto.FeatureSet.newBuilder() - .setSpec(fs.getSpec()) - .setMeta(fs.getMeta()) - .build())) - .collect(Collectors.toList())); - job.setStore(feast.core.model.Store.fromProto(store)); - AuditLogger.log( - Resource.JOB, - job.getId(), - Action.UPDATE, - "Updating job %s for runner %s", - job.getId(), - jobManager.getRunnerType().toString()); + private Job updateJob(Job job) { + job.setFeatureSets(featureSets); + job.setStore(store); + logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerName); return jobManager.updateJob(job); } + private Job updateStatus(Job job) { + JobStatus currentStatus = job.getStatus(); + JobStatus newStatus = jobManager.getJobStatus(job); + if (newStatus != currentStatus) { + var auditMessage = "Job status updated: changed from %s to %s"; + logAudit(Action.STATUS_CHANGE, job, auditMessage, currentStatus, newStatus); + } + + job.setStatus(newStatus); + return job; + } + String createJobId(String sourceId, String storeName) { String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); String sourceIdTrunc = sourceId.split("/")[0].toLowerCase(); String jobId = String.format("%s-to-%s", sourceIdTrunc, storeName) + dateSuffix; return jobId.replaceAll("_", "-"); } + + private void logAudit(Action action, Job job, String detail, Object... args) { + AuditLogger.log(Resource.JOB, job.getId(), action, detail, args); + } } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 880dd6c146..db9a7f9070 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -193,16 +193,15 @@ public void abortJob(String dataflowJobId) { } /** - * Restart a restart dataflow job. Dataflow should ensure continuity between during the restart, - * so no data should be lost during the restart operation. + * Restart a Dataflow job. Dataflow should ensure continuity such that no data should be lost + * during the restart operation. * * @param job job to restart * @return the restarted job */ @Override public Job restartJob(Job job) { - JobStatus status = job.getStatus(); - if (JobStatus.getTerminalState().contains(status)) { + if (job.getStatus().isTerminal()) { // job yet not running: just start job return this.startJob(job); } else { diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index 9b3a8473e4..2adedbefd9 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -166,8 +166,7 @@ public PipelineResult runPipeline(ImportOptions pipelineOptions) throws IOExcept */ @Override public Job restartJob(Job job) { - JobStatus status = job.getStatus(); - if (JobStatus.getTerminalState().contains(status)) { + if (job.getStatus().isTerminal()) { // job yet not running: just start job return this.startJob(job); } else { diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 95bcd79e6c..fc801f76a4 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -111,6 +111,14 @@ public Job( this.status = jobStatus; } + public boolean hasTerminated() { + return getStatus().isTerminal(); + } + + public boolean isRunning() { + return getStatus() == JobStatus.RUNNING; + } + public void updateMetrics(List newMetrics) { metrics.clear(); metrics.addAll(newMetrics); diff --git a/core/src/main/java/feast/core/model/JobStatus.java b/core/src/main/java/feast/core/model/JobStatus.java index 86aa512933..1d86900e2c 100644 --- a/core/src/main/java/feast/core/model/JobStatus.java +++ b/core/src/main/java/feast/core/model/JobStatus.java @@ -17,10 +17,8 @@ package feast.core.model; import feast.core.IngestionJobProto.IngestionJobStatus; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.Map; +import java.util.Set; public enum JobStatus { /** Job status is not known. */ @@ -53,33 +51,41 @@ public enum JobStatus { /** job has been suspended */ SUSPENDED; - private static final Collection TERMINAL_STATE = - Collections.unmodifiableList(Arrays.asList(COMPLETED, ABORTED, ERROR)); + private static final Set TERMINAL_STATES = Set.of(COMPLETED, ABORTED, ERROR); /** - * Get a collection of terminal job state. + * Get the set of terminal job states. * - *

Terminal job state is final and will not change to any other state. + *

A terminal job state is final and will not change to any other state. * - * @return collection of terminal job state. + * @return set of terminal job states. */ - public static Collection getTerminalState() { - return TERMINAL_STATE; + public static Set getTerminalStates() { + return TERMINAL_STATES; } - private static final Collection TRANSITIONAL_STATES = - Collections.unmodifiableList(Arrays.asList(PENDING, ABORTING, SUSPENDING)); + private static final Set TRANSITIONAL_STATES = Set.of(PENDING, ABORTING, SUSPENDING); /** - * Get Transitional Job Status states. Transitionals states are assigned to jobs that + * Get Transitional Job Status states. Transitional states are assigned to jobs that are * transitioning to a more stable state (ie SUSPENDED, ABORTED etc.) * - * @return Collection of transitional Job Status states. + * @return set of transitional Job Status states. */ - public static final Collection getTransitionalStates() { + public static Set getTransitionalStates() { return TRANSITIONAL_STATES; } + /** @return true if this {@code JobStatus} is a terminal state. */ + public boolean isTerminal() { + return getTerminalStates().contains(this); + } + + /** @return true if this {@code JobStatus} is a transitional state. */ + public boolean isTransitional() { + return getTransitionalStates().contains(this); + } + private static final Map INGESTION_JOB_STATUS_MAP = Map.of( JobStatus.UNKNOWN, IngestionJobStatus.UNKNOWN, @@ -95,7 +101,7 @@ public static final Collection getTransitionalStates() { /** * Convert a Job Status to Ingestion Job Status proto * - * @return IngestionJobStatus proto derieved from this job status + * @return IngestionJobStatus proto derived from this job status */ public IngestionJobStatus toProto() { // maps job models job status to ingestion job status diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index b4ed341edc..6f366be508 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -32,7 +32,6 @@ import feast.core.job.JobUpdateTask; import feast.core.model.FeatureSet; import feast.core.model.Job; -import feast.core.model.JobStatus; import feast.core.model.Source; import feast.core.model.Store; import java.util.ArrayList; @@ -45,6 +44,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; +import javax.validation.constraints.Positive; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -55,11 +55,11 @@ @Service public class JobCoordinatorService { - private JobRepository jobRepository; - private FeatureSetRepository featureSetRepository; - private SpecService specService; - private JobManager jobManager; - private JobProperties jobProperties; + private final JobRepository jobRepository; + private final FeatureSetRepository featureSetRepository; + private final SpecService specService; + private final JobManager jobManager; + private final JobProperties jobProperties; @Autowired public JobCoordinatorService( @@ -90,54 +90,56 @@ public JobCoordinatorService( @Scheduled(fixedDelayString = "${feast.jobs.polling_interval_milliseconds}") public void Poll() throws InvalidProtocolBufferException { log.info("Polling for new jobs..."); + @Positive long updateTimeout = jobProperties.getJobUpdateTimeoutSeconds(); List jobUpdateTasks = new ArrayList<>(); ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build()); - for (StoreProto.Store store : listStoresResponse.getStoreList()) { - Set featureSets = new HashSet<>(); - for (Subscription subscription : store.getSubscriptionsList()) { - featureSets.addAll( - new ArrayList<>( - specService - .listFeatureSets( - ListFeatureSetsRequest.Filter.newBuilder() - .setFeatureSetName(subscription.getName()) - .setFeatureSetVersion(subscription.getVersion()) - .setProject(subscription.getProject()) - .build()) - .getFeatureSetsList())); - } - if (!featureSets.isEmpty()) { - featureSets.stream() - .collect(Collectors.groupingBy(fs -> fs.getSpec().getSource())) - .entrySet() - .stream() - .forEach( - kv -> { - Optional originalJob = - getJob(Source.fromProto(kv.getKey()), Store.fromProto(store)); - jobUpdateTasks.add( - new JobUpdateTask( - kv.getValue(), - kv.getKey(), - store, - originalJob, - jobManager, - jobProperties.getJobUpdateTimeoutSeconds())); - }); + + for (StoreProto.Store storeSpec : listStoresResponse.getStoreList()) { + Set featureSets = new HashSet<>(); + Store store = Store.fromProto(storeSpec); + + for (Subscription subscription : store.getSubscriptions()) { + var featureSetSpecs = + specService + .listFeatureSets( + ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName(subscription.getName()) + .setFeatureSetVersion(subscription.getVersion()) + .setProject(subscription.getProject()) + .build()) + .getFeatureSetsList(); + featureSets.addAll(featureSetsFromProto(featureSetSpecs)); } + + featureSets.stream() + .collect(Collectors.groupingBy(FeatureSet::getSource)) + .forEach( + (source, setsForSource) -> { + Optional originalJob = getJob(source, store); + jobUpdateTasks.add( + new JobUpdateTask( + setsForSource, source, store, originalJob, jobManager, updateTimeout)); + }); } - if (jobUpdateTasks.size() == 0) { + if (jobUpdateTasks.isEmpty()) { log.info("No jobs found."); return; } log.info("Creating/Updating {} jobs...", jobUpdateTasks.size()); - ExecutorService executorService = Executors.newFixedThreadPool(jobUpdateTasks.size()); + startOrUpdateJobs(jobUpdateTasks); + + log.info("Updating feature set status"); + updateFeatureSetStatuses(jobUpdateTasks); + } + + void startOrUpdateJobs(List tasks) { + ExecutorService executorService = Executors.newFixedThreadPool(tasks.size()); ExecutorCompletionService ecs = new ExecutorCompletionService<>(executorService); - jobUpdateTasks.forEach(ecs::submit); + tasks.forEach(ecs::submit); int completedTasks = 0; - while (completedTasks < jobUpdateTasks.size()) { + while (completedTasks < tasks.size()) { try { Job job = ecs.take().get(); if (job != null) { @@ -148,27 +150,23 @@ public void Poll() throws InvalidProtocolBufferException { } completedTasks++; } - - log.info("Updating feature set status"); - updateFeatureSetStatuses(jobUpdateTasks); + executorService.shutdown(); } // TODO: make this more efficient private void updateFeatureSetStatuses(List jobUpdateTasks) { Set ready = new HashSet<>(); Set pending = new HashSet<>(); - for (JobUpdateTask jobUpdateTask : jobUpdateTasks) { - Optional job = - getJob( - Source.fromProto(jobUpdateTask.getSourceSpec()), - Store.fromProto(jobUpdateTask.getStore())); - if (job.isPresent()) { - if (job.get().getStatus() == JobStatus.RUNNING) { - ready.addAll(job.get().getFeatureSets()); - } else { - pending.addAll(job.get().getFeatureSets()); - } - } + for (JobUpdateTask task : jobUpdateTasks) { + getJob(task.getSource(), task.getStore()) + .ifPresent( + job -> { + if (job.isRunning()) { + ready.addAll(job.getFeatureSets()); + } else { + pending.addAll(job.getFeatureSets()); + } + }); } ready.removeAll(pending); ready.forEach( @@ -189,14 +187,16 @@ public Optional getJob(Source source, Store store) { List jobs = jobRepository.findBySourceIdAndStoreNameOrderByLastUpdatedDesc( source.getId(), store.getName()); - jobs = - jobs.stream() - .filter(job -> !JobStatus.getTerminalState().contains(job.getStatus())) - .collect(Collectors.toList()); - if (jobs.size() == 0) { + jobs = jobs.stream().filter(job -> !job.hasTerminated()).collect(Collectors.toList()); + if (jobs.isEmpty()) { return Optional.empty(); } // return the latest return Optional.of(jobs.get(0)); } + + // TODO: Put in a util somewhere? + private static List featureSetsFromProto(List protos) { + return protos.stream().map(FeatureSet::fromProto).collect(Collectors.toList()); + } } diff --git a/core/src/main/java/feast/core/service/JobService.java b/core/src/main/java/feast/core/service/JobService.java index c8fc5caf5e..33c118999c 100644 --- a/core/src/main/java/feast/core/service/JobService.java +++ b/core/src/main/java/feast/core/service/JobService.java @@ -159,6 +159,7 @@ public RestartIngestionJobResponse restartJob(RestartIngestionJobRequest request // check job exists Optional getJob = this.jobRepository.findById(request.getId()); if (getJob.isEmpty()) { + // FIXME: if getJob.isEmpty then constructing this error message will always throw an error... throw new NoSuchElementException( "Attempted to stop nonexistent job with id: " + getJob.get().getId()); } @@ -166,9 +167,7 @@ public RestartIngestionJobResponse restartJob(RestartIngestionJobRequest request // check job status is valid for restarting Job job = getJob.get(); JobStatus status = job.getStatus(); - if (JobStatus.getTransitionalStates().contains(status) - || JobStatus.getTerminalState().contains(status) - || status.equals(JobStatus.UNKNOWN)) { + if (status.isTransitional() || status.isTerminal() || status == JobStatus.UNKNOWN) { throw new UnsupportedOperationException( "Restarting a job with a transitional, terminal or unknown status is unsupported"); } @@ -209,11 +208,10 @@ public StopIngestionJobResponse stopJob(StopIngestionJobRequest request) // check job status is valid for stopping Job job = getJob.get(); JobStatus status = job.getStatus(); - if (JobStatus.getTerminalState().contains(status)) { + if (status.isTerminal()) { // do nothing - job is already stopped return StopIngestionJobResponse.newBuilder().build(); - } else if (JobStatus.getTransitionalStates().contains(status) - || status.equals(JobStatus.UNKNOWN)) { + } else if (status.isTransitional() || status == JobStatus.UNKNOWN) { throw new UnsupportedOperationException( "Stopping a job with a transitional or unknown status is unsupported"); } diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java index 5faf446a94..8d179baebb 100644 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java @@ -40,6 +40,8 @@ import feast.core.model.Source; import feast.core.model.Store; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Optional; import org.hamcrest.core.IsNull; import org.junit.Before; @@ -47,95 +49,71 @@ import org.mockito.Mock; public class JobUpdateTaskTest { + private static final Runner RUNNER = Runner.DATAFLOW; + + private static final FeatureSetProto.FeatureSet.Builder fsBuilder = + FeatureSetProto.FeatureSet.newBuilder().setMeta(FeatureSetMeta.newBuilder()); + private static final FeatureSetSpec.Builder specBuilder = + FeatureSetSpec.newBuilder().setProject("project1").setVersion(1); @Mock private JobManager jobManager; - private StoreProto.Store store; - private SourceProto.Source source; + private Store store; + private Source source; + private FeatureSet featureSet1; @Before public void setUp() { initMocks(this); + when(jobManager.getRunnerType()).thenReturn(RUNNER); + store = - StoreProto.Store.newBuilder() - .setName("test") - .setType(StoreType.REDIS) - .setRedisConfig(RedisConfig.newBuilder().build()) - .addSubscriptions( - Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) - .build(); + Store.fromProto( + StoreProto.Store.newBuilder() + .setName("test") + .setType(StoreType.REDIS) + .setRedisConfig(RedisConfig.newBuilder().build()) + .addSubscriptions( + Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) + .build()); source = - SourceProto.Source.newBuilder() - .setType(SourceType.KAFKA) - .setKafkaSourceConfig( - KafkaSourceConfig.newBuilder() - .setTopic("topic") - .setBootstrapServers("servers:9092") - .build()) - .build(); + Source.fromProto( + SourceProto.Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setTopic("topic") + .setBootstrapServers("servers:9092") + .build()) + .build()); + + featureSet1 = + FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet1")).build()); + featureSet1.setSource(source); + } + + Job makeJob(String extId, List featureSets, JobStatus status) { + return new Job("job", extId, RUNNER, source, store, featureSets, status); + } + + JobUpdateTask makeTask(List featureSets, Optional currentJob) { + return new JobUpdateTask(featureSets, source, store, currentJob, jobManager, 100L); } @Test public void shouldUpdateJobIfPresent() { - FeatureSetProto.FeatureSet featureSet1 = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - FeatureSetSpec.newBuilder() - .setSource(source) - .setProject("project1") - .setName("featureSet1") - .setVersion(1)) - .setMeta(FeatureSetMeta.newBuilder()) - .build(); - FeatureSetProto.FeatureSet featureSet2 = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - FeatureSetSpec.newBuilder() - .setSource(source) - .setProject("project1") - .setName("featureSet2") - .setVersion(1)) - .setMeta(FeatureSetMeta.newBuilder()) - .build(); - Job originalJob = - new Job( - "job", - "old_ext", - Runner.DATAFLOW, - feast.core.model.Source.fromProto(source), - feast.core.model.Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.RUNNING); - JobUpdateTask jobUpdateTask = - new JobUpdateTask( - Arrays.asList(featureSet1, featureSet2), - source, - store, - Optional.of(originalJob), - jobManager, - 100L); - Job submittedJob = - new Job( - "job", - "old_ext", - Runner.DATAFLOW, - feast.core.model.Source.fromProto(source), - feast.core.model.Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), - JobStatus.RUNNING); + FeatureSet featureSet2 = + FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet2")).build()); + List existingFeatureSetsPopulatedByJob = Collections.singletonList(featureSet1); + List newFeatureSetsPopulatedByJob = Arrays.asList(featureSet1, featureSet2); + + Job originalJob = makeJob("old_ext", existingFeatureSetsPopulatedByJob, JobStatus.RUNNING); + JobUpdateTask jobUpdateTask = makeTask(newFeatureSetsPopulatedByJob, Optional.of(originalJob)); + Job submittedJob = makeJob("old_ext", newFeatureSetsPopulatedByJob, JobStatus.RUNNING); - Job expected = - new Job( - "job", - "new_ext", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), - JobStatus.PENDING); + Job expected = makeJob("new_ext", newFeatureSetsPopulatedByJob, JobStatus.PENDING); when(jobManager.updateJob(submittedJob)).thenReturn(expected); - when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); Job actual = jobUpdateTask.call(); assertThat(actual, equalTo(expected)); @@ -143,43 +121,13 @@ public void shouldUpdateJobIfPresent() { @Test public void shouldCreateJobIfNotPresent() { - FeatureSetProto.FeatureSet featureSet1 = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - FeatureSetSpec.newBuilder() - .setSource(source) - .setProject("project1") - .setName("featureSet1") - .setVersion(1)) - .setMeta(FeatureSetMeta.newBuilder()) - .build(); - JobUpdateTask jobUpdateTask = - spy( - new JobUpdateTask( - Arrays.asList(featureSet1), source, store, Optional.empty(), jobManager, 100L)); + var featureSets = Collections.singletonList(featureSet1); + JobUpdateTask jobUpdateTask = spy(makeTask(featureSets, Optional.empty())); doReturn("job").when(jobUpdateTask).createJobId("KAFKA/servers:9092/topic", "test"); - Job expectedInput = - new Job( - "job", - "", - Runner.DATAFLOW, - feast.core.model.Source.fromProto(source), - feast.core.model.Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.PENDING); + Job expectedInput = makeJob("", featureSets, JobStatus.PENDING); + Job expected = makeJob("ext", featureSets, JobStatus.PENDING); - Job expected = - new Job( - "job", - "ext", - Runner.DATAFLOW, - feast.core.model.Source.fromProto(source), - feast.core.model.Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.RUNNING); - - when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); when(jobManager.startJob(expectedInput)).thenReturn(expected); Job actual = jobUpdateTask.call(); @@ -188,83 +136,25 @@ public void shouldCreateJobIfNotPresent() { @Test public void shouldUpdateJobStatusIfNotCreateOrUpdate() { - FeatureSetProto.FeatureSet featureSet1 = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - FeatureSetSpec.newBuilder() - .setSource(source) - .setProject("project1") - .setName("featureSet1") - .setVersion(1)) - .setMeta(FeatureSetMeta.newBuilder()) - .build(); - Job originalJob = - new Job( - "job", - "ext", - Runner.DATAFLOW, - feast.core.model.Source.fromProto(source), - feast.core.model.Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.RUNNING); - JobUpdateTask jobUpdateTask = - new JobUpdateTask( - Arrays.asList(featureSet1), source, store, Optional.of(originalJob), jobManager, 100L); + var featureSets = Collections.singletonList(featureSet1); + Job originalJob = makeJob("ext", featureSets, JobStatus.RUNNING); + JobUpdateTask jobUpdateTask = makeTask(featureSets, Optional.of(originalJob)); when(jobManager.getJobStatus(originalJob)).thenReturn(JobStatus.ABORTING); - Job expected = - new Job( - "job", - "ext", - Runner.DATAFLOW, - Source.fromProto(source), - Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.ABORTING); - Job actual = jobUpdateTask.call(); + Job updated = jobUpdateTask.call(); - assertThat(actual, equalTo(expected)); + assertThat(updated.getStatus(), equalTo(JobStatus.ABORTING)); } @Test public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { - FeatureSetProto.FeatureSet featureSet1 = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - FeatureSetSpec.newBuilder() - .setSource(source) - .setProject("project1") - .setName("featureSet1") - .setVersion(1)) - .setMeta(FeatureSetMeta.newBuilder()) - .build(); - JobUpdateTask jobUpdateTask = - spy( - new JobUpdateTask( - Arrays.asList(featureSet1), source, store, Optional.empty(), jobManager, 100L)); + var featureSets = Collections.singletonList(featureSet1); + JobUpdateTask jobUpdateTask = spy(makeTask(featureSets, Optional.empty())); doReturn("job").when(jobUpdateTask).createJobId("KAFKA/servers:9092/topic", "test"); - Job expectedInput = - new Job( - "job", - "", - Runner.DATAFLOW, - feast.core.model.Source.fromProto(source), - feast.core.model.Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.PENDING); - - Job expected = - new Job( - "job", - "", - Runner.DATAFLOW, - feast.core.model.Source.fromProto(source), - feast.core.model.Store.fromProto(store), - Arrays.asList(FeatureSet.fromProto(featureSet1)), - JobStatus.ERROR); + Job expectedInput = makeJob("", featureSets, JobStatus.PENDING); + Job expected = makeJob("", featureSets, JobStatus.ERROR); - when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); when(jobManager.startJob(expectedInput)) .thenThrow(new RuntimeException("Something went wrong")); @@ -274,21 +164,13 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { @Test public void shouldTimeout() { - FeatureSetProto.FeatureSet featureSet1 = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - FeatureSetSpec.newBuilder() - .setSource(source) - .setProject("project1") - .setName("featureSet1") - .setVersion(1)) - .setMeta(FeatureSetMeta.newBuilder()) - .build(); - + var featureSets = Collections.singletonList(featureSet1); + var timeoutSeconds = 0L; JobUpdateTask jobUpdateTask = spy( new JobUpdateTask( - Arrays.asList(featureSet1), source, store, Optional.empty(), jobManager, 0L)); + featureSets, source, store, Optional.empty(), jobManager, timeoutSeconds)); + Job actual = jobUpdateTask.call(); assertThat(actual, is(IsNull.nullValue())); } diff --git a/core/src/test/java/feast/core/model/JobStatusTest.java b/core/src/test/java/feast/core/model/JobStatusTest.java new file mode 100644 index 0000000000..f5c8839386 --- /dev/null +++ b/core/src/test/java/feast/core/model/JobStatusTest.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.model; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +public class JobStatusTest { + + @Test + public void isTerminalReturnsTrueForJobStatusWithTerminalState() { + JobStatus.getTerminalStates() + .forEach( + status -> { + assertThat(status.isTerminal(), is(true)); + assertThat(status.isTransitional(), is(false)); + }); + } + + @Test + public void isTransitionalReturnsTrueForJobStatusWithTransitionalState() { + JobStatus.getTransitionalStates() + .forEach( + status -> { + assertThat(status.isTransitional(), is(true)); + assertThat(status.isTerminal(), is(false)); + }); + } +} diff --git a/core/src/test/java/feast/core/service/JobServiceTest.java b/core/src/test/java/feast/core/service/JobServiceTest.java index 6f34205bbf..3d527a90a8 100644 --- a/core/src/test/java/feast/core/service/JobServiceTest.java +++ b/core/src/test/java/feast/core/service/JobServiceTest.java @@ -341,10 +341,9 @@ public void testStopJobForId() { } @Test - public void testStopAlreadyStop() { + public void testStopAlreadyStopped() { // check that stop jobs does not trying to stop jobs that are not already stopped - List doNothingStatuses = new ArrayList<>(); - doNothingStatuses.addAll(JobStatus.getTerminalState()); + List doNothingStatuses = new ArrayList<>(JobStatus.getTerminalStates()); JobStatus prevStatus = this.job.getStatus(); for (JobStatus status : doNothingStatuses) {