Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ingestion Job management API for Feast Core #548

Merged
merged 66 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
1e1abb1
Added protobuf definitions for a Job Management API
Mar 11, 2020
bd04076
Added query methods to JobRepository to query jobs by store and featu…
Mar 12, 2020
643f042
Added hashCode() & equals() to Job model compare and hash jobs
Mar 12, 2020
f4b417e
Added toIngestionProto() to Job object to convert Job model to ingest…
Mar 16, 2020
9d62a38
Added query methods to FeatureSetRepository to query by exact (name, …
Mar 16, 2020
32a0bb5
Added listJobs() to JobService to handle request list ingestion jobs …
Mar 16, 2020
4f55012
Added missing filter field in ListIngestionJobsRequest protobuf
Mar 17, 2020
b3aa72a
Added code to setup mockups for testing JobService
Mar 17, 2020
06b84df
Revert "Added hashCode() & equals() to Job model compare and hash jobs"
Mar 17, 2020
c4b901a
Added JobServiceTest unit tests for JobService's listJobs()
Mar 17, 2020
cd0bde2
Added stopJobs() to JobService to handle requests to stop jobs
Mar 18, 2020
c59035f
Added getTransitionalStates() to JobStatus to return collection of tr…
Mar 20, 2020
898045c
Changed stopJobs() to throw unsupported error on transitional job sta…
Mar 20, 2020
f4711cd
Moved conversion of JobStatus to IngestionJobStatus proto to JobStatus.
Mar 20, 2020
fe744ea
Make findFeatureSet() match only one featureset.
Mar 20, 2020
a79f8b4
Added matchFeatureSets() to SpecService to match Feature Sets from Re…
Mar 20, 2020
5b25929
Refactor JobService listJobs() to use SpecService to provide features…
Mar 20, 2020
1997bb1
Revert findBy methods added to FeatureSetRepository as no longer used.
Mar 20, 2020
f320e38
Added restartJob() to JobManagers to restart ingestion/import jobs
Mar 20, 2020
0ec5afb
Added restartJob() to JobService to restart ingestion jobs
Mar 23, 2020
83cc4c7
Update job model (due to new extId) when restartJob() in JobService
Mar 23, 2020
cf6a9c4
Use assertThat() & equalTo() instead of assertEquals() in JobService
Mar 23, 2020
8088b33
Revert "Use assertThat() & equalTo() instead of assertEquals() in Job…
Mar 23, 2020
bdc2803
Use hamcrest’s assertThat() and equalTo() instead of Junit's assertEq…
Mar 23, 2020
fafcd11
Hook up JobService list, stop, restart job methods to CoreServiceImpl.
Mar 23, 2020
7a97e1f
Throw InvalidArgumentException instead when calling listJobs() with i…
Mar 23, 2020
d277f93
Fixed typos in javadocs: InvalidArgumentException should be IllegalAr…
Mar 23, 2020
255946b
Fixed in findByFeatureSetsIn() query in JobRepository
Mar 23, 2020
09e9ddc
Renamed toIngestionProto() methods to toProto() to follow code conven…
Mar 23, 2020
c85da17
Make JobService's listJobs() a transaction to prevent DB data race co…
Mar 23, 2020
c876e40
Moved matchFeatureSets() in SpecService to private method in JobService
Mar 23, 2020
1799c3a
Make the map that maps between JobStatus and IngestionJobStatus static
Mar 23, 2020
1bead61
Make JobService's listJobs() to return all ingestion jobs on empty fi…
Mar 25, 2020
19994ea
Fixed issue where the jobManager map that JobService built used wrong…
Mar 25, 2020
ff73b5f
Fix issue where actual Job Status is not synced with database.
Mar 26, 2020
c207f1a
Log stopJob() & restartJob() operations to make debugging easier
Mar 26, 2020
c64039a
Use Runner.name() instead of runner.toString() to build JobManager map
Mar 27, 2020
ed4dae9
Move documentation on JobService operations to CoreService protobuf d…
Mar 28, 2020
24c09be
Added IngestJob to python sdk as native representation of IngestionJo…
Mar 30, 2020
2930388
Make empty filter on JobService's listJobs() select all ingestion jobs
Mar 30, 2020
5cc8abe
Added bindings for Job management API to python sdk client.
Mar 30, 2020
d86cb66
Fixed __connect_core() to connect to Feast CoreService when calling o…
Mar 30, 2020
d9874f2
Auto reload IngestJob.status and IngestJob.external_id on get property.
Mar 30, 2020
19620b2
Added IngestJob.wait() to wait for job status to transtion
Mar 30, 2020
6b47ecb
Added basic job api e2e test to exercise job api
Mar 30, 2020
f3471ac
Reorder the operations e2etest to make sure that jobs are running aft…
Mar 30, 2020
1707401
Added e2e test for all types exercising job api
Mar 31, 2020
9999db5
Fixed typo in function arguments
Mar 31, 2020
f012623
Added unit tests for Ingestion Job API additions in python sdk
Mar 31, 2020
2e819b2
Rename "ingestion" to "ingest" for more consistent naming
Mar 31, 2020
ba08c8a
Disable support for restarting Job in a terminal state due to possibl…
Apr 3, 2020
5fb774c
Added __str__ and __repr__ to IngestJob to render ingestjob in human …
Apr 3, 2020
80ef3c9
Added FeatureSetRef to represent references to featursets
Apr 4, 2020
d3c9a3d
Admend client's list_ingest_jobs() to accept feature references directly
Apr 6, 2020
0cfc32a
Fixed typo in IngestJob.store property
Apr 6, 2020
a540157
Fixed issue with FeatureSetRef.from_str not converting version to int
Apr 6, 2020
355b571
Make the grpc error message more apparent on stop_ingest_job() and re…
Apr 6, 2020
05b33a6
Added feast ingest-job list, describe, stop, restart to CLI
Apr 6, 2020
3b8198a
Rename Job to RetrievalJob to prevent confusion with IngestJob
Apr 6, 2020
19bd141
Updated e2e tests to use FeatureSetRef in list_ingest_jobs()
Apr 6, 2020
9f6cf6a
Fixed due e2e tests to cater to new limitations on stop_ingest_job()
Apr 6, 2020
87054b9
Increase timeout on test_all_types_ingest_jobs() e2e test.
Apr 6, 2020
1848dd4
Configure IngestJob.wait() to backoff with a exponentially larger wai…
Apr 7, 2020
146fb2b
Added print statements to debug e2e test failure.
Apr 7, 2020
0fa6b93
Revert "Added print statements to debug e2e test failure."
Apr 7, 2020
66d3f77
Fixed issue of test waiting for aborted job to become running causing…
Apr 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/java/feast/core/dao/JobRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.dao;

import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import java.util.Collection;
Expand All @@ -29,4 +30,10 @@ public interface JobRepository extends JpaRepository<Job, String> {
List<Job> findByStatusNotIn(Collection<JobStatus> statuses);

List<Job> findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName);

// find jobs by feast store name
List<Job> findByStoreName(String storeName);

// find jobs by featureset
List<Job> findByFeatureSetsIn(List<FeatureSet> featureSets);
}
81 changes: 80 additions & 1 deletion core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.grpc;

import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceGrpc.CoreServiceImplBase;
import feast.core.CoreServiceProto.ApplyFeatureSetRequest;
Expand All @@ -30,21 +31,29 @@
import feast.core.CoreServiceProto.GetFeatureSetResponse;
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
import feast.core.CoreServiceProto.ListIngestionJobsRequest;
import feast.core.CoreServiceProto.ListIngestionJobsResponse;
import feast.core.CoreServiceProto.ListProjectsRequest;
import feast.core.CoreServiceProto.ListProjectsResponse;
import feast.core.CoreServiceProto.ListStoresRequest;
import feast.core.CoreServiceProto.ListStoresResponse;
import feast.core.CoreServiceProto.RestartIngestionJobRequest;
import feast.core.CoreServiceProto.RestartIngestionJobResponse;
import feast.core.CoreServiceProto.StopIngestionJobRequest;
import feast.core.CoreServiceProto.StopIngestionJobResponse;
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.exception.RetrievalException;
import feast.core.grpc.interceptors.MonitoringInterceptor;
import feast.core.model.Project;
import feast.core.service.AccessManagementService;
import feast.core.service.JobService;
import feast.core.service.SpecService;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.lognet.springboot.grpc.GRpcService;
Expand All @@ -57,11 +66,16 @@ public class CoreServiceImpl extends CoreServiceImplBase {

private SpecService specService;
private AccessManagementService accessManagementService;
private JobService jobService;

@Autowired
public CoreServiceImpl(SpecService specService, AccessManagementService accessManagementService) {
public CoreServiceImpl(
SpecService specService,
AccessManagementService accessManagementService,
JobService jobService) {
this.specService = specService;
this.accessManagementService = accessManagementService;
this.jobService = jobService;
}

@Override
Expand Down Expand Up @@ -192,4 +206,69 @@ public void listProjects(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void listIngestionJobs(
ListIngestionJobsRequest request,
StreamObserver<ListIngestionJobsResponse> responseObserver) {
try {
ListIngestionJobsResponse response = this.jobService.listJobs(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (InvalidArgumentException e) {
log.error("Recieved an invalid request on calling listIngestionJobs method:", e);
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.error("Unexpected exception on calling listIngestionJobs method:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void restartIngestionJob(
RestartIngestionJobRequest request,
StreamObserver<RestartIngestionJobResponse> responseObserver) {
try {
RestartIngestionJobResponse response = this.jobService.restartJob(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (NoSuchElementException e) {
log.error(
"Attempted to restart an nonexistent job on calling restartIngestionJob method:", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (UnsupportedOperationException e) {
log.error("Recieved an unsupported request on calling restartIngestionJob method:", e);
responseObserver.onError(
Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.error("Unexpected exception on calling restartIngestionJob method:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void stopIngestionJob(
StopIngestionJobRequest request, StreamObserver<StopIngestionJobResponse> responseObserver) {
try {
StopIngestionJobResponse response = this.jobService.stopJob(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (NoSuchElementException e) {
log.error("Attempted to stop an nonexistent job on calling stopIngestionJob method:", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (UnsupportedOperationException e) {
log.error("Recieved an unsupported request on calling stopIngestionJob method:", e);
responseObserver.onError(
Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.error("Unexpected exception on calling stopIngestionJob method:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/feast/core/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ public interface JobManager {
*/
void abortJob(String extId);

/**
* Restart an job. If job is an terminated state, will simply start the job. Might cause data to
* be lost during when restarting running jobs in some implementations. Refer to on docs the
* specific implementation.
*
* @param job job to restart
* @return the restarted job
*/
Job restartJob(Job job);

/**
* Get status of a job given runner-specific job ID.
*
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,26 @@ public void abortJob(String dataflowJobId) {
}
}

/**
* Restart a restart dataflow job. Dataflow should ensure continuity between during the restart,
* so no data should be lost during the restart operation.
*
* @param job job to restart
* @return the restarted job
*/
@Override
public Job restartJob(Job job) {
JobStatus status = job.getStatus();
if (JobStatus.getTerminalState().contains(status)) {
// job yet not running: just start job
return this.startJob(job);
} else {
// job is running - updating the job without changing the job has
// the effect of restarting the job
return this.updateJob(job);
}
}

/**
* Get status of a dataflow job with given id and try to map it into Feast's JobStatus.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,26 @@ public PipelineResult runPipeline(ImportOptions pipelineOptions) throws IOExcept
return ImportJob.runPipeline(pipelineOptions);
}

/**
* Restart a direct runner job. Note that some data will be temporarily lost during when
* restarting running direct runner jobs. See {#link {@link #updateJob(Job)} for more info.
*
* @param job job to restart
* @return the restarted job
*/
@Override
public Job restartJob(Job job) {
JobStatus status = job.getStatus();
if (JobStatus.getTerminalState().contains(status)) {
// job yet not running: just start job
return this.startJob(job);
} else {
// job is running - updating the job without changing the job has
// the effect of restarting the job.
return this.updateJob(job);
}
}

/**
* Gets the state of the direct runner job. Direct runner jobs only have 2 states: RUNNING and
* ABORTED.
Expand Down
45 changes: 44 additions & 1 deletion core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,24 @@
*/
package feast.core.model;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto;
import feast.core.IngestionJobProto;
import java.util.ArrayList;
import java.util.List;
import javax.persistence.*;
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Id;
import javax.persistence.Index;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
import javax.persistence.ManyToOne;
import javax.persistence.OneToMany;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -102,4 +118,31 @@ public void updateMetrics(List<Metrics> newMetrics) {
public String getSinkName() {
return store.getName();
}

/**
* Convert a job model to ingestion job proto
*
* @return Ingestion Job proto derieved from the given job
*/
public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferException {

// convert featuresets of job to protos
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : this.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}

// build ingestion job proto with job data
IngestionJobProto.IngestionJob ingestJob =
IngestionJobProto.IngestionJob.newBuilder()
.setId(this.getId())
.setExternalId(this.getExtId())
.setStatus(this.getStatus().toProto())
.addAllFeatureSets(featureSetProtos)
.setSource(this.getSource().toProto())
.setStore(this.getStore().toProto())
.build();

return ingestJob;
}
}
37 changes: 37 additions & 0 deletions core/src/main/java/feast/core/model/JobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package feast.core.model;

import feast.core.IngestionJobProto.IngestionJobStatus;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

public enum JobStatus {
/** Job status is not known. */
Expand Down Expand Up @@ -64,4 +66,39 @@ public enum JobStatus {
public static Collection<JobStatus> getTerminalState() {
return TERMINAL_STATE;
}

private static final Collection<JobStatus> TRANSITIONAL_STATES =
Collections.unmodifiableList(Arrays.asList(PENDING, ABORTING, SUSPENDING));

/**
* Get Transitional Job Status states. Transitionals states are assigned to jobs that
* transitioning to a more stable state (ie SUSPENDED, ABORTED etc.)
*
* @return Collection of transitional Job Status states.
*/
public static final Collection<JobStatus> getTransitionalStates() {
return TRANSITIONAL_STATES;
}

private static final Map<JobStatus, IngestionJobStatus> INGESTION_JOB_STATUS_MAP =
Map.of(
JobStatus.UNKNOWN, IngestionJobStatus.UNKNOWN,
JobStatus.PENDING, IngestionJobStatus.PENDING,
JobStatus.RUNNING, IngestionJobStatus.RUNNING,
JobStatus.COMPLETED, IngestionJobStatus.COMPLETED,
JobStatus.ABORTING, IngestionJobStatus.ABORTING,
JobStatus.ABORTED, IngestionJobStatus.ABORTED,
JobStatus.ERROR, IngestionJobStatus.ERROR,
JobStatus.SUSPENDING, IngestionJobStatus.SUSPENDING,
JobStatus.SUSPENDED, IngestionJobStatus.SUSPENDED);

/**
* Convert a Job Status to Ingestion Job Status proto
*
* @return IngestionJobStatus proto derieved from this job status
*/
public IngestionJobStatus toProto() {
// maps job models job status to ingestion job status
return INGESTION_JOB_STATUS_MAP.get(this);
}
}
Loading