diff --git a/core/src/main/java/feast/core/config/FeastProperties.java b/core/src/main/java/feast/core/config/FeastProperties.java index b508de76e5..07303107a4 100644 --- a/core/src/main/java/feast/core/config/FeastProperties.java +++ b/core/src/main/java/feast/core/config/FeastProperties.java @@ -172,6 +172,10 @@ public static class FeatureSetSpecStreamProperties { /* Kafka topic to receive acknowledgment from ingestion job on successful processing of new specs */ @NotBlank private String specsAckTopic = "feast-feature-set-specs-ack"; + + /* Notify jobs interval in millisecond. + How frequently Feast will check on Pending FeatureSets and publish them to kafka. */ + @Positive private long notifyIntervalMilliseconds; } } diff --git a/core/src/main/java/feast/core/dao/FeatureSetRepository.java b/core/src/main/java/feast/core/dao/FeatureSetRepository.java index b136650dfd..38a690b0d6 100644 --- a/core/src/main/java/feast/core/dao/FeatureSetRepository.java +++ b/core/src/main/java/feast/core/dao/FeatureSetRepository.java @@ -17,6 +17,7 @@ package feast.core.dao; import feast.core.model.FeatureSet; +import feast.proto.core.FeatureSetProto; import java.util.List; import org.springframework.data.jpa.repository.JpaRepository; @@ -37,4 +38,7 @@ public interface FeatureSetRepository extends JpaRepository // find all feature sets matching the given name pattern and project pattern List findAllByNameLikeAndProject_NameLikeOrderByNameAsc( String name, String project_name); + + // find all feature sets matching given status + List findAllByStatus(FeatureSetProto.FeatureSetStatus status); } diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index 9abdfa3e45..9a2e5e6c25 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -24,6 +24,7 @@ import feast.proto.core.FeatureSetProto; import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -32,6 +33,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing; @@ -149,10 +151,24 @@ private Job startJob(String jobId) { } private void updateFeatureSets(Job job) { + Map alreadyConnected = + job.getFeatureSetJobStatuses().stream() + .collect(Collectors.toMap(FeatureSetJobStatus::getFeatureSet, s -> s)); + for (FeatureSet fs : featureSets) { + if (alreadyConnected.containsKey(fs)) { + continue; + } + FeatureSetJobStatus status = new FeatureSetJobStatus(); status.setFeatureSet(fs); status.setJob(job); + if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) { + // Feature Set was already delivered to previous generation of the job + // (another words, it exists in kafka) + // so we expect Job will ack latest version based on history from kafka topic + status.setVersion(fs.getVersion()); + } status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); job.getFeatureSetJobStatuses().add(status); } @@ -175,6 +191,7 @@ private Job updateStatus(Job job) { } job.setStatus(newStatus); + updateFeatureSets(job); return job; } diff --git a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java index 7f21c26335..f6320defdd 100644 --- a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java +++ b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java @@ -16,6 +16,7 @@ */ package feast.core.model; +import com.google.common.base.Objects; import feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus; import java.io.Serializable; import javax.persistence.*; @@ -61,4 +62,23 @@ public FeatureSetJobStatusKey() {} @Enumerated(EnumType.STRING) @Column(name = "delivery_status") private FeatureSetJobDeliveryStatus deliveryStatus; + + @Column(name = "version", columnDefinition = "integer default 0") + private int version; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FeatureSetJobStatus that = (FeatureSetJobStatus) o; + return version == that.version + && Objects.equal(job.getId(), that.job.getId()) + && Objects.equal(featureSet.getReference(), that.featureSet.getReference()) + && deliveryStatus == that.deliveryStatus; + } + + @Override + public int hashCode() { + return Objects.hashCode(job, featureSet, deliveryStatus, version); + } } diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 69da3e507e..19d5d65b61 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -16,6 +16,8 @@ */ package feast.core.service; +import static feast.core.model.FeatureSet.parseReference; + import com.google.protobuf.InvalidProtocolBufferException; import feast.core.config.FeastProperties; import feast.core.config.FeastProperties.JobProperties; @@ -26,6 +28,8 @@ import feast.core.model.*; import feast.proto.core.CoreServiceProto.ListStoresRequest.Filter; import feast.proto.core.CoreServiceProto.ListStoresResponse; +import feast.proto.core.FeatureSetProto; +import feast.proto.core.IngestionJobProto; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.Subscription; import java.util.ArrayList; @@ -33,14 +37,15 @@ import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import java.util.stream.Collectors; import javax.validation.constraints.Positive; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.util.Pair; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -49,11 +54,14 @@ @Service public class JobCoordinatorService { + private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5; + private final JobRepository jobRepository; private final FeatureSetRepository featureSetRepository; private final SpecService specService; private final JobManager jobManager; private final JobProperties jobProperties; + private final KafkaTemplate specPublisher; @Autowired public JobCoordinatorService( @@ -61,12 +69,14 @@ public JobCoordinatorService( FeatureSetRepository featureSetRepository, SpecService specService, JobManager jobManager, - FeastProperties feastProperties) { + FeastProperties feastProperties, + KafkaTemplate specPublisher) { this.jobRepository = jobRepository; this.featureSetRepository = featureSetRepository; this.specService = specService; this.jobManager = jobManager; this.jobProperties = feastProperties.getJobs(); + this.specPublisher = specPublisher; } /** @@ -153,4 +163,124 @@ public Optional getJob(Source source, Store store) { // return the latest return Optional.of(jobs.get(0)); } + + @Transactional + @Scheduled(fixedDelayString = "${feast.stream.specsOptions.notifyIntervalMilliseconds}") + public void notifyJobsWhenFeatureSetUpdated() { + List pendingFeatureSets = + featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING); + + pendingFeatureSets.stream() + .filter( + fs -> { + List runningJobs = + fs.getJobStatuses().stream() + .filter(jobStatus -> jobStatus.getJob().isRunning()) + .collect(Collectors.toList()); + + return runningJobs.size() > 0 + && runningJobs.stream() + .allMatch(jobStatus -> jobStatus.getVersion() < fs.getVersion()); + }) + .forEach( + fs -> { + log.info("Sending new FeatureSet {} to Ingestion", fs.getReference()); + + // Sending latest version of FeatureSet to all currently running IngestionJobs + // (there's one topic for all sets). + // All related jobs would apply new FeatureSet on the fly. + // In case kafka doesn't respond within SPEC_PUBLISHING_TIMEOUT_SECONDS we will try + // again later. + try { + specPublisher + .sendDefault(fs.getReference(), fs.toProto().getSpec()) + .get(SPEC_PUBLISHING_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (Exception e) { + log.error( + "Error occurred while sending FeatureSetSpec to kafka. Cause {}." + + " Will retry later", + e.getMessage()); + return; + } + + // Updating delivery status for related jobs (that are currently using this + // FeatureSet). + // We now set status to IN_PROGRESS, so listenAckFromJobs would be able to + // monitor delivery progress for each new version. + fs.getJobStatuses().stream() + .filter(s -> s.getJob().isRunning()) + .forEach( + jobStatus -> { + jobStatus.setDeliveryStatus( + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + jobStatus.setVersion(fs.getVersion()); + }); + featureSetRepository.saveAndFlush(fs); + }); + } + + /** + * Listener for ACK messages coming from IngestionJob when FeatureSetSpec is installed (in + * pipeline). + * + *

Updates FeatureSetJobStatus for respected FeatureSet (selected by reference) and Job (select + * by Id). + * + *

When all related (running) to FeatureSet jobs are updated - FeatureSet receives READY status + * + * @param record ConsumerRecord with key: FeatureSet reference and value: Ack message + */ + @KafkaListener(topics = {"${feast.stream.specsOptions.specsAckTopic}"}) + @Transactional + public void listenAckFromJobs( + ConsumerRecord record) { + String setReference = record.key(); + Pair projectAndSetName = parseReference(setReference); + FeatureSet featureSet = + featureSetRepository.findFeatureSetByNameAndProject_Name( + projectAndSetName.getSecond(), projectAndSetName.getFirst()); + if (featureSet == null) { + log.warn( + String.format("ACKListener received message for unknown FeatureSet %s", setReference)); + return; + } + + int ackVersion = record.value().getFeatureSetVersion(); + + if (featureSet.getVersion() != ackVersion) { + log.warn( + String.format( + "ACKListener received outdated ack for %s. Current %d, Received %d", + setReference, featureSet.getVersion(), ackVersion)); + return; + } + + log.info("Updating featureSet {} delivery statuses.", featureSet.getReference()); + + featureSet.getJobStatuses().stream() + .filter( + js -> + js.getJob().getId().equals(record.value().getJobName()) + && js.getVersion() == ackVersion) + .findFirst() + .ifPresent( + featureSetJobStatus -> + featureSetJobStatus.setDeliveryStatus( + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + + boolean allDelivered = + featureSet.getJobStatuses().stream() + .filter(js -> js.getJob().isRunning()) + .allMatch( + js -> + js.getDeliveryStatus() + .equals(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + + if (allDelivered) { + log.info("FeatureSet {} update is completely delivered", featureSet.getReference()); + + featureSet.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY); + featureSetRepository.saveAndFlush(featureSet); + } + } } diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index e60aeee57d..4365b8fd6c 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -16,7 +16,6 @@ */ package feast.core.service; -import static feast.core.model.FeatureSet.parseReference; import static feast.core.validators.Matchers.checkValidCharacters; import static feast.core.validators.Matchers.checkValidCharactersAllowAsterisk; @@ -42,21 +41,15 @@ import feast.proto.core.CoreServiceProto.UpdateStoreResponse; import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetStatus; -import feast.proto.core.IngestionJobProto; import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.Subscription; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.util.Pair; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -68,26 +61,21 @@ @Service public class SpecService { - private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5; - private final FeatureSetRepository featureSetRepository; private final ProjectRepository projectRepository; private final StoreRepository storeRepository; private final Source defaultSource; - private final KafkaTemplate specPublisher; @Autowired public SpecService( FeatureSetRepository featureSetRepository, StoreRepository storeRepository, ProjectRepository projectRepository, - Source defaultSource, - KafkaTemplate specPublisher) { + Source defaultSource) { this.featureSetRepository = featureSetRepository; this.storeRepository = storeRepository; this.projectRepository = projectRepository; this.defaultSource = defaultSource; - this.specPublisher = specPublisher; } /** @@ -383,37 +371,6 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetProto.FeatureSet newFea featureSet.incVersion(); - // Sending latest version of FeatureSet to all currently running IngestionJobs (there's one - // topic for all sets). - // All related jobs would apply new FeatureSet on the fly. - // We wait for Kafka broker to ack that the message was added to topic before actually - // committing this FeatureSet. - // In case kafka doesn't respond within SPEC_PUBLISHING_TIMEOUT_SECONDS we abort current - // transaction and return error to client. - try { - specPublisher - .sendDefault(featureSet.getReference(), featureSet.toProto().getSpec()) - .get(SPEC_PUBLISHING_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } catch (Exception e) { - throw io.grpc.Status.UNAVAILABLE - .withDescription( - String.format( - "Unable to publish FeatureSet to Kafka. Cause: %s", - e.getCause() != null ? e.getCause().getMessage() : "unknown")) - .withCause(e) - .asRuntimeException(); - } - - // Updating delivery status for related jobs (that are currently using this FeatureSet). - // We now set status to IN_PROGRESS, so listenAckFromJobs would be able to - // monitor delivery progress for each new version. - featureSet.getJobStatuses().stream() - .filter(s -> s.getJob().isRunning()) - .forEach( - s -> - s.setDeliveryStatus( - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); - // Persist the FeatureSet object featureSet.setStatus(FeatureSetStatus.STATUS_PENDING); project.addFeatureSet(featureSet); @@ -462,60 +419,4 @@ public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) .setStore(updateStoreRequest.getStore()) .build(); } - - /** - * Listener for ACK messages coming from IngestionJob when FeatureSetSpec is installed (in - * pipeline). - * - *

Updates FeatureSetJobStatus for respected FeatureSet (selected by reference) and Job (select - * by Id). - * - *

When all related (running) to FeatureSet jobs are updated - FeatureSet receives READY status - * - * @param record ConsumerRecord with key: FeatureSet reference and value: Ack message - */ - @KafkaListener(topics = {"${feast.stream.specsOptions.specsAckTopic}"}) - @Transactional - public void listenAckFromJobs( - ConsumerRecord record) { - String setReference = record.key(); - Pair projectAndSetName = parseReference(setReference); - FeatureSet featureSet = - featureSetRepository.findFeatureSetByNameAndProject_Name( - projectAndSetName.getSecond(), projectAndSetName.getFirst()); - if (featureSet == null) { - log.warn( - String.format("ACKListener received message for unknown FeatureSet %s", setReference)); - return; - } - - if (featureSet.getVersion() != record.value().getFeatureSetVersion()) { - log.warn( - String.format( - "ACKListener received outdated ack for %s. Current %d, Received %d", - setReference, featureSet.getVersion(), record.value().getFeatureSetVersion())); - return; - } - - featureSet.getJobStatuses().stream() - .filter(js -> js.getJob().getId().equals(record.value().getJobName())) - .findFirst() - .ifPresent( - featureSetJobStatus -> - featureSetJobStatus.setDeliveryStatus( - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); - - boolean allDelivered = - featureSet.getJobStatuses().stream() - .filter(js -> js.getJob().isRunning()) - .allMatch( - js -> - js.getDeliveryStatus() - .equals(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); - - if (allDelivered) { - featureSet.setStatus(FeatureSetStatus.STATUS_READY); - featureSetRepository.saveAndFlush(featureSet); - } - } } diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index 9f02ed302d..c34af21495 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -81,6 +81,7 @@ feast: specsOptions: specsTopic: feast-specs specsAckTopic: feast-specs-ack + notifyIntervalMilliseconds: 1000 spring: jpa: diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java index fc633ba15f..16e3627459 100644 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java @@ -16,6 +16,8 @@ */ package feast.core.job; +import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThat; @@ -57,6 +59,7 @@ public class JobUpdateTaskTest { private Store store; private Source source; private FeatureSet featureSet1; + private FeatureSet featureSet2; @Before public void setUp() { @@ -85,7 +88,11 @@ public void setUp() { featureSet1 = FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet1")).build()); + featureSet2 = + FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet2")).build()); featureSet1.setSource(source); + featureSet2.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY); + featureSet2.setVersion(5); } Job makeJob(String extId, List featureSets, JobStatus status) { @@ -176,4 +183,55 @@ public void shouldTimeout() { Job actual = jobUpdateTask.call(); assertThat(actual, is(IsNull.nullValue())); } + + @Test + public void featureSetsShouldBeUpdated() { + Job job = makeJob("", Collections.emptyList(), JobStatus.RUNNING); + + when(jobManager.getJobStatus(job)).thenReturn(JobStatus.RUNNING); + + JobUpdateTask jobUpdateTask = + new JobUpdateTask( + Collections.singletonList(featureSet1), + source, + store, + Optional.of(job), + jobManager, + 0L); + + jobUpdateTask.call(); + + FeatureSetJobStatus expectedStatus1 = new FeatureSetJobStatus(); + expectedStatus1.setJob(job); + expectedStatus1.setFeatureSet(featureSet1); + expectedStatus1.setVersion(0); + expectedStatus1.setDeliveryStatus( + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + + assertThat(job.getFeatureSetJobStatuses(), containsInAnyOrder(expectedStatus1)); + + expectedStatus1.setDeliveryStatus(STATUS_DELIVERED); + job.getFeatureSetJobStatuses().forEach(j -> j.setDeliveryStatus(STATUS_DELIVERED)); + + JobUpdateTask jobUpdateTask2 = + new JobUpdateTask( + Arrays.asList(featureSet1, featureSet2), + source, + store, + Optional.of(job), + jobManager, + 0L); + + jobUpdateTask2.call(); + + FeatureSetJobStatus expectedStatus2 = new FeatureSetJobStatus(); + expectedStatus2.setJob(job); + expectedStatus2.setFeatureSet(featureSet2); + expectedStatus2.setVersion(featureSet2.getVersion()); + expectedStatus2.setDeliveryStatus( + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + + assertThat( + job.getFeatureSetJobStatuses(), containsInAnyOrder(expectedStatus1, expectedStatus2)); + } } diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index caec99f228..9b8e47de75 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -17,14 +17,13 @@ package feast.core.service; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.config.FeastProperties; @@ -42,6 +41,7 @@ import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetMeta; import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.IngestionJobProto; import feast.proto.core.SourceProto.KafkaSourceConfig; import feast.proto.core.SourceProto.Source; import feast.proto.core.SourceProto.SourceType; @@ -49,14 +49,20 @@ import feast.proto.core.StoreProto.Store.RedisConfig; import feast.proto.core.StoreProto.Store.StoreType; import feast.proto.core.StoreProto.Store.Subscription; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CancellationException; +import lombok.SneakyThrows; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.AsyncResult; public class JobCoordinatorServiceTest { @@ -65,8 +71,10 @@ public class JobCoordinatorServiceTest { @Mock JobManager jobManager; @Mock SpecService specService; @Mock FeatureSetRepository featureSetRepository; + @Mock private KafkaTemplate kafkaTemplate; private FeastProperties feastProperties; + private JobCoordinatorService jcs; @Before public void setUp() { @@ -75,14 +83,23 @@ public void setUp() { JobProperties jobProperties = new JobProperties(); jobProperties.setJobUpdateTimeoutSeconds(5); feastProperties.setJobs(jobProperties); + + jcs = + new JobCoordinatorService( + jobRepository, + featureSetRepository, + specService, + jobManager, + feastProperties, + kafkaTemplate); + + when(kafkaTemplate.sendDefault(any(), any())).thenReturn(new AsyncResult<>(null)); } @Test public void shouldDoNothingIfNoStoresFound() throws InvalidProtocolBufferException { when(specService.listStores(any())).thenReturn(ListStoresResponse.newBuilder().build()); - JobCoordinatorService jcs = - new JobCoordinatorService( - jobRepository, featureSetRepository, specService, jobManager, feastProperties); + jcs.Poll(); verify(jobRepository, times(0)).saveAndFlush(any()); } @@ -101,9 +118,7 @@ public void shouldDoNothingIfNoMatchingFeatureSetsFound() throws InvalidProtocol when(specService.listFeatureSets( Filter.newBuilder().setProject("*").setFeatureSetName("*").build())) .thenReturn(ListFeatureSetsResponse.newBuilder().build()); - JobCoordinatorService jcs = - new JobCoordinatorService( - jobRepository, featureSetRepository, specService, jobManager, feastProperties); + jcs.Poll(); verify(jobRepository, times(0)).saveAndFlush(any()); } @@ -178,9 +193,6 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep when(jobManager.startJob(argThat(new JobMatcher(expectedInput)))).thenReturn(expected); when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); - JobCoordinatorService jcs = - new JobCoordinatorService( - jobRepository, featureSetRepository, specService, jobManager, feastProperties); jcs.Poll(); verify(jobRepository, times(1)).saveAll(jobArgCaptor.capture()); List actual = jobArgCaptor.getValue(); @@ -288,9 +300,6 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { when(jobManager.startJob(argThat(new JobMatcher(expectedInput2)))).thenReturn(expected2); when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); - JobCoordinatorService jcs = - new JobCoordinatorService( - jobRepository, featureSetRepository, specService, jobManager, feastProperties); jcs.Poll(); verify(jobRepository, times(1)).saveAll(jobArgCaptor.capture()); @@ -299,4 +308,177 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { assertThat(actual.get(0), equalTo(expected1)); assertThat(actual.get(1), equalTo(expected2)); } + + @Test + public void shouldSendPendingFeatureSetToJobs() { + FeatureSet fs1 = + TestObjectFactory.CreateFeatureSet( + "fs_1", "project", Collections.emptyList(), Collections.emptyList()); + fs1.setVersion(2); + + FeatureSetJobStatus status1 = + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED, 1); + FeatureSetJobStatus status2 = + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED, 1); + FeatureSetJobStatus status3 = + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.ABORTED, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED, 2); + + // spec needs to be send + fs1.getJobStatuses().addAll(ImmutableList.of(status1, status2, status3)); + + FeatureSet fs2 = + TestObjectFactory.CreateFeatureSet( + "fs_2", "project", Collections.emptyList(), Collections.emptyList()); + fs2.setVersion(5); + + // spec already sent to kafka + fs2.getJobStatuses() + .addAll( + ImmutableList.of( + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.RUNNING, + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, + 5))); + + // feature set without running jobs attached + FeatureSet fs3 = + TestObjectFactory.CreateFeatureSet( + "fs_3", "project", Collections.emptyList(), Collections.emptyList()); + fs3.getJobStatuses() + .addAll( + ImmutableList.of( + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.ABORTED, + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, + 5))); + + when(featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)) + .thenReturn(ImmutableList.of(fs1, fs2, fs3)); + + jcs.notifyJobsWhenFeatureSetUpdated(); + + verify(kafkaTemplate).sendDefault(eq(fs1.getReference()), any(FeatureSetSpec.class)); + verify(kafkaTemplate, never()).sendDefault(eq(fs2.getReference()), any(FeatureSetSpec.class)); + verify(kafkaTemplate, never()).sendDefault(eq(fs3.getReference()), any(FeatureSetSpec.class)); + + assertThat(status1.getVersion(), is(2)); + assertThat( + status1.getDeliveryStatus(), + is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + + assertThat(status2.getVersion(), is(2)); + assertThat( + status2.getDeliveryStatus(), + is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + + assertThat(status3.getVersion(), is(2)); + assertThat( + status3.getDeliveryStatus(), + is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + } + + @Test + @SneakyThrows + public void shouldNotUpdateJobStatusVersionWhenKafkaUnavailable() { + FeatureSet fsInTest = + TestObjectFactory.CreateFeatureSet( + "fs_1", "project", Collections.emptyList(), Collections.emptyList()); + fsInTest.setVersion(2); + + FeatureSetJobStatus featureSetJobStatus = + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED, 1); + fsInTest.getJobStatuses().add(featureSetJobStatus); + + CancellationException exc = new CancellationException(); + when(kafkaTemplate.sendDefault(eq(fsInTest.getReference()), any()).get()).thenThrow(exc); + when(featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)) + .thenReturn(ImmutableList.of(fsInTest)); + + jcs.notifyJobsWhenFeatureSetUpdated(); + assertThat(featureSetJobStatus.getVersion(), is(1)); + } + + @Test + public void specAckListenerShouldDoNothingWhenMessageIsOutdated() { + FeatureSet fsInTest = + TestObjectFactory.CreateFeatureSet( + "fs", "project", Collections.emptyList(), Collections.emptyList()); + FeatureSetJobStatus j1 = + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + FeatureSetJobStatus j2 = + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + + fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2)); + + when(featureSetRepository.findFeatureSetByNameAndProject_Name( + fsInTest.getName(), fsInTest.getProject().getName())) + .thenReturn(fsInTest); + + jcs.listenAckFromJobs(newAckMessage("project/invalid", 0, j1.getJob().getId())); + jcs.listenAckFromJobs(newAckMessage(fsInTest.getReference(), 0, "")); + jcs.listenAckFromJobs(newAckMessage(fsInTest.getReference(), -1, j1.getJob().getId())); + + assertThat( + j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + assertThat( + j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + } + + @Test + public void specAckListenerShouldUpdateFeatureSetStatus() { + FeatureSet fsInTest = + TestObjectFactory.CreateFeatureSet( + "fs", "project", Collections.emptyList(), Collections.emptyList()); + fsInTest.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING); + + FeatureSetJobStatus j1 = + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + FeatureSetJobStatus j2 = + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + FeatureSetJobStatus j3 = + TestObjectFactory.CreateFeatureSetJobStatusWithJob( + JobStatus.ABORTED, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + + fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2, j3)); + + when(featureSetRepository.findFeatureSetByNameAndProject_Name( + fsInTest.getName(), fsInTest.getProject().getName())) + .thenReturn(fsInTest); + + jcs.listenAckFromJobs( + newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j1.getJob().getId())); + + assertThat( + j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + assertThat(fsInTest.getStatus(), is(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)); + + jcs.listenAckFromJobs( + newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j2.getJob().getId())); + + assertThat( + j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + + assertThat(fsInTest.getStatus(), is(FeatureSetProto.FeatureSetStatus.STATUS_READY)); + } + + private ConsumerRecord newAckMessage( + String key, int version, String jobName) { + return new ConsumerRecord<>( + "topic", + 0, + 0, + key, + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setFeatureSetVersion(version) + .setJobName(jobName) + .build()); + } } diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java index ae3e878644..dad04462ec 100644 --- a/core/src/test/java/feast/core/service/SpecServiceTest.java +++ b/core/src/test/java/feast/core/service/SpecServiceTest.java @@ -16,12 +16,9 @@ */ package feast.core.service; -import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; @@ -48,21 +45,16 @@ import feast.proto.core.FeatureSetProto.EntitySpec; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.FeatureSetProto.FeatureSpec; -import feast.proto.core.IngestionJobProto; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.RedisConfig; import feast.proto.core.StoreProto.Store.StoreType; import feast.proto.core.StoreProto.Store.Subscription; import feast.proto.types.ValueProto.ValueType.Enum; -import io.grpc.StatusRuntimeException; import java.sql.Date; import java.time.Instant; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.CancellationException; import java.util.stream.Collectors; -import lombok.SneakyThrows; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -70,8 +62,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mock; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.scheduling.annotation.AsyncResult; import org.tensorflow.metadata.v0.BoolDomain; import org.tensorflow.metadata.v0.FeaturePresence; import org.tensorflow.metadata.v0.FeaturePresenceWithinGroup; @@ -96,8 +86,6 @@ public class SpecServiceTest { @Mock private ProjectRepository projectRepository; - @Mock private KafkaTemplate kafkaTemplate; - @Rule public final ExpectedException expectedException = ExpectedException.none(); private SpecService specService; @@ -226,11 +214,8 @@ public void setUp() throws InvalidProtocolBufferException { when(storeRepository.findById("SERVING")).thenReturn(Optional.of(store1)); when(storeRepository.findById("NOTFOUND")).thenReturn(Optional.empty()); - when(kafkaTemplate.sendDefault(any(), any())).thenReturn(new AsyncResult<>(null)); - specService = - new SpecService( - featureSetRepository, storeRepository, projectRepository, defaultSource, kafkaTemplate); + new SpecService(featureSetRepository, storeRepository, projectRepository, defaultSource); } @Test @@ -412,72 +397,6 @@ public void applyFeatureSetShouldUpdateAndSaveFeatureSetIfAlreadyExists() FeatureSet.fromProto(expected)); assertThat(applyFeatureSetResponse.getFeatureSet().getSpec().getVersion(), equalTo(2)); - verify(kafkaTemplate) - .sendDefault(eq(featureSets.get(0).getReference()), any(FeatureSetSpec.class)); - } - - @Test - @SneakyThrows - public void applyFeatureSetShouldNotWorkWithoutKafkaAck() { - FeatureSet fsInTest = featureSets.get(1); - FeatureSetProto.FeatureSet incomingFeatureSet = fsInTest.toProto(); - CancellationException exc = new CancellationException(); - when(kafkaTemplate.sendDefault(eq(fsInTest.getReference()), any()).get()).thenThrow(exc); - - incomingFeatureSet = - incomingFeatureSet - .toBuilder() - .setMeta(incomingFeatureSet.getMeta()) - .setSpec( - incomingFeatureSet - .getSpec() - .toBuilder() - .addFeatures( - FeatureSpec.newBuilder().setName("feature2").setValueType(Enum.STRING)) - .build()) - .build(); - - expectedException.expect(StatusRuntimeException.class); - specService.applyFeatureSet(incomingFeatureSet); - verify(featureSetRepository, never()).saveAndFlush(ArgumentMatchers.any(FeatureSet.class)); - } - - @Test - @SneakyThrows - public void applyFeatureSetShouldUpdateDeliveryStatuses() { - FeatureSet fsInTest = featureSets.get(1); - FeatureSetJobStatus j1 = - newJob( - fsInTest, - JobStatus.RUNNING, - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED); - FeatureSetJobStatus j2 = - newJob( - fsInTest, - JobStatus.ABORTED, - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED); - - fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2)); - - FeatureSetProto.FeatureSet incomingFeatureSet = fsInTest.toProto(); - incomingFeatureSet = - incomingFeatureSet - .toBuilder() - .setMeta(incomingFeatureSet.getMeta()) - .setSpec( - incomingFeatureSet - .getSpec() - .toBuilder() - .addFeatures( - FeatureSpec.newBuilder().setName("feature2").setValueType(Enum.STRING)) - .build()) - .build(); - - specService.applyFeatureSet(incomingFeatureSet); - assertThat( - j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); - assertThat( - j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); } @Test @@ -1037,70 +956,6 @@ public void getOrListFeatureSetShouldUseDefaultProjectIfProjectUnspecified() assertThat(listResponse.getFeatureSetsList(), equalTo(Arrays.asList(expected.toProto()))); } - @Test - public void specAckListenerShouldDoNothingWhenMessageIsOutdated() { - FeatureSet fsInTest = featureSets.get(1); - FeatureSetJobStatus j1 = - newJob( - fsInTest, - JobStatus.RUNNING, - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - FeatureSetJobStatus j2 = - newJob( - fsInTest, - JobStatus.RUNNING, - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - - fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2)); - - specService.listenAckFromJobs(newAckMessage("project/invalid", 0, j1.getJob().getId())); - specService.listenAckFromJobs(newAckMessage(fsInTest.getReference(), 0, "")); - specService.listenAckFromJobs(newAckMessage(fsInTest.getReference(), -1, j1.getJob().getId())); - - assertThat( - j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); - assertThat( - j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); - } - - @Test - public void specAckListenerShouldUpdateFeatureSetStatus() { - FeatureSet fsInTest = featureSets.get(1); - fsInTest.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING); - - FeatureSetJobStatus j1 = - newJob( - fsInTest, - JobStatus.RUNNING, - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - FeatureSetJobStatus j2 = - newJob( - fsInTest, - JobStatus.RUNNING, - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - FeatureSetJobStatus j3 = - newJob( - fsInTest, - JobStatus.ABORTED, - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - - fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2, j3)); - - specService.listenAckFromJobs( - newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j1.getJob().getId())); - - assertThat( - j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); - assertThat(fsInTest.getStatus(), is(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)); - - specService.listenAckFromJobs( - newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j2.getJob().getId())); - - assertThat( - j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); - assertThat(fsInTest.getStatus(), is(FeatureSetProto.FeatureSetStatus.STATUS_READY)); - } - private FeatureSet newDummyFeatureSet(String name, String project) { FeatureSpec f1 = FeatureSpec.newBuilder() @@ -1118,33 +973,6 @@ private FeatureSet newDummyFeatureSet(String name, String project) { return fs; } - private FeatureSetJobStatus newJob( - FeatureSet fs, JobStatus status, FeatureSetProto.FeatureSetJobDeliveryStatus deliveryStatus) { - Job job = new Job(); - job.setStatus(status); - job.setId(UUID.randomUUID().toString()); - - FeatureSetJobStatus featureSetJobStatus = new FeatureSetJobStatus(); - featureSetJobStatus.setJob(job); - featureSetJobStatus.setFeatureSet(fs); - featureSetJobStatus.setDeliveryStatus(deliveryStatus); - - return featureSetJobStatus; - } - - private ConsumerRecord newAckMessage( - String key, int version, String jobName) { - return new ConsumerRecord<>( - "topic", - 0, - 0, - key, - IngestionJobProto.FeatureSetSpecAck.newBuilder() - .setFeatureSetVersion(version) - .setJobName(jobName) - .build()); - } - private Store newDummyStore(String name) { // Add type to this method when we enable filtering by type Store store = new Store(); diff --git a/core/src/test/java/feast/core/service/TestObjectFactory.java b/core/src/test/java/feast/core/service/TestObjectFactory.java index 53522e09ca..99e1cf9e9f 100644 --- a/core/src/test/java/feast/core/service/TestObjectFactory.java +++ b/core/src/test/java/feast/core/service/TestObjectFactory.java @@ -16,16 +16,14 @@ */ package feast.core.service; -import feast.core.model.Entity; -import feast.core.model.Feature; -import feast.core.model.FeatureSet; -import feast.core.model.Source; +import feast.core.model.*; import feast.proto.core.FeatureSetProto; import feast.proto.core.SourceProto; import feast.proto.types.ValueProto; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; public class TestObjectFactory { @@ -73,4 +71,19 @@ public static Entity CreateEntity(String name, ValueProto.ValueType.Enum valueTy return Entity.fromProto( FeatureSetProto.EntitySpec.newBuilder().setName(name).setValueType(valueType).build()); } + + public static FeatureSetJobStatus CreateFeatureSetJobStatusWithJob( + JobStatus status, FeatureSetProto.FeatureSetJobDeliveryStatus deliveryStatus, int version) { + Job job = new Job(); + job.setStatus(status); + job.setId(UUID.randomUUID().toString()); + + FeatureSetJobStatus featureSetJobStatus = new FeatureSetJobStatus(); + featureSetJobStatus.setJob(job); + + featureSetJobStatus.setDeliveryStatus(deliveryStatus); + featureSetJobStatus.setVersion(version); + + return featureSetJobStatus; + } }