diff --git a/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java index 41431a8786..a8b1c5d274 100644 --- a/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java +++ b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java @@ -43,7 +43,8 @@ public ConsolidatedJobStrategy(JobRepository jobRepository) { } @Override - public Job getOrCreateJob(SourceProto.Source source, Set stores) { + public Job getOrCreateJob( + SourceProto.Source source, Set stores, Map labels) { return jobRepository .findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( source, null, JobStatus.getTerminalStates()) @@ -55,6 +56,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set store .setStores( stores.stream() .collect(Collectors.toMap(StoreProto.Store::getName, s -> s))) + .setLabels(labels) .build()); } diff --git a/core/src/main/java/feast/core/job/JobGroupingStrategy.java b/core/src/main/java/feast/core/job/JobGroupingStrategy.java index ddd1a70294..a8cd0d3cd7 100644 --- a/core/src/main/java/feast/core/job/JobGroupingStrategy.java +++ b/core/src/main/java/feast/core/job/JobGroupingStrategy.java @@ -19,6 +19,7 @@ import feast.core.model.Job; import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; +import java.util.Map; import java.util.Set; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; @@ -29,7 +30,8 @@ */ public interface JobGroupingStrategy { /** Get the non terminated ingestion job ingesting for given source and stores. */ - Job getOrCreateJob(SourceProto.Source source, Set stores); + Job getOrCreateJob( + SourceProto.Source source, Set stores, Map labels); /** Create unique JobId that would be used as key in communications with JobRunner */ String createJobId(Job job); /* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/ diff --git a/core/src/main/java/feast/core/job/JobPerStoreStrategy.java b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java index 6f6b25e8db..f967cefac0 100644 --- a/core/src/main/java/feast/core/job/JobPerStoreStrategy.java +++ b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java @@ -23,6 +23,7 @@ import feast.proto.core.StoreProto; import java.time.Instant; import java.util.ArrayList; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -42,7 +43,8 @@ public JobPerStoreStrategy(JobRepository jobRepository) { } @Override - public Job getOrCreateJob(SourceProto.Source source, Set stores) { + public Job getOrCreateJob( + SourceProto.Source source, Set stores, Map labels) { ArrayList storesList = Lists.newArrayList(stores); if (storesList.size() != 1) { throw new RuntimeException("Only one store is acceptable in JobPerStore Strategy"); @@ -60,6 +62,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set store .setStores( stores.stream() .collect(Collectors.toMap(StoreProto.Store::getName, s -> s))) + .setLabels(labels) .build()); } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index d536a84710..1506905c7a 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -94,12 +94,6 @@ public static DataflowJobManager of( Map jobSelector, Dataflow dataflow) { - // Retrieve labels to extend them with jobSelector - Map jobLabels = new HashMap<>(runnerConfigOptions.getLabelsMap()); - // Merge Job Selector Labels into runner options - jobSelector.forEach(jobLabels::put); - runnerConfigOptions = runnerConfigOptions.toBuilder().putAllLabels(jobLabels).build(); - defaultOptions = new DataflowRunnerConfig(runnerConfigOptions); this.dataflow = dataflow; this.metrics = metricsProperties; @@ -130,7 +124,11 @@ public Job startJob(Job job) { try { String extId = submitDataflowJob( - job.getId(), job.getSource(), new HashSet<>(job.getStores().values()), false); + job.getId(), + job.getSource(), + new HashSet<>(job.getStores().values()), + job.getLabels(), + false); job.setExtId(extId); return job; @@ -315,9 +313,13 @@ public List listRunningJobs() { } private String submitDataflowJob( - String jobName, SourceProto.Source source, Set sinks, boolean update) { + String jobName, + SourceProto.Source source, + Set sinks, + Map labels, + boolean update) { try { - ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, update); + ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, labels, update); DataflowPipelineJob pipelineResult = runPipeline(pipelineOptions); String jobId = waitForJobToRun(pipelineResult); return jobId; @@ -328,7 +330,11 @@ private String submitDataflowJob( } private ImportOptions getPipelineOptions( - String jobName, SourceProto.Source source, Set sinks, boolean update) + String jobName, + SourceProto.Source source, + Set sinks, + Map labels, + boolean update) throws IOException, IllegalAccessException { ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class); @@ -347,6 +353,12 @@ private ImportOptions getPipelineOptions( pipelineOptions.setJobName(jobName); pipelineOptions.setFilesToStage( detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader())); + + // Merge common labels with job's labels + Map mergedLabels = new HashMap<>(defaultOptions.getLabels()); + labels.forEach(mergedLabels::put); + pipelineOptions.setLabels(mergedLabels); + if (metrics.isEnabled()) { pipelineOptions.setMetricsExporterType(metrics.getType()); if (metrics.getType().equals("statsd")) { diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 7257d750f2..d67d16cba6 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -53,10 +53,14 @@ public JobStatus getStatus() { public abstract Map getFeatureSetDeliveryStatuses(); + // Job's labels + public abstract Map getLabels(); + public static Builder builder() { return new AutoValue_Job.Builder() .setFeatureSetDeliveryStatuses(new HashMap<>()) - .setStores(new HashMap<>()); + .setStores(new HashMap<>()) + .setLabels(new HashMap<>()); } @AutoValue.Builder @@ -70,6 +74,8 @@ public interface Builder { Builder setFeatureSetDeliveryStatuses( Map statuses); + Builder setLabels(Map labels); + Job build(); } @@ -164,12 +170,13 @@ public IngestionJobProto.IngestionJob toProto() { return ingestJob; } - public Job cloneWithId(String newJobId) { + public Job cloneWithIdAndLabels(String newJobId, Map labels) { return Job.builder() .setSource(this.getSource()) .setFeatureSetDeliveryStatuses(new HashMap<>(this.getFeatureSetDeliveryStatuses())) .setStores(new HashMap<>(this.getStores())) .setId(newJobId) + .setLabels(labels) .build(); } diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 0e7e67d97f..c2e17048fc 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -59,6 +59,7 @@ public class JobCoordinatorService { private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5; + public static final String VERSION_LABEL = "feast_version"; private final JobRepository jobRepository; private final SpecService specService; @@ -68,6 +69,8 @@ public class JobCoordinatorService { private final KafkaTemplate specPublisher; private final List featureSetSubscriptions; private final List whitelistedStores; + private final Map jobLabels; + private final String currentVersion; @Autowired public JobCoordinatorService( @@ -88,6 +91,9 @@ public JobCoordinatorService( .map(JobProperties.CoordinatorProperties.FeatureSetSelector::toSubscription) .collect(Collectors.toList()); this.whitelistedStores = feastProperties.getJobs().getCoordinator().getWhitelistedStores(); + this.currentVersion = feastProperties.getVersion(); + this.jobLabels = new HashMap<>(feastProperties.getJobs().getCoordinator().getJobSelector()); + this.jobLabels.put(VERSION_LABEL, this.currentVersion); } /** @@ -161,7 +167,7 @@ List makeJobUpdateTasks(Iterable>> sourceToStor Source source = mapping.getKey(); Set stores = mapping.getValue(); - Job job = groupingStrategy.getOrCreateJob(source, stores); + Job job = groupingStrategy.getOrCreateJob(source, stores, this.jobLabels); if (job.isDeployed()) { if (!job.isRunning()) { @@ -177,7 +183,7 @@ List makeJobUpdateTasks(Iterable>> sourceToStor // it would make sense to spawn clone of current job // and terminate old version on the next Poll. // Both jobs should be in the same consumer group and not conflict with each other - job = job.cloneWithId(groupingStrategy.createJobId(job)); + job = job.cloneWithIdAndLabels(groupingStrategy.createJobId(job), this.jobLabels); job.addAllStores(stores); isSafeToStopJobs = false; @@ -214,8 +220,9 @@ List makeJobUpdateTasks(Iterable>> sourceToStor /** * Decides whether we need to upgrade (restart) given job. Since we send updated FeatureSets to * IngestionJob via Kafka, and there's only one source per job (if it change - new job would be - * created) the only things that can cause upgrade here are stores: new stores can be added, or - * existing stores will change subscriptions. + * created) main trigger that can cause upgrade here are stores: new stores can be added, or + * existing stores will change subscriptions. Another trigger is release of new version: current + * version is being compared with job's version stored in labels. * * @param job {@link Job} to check * @param stores Set of {@link Store} new version of stores (vs current version job.getStores()) @@ -227,6 +234,10 @@ private boolean jobRequiresUpgrade(Job job, Set stores) { return true; } + if (!this.currentVersion.equals(job.getLabels().get(VERSION_LABEL))) { + return true; + } + return false; } @@ -257,7 +268,9 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { // Add featureSet to allocated job if not allocated before for (Pair> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) { - Job job = groupingStrategy.getOrCreateJob(jobArgs.getLeft(), jobArgs.getRight()); + Job job = + groupingStrategy.getOrCreateJob( + jobArgs.getLeft(), jobArgs.getRight(), Collections.emptyMap()); if (!job.isRunning()) { continue; } diff --git a/core/src/test/java/feast/core/service/JobCoordinatorIT.java b/core/src/test/java/feast/core/service/JobCoordinatorIT.java index 557f043fe8..7f9233f832 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorIT.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorIT.java @@ -22,6 +22,7 @@ import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; import static org.hamcrest.collection.IsCollectionWithSize.hasSize; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.hamcrest.collection.IsMapContaining.hasEntry; import static org.hamcrest.collection.IsMapWithSize.aMapWithSize; import static org.hamcrest.core.AllOf.allOf; @@ -60,6 +61,7 @@ "feast.jobs.coordinator.feature-set-selector[0].project=default", "feast.jobs.coordinator.whitelisted-stores[0]=test-store", "feast.jobs.coordinator.whitelisted-stores[1]=new-store", + "feast.version=1.0.0" }) public class JobCoordinatorIT extends BaseIT { @Autowired private FakeJobManager jobManager; @@ -188,6 +190,33 @@ public void shouldNotCreateJobForUnwantedFeatureSet() { assertThat(jobManager.getAllJobs(), hasSize(0)); } + @Test + @SneakyThrows + public void shouldRestartJobWithOldVersion() { + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test")); + + Job job = + Job.builder() + .setSource(DataGenerator.getDefaultSource()) + .setStores( + ImmutableMap.of( + DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore())) + .setId("some-running-id") + .setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "0.9.9")) + .build(); + + jobManager.startJob(job); + jobRepository.add(job); + + await().until(() -> jobManager.getJobStatus(job), equalTo(JobStatus.ABORTED)); + + Job replacement = jobRepository.findByStatus(JobStatus.RUNNING).get(0); + assertThat(replacement.getSource(), equalTo(job.getSource())); + assertThat(replacement.getStores(), equalTo(job.getStores())); + assertThat(replacement.getLabels(), hasEntry(JobCoordinatorService.VERSION_LABEL, "1.0.0")); + } + @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @Nested class SpecNotificationFlow extends SequentialFlow { @@ -212,6 +241,7 @@ public void shouldSendNewSpec() { ImmutableMap.of( DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore())) .setId("some-running-id") + .setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "1.0.0")) .build(); jobManager.startJob(job); diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index faaca2bb63..a6f206c0f3 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -85,9 +85,11 @@ public void setUp() { coordinatorProperties.setFeatureSetSelector(ImmutableList.of(selector)); coordinatorProperties.setWhitelistedStores( ImmutableList.of("test-store", "test", "test-1", "test-2", "normal-store")); + coordinatorProperties.setJobSelector(ImmutableMap.of("application", "feast")); jobProperties.setCoordinator(coordinatorProperties); feastProperties.setJobs(jobProperties); + feastProperties.setVersion("1.0.0"); TestUtil.setupAuditLogger();