Skip to content

Commit

Permalink
solve techdebt: move communication with job to JobCoordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
pyalex committed Jun 18, 2020
1 parent e343c2f commit fbb5eb8
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 299 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public static class FeatureSetSpecStreamProperties {

/* Kafka topic to receive acknowledgment from ingestion job on successful processing of new specs */
@NotBlank private String specsAckTopic = "feast-feature-set-specs-ack";

/* Notify jobs interval in millisecond.
How frequently Feast will check on Pending FeatureSets and publish them to kafka. */
@Positive private long notifyIntervalMilliseconds;
}
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/feast/core/dao/FeatureSetRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.core.dao;

import feast.core.model.FeatureSet;
import feast.proto.core.FeatureSetProto;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;

Expand All @@ -37,4 +38,7 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>
// find all feature sets matching the given name pattern and project pattern
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAsc(
String name, String project_name);

// find all feature sets matching given status
List<FeatureSet> findAllByStatus(FeatureSetProto.FeatureSetStatus status);
}
17 changes: 17 additions & 0 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import feast.proto.core.FeatureSetProto;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
Expand Down Expand Up @@ -149,10 +151,24 @@ private Job startJob(String jobId) {
}

private void updateFeatureSets(Job job) {
Map<FeatureSet, FeatureSetJobStatus> alreadyConnected =
job.getFeatureSetJobStatuses().stream()
.collect(Collectors.toMap(FeatureSetJobStatus::getFeatureSet, s -> s));

for (FeatureSet fs : featureSets) {
if (alreadyConnected.containsKey(fs)) {
continue;
}

FeatureSetJobStatus status = new FeatureSetJobStatus();
status.setFeatureSet(fs);
status.setJob(job);
if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) {
// Feature Set was already delivered to previous generation of the job
// (another words, it exists in kafka)
// so we expect Job will ack latest version based on history from kafka topic
status.setVersion(fs.getVersion());
}
status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
job.getFeatureSetJobStatuses().add(status);
}
Expand All @@ -175,6 +191,7 @@ private Job updateStatus(Job job) {
}

job.setStatus(newStatus);
updateFeatureSets(job);
return job;
}

Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/feast/core/model/FeatureSetJobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.model;

import com.google.common.base.Objects;
import feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus;
import java.io.Serializable;
import javax.persistence.*;
Expand Down Expand Up @@ -61,4 +62,23 @@ public FeatureSetJobStatusKey() {}
@Enumerated(EnumType.STRING)
@Column(name = "delivery_status")
private FeatureSetJobDeliveryStatus deliveryStatus;

@Column(name = "version", columnDefinition = "integer default 0")
private int version;

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FeatureSetJobStatus that = (FeatureSetJobStatus) o;
return version == that.version
&& Objects.equal(job.getId(), that.job.getId())
&& Objects.equal(featureSet.getReference(), that.featureSet.getReference())
&& deliveryStatus == that.deliveryStatus;
}

@Override
public int hashCode() {
return Objects.hashCode(job, featureSet, deliveryStatus, version);
}
}
140 changes: 135 additions & 5 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package feast.core.service;

import static feast.core.model.FeatureSet.parseReference;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.config.FeastProperties;
import feast.core.config.FeastProperties.JobProperties;
Expand All @@ -26,21 +28,24 @@
import feast.core.model.*;
import feast.proto.core.CoreServiceProto.ListStoresRequest.Filter;
import feast.proto.core.CoreServiceProto.ListStoresResponse;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.StoreProto;
import feast.proto.core.StoreProto.Store.Subscription;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import javax.validation.constraints.Positive;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Pair;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -49,24 +54,29 @@
@Service
public class JobCoordinatorService {

private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5;

private final JobRepository jobRepository;
private final FeatureSetRepository featureSetRepository;
private final SpecService specService;
private final JobManager jobManager;
private final JobProperties jobProperties;
private final KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher;

@Autowired
public JobCoordinatorService(
JobRepository jobRepository,
FeatureSetRepository featureSetRepository,
SpecService specService,
JobManager jobManager,
FeastProperties feastProperties) {
FeastProperties feastProperties,
KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher) {
this.jobRepository = jobRepository;
this.featureSetRepository = featureSetRepository;
this.specService = specService;
this.jobManager = jobManager;
this.jobProperties = feastProperties.getJobs();
this.specPublisher = specPublisher;
}

/**
Expand Down Expand Up @@ -153,4 +163,124 @@ public Optional<Job> getJob(Source source, Store store) {
// return the latest
return Optional.of(jobs.get(0));
}

@Transactional
@Scheduled(fixedDelayString = "${feast.stream.specsOptions.notifyIntervalMilliseconds}")
public void notifyJobsWhenFeatureSetUpdated() {
List<FeatureSet> pendingFeatureSets =
featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING);

pendingFeatureSets.stream()
.filter(
fs -> {
List<FeatureSetJobStatus> runningJobs =
fs.getJobStatuses().stream()
.filter(jobStatus -> jobStatus.getJob().isRunning())
.collect(Collectors.toList());

return runningJobs.size() > 0
&& runningJobs.stream()
.allMatch(jobStatus -> jobStatus.getVersion() < fs.getVersion());
})
.forEach(
fs -> {
log.info("Sending new FeatureSet {} to Ingestion", fs.getReference());

// Sending latest version of FeatureSet to all currently running IngestionJobs
// (there's one topic for all sets).
// All related jobs would apply new FeatureSet on the fly.
// In case kafka doesn't respond within SPEC_PUBLISHING_TIMEOUT_SECONDS we will try
// again later.
try {
specPublisher
.sendDefault(fs.getReference(), fs.toProto().getSpec())
.get(SPEC_PUBLISHING_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(
"Error occurred while sending FeatureSetSpec to kafka. Cause {}."
+ " Will retry later",
e.getMessage());
return;
}

// Updating delivery status for related jobs (that are currently using this
// FeatureSet).
// We now set status to IN_PROGRESS, so listenAckFromJobs would be able to
// monitor delivery progress for each new version.
fs.getJobStatuses().stream()
.filter(s -> s.getJob().isRunning())
.forEach(
jobStatus -> {
jobStatus.setDeliveryStatus(
FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
jobStatus.setVersion(fs.getVersion());
});
featureSetRepository.saveAndFlush(fs);
});
}

/**
* Listener for ACK messages coming from IngestionJob when FeatureSetSpec is installed (in
* pipeline).
*
* <p>Updates FeatureSetJobStatus for respected FeatureSet (selected by reference) and Job (select
* by Id).
*
* <p>When all related (running) to FeatureSet jobs are updated - FeatureSet receives READY status
*
* @param record ConsumerRecord with key: FeatureSet reference and value: Ack message
*/
@KafkaListener(topics = {"${feast.stream.specsOptions.specsAckTopic}"})
@Transactional
public void listenAckFromJobs(
ConsumerRecord<String, IngestionJobProto.FeatureSetSpecAck> record) {
String setReference = record.key();
Pair<String, String> projectAndSetName = parseReference(setReference);
FeatureSet featureSet =
featureSetRepository.findFeatureSetByNameAndProject_Name(
projectAndSetName.getSecond(), projectAndSetName.getFirst());
if (featureSet == null) {
log.warn(
String.format("ACKListener received message for unknown FeatureSet %s", setReference));
return;
}

int ackVersion = record.value().getFeatureSetVersion();

if (featureSet.getVersion() != ackVersion) {
log.warn(
String.format(
"ACKListener received outdated ack for %s. Current %d, Received %d",
setReference, featureSet.getVersion(), ackVersion));
return;
}

log.info("Updating featureSet {} delivery statuses.", featureSet.getReference());

featureSet.getJobStatuses().stream()
.filter(
js ->
js.getJob().getId().equals(record.value().getJobName())
&& js.getVersion() == ackVersion)
.findFirst()
.ifPresent(
featureSetJobStatus ->
featureSetJobStatus.setDeliveryStatus(
FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED));

boolean allDelivered =
featureSet.getJobStatuses().stream()
.filter(js -> js.getJob().isRunning())
.allMatch(
js ->
js.getDeliveryStatus()
.equals(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED));

if (allDelivered) {
log.info("FeatureSet {} update is completely delivered", featureSet.getReference());

featureSet.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY);
featureSetRepository.saveAndFlush(featureSet);
}
}
}
Loading

0 comments on commit fbb5eb8

Please sign in to comment.