From a9da2caadc5b72e9adbfe0ce5dc3ceb8ab97aec1 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 14 Aug 2020 14:49:19 +0800 Subject: [PATCH 1/7] blacklist -> whitelist From d7b8046a569ddf3a7b2ea0bbb85195852dd04194 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 14 Aug 2020 15:05:20 +0800 Subject: [PATCH 2/7] add whitelisted property From 75b3086f3dfb63830a15508b93bc0f8b00d860bc Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 14 Aug 2020 15:29:29 +0800 Subject: [PATCH 3/7] coordinator properties in e2e tests From d96a1b0ab8bdf4f9c19ea3034476ceb5dc531903 Mon Sep 17 00:00:00 2001 From: pyalex Date: Thu, 13 Aug 2020 13:48:49 +0800 Subject: [PATCH 4/7] move labels to job & jobcoordinator --- .../core/job/ConsolidatedJobStrategy.java | 4 ++- .../feast/core/job/JobGroupingStrategy.java | 4 ++- .../feast/core/job/JobPerStoreStrategy.java | 5 ++- .../core/job/dataflow/DataflowJobManager.java | 32 +++++++++++++------ core/src/main/java/feast/core/model/Job.java | 11 +++++-- .../core/service/JobCoordinatorService.java | 23 ++++++++++--- .../feast/core/service/JobCoordinatorIT.java | 2 ++ 7 files changed, 61 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java index 41431a8786..a8b1c5d274 100644 --- a/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java +++ b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java @@ -43,7 +43,8 @@ public ConsolidatedJobStrategy(JobRepository jobRepository) { } @Override - public Job getOrCreateJob(SourceProto.Source source, Set stores) { + public Job getOrCreateJob( + SourceProto.Source source, Set stores, Map labels) { return jobRepository .findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( source, null, JobStatus.getTerminalStates()) @@ -55,6 +56,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set store .setStores( stores.stream() .collect(Collectors.toMap(StoreProto.Store::getName, s -> s))) + .setLabels(labels) .build()); } diff --git a/core/src/main/java/feast/core/job/JobGroupingStrategy.java b/core/src/main/java/feast/core/job/JobGroupingStrategy.java index ddd1a70294..a8cd0d3cd7 100644 --- a/core/src/main/java/feast/core/job/JobGroupingStrategy.java +++ b/core/src/main/java/feast/core/job/JobGroupingStrategy.java @@ -19,6 +19,7 @@ import feast.core.model.Job; import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; +import java.util.Map; import java.util.Set; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; @@ -29,7 +30,8 @@ */ public interface JobGroupingStrategy { /** Get the non terminated ingestion job ingesting for given source and stores. */ - Job getOrCreateJob(SourceProto.Source source, Set stores); + Job getOrCreateJob( + SourceProto.Source source, Set stores, Map labels); /** Create unique JobId that would be used as key in communications with JobRunner */ String createJobId(Job job); /* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/ diff --git a/core/src/main/java/feast/core/job/JobPerStoreStrategy.java b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java index 6f6b25e8db..f967cefac0 100644 --- a/core/src/main/java/feast/core/job/JobPerStoreStrategy.java +++ b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java @@ -23,6 +23,7 @@ import feast.proto.core.StoreProto; import java.time.Instant; import java.util.ArrayList; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -42,7 +43,8 @@ public JobPerStoreStrategy(JobRepository jobRepository) { } @Override - public Job getOrCreateJob(SourceProto.Source source, Set stores) { + public Job getOrCreateJob( + SourceProto.Source source, Set stores, Map labels) { ArrayList storesList = Lists.newArrayList(stores); if (storesList.size() != 1) { throw new RuntimeException("Only one store is acceptable in JobPerStore Strategy"); @@ -60,6 +62,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set store .setStores( stores.stream() .collect(Collectors.toMap(StoreProto.Store::getName, s -> s))) + .setLabels(labels) .build()); } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index d536a84710..1506905c7a 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -94,12 +94,6 @@ public static DataflowJobManager of( Map jobSelector, Dataflow dataflow) { - // Retrieve labels to extend them with jobSelector - Map jobLabels = new HashMap<>(runnerConfigOptions.getLabelsMap()); - // Merge Job Selector Labels into runner options - jobSelector.forEach(jobLabels::put); - runnerConfigOptions = runnerConfigOptions.toBuilder().putAllLabels(jobLabels).build(); - defaultOptions = new DataflowRunnerConfig(runnerConfigOptions); this.dataflow = dataflow; this.metrics = metricsProperties; @@ -130,7 +124,11 @@ public Job startJob(Job job) { try { String extId = submitDataflowJob( - job.getId(), job.getSource(), new HashSet<>(job.getStores().values()), false); + job.getId(), + job.getSource(), + new HashSet<>(job.getStores().values()), + job.getLabels(), + false); job.setExtId(extId); return job; @@ -315,9 +313,13 @@ public List listRunningJobs() { } private String submitDataflowJob( - String jobName, SourceProto.Source source, Set sinks, boolean update) { + String jobName, + SourceProto.Source source, + Set sinks, + Map labels, + boolean update) { try { - ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, update); + ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, labels, update); DataflowPipelineJob pipelineResult = runPipeline(pipelineOptions); String jobId = waitForJobToRun(pipelineResult); return jobId; @@ -328,7 +330,11 @@ private String submitDataflowJob( } private ImportOptions getPipelineOptions( - String jobName, SourceProto.Source source, Set sinks, boolean update) + String jobName, + SourceProto.Source source, + Set sinks, + Map labels, + boolean update) throws IOException, IllegalAccessException { ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class); @@ -347,6 +353,12 @@ private ImportOptions getPipelineOptions( pipelineOptions.setJobName(jobName); pipelineOptions.setFilesToStage( detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader())); + + // Merge common labels with job's labels + Map mergedLabels = new HashMap<>(defaultOptions.getLabels()); + labels.forEach(mergedLabels::put); + pipelineOptions.setLabels(mergedLabels); + if (metrics.isEnabled()) { pipelineOptions.setMetricsExporterType(metrics.getType()); if (metrics.getType().equals("statsd")) { diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 7257d750f2..d67d16cba6 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -53,10 +53,14 @@ public JobStatus getStatus() { public abstract Map getFeatureSetDeliveryStatuses(); + // Job's labels + public abstract Map getLabels(); + public static Builder builder() { return new AutoValue_Job.Builder() .setFeatureSetDeliveryStatuses(new HashMap<>()) - .setStores(new HashMap<>()); + .setStores(new HashMap<>()) + .setLabels(new HashMap<>()); } @AutoValue.Builder @@ -70,6 +74,8 @@ public interface Builder { Builder setFeatureSetDeliveryStatuses( Map statuses); + Builder setLabels(Map labels); + Job build(); } @@ -164,12 +170,13 @@ public IngestionJobProto.IngestionJob toProto() { return ingestJob; } - public Job cloneWithId(String newJobId) { + public Job cloneWithIdAndLabels(String newJobId, Map labels) { return Job.builder() .setSource(this.getSource()) .setFeatureSetDeliveryStatuses(new HashMap<>(this.getFeatureSetDeliveryStatuses())) .setStores(new HashMap<>(this.getStores())) .setId(newJobId) + .setLabels(labels) .build(); } diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 0e7e67d97f..1e51a751ab 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -59,6 +59,7 @@ public class JobCoordinatorService { private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5; + private final String VERSION_LABEL = "FeastVersion"; private final JobRepository jobRepository; private final SpecService specService; @@ -68,6 +69,8 @@ public class JobCoordinatorService { private final KafkaTemplate specPublisher; private final List featureSetSubscriptions; private final List whitelistedStores; + private final Map jobLabels; + private final String currentVersion; @Autowired public JobCoordinatorService( @@ -88,6 +91,9 @@ public JobCoordinatorService( .map(JobProperties.CoordinatorProperties.FeatureSetSelector::toSubscription) .collect(Collectors.toList()); this.whitelistedStores = feastProperties.getJobs().getCoordinator().getWhitelistedStores(); + this.currentVersion = feastProperties.getVersion(); + this.jobLabels = new HashMap<>(feastProperties.getJobs().getCoordinator().getJobSelector()); + this.jobLabels.put(VERSION_LABEL, this.currentVersion); } /** @@ -161,7 +167,7 @@ List makeJobUpdateTasks(Iterable>> sourceToStor Source source = mapping.getKey(); Set stores = mapping.getValue(); - Job job = groupingStrategy.getOrCreateJob(source, stores); + Job job = groupingStrategy.getOrCreateJob(source, stores, this.jobLabels); if (job.isDeployed()) { if (!job.isRunning()) { @@ -177,7 +183,7 @@ List makeJobUpdateTasks(Iterable>> sourceToStor // it would make sense to spawn clone of current job // and terminate old version on the next Poll. // Both jobs should be in the same consumer group and not conflict with each other - job = job.cloneWithId(groupingStrategy.createJobId(job)); + job = job.cloneWithIdAndLabels(groupingStrategy.createJobId(job), this.jobLabels); job.addAllStores(stores); isSafeToStopJobs = false; @@ -214,8 +220,9 @@ List makeJobUpdateTasks(Iterable>> sourceToStor /** * Decides whether we need to upgrade (restart) given job. Since we send updated FeatureSets to * IngestionJob via Kafka, and there's only one source per job (if it change - new job would be - * created) the only things that can cause upgrade here are stores: new stores can be added, or - * existing stores will change subscriptions. + * created) main trigger that can cause upgrade here are stores: new stores can be added, or + * existing stores will change subscriptions. Another trigger is release of new version: current + * version is being compared with job's version stored in labels. * * @param job {@link Job} to check * @param stores Set of {@link Store} new version of stores (vs current version job.getStores()) @@ -227,6 +234,10 @@ private boolean jobRequiresUpgrade(Job job, Set stores) { return true; } + if (!this.currentVersion.equals(job.getLabels().get(VERSION_LABEL))) { + return true; + } + return false; } @@ -257,7 +268,9 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { // Add featureSet to allocated job if not allocated before for (Pair> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) { - Job job = groupingStrategy.getOrCreateJob(jobArgs.getLeft(), jobArgs.getRight()); + Job job = + groupingStrategy.getOrCreateJob( + jobArgs.getLeft(), jobArgs.getRight(), Collections.emptyMap()); if (!job.isRunning()) { continue; } diff --git a/core/src/test/java/feast/core/service/JobCoordinatorIT.java b/core/src/test/java/feast/core/service/JobCoordinatorIT.java index 557f043fe8..e0a6735528 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorIT.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorIT.java @@ -60,6 +60,7 @@ "feast.jobs.coordinator.feature-set-selector[0].project=default", "feast.jobs.coordinator.whitelisted-stores[0]=test-store", "feast.jobs.coordinator.whitelisted-stores[1]=new-store", + "feast.version=1.0.0" }) public class JobCoordinatorIT extends BaseIT { @Autowired private FakeJobManager jobManager; @@ -212,6 +213,7 @@ public void shouldSendNewSpec() { ImmutableMap.of( DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore())) .setId("some-running-id") + .setLabels(ImmutableMap.of("version", "1.0.0")) .build(); jobManager.startJob(job); From ff73df755ca956dc10af42b497dc63bca24a558e Mon Sep 17 00:00:00 2001 From: pyalex Date: Thu, 13 Aug 2020 14:06:14 +0800 Subject: [PATCH 5/7] it test for version update --- .../feast/core/service/JobCoordinatorIT.java | 28 +++++++++++++++++++ .../service/JobCoordinatorServiceTest.java | 2 ++ 2 files changed, 30 insertions(+) diff --git a/core/src/test/java/feast/core/service/JobCoordinatorIT.java b/core/src/test/java/feast/core/service/JobCoordinatorIT.java index e0a6735528..8ba0a7459c 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorIT.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorIT.java @@ -22,6 +22,7 @@ import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; import static org.hamcrest.collection.IsCollectionWithSize.hasSize; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.hamcrest.collection.IsMapContaining.hasEntry; import static org.hamcrest.collection.IsMapWithSize.aMapWithSize; import static org.hamcrest.core.AllOf.allOf; @@ -189,6 +190,33 @@ public void shouldNotCreateJobForUnwantedFeatureSet() { assertThat(jobManager.getAllJobs(), hasSize(0)); } + @Test + @SneakyThrows + public void shouldRestartJobWithOldVersion() { + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test")); + + Job job = + Job.builder() + .setSource(DataGenerator.getDefaultSource()) + .setStores( + ImmutableMap.of( + DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore())) + .setId("some-running-id") + .setLabels(ImmutableMap.of("version", "0.9.9")) + .build(); + + jobManager.startJob(job); + jobRepository.add(job); + + await().until(() -> jobManager.getJobStatus(job), equalTo(JobStatus.ABORTED)); + + Job replacement = jobRepository.findByStatus(JobStatus.RUNNING).get(0); + assertThat(replacement.getSource(), equalTo(job.getSource())); + assertThat(replacement.getStores(), equalTo(job.getStores())); + assertThat(replacement.getLabels(), hasEntry("version", "1.0.0")); + } + @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @Nested class SpecNotificationFlow extends SequentialFlow { diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index faaca2bb63..a6f206c0f3 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -85,9 +85,11 @@ public void setUp() { coordinatorProperties.setFeatureSetSelector(ImmutableList.of(selector)); coordinatorProperties.setWhitelistedStores( ImmutableList.of("test-store", "test", "test-1", "test-2", "normal-store")); + coordinatorProperties.setJobSelector(ImmutableMap.of("application", "feast")); jobProperties.setCoordinator(coordinatorProperties); feastProperties.setJobs(jobProperties); + feastProperties.setVersion("1.0.0"); TestUtil.setupAuditLogger(); From 92964f23a39b98116980d3dd5b4c849a978e590c Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 14 Aug 2020 15:20:17 +0800 Subject: [PATCH 6/7] version label constant --- .../main/java/feast/core/service/JobCoordinatorService.java | 2 +- core/src/test/java/feast/core/service/JobCoordinatorIT.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 1e51a751ab..46e58528dd 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -59,7 +59,7 @@ public class JobCoordinatorService { private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5; - private final String VERSION_LABEL = "FeastVersion"; + public static final String VERSION_LABEL = "FeastVersion"; private final JobRepository jobRepository; private final SpecService specService; diff --git a/core/src/test/java/feast/core/service/JobCoordinatorIT.java b/core/src/test/java/feast/core/service/JobCoordinatorIT.java index 8ba0a7459c..7f9233f832 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorIT.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorIT.java @@ -203,7 +203,7 @@ public void shouldRestartJobWithOldVersion() { ImmutableMap.of( DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore())) .setId("some-running-id") - .setLabels(ImmutableMap.of("version", "0.9.9")) + .setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "0.9.9")) .build(); jobManager.startJob(job); @@ -214,7 +214,7 @@ public void shouldRestartJobWithOldVersion() { Job replacement = jobRepository.findByStatus(JobStatus.RUNNING).get(0); assertThat(replacement.getSource(), equalTo(job.getSource())); assertThat(replacement.getStores(), equalTo(job.getStores())); - assertThat(replacement.getLabels(), hasEntry("version", "1.0.0")); + assertThat(replacement.getLabels(), hasEntry(JobCoordinatorService.VERSION_LABEL, "1.0.0")); } @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -241,7 +241,7 @@ public void shouldSendNewSpec() { ImmutableMap.of( DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore())) .setId("some-running-id") - .setLabels(ImmutableMap.of("version", "1.0.0")) + .setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "1.0.0")) .build(); jobManager.startJob(job); From 209a65df3c3040a5c60a06d37d2d67ebef3b7afa Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 14 Aug 2020 16:32:28 +0800 Subject: [PATCH 7/7] fix version label --- .../src/main/java/feast/core/service/JobCoordinatorService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 46e58528dd..c2e17048fc 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -59,7 +59,7 @@ public class JobCoordinatorService { private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5; - public static final String VERSION_LABEL = "FeastVersion"; + public static final String VERSION_LABEL = "feast_version"; private final JobRepository jobRepository; private final SpecService specService;