From 7e3638ac8ceaf5304cec4d1af0173e2f9e29e39a Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 30 Jun 2020 12:02:43 +0300 Subject: [PATCH 1/6] fix repositories inconsistency --- .../main/java/feast/core/service/JobCoordinatorService.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index c7862a386e..24575d93aa 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -378,7 +378,8 @@ public void notifyJobsWhenFeatureSetUpdated() { // FeatureSet). // We now set status to IN_PROGRESS, so listenAckFromJobs would be able to // monitor delivery progress for each new version. - fs.getJobStatuses().stream() + Set jobStatuses = fs.getJobStatuses(); + jobStatuses.stream() .filter(s -> s.getJob().isRunning()) .forEach( jobStatus -> { @@ -386,7 +387,8 @@ public void notifyJobsWhenFeatureSetUpdated() { FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); jobStatus.setVersion(fs.getVersion()); }); - featureSetRepository.saveAndFlush(fs); + jobStatusRepository.saveAll(jobStatuses); + jobStatusRepository.flush(); }); } From 1d190d896a92d92449ba841cd91414724f12626c Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 30 Jun 2020 12:35:47 +0300 Subject: [PATCH 2/6] more stable featureSetJobStatus creation --- .../core/service/JobCoordinatorService.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 24575d93aa..5f3af180e3 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -241,8 +241,10 @@ private boolean jobRequiresUpgrade(Job job, Set stores) { * @param featureSet featureSet {@link FeatureSet} to find jobs and allocate */ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { - Set toAdd = new HashSet<>(); - Set existing = featureSet.getJobStatuses(); + Map current = new HashMap<>(); + Map existing = + featureSet.getJobStatuses().stream() + .collect(Collectors.toMap(FeatureSetJobStatus::getId, s -> s)); Stream> jobArgsStream = getAllStores().stream() @@ -265,14 +267,16 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { status.setJob(job); status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - toAdd.add(status); + current.put(status.getId(), status); } - Set toDelete = Sets.difference(existing, toAdd); - toAdd = Sets.difference(toAdd, existing); + Set toDelete = + Sets.difference(existing.keySet(), current.keySet()); + Set toAdd = + Sets.difference(current.keySet(), existing.keySet()); - jobStatusRepository.deleteAll(toDelete); - jobStatusRepository.saveAll(toAdd); + jobStatusRepository.deleteAll(toDelete.stream().map(existing::get).collect(Collectors.toSet())); + jobStatusRepository.saveAll(toAdd.stream().map(current::get).collect(Collectors.toSet())); jobStatusRepository.flush(); return featureSet; } From 3ad2cf28d71f891a256eb426171c95f53a9e80f5 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 30 Jun 2020 12:42:41 +0300 Subject: [PATCH 3/6] populate Id manually --- core/src/main/java/feast/core/model/FeatureSetJobStatus.java | 3 +++ .../main/java/feast/core/service/JobCoordinatorService.java | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java index 79a3b51da4..8f1c7c304c 100644 --- a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java +++ b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java @@ -21,6 +21,8 @@ import java.io.Serializable; import javax.persistence.*; import javax.persistence.Entity; + +import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -37,6 +39,7 @@ public class FeatureSetJobStatus { @Embeddable @EqualsAndHashCode + @AllArgsConstructor public static class FeatureSetJobStatusKey implements Serializable { public FeatureSetJobStatusKey() {} diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 5f3af180e3..621742b1f4 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -267,7 +267,7 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { status.setJob(job); status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - current.put(status.getId(), status); + current.put(new FeatureSetJobStatus.FeatureSetJobStatusKey(job.getId(), featureSet.getId()), status); } Set toDelete = From f09345f9af159ba4f5402b704a3c078fc9feca36 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 30 Jun 2020 12:49:19 +0300 Subject: [PATCH 4/6] lint --- .../main/java/feast/core/service/JobCoordinatorService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 621742b1f4..59cc619fa6 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -267,7 +267,8 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { status.setJob(job); status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - current.put(new FeatureSetJobStatus.FeatureSetJobStatusKey(job.getId(), featureSet.getId()), status); + current.put( + new FeatureSetJobStatus.FeatureSetJobStatusKey(job.getId(), featureSet.getId()), status); } Set toDelete = From 57a238271e7a618df64553727575a6006e80231b Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 30 Jun 2020 12:51:42 +0300 Subject: [PATCH 5/6] lint --- core/src/main/java/feast/core/model/FeatureSetJobStatus.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java index 8f1c7c304c..a7281a552b 100644 --- a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java +++ b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java @@ -21,7 +21,6 @@ import java.io.Serializable; import javax.persistence.*; import javax.persistence.Entity; - import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; From ca88e0a7cebd7da2bce6a83ca24b6d1ba9e05133 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 30 Jun 2020 13:01:02 +0300 Subject: [PATCH 6/6] fix TestUtil --- core/src/test/java/feast/core/util/TestUtil.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/feast/core/util/TestUtil.java b/core/src/test/java/feast/core/util/TestUtil.java index 5b3bf6b10c..b93aaa8cd4 100644 --- a/core/src/test/java/feast/core/util/TestUtil.java +++ b/core/src/test/java/feast/core/util/TestUtil.java @@ -141,6 +141,7 @@ public static FeatureSetJobStatus CreateFeatureSetJobStatusWithJob( featureSetJobStatus.setDeliveryStatus(deliveryStatus); featureSetJobStatus.setVersion(version); + featureSetJobStatus.setId(new FeatureSetJobStatus.FeatureSetJobStatusKey(job.getId(), 0)); return featureSetJobStatus; }