diff --git a/core/pom.xml b/core/pom.xml index 9f38638a2c..8bc1a9b6b3 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -189,6 +189,11 @@ google-cloud-bigquery 1.48.0 + + org.apache.flink + flink-clients_2.11 + 1.5.5 + @@ -249,5 +254,11 @@ 2.23.0 test + + com.squareup.okhttp3 + mockwebserver + 3.11.0 + test + diff --git a/core/src/main/java/feast/core/config/JobConfig.java b/core/src/main/java/feast/core/config/JobConfig.java index b8a37921b7..04b7bd8686 100644 --- a/core/src/main/java/feast/core/config/JobConfig.java +++ b/core/src/main/java/feast/core/config/JobConfig.java @@ -26,28 +26,37 @@ import com.timgroup.statsd.StatsDClient; import feast.core.job.JobManager; import feast.core.job.JobMonitor; -import feast.core.job.NoopJobManager; import feast.core.job.NoopJobMonitor; +import feast.core.job.Runner; import feast.core.job.StatsdMetricPusher; import feast.core.job.dataflow.DataflowJobConfig; import feast.core.job.dataflow.DataflowJobManager; import feast.core.job.dataflow.DataflowJobMonitor; +import feast.core.job.direct.DirectRunnerJobManager; +import feast.core.job.flink.FlinkJobConfig; +import feast.core.job.flink.FlinkJobManager; +import feast.core.job.flink.FlinkJobMonitor; +import feast.core.job.flink.FlinkRestApi; import java.io.IOException; import java.security.GeneralSecurityException; +import java.util.List; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.cli.CustomCommandLine; +import org.apache.flink.configuration.GlobalConfiguration; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.web.client.RestTemplate; -/** - * Beans for job management - */ +/** Beans for job management */ @Slf4j @Configuration public class JobConfig { /** * Get configuration for dataflow connection + * * @param projectId * @param location * @return DataflowJobConfig @@ -59,89 +68,133 @@ public DataflowJobConfig getDataflowJobConfig( return new DataflowJobConfig(projectId, location); } + @Bean + public FlinkJobConfig getFlinkJobConfig( + @Value("${feast.jobs.flink.configDir}") String flinkConfigDir, + @Value("${feast.jobs.flink.masterUrl}") String flinkMasterUrl) { + return new FlinkJobConfig(flinkMasterUrl, flinkConfigDir); + } + /** * Get a JobManager according to the runner type and dataflow configuration. + * * @param runnerType runner type: one of [DataflowRunner, DirectRunner, FlinkRunner] * @param dfConfig dataflow job configuration * @return JobManager */ @Bean - public JobManager getJobManager(@Value("${feast.jobs.runner}") String runnerType, - DataflowJobConfig dfConfig) { - if ("DataflowRunner".equals(runnerType)) { - if (Strings.isNullOrEmpty(dfConfig.getLocation()) || - Strings.isNullOrEmpty(dfConfig.getProjectId())) { - log.error("Project and location of the Dataflow runner is not configured"); - throw new IllegalStateException( - "Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner."); - } - try { - GoogleCredential credential = GoogleCredential.getApplicationDefault() - .createScoped(DataflowScopes.all()); - Dataflow dataflow = new Dataflow(GoogleNetHttpTransport.newTrustedTransport(), - JacksonFactory.getDefaultInstance(), credential); + public JobManager getJobManager( + @Value("${feast.jobs.runner}") String runnerType, + DataflowJobConfig dfConfig, + FlinkJobConfig flinkConfig, + ImportJobDefaults defaults) + throws Exception { + + Runner runner = Runner.fromString(runnerType); + + switch (runner) { + case DATAFLOW: + if (Strings.isNullOrEmpty(dfConfig.getLocation()) + || Strings.isNullOrEmpty(dfConfig.getProjectId())) { + log.error("Project and location of the Dataflow runner is not configured"); + throw new IllegalStateException( + "Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner."); + } + try { + GoogleCredential credential = + GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all()); + Dataflow dataflow = + new Dataflow( + GoogleNetHttpTransport.newTrustedTransport(), + JacksonFactory.getDefaultInstance(), + credential); - return new DataflowJobManager(dataflow, - dfConfig.getProjectId(), - dfConfig.getLocation()); - } catch (IOException e) { - throw new IllegalStateException( - "Unable to find credential required for Dataflow monitoring API", e); - } catch (GeneralSecurityException e) { - throw new IllegalStateException("Security exception while connecting to Dataflow API", e); - } catch (Exception e) { - throw new IllegalStateException("Unable to initialize DataflowJobManager", e); - } + return new DataflowJobManager( + dataflow, dfConfig.getProjectId(), dfConfig.getLocation(), defaults); + } catch (IOException e) { + throw new IllegalStateException( + "Unable to find credential required for Dataflow monitoring API", e); + } catch (GeneralSecurityException e) { + throw new IllegalStateException("Security exception while connecting to Dataflow API", e); + } catch (Exception e) { + throw new IllegalStateException("Unable to initialize DataflowJobManager", e); + } + case FLINK: + org.apache.flink.configuration.Configuration configuration = + GlobalConfiguration.loadConfiguration(flinkConfig.getConfigDir()); + List> customCommandLines = + CliFrontend.loadCustomCommandLines(configuration, flinkConfig.getConfigDir()); + CliFrontend flinkCli = new CliFrontend(configuration, customCommandLines); + FlinkRestApi flinkRestApi = + new FlinkRestApi(new RestTemplate(), flinkConfig.getMasterUrl()); + return new FlinkJobManager(flinkCli, flinkConfig, flinkRestApi, defaults); + case DIRECT: + return new DirectRunnerJobManager(defaults); + default: + throw new IllegalArgumentException("Unsupported runner: " + runnerType); } - return new NoopJobManager(); } /** * Get a Job Monitor given the runner type and dataflow configuration. + * * @param runnerType runner type: one of [DataflowRunner, DirectRunner, FlinkRunner] * @param dfConfig dataflow job configuration * @return JobMonitor */ @Bean - public JobMonitor getJobMonitor(@Value("${feast.jobs.runner}") String runnerType, - DataflowJobConfig dfConfig) { - if ("DataflowRunner".equals(runnerType)) { - if (Strings.isNullOrEmpty(dfConfig.getLocation()) || - Strings.isNullOrEmpty(dfConfig.getProjectId())) { - log.warn( - "Project and location of the Dataflow runner is not configured, will not do job monitoring"); - return new NoopJobMonitor(); - } - try { - GoogleCredential credential = GoogleCredential.getApplicationDefault() - .createScoped(DataflowScopes.all()); - Dataflow dataflow = new Dataflow(GoogleNetHttpTransport.newTrustedTransport(), - JacksonFactory.getDefaultInstance(), credential); + public JobMonitor getJobMonitor( + @Value("${feast.jobs.runner}") String runnerType, + DataflowJobConfig dfConfig, + FlinkJobConfig flinkJobConfig) + throws Exception { - return new DataflowJobMonitor(dataflow, - dfConfig.getProjectId(), - dfConfig.getLocation()); - } catch (IOException e) { - log.error("Unable to find credential required for Dataflow monitoring API: {}", - e.getMessage()); - } catch (GeneralSecurityException e) { - log.error("Security exception while "); - } catch (Exception e) { - log.error("Unable to initialize DataflowJobMonitor", e); - } - } + Runner runner = Runner.fromString(runnerType); + + switch (runner) { + case DATAFLOW: + if (Strings.isNullOrEmpty(dfConfig.getLocation()) + || Strings.isNullOrEmpty(dfConfig.getProjectId())) { + log.warn( + "Project and location of the Dataflow runner is not configured, will not do job monitoring"); + return new NoopJobMonitor(); + } + try { + GoogleCredential credential = + GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all()); + Dataflow dataflow = + new Dataflow( + GoogleNetHttpTransport.newTrustedTransport(), + JacksonFactory.getDefaultInstance(), + credential); - // Default to no monitoring - return new NoopJobMonitor(); + return new DataflowJobMonitor(dataflow, dfConfig.getProjectId(), dfConfig.getLocation()); + } catch (IOException e) { + log.error( + "Unable to find credential required for Dataflow monitoring API: {}", e.getMessage()); + } catch (GeneralSecurityException e) { + log.error("Security exception while "); + } catch (Exception e) { + log.error("Unable to initialize DataflowJobMonitor", e); + } + case FLINK: + FlinkRestApi flinkRestApi = + new FlinkRestApi(new RestTemplate(), flinkJobConfig.getMasterUrl()); + return new FlinkJobMonitor(flinkRestApi); + case DIRECT: + default: + return new NoopJobMonitor(); + } } /** * Get metrics pusher to statsd + * * @param statsDClient * @return StatsdMetricPusher */ @Bean - public StatsdMetricPusher getStatsdMetricPusher(StatsDClient statsDClient){ + public StatsdMetricPusher getStatsdMetricPusher(StatsDClient statsDClient) { return new StatsdMetricPusher(statsDClient); } } diff --git a/core/src/main/java/feast/core/grpc/JobServiceImpl.java b/core/src/main/java/feast/core/grpc/JobServiceImpl.java index 43d1cc4c67..47b76061b0 100644 --- a/core/src/main/java/feast/core/grpc/JobServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/JobServiceImpl.java @@ -19,32 +19,36 @@ import com.google.protobuf.Empty; import feast.core.JobServiceGrpc; -import feast.core.JobServiceProto.JobServiceTypes.*; +import feast.core.JobServiceProto.JobServiceTypes.AbortJobRequest; +import feast.core.JobServiceProto.JobServiceTypes.AbortJobResponse; +import feast.core.JobServiceProto.JobServiceTypes.GetJobRequest; +import feast.core.JobServiceProto.JobServiceTypes.GetJobResponse; +import feast.core.JobServiceProto.JobServiceTypes.JobDetail; +import feast.core.JobServiceProto.JobServiceTypes.ListJobsResponse; +import feast.core.JobServiceProto.JobServiceTypes.SubmitImportJobRequest; +import feast.core.JobServiceProto.JobServiceTypes.SubmitImportJobResponse; import feast.core.exception.JobExecutionException; -import feast.core.service.JobExecutionService; import feast.core.service.JobManagementService; import feast.core.validators.SpecValidator; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.lognet.springboot.grpc.GRpcService; import org.springframework.beans.factory.annotation.Autowired; -import java.util.List; - /** Implementation of the feast job GRPC service. */ @Slf4j @GRpcService public class JobServiceImpl extends JobServiceGrpc.JobServiceImplBase { - @Autowired private JobExecutionService jobExecutionService; - @Autowired private JobManagementService jobManagementService; @Autowired private SpecValidator validator; /** * submit a job to the runner by providing an import spec. + * * @param request ImportJobRequest object containing an import spec * @param responseObserver */ @@ -53,8 +57,9 @@ public void submitJob( SubmitImportJobRequest request, StreamObserver responseObserver) { try { validator.validateImportSpec(request.getImportSpec()); + String jobID = jobManagementService.submitJob(request.getImportSpec(), request.getName()); SubmitImportJobResponse response = - jobExecutionService.submitJob(request.getImportSpec(), request.getName()); + SubmitImportJobResponse.newBuilder().setJobId(jobID).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } catch (IllegalArgumentException e) { @@ -68,6 +73,7 @@ public void submitJob( /** * Abort a job given its feast-internal job id + * * @param request AbortJobRequest object containing feast job id * @param responseObserver */ @@ -86,6 +92,7 @@ public void abortJob(AbortJobRequest request, StreamObserver r /** * List all jobs previously submitted to the system. + * * @param request Empty request * @param responseObserver */ @@ -104,6 +111,7 @@ public void listJobs(Empty request, StreamObserver responseObs /** * Get a single job previously submitted to the system by id + * * @param request GetJobRequest object containing a feast-internal job id * @param responseObserver */ diff --git a/core/src/main/java/feast/core/job/JobManager.java b/core/src/main/java/feast/core/job/JobManager.java index 94ac132b13..7dd098debe 100644 --- a/core/src/main/java/feast/core/job/JobManager.java +++ b/core/src/main/java/feast/core/job/JobManager.java @@ -17,7 +17,18 @@ package feast.core.job; +import feast.specs.ImportSpecProto.ImportSpec; + public interface JobManager { + + /** + * Submit an ingestion job into runner + * @param importSpec import spec of the ingestion job to run + * @param jobName name of the job + * @return extId runner specific job ID. + */ + String submitJob(ImportSpec importSpec, String jobName); + /** * abort a job given runner-specific job ID. * diff --git a/core/src/main/java/feast/core/job/JobMonitor.java b/core/src/main/java/feast/core/job/JobMonitor.java index 42d1ed9faa..723bce6353 100644 --- a/core/src/main/java/feast/core/job/JobMonitor.java +++ b/core/src/main/java/feast/core/job/JobMonitor.java @@ -27,10 +27,10 @@ public interface JobMonitor { /** * Get status of a job given runner-specific job ID. * - * @param runnerJobId runner specific job id. + * @param job job. * @return job status. */ - JobStatus getJobStatus(String runnerJobId); + JobStatus getJobStatus(JobInfo job); /** * Get metrics of a job. diff --git a/core/src/main/java/feast/core/job/NoopJobMonitor.java b/core/src/main/java/feast/core/job/NoopJobMonitor.java index 820ba0a2a8..16d399a6f7 100644 --- a/core/src/main/java/feast/core/job/NoopJobMonitor.java +++ b/core/src/main/java/feast/core/job/NoopJobMonitor.java @@ -26,7 +26,7 @@ public class NoopJobMonitor implements JobMonitor { @Override - public JobStatus getJobStatus(String runnerJobId) { + public JobStatus getJobStatus(JobInfo job) { return JobStatus.UNKNOWN; } diff --git a/core/src/main/java/feast/core/job/Runner.java b/core/src/main/java/feast/core/job/Runner.java new file mode 100644 index 0000000000..ede26ce863 --- /dev/null +++ b/core/src/main/java/feast/core/job/Runner.java @@ -0,0 +1,26 @@ +package feast.core.job; + +public enum Runner { + DATAFLOW("DataflowRunner"), + FLINK("FlinkRunner"), + DIRECT("DirectRunner"); + + private final String name; + + Runner(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static Runner fromString(String runner) { + for(Runner r: Runner.values()) { + if (r.getName().equals(runner)) { + return r; + } + } + throw new IllegalArgumentException("Unknown value: " + runner); + } +} diff --git a/core/src/main/java/feast/core/job/ScheduledJobMonitor.java b/core/src/main/java/feast/core/job/ScheduledJobMonitor.java index f8b5af16d0..426ee7bba2 100644 --- a/core/src/main/java/feast/core/job/ScheduledJobMonitor.java +++ b/core/src/main/java/feast/core/job/ScheduledJobMonitor.java @@ -73,7 +73,7 @@ public void pollStatusAndMetrics() { if (Strings.isNullOrEmpty(jobId)) { continue; } - JobStatus jobStatus = jobMonitor.getJobStatus(jobId); + JobStatus jobStatus = jobMonitor.getJobStatus(job); if (job.getStatus() != jobStatus) { AuditLogger.log( Resource.JOB, @@ -84,9 +84,8 @@ public void pollStatusAndMetrics() { jobStatus); } job.setStatus(jobStatus); + jobInfoRepository.save(job); } - - jobInfoRepository.saveAll(nonTerminalJobs); } /** Periodically pull metrics of job which is not in terminal state and push it to statsd. */ @@ -103,9 +102,13 @@ public void pollStatusAndMetrics() { continue; } List metrics = jobMonitor.getJobMetrics(job); + if (metrics == null) { + continue; + } + job.setMetrics(metrics); statsdMetricPusher.pushMetrics(metrics); + jobInfoRepository.save(job); } - jobInfoRepository.saveAll(nonTerminalJobs); } } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 0c92abe76a..61fb5d87b5 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -17,27 +17,38 @@ package feast.core.job.dataflow; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.Job; import com.google.common.base.Strings; -import feast.core.job.JobManager; +import feast.core.config.ImportJobDefaults; +import feast.core.job.direct.DirectRunnerJobManager; +import feast.specs.ImportSpecProto.ImportSpec; import lombok.extern.slf4j.Slf4j; -import static com.google.common.base.Preconditions.checkNotNull; - @Slf4j -public class DataflowJobManager implements JobManager { +public class DataflowJobManager extends DirectRunnerJobManager { private final String projectId; private final String location; private final Dataflow dataflow; + private ImportJobDefaults defaults; - public DataflowJobManager(Dataflow dataflow, String projectId, String location) { + public DataflowJobManager( + Dataflow dataflow, String projectId, String location, ImportJobDefaults importJobDefaults) { + super(importJobDefaults); checkNotNull(projectId); checkNotNull(location); this.projectId = projectId; this.location = location; this.dataflow = dataflow; + this.defaults = importJobDefaults; + } + + @Override + public String submitJob(ImportSpec importSpec, String jobId) { + return super.submitJob(importSpec, jobId); } @Override @@ -46,9 +57,9 @@ public void abortJob(String dataflowJobId) { Job job = dataflow.projects().locations().jobs().get(projectId, location, dataflowJobId).execute(); Job content = new Job(); - if (job.getType().equals("JOB_TYPE_BATCH")){ + if (job.getType().equals(DataflowJobType.JOB_TYPE_BATCH.toString())) { content.setRequestedState(DataflowJobState.JOB_STATE_CANCELLED.toString()); - } else if (job.getType().equals("JOB_TYPE_STREAMING")) { + } else if (job.getType().equals(DataflowJobType.JOB_TYPE_STREAMING.toString())) { content.setRequestedState(DataflowJobState.JOB_STATE_DRAINING.toString()); } dataflow diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobMonitor.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobMonitor.java index e0f8035630..90252c3862 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobMonitor.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobMonitor.java @@ -24,6 +24,7 @@ import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; import feast.core.job.JobMonitor; +import feast.core.job.Runner; import feast.core.model.JobInfo; import feast.core.model.JobStatus; import feast.core.model.Metrics; @@ -55,17 +56,21 @@ public DataflowJobMonitor(Dataflow dataflow, String projectId, String location) /** * Get status of a dataflow job with given id and try to map it into Feast's JobStatus. * - * @param dataflowJobId dataflow job id. + * @param jobInfo dataflow job id. * @return status of the job, or return {@link JobStatus#UNKNOWN} if error happens. */ - public JobStatus getJobStatus(String dataflowJobId) { + public JobStatus getJobStatus(JobInfo jobInfo) { + if (!Runner.DATAFLOW.getName().equals(jobInfo.getRunner())) { + return jobInfo.getStatus(); + } + try { - Job job = dataflow.projects().locations().jobs().get(projectId, location, dataflowJobId) + Job job = dataflow.projects().locations().jobs().get(projectId, location, jobInfo.getExtId()) .execute(); return jobStateMaper.map(job.getCurrentState()); } catch (Exception e) { log.error("Unable to retrieve status of a dataflow job with id : {}\ncause: {}", - dataflowJobId, e.getMessage()); + jobInfo.getExtId(), e.getMessage()); } return JobStatus.UNKNOWN; } @@ -78,6 +83,10 @@ public JobStatus getJobStatus(String dataflowJobId) { * @return list of feast-related metrics. Or return an empty list if error happens. */ public List getJobMetrics(JobInfo job) { + if (!Runner.DATAFLOW.getName().equals(job.getRunner())) { + return null; + } + String dataflowJobId = job.getExtId(); try { JobMetrics jobMetrics = dataflow.projects().locations().jobs() diff --git a/core/src/main/java/feast/core/job/NoopJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobType.java similarity index 73% rename from core/src/main/java/feast/core/job/NoopJobManager.java rename to core/src/main/java/feast/core/job/dataflow/DataflowJobType.java index 4075969667..fc3edfd42f 100644 --- a/core/src/main/java/feast/core/job/NoopJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobType.java @@ -15,13 +15,9 @@ * */ -package feast.core.job; +package feast.core.job.dataflow; -/* - * Job manager that does nothing. Direct runner jobs cannot be - * cancelled or stopped. - */ -public class NoopJobManager implements JobManager { - @Override - public void abortJob(String extId) {} +public enum DataflowJobType { + JOB_TYPE_BATCH, + JOB_TYPE_STREAMING } diff --git a/core/src/main/java/feast/core/service/JobExecutionService.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java similarity index 53% rename from core/src/main/java/feast/core/service/JobExecutionService.java rename to core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index d734a8aae8..38acbecb5a 100644 --- a/core/src/main/java/feast/core/service/JobExecutionService.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -15,22 +15,16 @@ * */ -package feast.core.service; +package feast.core.job.direct; -import feast.core.JobServiceProto.JobServiceTypes.SubmitImportJobResponse; +import com.google.common.annotations.VisibleForTesting; import feast.core.config.ImportJobDefaults; -import feast.core.dao.JobInfoRepository; import feast.core.exception.JobExecutionException; -import feast.core.log.Action; -import feast.core.log.AuditLogger; -import feast.core.log.Resource; -import feast.core.model.JobInfo; -import feast.core.model.JobStatus; +import feast.core.job.JobManager; import feast.core.util.TypeConversion; import feast.specs.ImportSpecProto.ImportSpec; import java.io.BufferedReader; import java.io.InputStreamReader; -import java.time.Instant; import java.util.ArrayList; import java.util.Base64; import java.util.List; @@ -38,103 +32,45 @@ import java.util.Optional; import java.util.regex.Pattern; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; @Slf4j -@Service -public class JobExecutionService { +public class DirectRunnerJobManager implements JobManager { - public static final String JOB_PREFIX_DEFAULT = "feastimport"; private static final int SLEEP_MS = 10; private static final Pattern JOB_EXT_ID_PREFIX_REGEX = Pattern.compile(".*FeastImportJobId:.*"); - private JobInfoRepository jobInfoRepository; - private ImportJobDefaults defaults; + protected ImportJobDefaults defaults; - @Autowired - public JobExecutionService(JobInfoRepository jobInfoRepository, ImportJobDefaults defaults) { - this.jobInfoRepository = jobInfoRepository; - this.defaults = defaults; + public DirectRunnerJobManager(ImportJobDefaults importJobDefaults) { + this.defaults = importJobDefaults; } - /** - * Submits a job defined by the importSpec to the runner and writes details about the job to the - * core database. - * - * @param importSpec job import spec - * @param jobPrefix prefix for job name - * @return response with feast-internal job id - */ - public SubmitImportJobResponse submitJob(ImportSpec importSpec, String jobPrefix) { - String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); - String jobId = jobPrefix.isEmpty() ? JOB_PREFIX_DEFAULT + dateSuffix : jobPrefix + dateSuffix; + @Override + public String submitJob(ImportSpec importSpec, String jobId) { ProcessBuilder pb = getProcessBuilder(importSpec, jobId); log.info(String.format("Executing command: %s", String.join(" ", pb.command()))); - AuditLogger.log( - Resource.JOB, - jobId, - Action.SUBMIT, - "Building graph and submitting to %s", - defaults.getRunner()); + try { - JobInfo jobInfo = new JobInfo(jobId, "", defaults.getRunner(), importSpec, JobStatus.PENDING); - jobInfoRepository.saveAndFlush(jobInfo); Process p = pb.start(); - String jobExtId = runProcess(p); - if (jobExtId.isEmpty()) { - throw new RuntimeException( - String.format("Could not submit job: \n%s", "unable to retrieve job external id")); - } - updateJobExtId(jobId, jobExtId); - AuditLogger.log( - Resource.JOB, - jobId, - Action.STATUS_CHANGE, - "Job submitted to runner %s with runner id %s.", - defaults.getRunner(), - jobExtId); - return SubmitImportJobResponse.newBuilder().setJobId(jobId).build(); + return runProcess(p); } catch (Exception e) { - updateJobStatus(jobId, JobStatus.ERROR); - AuditLogger.log( - Resource.JOB, - jobId, - Action.STATUS_CHANGE, - "Job failed to be submitted to runner %s. Job status changed to ERROR.", - defaults.getRunner()); + log.error("Error submitting job", e); throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e); } } - /** - * Update a given job's status - */ - public void updateJobStatus(String jobId, JobStatus status) { - Optional jobRecordOptional = jobInfoRepository.findById(jobId); - if (jobRecordOptional.isPresent()) { - JobInfo jobRecord = jobRecordOptional.get(); - jobRecord.setStatus(status); - jobInfoRepository.saveAndFlush(jobRecord); - } - } - - /** - * Update a given job's external id - */ - public void updateJobExtId(String jobId, String jobExtId) { - Optional jobRecordOptional = jobInfoRepository.findById(jobId); - if (jobRecordOptional.isPresent()) { - JobInfo jobRecord = jobRecordOptional.get(); - jobRecord.setExtId(jobExtId); - jobInfoRepository.saveAndFlush(jobRecord); - } + @Override + public void abortJob(String extId) { + throw new UnsupportedOperationException("Unable to abort a job running in direct runner"); } /** * Builds the command to execute the ingestion job * + * @param importSpec + * @param jobId * @return configured ProcessBuilder */ + @VisibleForTesting public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) { Map options = TypeConversion.convertJsonStringToMap(defaults.getImportJobOptions()); @@ -149,23 +85,21 @@ public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) { commands.add(option("coreApiUri", defaults.getCoreApiUri())); commands.add(option("errorsStoreType", defaults.getErrorsStoreType())); commands.add(option("errorsStoreOptions", defaults.getErrorsStoreOptions())); + options.forEach((k, v) -> commands.add(option(k, v))); return new ProcessBuilder(commands); } - private String option(String key, String value) { - return String.format("--%s=%s", key, value); - } - /** * Run the given process and extract the job id from the output logs * * @param p Process * @return job id */ + @VisibleForTesting public String runProcess(Process p) { try (BufferedReader outputStream = - new BufferedReader(new InputStreamReader(p.getInputStream())); + new BufferedReader(new InputStreamReader(p.getInputStream())); BufferedReader errorsStream = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { String extId = ""; @@ -189,4 +123,8 @@ public String runProcess(Process p) { throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e); } } + + private String option(String key, String value) { + return String.format("--%s=%s", key, value); + } } diff --git a/core/src/main/java/feast/core/job/flink/FlinkJob.java b/core/src/main/java/feast/core/job/flink/FlinkJob.java new file mode 100644 index 0000000000..dc514c6606 --- /dev/null +++ b/core/src/main/java/feast/core/job/flink/FlinkJob.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.core.job.flink; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@NoArgsConstructor +@AllArgsConstructor +@Setter +@Getter +@JsonIgnoreProperties(ignoreUnknown = true) +public class FlinkJob { + + /** job ID */ + String jid; + + /** job name */ + String name; + + /** state */ + String state; +} diff --git a/core/src/main/java/feast/core/job/flink/FlinkJobConfig.java b/core/src/main/java/feast/core/job/flink/FlinkJobConfig.java new file mode 100644 index 0000000000..f26b2c4be0 --- /dev/null +++ b/core/src/main/java/feast/core/job/flink/FlinkJobConfig.java @@ -0,0 +1,36 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.core.job.flink; + +import lombok.Value; + +@Value +public class FlinkJobConfig { + + /** + * Flink's job master URL + * e.g: localhost:8081 + */ + String masterUrl; + + /** + * Directory containing flink-conf.yaml + * e.g.: /etc/flink/conf + */ + String configDir; +} diff --git a/core/src/main/java/feast/core/job/flink/FlinkJobList.java b/core/src/main/java/feast/core/job/flink/FlinkJobList.java new file mode 100644 index 0000000000..af9776d643 --- /dev/null +++ b/core/src/main/java/feast/core/job/flink/FlinkJobList.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.core.job.flink; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.util.List; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@NoArgsConstructor +@Setter +@Getter +@JsonIgnoreProperties(ignoreUnknown = true) +public class FlinkJobList { + + /** List of flink job. */ + List jobs; +} diff --git a/core/src/main/java/feast/core/job/flink/FlinkJobManager.java b/core/src/main/java/feast/core/job/flink/FlinkJobManager.java new file mode 100644 index 0000000000..1ec16ec17d --- /dev/null +++ b/core/src/main/java/feast/core/job/flink/FlinkJobManager.java @@ -0,0 +1,106 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.core.job.flink; + +import feast.core.config.ImportJobDefaults; +import feast.core.job.JobManager; +import feast.core.util.TypeConversion; +import feast.specs.ImportSpecProto.ImportSpec; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.client.cli.CliFrontend; + +@Slf4j +public class FlinkJobManager implements JobManager { + + private final CliFrontend flinkCli; + private final ImportJobDefaults defaults; + private final String masterUrl; + private final FlinkRestApi flinkRestApis; + + public FlinkJobManager( + CliFrontend flinkCli, + FlinkJobConfig config, + FlinkRestApi flinkRestApi, + ImportJobDefaults defaults) { + this.flinkCli = flinkCli; + this.defaults = defaults; + this.masterUrl = config.getMasterUrl(); + this.flinkRestApis = flinkRestApi; + } + + @Override + public String submitJob(ImportSpec importSpec, String jobId) { + flinkCli.parseParameters(createRunArgs(importSpec, jobId)); + + return getFlinkJobId(jobId); + } + + @Override + public void abortJob(String extId) { + flinkCli.parseParameters(createStopArgs(extId)); + } + + private String getFlinkJobId(String jobId) { + FlinkJobList jobList = flinkRestApis.getJobsOverview(); + for (FlinkJob job : jobList.getJobs()) { + if (jobId.equals(job.getName())) { + return job.getJid(); + } + } + log.warn("Unable to find job: {}", jobId); + return ""; + } + + private String[] createRunArgs(ImportSpec importSpec, String jobId) { + Map options = + TypeConversion.convertJsonStringToMap(defaults.getImportJobOptions()); + List commands = new ArrayList<>(); + commands.add("run"); + commands.add("-d"); + commands.add("-m"); + commands.add(masterUrl); + commands.add(defaults.getExecutable()); + commands.add(option("jobName", jobId)); + commands.add(option("runner", defaults.getRunner())); + commands.add( + option("importSpecBase64", Base64.getEncoder().encodeToString(importSpec.toByteArray()))); + commands.add(option("coreApiUri", defaults.getCoreApiUri())); + commands.add(option("errorsStoreType", defaults.getErrorsStoreType())); + commands.add(option("errorsStoreOptions", defaults.getErrorsStoreOptions())); + + options.forEach((k, v) -> commands.add(option(k, v))); + return commands.toArray(new String[] {}); + } + + private String[] createStopArgs(String extId) { + List commands = new ArrayList<>(); + commands.add("cancel"); + commands.add("-m"); + commands.add(masterUrl); + commands.add(extId); + return commands.toArray(new String[] {}); + } + + private String option(String key, String value) { + return String.format("--%s=%s", key, value); + } +} diff --git a/core/src/main/java/feast/core/job/flink/FlinkJobMapper.java b/core/src/main/java/feast/core/job/flink/FlinkJobMapper.java new file mode 100644 index 0000000000..5bbcac29ef --- /dev/null +++ b/core/src/main/java/feast/core/job/flink/FlinkJobMapper.java @@ -0,0 +1,56 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.core.job.flink; + +import feast.core.model.JobStatus; +import java.util.HashMap; +import java.util.Map; + +public class FlinkJobMapper { + private static final Map FLINK_TO_FEAST_JOB_STATE_MAP; + + static { + FLINK_TO_FEAST_JOB_STATE_MAP = new HashMap<>(); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.CREATED, JobStatus.PENDING); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.RUNNING, JobStatus.RUNNING); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.FINISHED, JobStatus.COMPLETED); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.RESTARTING, JobStatus.RUNNING); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.CANCELLING, JobStatus.ABORTING); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.CANCELED, JobStatus.ABORTED); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.FAILING, JobStatus.ERROR); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.FAILED, JobStatus.ERROR); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.SUSPENDING, JobStatus.SUSPENDING); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.SUSPENDED, JobStatus.SUSPENDED); + FLINK_TO_FEAST_JOB_STATE_MAP.put(FlinkJobState.RECONCILING, JobStatus.PENDING); + } + + /** + * Map a string containing Flink's JobState into Feast's JobStatus + * + * @param jobState Flink JobState + * @return JobStatus. + * @throws IllegalArgumentException if jobState is invalid. + */ + public JobStatus map(String jobState) { + FlinkJobState dfJobState = FlinkJobState.valueOf(jobState); + if (FLINK_TO_FEAST_JOB_STATE_MAP.containsKey(dfJobState)) { + return FLINK_TO_FEAST_JOB_STATE_MAP.get(dfJobState); + } + throw new IllegalArgumentException("Unknown job state: " + jobState); + } +} diff --git a/core/src/main/java/feast/core/job/flink/FlinkJobMonitor.java b/core/src/main/java/feast/core/job/flink/FlinkJobMonitor.java new file mode 100644 index 0000000000..0ba6b56124 --- /dev/null +++ b/core/src/main/java/feast/core/job/flink/FlinkJobMonitor.java @@ -0,0 +1,72 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.core.job.flink; + +import feast.core.job.JobMonitor; +import feast.core.job.Runner; +import feast.core.model.JobInfo; +import feast.core.model.JobStatus; +import feast.core.model.Metrics; +import java.util.Collections; +import java.util.List; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class FlinkJobMonitor implements JobMonitor { + + private final FlinkRestApi flinkRestApi; + private final FlinkJobMapper mapper; + + public FlinkJobMonitor(FlinkRestApi flinkRestApi) { + this.flinkRestApi = flinkRestApi; + this.mapper = new FlinkJobMapper(); + } + + @Override + public JobStatus getJobStatus(JobInfo jobInfo) { + if (!Runner.FLINK.getName().equals(jobInfo.getRunner())) { + return jobInfo.getStatus(); + } + + FlinkJobList jobList = flinkRestApi.getJobsOverview(); + for (FlinkJob job : jobList.getJobs()) { + if (jobInfo.getExtId().equals(job.getJid())) { + return mapFlinkJobStatusToFeastJobStatus(job.getState()); + } + } + return JobStatus.UNKNOWN; + } + + @Override + public List getJobMetrics(JobInfo job) { + if (!Runner.FLINK.getName().equals(job.getRunner())) { + return null; + } + // TODO: metrics for flink + return Collections.emptyList(); + } + + private JobStatus mapFlinkJobStatusToFeastJobStatus(String state) { + try { + return mapper.map(state); + } catch (IllegalArgumentException e) { + log.error("Unknown job state: " + state); + return JobStatus.UNKNOWN; + } + } +} diff --git a/core/src/main/java/feast/core/job/flink/FlinkJobState.java b/core/src/main/java/feast/core/job/flink/FlinkJobState.java new file mode 100644 index 0000000000..61b75501cc --- /dev/null +++ b/core/src/main/java/feast/core/job/flink/FlinkJobState.java @@ -0,0 +1,57 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.core.job.flink; + +/** + * Possible state of flink's job. + */ +public enum FlinkJobState { + + /** Job is newly created */ + CREATED, + + /** Job is running */ + RUNNING, + + /** job is completed successfully */ + FINISHED, + + /** job is reset and restarting */ + RESTARTING, + + /** job is being canceled */ + CANCELLING, + + /** job has ben cancelled */ + CANCELED, + + /** job has failed and waiting for cleanup */ + FAILING, + + /** job has failed with a non-recoverable failure */ + FAILED, + + /** job has been suspended and waiting for cleanup */ + SUSPENDING, + + /** job has been suspended */ + SUSPENDED, + + /** job is reconciling and waits for task execution to recover state */ + RECONCILING +} diff --git a/core/src/main/java/feast/core/job/flink/FlinkRestApi.java b/core/src/main/java/feast/core/job/flink/FlinkRestApi.java new file mode 100644 index 0000000000..e6e7c6b6ce --- /dev/null +++ b/core/src/main/java/feast/core/job/flink/FlinkRestApi.java @@ -0,0 +1,53 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.core.job.flink; + +import java.net.URI; +import java.util.Collections; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.client.RestTemplate; + +@Slf4j +public class FlinkRestApi { + + private static final String SCHEME = "http"; + private static final String JOB_OVERVIEW_PATH = "jobs/overview"; + private final RestTemplate restTemplate; + private final URI jobsOverviewUri; + + public FlinkRestApi(RestTemplate restTemplate, String masterUrl) throws Exception { + this.restTemplate = restTemplate; + this.jobsOverviewUri = + new URI(String.format("%s://%s/%s", SCHEME, masterUrl, JOB_OVERVIEW_PATH)); + } + + public FlinkJobList getJobsOverview() { + try { + FlinkJobList jobList = restTemplate.getForObject(jobsOverviewUri, FlinkJobList.class); + if (jobList == null || jobList.getJobs() == null) { + jobList.setJobs(Collections.emptyList()); + } + return jobList; + } catch (Exception e) { + log.error("Unable to get job overview from {}: ", jobsOverviewUri, e); + FlinkJobList flinkJobList = new FlinkJobList(); + flinkJobList.setJobs(Collections.emptyList()); + return flinkJobList; + } + } +} diff --git a/core/src/main/java/feast/core/model/JobStatus.java b/core/src/main/java/feast/core/model/JobStatus.java index 26703199c0..04a5d56c51 100644 --- a/core/src/main/java/feast/core/model/JobStatus.java +++ b/core/src/main/java/feast/core/model/JobStatus.java @@ -22,52 +22,47 @@ import java.util.Collections; public enum JobStatus { - /** - * Job status is not known. - */ + /** Job status is not known. */ UNKNOWN, - /** - * Import job is submitted to runner and currently pending for executing - */ + /** Import job is submitted to runner and currently pending for executing */ PENDING, - /** - * Import job is currently running in the runner - */ + /** Import job is currently running in the runner */ RUNNING, - /** - * Runner’s reported the import job has completed (applicable to batch job) - */ + /** Runner’s reported the import job has completed (applicable to batch job) */ COMPLETED, - /** - * When user sent abort command, but it's still running - */ + /** When user sent abort command, but it's still running */ ABORTING, - /** - * User initiated abort job - */ + /** User initiated abort job */ ABORTED, /** - * Runner’s reported that the import job failed to run or there is a failure during job submission. + * Runner’s reported that the import job failed to run or there is a failure during job + * submission. */ - ERROR; + ERROR, + + /** job has been suspended and waiting for cleanup */ + SUSPENDING, + + /** job has been suspended */ + SUSPENDED; - private static final Collection TERMINAL_STATE = Collections.unmodifiableList( - Arrays.asList(COMPLETED, ABORTED, ERROR)); + private static final Collection TERMINAL_STATE = + Collections.unmodifiableList(Arrays.asList(COMPLETED, ABORTED, ERROR)); /** * Get a collection of terminal job state. * - *

Terminal job state is final and will not change to any other state.

+ *

Terminal job state is final and will not change to any other state. * * @return collection of terminal job state. */ - public static Collection getTerminalState(){ + public static Collection getTerminalState() { return TERMINAL_STATE; } } diff --git a/core/src/main/java/feast/core/service/JobManagementService.java b/core/src/main/java/feast/core/service/JobManagementService.java index 01257e33a7..02fa40b280 100644 --- a/core/src/main/java/feast/core/service/JobManagementService.java +++ b/core/src/main/java/feast/core/service/JobManagementService.java @@ -19,39 +19,50 @@ import com.google.common.base.Strings; import feast.core.JobServiceProto.JobServiceTypes.JobDetail; +import feast.core.config.ImportJobDefaults; import feast.core.dao.JobInfoRepository; import feast.core.dao.MetricsRepository; +import feast.core.exception.JobExecutionException; import feast.core.exception.RetrievalException; import feast.core.job.JobManager; +import feast.core.job.Runner; import feast.core.log.Action; import feast.core.log.AuditLogger; import feast.core.log.Resource; import feast.core.model.JobInfo; import feast.core.model.JobStatus; import feast.core.model.Metrics; +import feast.specs.ImportSpecProto.ImportSpec; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - @Slf4j @Service public class JobManagementService { - @Autowired private JobInfoRepository jobInfoRepository; - @Autowired private MetricsRepository metricsRepository; - @Autowired private JobManager jobManager; + private static final String JOB_PREFIX_DEFAULT = "feastimport"; + private static final String UNKNOWN_EXT_JOB_ID = ""; + + private JobInfoRepository jobInfoRepository; + private MetricsRepository metricsRepository; + private JobManager jobManager; + private ImportJobDefaults defaults; + @Autowired public JobManagementService( JobInfoRepository jobInfoRepository, MetricsRepository metricsRepository, - JobManager jobManager) { + JobManager jobManager, + ImportJobDefaults defaults) { this.jobInfoRepository = jobInfoRepository; this.metricsRepository = metricsRepository; this.jobManager = jobManager; + this.defaults = defaults; } /** @@ -85,6 +96,64 @@ public JobDetail getJob(String id) { return jobDetailBuilder.build(); } + /** + * Submit ingestion job to runner. + * + * @param importSpec import spec of the ingestion job + * @param namePrefix name prefix of the ingestion job + * @return feast job ID. + */ + public String submitJob(ImportSpec importSpec, String namePrefix) { + String jobId = createJobId(namePrefix); + boolean isDirectRunner = Runner.DIRECT.getName().equals(defaults.getRunner()); + try { + if (!isDirectRunner) { + JobInfo jobInfo = + new JobInfo(jobId, UNKNOWN_EXT_JOB_ID, defaults.getRunner(), importSpec, JobStatus.PENDING); + jobInfoRepository.save(jobInfo); + } + + AuditLogger.log( + Resource.JOB, + jobId, + Action.SUBMIT, + "Building graph and submitting to %s", + defaults.getRunner()); + + String extId = jobManager.submitJob(importSpec, jobId); + if (extId.isEmpty()) { + throw new RuntimeException( + String.format("Could not submit job: \n%s", "unable to retrieve job external id")); + } + + AuditLogger.log( + Resource.JOB, + jobId, + Action.STATUS_CHANGE, + "Job submitted to runner %s with ext id %s.", + defaults.getRunner(), + extId); + + if (isDirectRunner) { + JobInfo jobInfo = + new JobInfo(jobId, extId, defaults.getRunner(), importSpec, JobStatus.COMPLETED); + jobInfoRepository.save(jobInfo); + } else { + updateJobExtId(jobId, extId); + } + return jobId; + } catch (Exception e) { + updateJobStatus(jobId, JobStatus.ERROR); + AuditLogger.log( + Resource.JOB, + jobId, + Action.STATUS_CHANGE, + "Job failed to be submitted to runner %s. Job status changed to ERROR.", + defaults.getRunner()); + throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e); + } + } + /** * Drain the given job. If this is successful, the job will start the draining process. When the * draining process is complete, the job will be cleaned up and removed. @@ -108,4 +177,39 @@ public void abortJob(String id) { AuditLogger.log(Resource.JOB, id, Action.ABORT, "Triggering draining of job"); jobInfoRepository.saveAndFlush(job); } + + /** + * Update a given job's status + * + * @param jobId + * @param status + */ + void updateJobStatus(String jobId, JobStatus status) { + Optional jobRecordOptional = jobInfoRepository.findById(jobId); + if (jobRecordOptional.isPresent()) { + JobInfo jobRecord = jobRecordOptional.get(); + jobRecord.setStatus(status); + jobInfoRepository.save(jobRecord); + } + } + + /** + * Update a given job's external id + * + * @param jobId + * @param jobExtId + */ + void updateJobExtId(String jobId, String jobExtId) { + Optional jobRecordOptional = jobInfoRepository.findById(jobId); + if (jobRecordOptional.isPresent()) { + JobInfo jobRecord = jobRecordOptional.get(); + jobRecord.setExtId(jobExtId); + jobInfoRepository.save(jobRecord); + } + } + + private String createJobId(String namePrefix) { + String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); + return namePrefix.isEmpty() ? JOB_PREFIX_DEFAULT + dateSuffix : namePrefix + dateSuffix; + } } diff --git a/core/src/main/resources/application.properties b/core/src/main/resources/application.properties index ac8fb3534f..ef95352d24 100644 --- a/core/src/main/resources/application.properties +++ b/core/src/main/resources/application.properties @@ -26,6 +26,9 @@ feast.jobs.errorsStoreOptions=${JOB_ERRORS_STORE_OPTIONS:{}} feast.jobs.dataflow.projectId = ${DATAFLOW_PROJECT_ID:} feast.jobs.dataflow.location = ${DATAFLOW_LOCATION:} +feast.jobs.flink.configDir = ${FLINK_CONF_DIR:/etc/flink/flink-1.5.5/conf} +feast.jobs.flink.masterUrl = ${FLINK_MASTER_URL:localhost:8081} + feast.jobs.monitor.period = ${JOB_MONITOR_PERIOD_MS:5000} feast.jobs.monitor.initialDelay = ${JOB_MONITOR_INITIAL_DELAY_MS:60000} diff --git a/core/src/test/java/feast/core/job/ScheduledJobMonitorTest.java b/core/src/test/java/feast/core/job/ScheduledJobMonitorTest.java index a7ee2bec70..d352044d16 100644 --- a/core/src/test/java/feast/core/job/ScheduledJobMonitorTest.java +++ b/core/src/test/java/feast/core/job/ScheduledJobMonitorTest.java @@ -20,10 +20,10 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.Lists; import feast.core.dao.JobInfoRepository; import feast.core.dao.MetricsRepository; import feast.core.model.JobInfo; @@ -39,22 +39,17 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; - public class ScheduledJobMonitorTest { ScheduledJobMonitor scheduledJobMonitor; - @Mock - JobMonitor jobMonitor; + @Mock JobMonitor jobMonitor; - @Mock - StatsdMetricPusher stasdMetricPusher; + @Mock StatsdMetricPusher stasdMetricPusher; - @Mock - JobInfoRepository jobInfoRepository; + @Mock JobInfoRepository jobInfoRepository; - @Mock - MetricsRepository metricsRepository; + @Mock MetricsRepository metricsRepository; @Before public void setUp() throws Exception { @@ -64,65 +59,71 @@ public void setUp() throws Exception { @Test public void getJobStatus_shouldUpdateJobInfoForRunningJob() { - JobInfo job = new JobInfo("jobId", "extId1", "Streaming", "DataflowRunner", "", - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - JobStatus.RUNNING, - ""); - - when(jobInfoRepository.findByStatusNotIn((Collection)any(Collection.class))).thenReturn( - Arrays.asList(job)); - when(jobMonitor.getJobStatus("extId1")).thenReturn(JobStatus.COMPLETED); + JobInfo job = + new JobInfo( + "jobId", + "extId1", + "Streaming", + "DataflowRunner", + "", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + JobStatus.RUNNING, + ""); + + when(jobInfoRepository.findByStatusNotIn((Collection) any(Collection.class))) + .thenReturn(Collections.singletonList(job)); + when(jobMonitor.getJobStatus(job)).thenReturn(JobStatus.COMPLETED); scheduledJobMonitor.getJobStatus(); - ArgumentCaptor> argCaptor = ArgumentCaptor.forClass(Iterable.class); - verify(jobInfoRepository).saveAll(argCaptor.capture()); + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(JobInfo.class); + verify(jobInfoRepository).save(argCaptor.capture()); - List jobInfos = Lists.newArrayList(argCaptor.getValue()); - assertThat(jobInfos.size(), equalTo(1)); - assertThat(jobInfos.get(0).getStatus(), equalTo(JobStatus.COMPLETED)); + JobInfo jobInfos = argCaptor.getValue(); + assertThat(jobInfos.getStatus(), equalTo(JobStatus.COMPLETED)); } @Test public void getJobStatus_shouldNotUpdateJobInfoForTerminalJob() { - when(jobInfoRepository.findByStatusNotIn((Collection)any(Collection.class))).thenReturn( - Collections.emptyList()); + when(jobInfoRepository.findByStatusNotIn((Collection) any(Collection.class))) + .thenReturn(Collections.emptyList()); scheduledJobMonitor.getJobStatus(); - ArgumentCaptor> argCaptor = ArgumentCaptor.forClass(Iterable.class); - verify(jobInfoRepository).saveAll(argCaptor.capture()); - - List jobInfos = Lists.newArrayList(argCaptor.getValue()); - assertThat(jobInfos.size(), equalTo(0)); + verify(jobInfoRepository, never()).save(any(JobInfo.class)); } @Test public void getJobMetrics_shouldPushToStatsDMetricPusherAndSaveNewMetricToDb() { - JobInfo job = new JobInfo("jobId", "extId1", "Streaming", "DataflowRunner", "", - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - JobStatus.RUNNING, - ""); + JobInfo job = + new JobInfo( + "jobId", + "extId1", + "Streaming", + "DataflowRunner", + "", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + JobStatus.RUNNING, + ""); Metrics metric1 = new Metrics(job, "metric1", 1); Metrics metric2 = new Metrics(job, "metric2", 2); List metrics = Arrays.asList(metric1, metric2); - when(jobInfoRepository.findByStatusNotIn((Collection)any(Collection.class))).thenReturn( - Arrays.asList(job)); + when(jobInfoRepository.findByStatusNotIn((Collection) any(Collection.class))) + .thenReturn(Arrays.asList(job)); when(jobMonitor.getJobMetrics(job)).thenReturn(metrics); scheduledJobMonitor.getJobMetrics(); verify(stasdMetricPusher).pushMetrics(metrics); - ArgumentCaptor> argCaptor = ArgumentCaptor.forClass(Iterable.class); - verify(jobInfoRepository).saveAll(argCaptor.capture()); + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(JobInfo.class); + verify(jobInfoRepository).save(argCaptor.capture()); assertThat(job.getMetrics(), equalTo(metrics)); } - -} \ No newline at end of file +} diff --git a/core/src/test/java/feast/core/service/JobExecutionServiceTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java similarity index 59% rename from core/src/test/java/feast/core/service/JobExecutionServiceTest.java rename to core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index ad29bb8b9e..678b76c104 100644 --- a/core/src/test/java/feast/core/service/JobExecutionServiceTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -15,42 +15,37 @@ * */ -package feast.core.service; +package feast.core.job.dataflow; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -import static org.mockito.internal.verification.VerificationModeFactory.times; +import com.google.api.services.dataflow.Dataflow; import com.google.common.collect.Lists; import feast.core.config.ImportJobDefaults; -import feast.core.dao.JobInfoRepository; -import feast.core.model.JobInfo; -import feast.core.model.JobStatus; import feast.specs.ImportSpecProto.ImportSpec; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.Optional; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; -public class JobExecutionServiceTest { +public class DataflowJobManagerTest { + + @Rule public final ExpectedException expectedException = ExpectedException.none(); + + @Mock Dataflow dataflow; - @Rule - public final ExpectedException expectedException = ExpectedException.none(); - @Mock - JobInfoRepository jobInfoRepository; private ImportJobDefaults defaults; + private DataflowJobManager dfJobManager; @Before public void setUp() { @@ -58,25 +53,25 @@ public void setUp() { defaults = new ImportJobDefaults( "localhost:8080", - "DirectRunner", + "DataflowRunner", "{\"key\":\"value\"}", "ingestion.jar", "STDOUT", "{}"); + dfJobManager = new DataflowJobManager(dataflow, "project", "location", defaults); } @Test public void shouldBuildProcessBuilderWithCorrectOptions() { - JobExecutionService jobExecutionService = new JobExecutionService(jobInfoRepository, defaults); ImportSpec importSpec = ImportSpec.newBuilder().setType("file").build(); - ProcessBuilder pb = jobExecutionService.getProcessBuilder(importSpec, "test"); + ProcessBuilder pb = dfJobManager.getProcessBuilder(importSpec, "test"); List expected = Lists.newArrayList( "java", "-jar", "ingestion.jar", "--jobName=test", - "--runner=DirectRunner", + "--runner=DataflowRunner", "--importSpecBase64=CgRmaWxl", "--coreApiUri=localhost:8080", "--errorsStoreType=STDOUT", @@ -85,38 +80,6 @@ public void shouldBuildProcessBuilderWithCorrectOptions() { assertThat(pb.command(), equalTo(expected)); } - @Test - public void shouldUpdateJobStatusIfExists() { - JobInfo jobInfo = new JobInfo(); - when(jobInfoRepository.findById("jobid")).thenReturn(Optional.of(jobInfo)); - - ArgumentCaptor jobInfoArgumentCaptor = ArgumentCaptor.forClass(JobInfo.class); - JobExecutionService jobExecutionService = new JobExecutionService(jobInfoRepository, defaults); - jobExecutionService.updateJobStatus("jobid", JobStatus.PENDING); - - verify(jobInfoRepository, times(1)).saveAndFlush(jobInfoArgumentCaptor.capture()); - - JobInfo jobInfoUpdated = new JobInfo(); - jobInfoUpdated.setStatus(JobStatus.PENDING); - assertThat(jobInfoArgumentCaptor.getValue(), equalTo(jobInfoUpdated)); - } - - @Test - public void shouldUpdateJobExtIdIfExists() { - JobInfo jobInfo = new JobInfo(); - when(jobInfoRepository.findById("jobid")).thenReturn(Optional.of(jobInfo)); - - ArgumentCaptor jobInfoArgumentCaptor = ArgumentCaptor.forClass(JobInfo.class); - JobExecutionService jobExecutionService = new JobExecutionService(jobInfoRepository, defaults); - jobExecutionService.updateJobExtId("jobid", "extid"); - - verify(jobInfoRepository, times(1)).saveAndFlush(jobInfoArgumentCaptor.capture()); - - JobInfo jobInfoUpdated = new JobInfo(); - jobInfoUpdated.setExtId("extid"); - assertThat(jobInfoArgumentCaptor.getValue(), equalTo(jobInfoUpdated)); - } - @Test public void shouldRunProcessAndGetJobIdIfNoError() throws IOException { Process process = Mockito.mock(Process.class); @@ -130,8 +93,7 @@ public void shouldRunProcessAndGetJobIdIfNoError() throws IOException { when(process.getErrorStream()).thenReturn(errorStream); when(process.exitValue()).thenReturn(0); when(process.isAlive()).thenReturn(true).thenReturn(false); - JobExecutionService jobExecutionService = new JobExecutionService(jobInfoRepository, defaults); - String jobId = jobExecutionService.runProcess(process); + String jobId = dfJobManager.runProcess(process); assertThat(jobId, equalTo("1231231231")); } @@ -149,7 +111,6 @@ public void shouldThrowRuntimeExceptionIfErrorOccursInProcess() { when(process.exitValue()).thenReturn(1); when(process.isAlive()).thenReturn(true).thenReturn(false); expectedException.expect(RuntimeException.class); - JobExecutionService jobExecutionService = new JobExecutionService(jobInfoRepository, defaults); - jobExecutionService.runProcess(process); + dfJobManager.runProcess(process); } } diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobMonitorTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobMonitorTest.java index 61ad74930d..3e1d3f89bf 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobMonitorTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobMonitorTest.java @@ -28,6 +28,8 @@ import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs; import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get; import com.google.api.services.dataflow.model.Job; +import feast.core.job.Runner; +import feast.core.model.JobInfo; import feast.core.model.JobStatus; import java.io.IOException; import org.junit.Before; @@ -66,7 +68,10 @@ public void getJobStatus_shouldReturnCorrectJobStatusForValidDataflowJobState() when(job.getCurrentState()).thenReturn(DataflowJobState.JOB_STATE_RUNNING.toString()); when(jobService.get(projectId, location, jobId)).thenReturn(getOp); - assertThat(monitor.getJobStatus(jobId), equalTo(JobStatus.RUNNING)); + JobInfo jobInfo = mock(JobInfo.class); + when(jobInfo.getExtId()).thenReturn(jobId); + when(jobInfo.getRunner()).thenReturn(Runner.DATAFLOW.getName()); + assertThat(monitor.getJobStatus(jobInfo), equalTo(JobStatus.RUNNING)); } @Test @@ -79,7 +84,10 @@ public void getJobStatus_shouldReturnUnknownStateForInvalidDataflowJobState() th when(job.getCurrentState()).thenReturn("Random String"); when(jobService.get(projectId, location, jobId)).thenReturn(getOp); - assertThat(monitor.getJobStatus(jobId), equalTo(JobStatus.UNKNOWN)); + JobInfo jobInfo = mock(JobInfo.class); + when(jobInfo.getExtId()).thenReturn(jobId); + when(jobInfo.getRunner()).thenReturn(Runner.DATAFLOW.getName()); + assertThat(monitor.getJobStatus(jobInfo), equalTo(JobStatus.UNKNOWN)); } @Test @@ -88,6 +96,9 @@ public void getJobStatus_shouldReturnUnknownStateWhenExceptionHappen() throws IO when(jobService.get(projectId, location, jobId)).thenThrow(new RuntimeException("some thing wrong")); - assertThat(monitor.getJobStatus(jobId), equalTo(JobStatus.UNKNOWN)); + JobInfo jobInfo = mock(JobInfo.class); + when(jobInfo.getExtId()).thenReturn(jobId); + when(jobInfo.getRunner()).thenReturn(Runner.DATAFLOW.getName()); + assertThat(monitor.getJobStatus(jobInfo), equalTo(JobStatus.UNKNOWN)); } } \ No newline at end of file diff --git a/core/src/test/java/feast/core/job/flink/FlinkJobManagerTest.java b/core/src/test/java/feast/core/job/flink/FlinkJobManagerTest.java new file mode 100644 index 0000000000..cd892696fb --- /dev/null +++ b/core/src/test/java/feast/core/job/flink/FlinkJobManagerTest.java @@ -0,0 +1,107 @@ +package feast.core.job.flink; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import feast.core.config.ImportJobDefaults; +import feast.specs.ImportSpecProto.ImportSpec; +import java.util.Collections; +import org.apache.flink.client.cli.CliFrontend; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class FlinkJobManagerTest { + @Mock private CliFrontend flinkCli; + @Mock private FlinkRestApi flinkRestApi; + + private FlinkJobConfig config; + private ImportJobDefaults defaults; + private FlinkJobManager flinkJobManager; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + config = new FlinkJobConfig("localhost:8081", "/etc/flink/conf"); + defaults = + new ImportJobDefaults( + "localhost:8080", + "FlinkRunner", + "{\"key\":\"value\"}", + "ingestion.jar", + "stderr", + "{}"); + + flinkJobManager = new FlinkJobManager(flinkCli, config, flinkRestApi, defaults); + } + + @Test + public void shouldPassCorrectArgumentForSubmittingJob() { + FlinkJobList response = new FlinkJobList(); + response.setJobs(Collections.singletonList(new FlinkJob("1234", "job1", "RUNNING"))); + when(flinkRestApi.getJobsOverview()).thenReturn(response); + + ImportSpec importSpec = ImportSpec.newBuilder().setType("file").build(); + String jobName = "importjob"; + flinkJobManager.submitJob(importSpec, jobName); + String[] expected = + new String[] { + "run", + "-d", + "-m", + config.getMasterUrl(), + defaults.getExecutable(), + "--jobName=" + jobName, + "--runner=FlinkRunner", + "--importSpecBase64=CgRmaWxl", + "--coreApiUri=" + defaults.getCoreApiUri(), + "--errorsStoreType=" + defaults.getErrorsStoreType(), + "--errorsStoreOptions=" + defaults.getErrorsStoreOptions(), + "--key=value" + }; + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String[].class); + verify(flinkCli).parseParameters(argumentCaptor.capture()); + + String[] actual = argumentCaptor.getValue(); + assertThat(actual, equalTo(expected)); + } + + @Test + public void shouldReturnFlinkJobId() { + FlinkJobList response = new FlinkJobList(); + String flinkJobId = "1234"; + String jobName = "importjob"; + response.setJobs(Collections.singletonList(new FlinkJob(flinkJobId, jobName, "RUNNING"))); + when(flinkRestApi.getJobsOverview()).thenReturn(response); + + ImportSpec importSpec = ImportSpec.newBuilder().setType("file").build(); + String jobId = flinkJobManager.submitJob(importSpec, jobName); + + assertThat(jobId, equalTo(flinkJobId)); + } + + @Test + public void shouldPassCorrectArgumentForStoppingJob() { + String jobId = "1234"; + + flinkJobManager.abortJob(jobId); + + String[] expected = new String[]{ + "cancel", + "-m", + config.getMasterUrl(), + jobId + }; + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String[].class); + verify(flinkCli).parseParameters(argumentCaptor.capture()); + + String[] actual = argumentCaptor.getValue(); + assertThat(actual, equalTo(expected)); + } +} diff --git a/core/src/test/java/feast/core/job/flink/FlinkRestApiTest.java b/core/src/test/java/feast/core/job/flink/FlinkRestApiTest.java new file mode 100644 index 0000000000..023791d2ee --- /dev/null +++ b/core/src/test/java/feast/core/job/flink/FlinkRestApiTest.java @@ -0,0 +1,113 @@ +package feast.core.job.flink; + +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Arrays; +import okhttp3.HttpUrl; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.web.client.RestTemplate; + +public class FlinkRestApiTest { + FlinkRestApi flinkRestApi; + MockWebServer mockWebServer; + + String host; + int port; + + @Before + public void setUp() throws Exception { + mockWebServer = new MockWebServer(); + mockWebServer.start(); + + port = mockWebServer.getPort(); + host = mockWebServer.getHostName(); + + flinkRestApi = new FlinkRestApi(new RestTemplate(), String.format("%s:%d", host, port)); + } + + @Test + public void shouldSendCorrectRequest() throws InterruptedException { + MockResponse response = new MockResponse(); + response.setResponseCode(200); + mockWebServer.enqueue(response); + + flinkRestApi.getJobsOverview(); + + RecordedRequest recordedRequest = mockWebServer.takeRequest(); + HttpUrl requestUrl = recordedRequest.getRequestUrl(); + + assertThat(requestUrl.host(), equalTo(host)); + assertThat(requestUrl.port(), equalTo(port)); + assertThat(requestUrl.encodedPath(), equalTo("/jobs/overview")); + } + + @Test + public void shouldReturnEmptyJobListForEmptyBody() { + MockResponse response = new MockResponse(); + response.setResponseCode(200); + mockWebServer.enqueue(response); + + FlinkJobList jobList = flinkRestApi.getJobsOverview(); + assertThat(jobList.getJobs().size(), equalTo(0)); + } + + @Test + public void shouldReturnEmptyJobListForEmptyJsonResponse() { + mockWebServer.enqueue(createMockResponse(200, "[]")); + + FlinkJobList jobList = flinkRestApi.getJobsOverview(); + assertThat(jobList.getJobs().size(), equalTo(0)); + + mockWebServer.enqueue(createMockResponse(200, "{}")); + + jobList = flinkRestApi.getJobsOverview(); + assertThat(jobList.getJobs().size(), equalTo(0)); + + mockWebServer.enqueue(createMockResponse(200, "{jobs: []}")); + + jobList = flinkRestApi.getJobsOverview(); + assertThat(jobList.getJobs().size(), equalTo(0)); + } + + @Test + public void shouldReturnCorrectResultForValidResponse() throws JsonProcessingException { + FlinkJobList jobList = new FlinkJobList(); + FlinkJob job1 = new FlinkJob("1234", "job1", "RUNNING"); + FlinkJob job2 = new FlinkJob("5678", "job2", "RUNNING"); + FlinkJob job3 = new FlinkJob("1111", "job3", "RUNNING"); + + jobList.setJobs(Arrays.asList(job1, job2, job3)); + + mockWebServer.enqueue( createMockResponse(200, createResponseBody(jobList))); + + FlinkJobList actual = flinkRestApi.getJobsOverview(); + + assertThat(actual.getJobs().size(), equalTo(3)); + } + + @After + public void tearDown() throws Exception { + mockWebServer.shutdown(); + } + + private String createResponseBody(FlinkJobList jobList) throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.writeValueAsString(jobList); + } + + private MockResponse createMockResponse(int statusCode, String body) { + MockResponse response = new MockResponse(); + response.setHeader("Content-Type", "application/json"); + response.setResponseCode(statusCode); + response.setBody(body); + return response; + } +} diff --git a/core/src/test/java/feast/core/service/JobManagementServiceTest.java b/core/src/test/java/feast/core/service/JobManagementServiceTest.java index 06228c28f5..709cf365b7 100644 --- a/core/src/test/java/feast/core/service/JobManagementServiceTest.java +++ b/core/src/test/java/feast/core/service/JobManagementServiceTest.java @@ -17,15 +17,28 @@ package feast.core.service; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + import com.google.common.collect.Lists; import com.google.protobuf.Timestamp; import feast.core.JobServiceProto.JobServiceTypes.JobDetail; +import feast.core.config.ImportJobDefaults; import feast.core.dao.JobInfoRepository; import feast.core.dao.MetricsRepository; import feast.core.exception.RetrievalException; import feast.core.job.JobManager; import feast.core.model.JobInfo; import feast.core.model.JobStatus; +import java.time.Instant; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Optional; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -33,28 +46,19 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import java.time.Instant; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Optional; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - public class JobManagementServiceTest { + @Rule public final ExpectedException exception = ExpectedException.none(); @Mock private JobInfoRepository jobInfoRepository; @Mock private MetricsRepository metricsRepository; @Mock private JobManager jobManager; - - @Rule public final ExpectedException exception = ExpectedException.none(); + private ImportJobDefaults defaults; @Before public void setUp() { initMocks(this); + defaults = + new ImportJobDefaults( + "localhost:8433", "DirectRunner", "", "/feast-import.jar", "stderr", ""); } @Test @@ -89,7 +93,7 @@ public void shouldListAllJobDetails() { jobInfo2.setLastUpdated(Date.from(Instant.ofEpochSecond(1))); when(jobInfoRepository.findAll()).thenReturn(Lists.newArrayList(jobInfo1, jobInfo2)); JobManagementService jobManagementService = - new JobManagementService(jobInfoRepository, metricsRepository, jobManager); + new JobManagementService(jobInfoRepository, metricsRepository, jobManager, defaults); List actual = jobManagementService.listJobs(); List expected = Lists.newArrayList( @@ -126,7 +130,7 @@ public void shouldReturnDetailOfRequestedJobId() { jobInfo1.setLastUpdated(Date.from(Instant.ofEpochSecond(1))); when(jobInfoRepository.findById("job1")).thenReturn(Optional.of(jobInfo1)); JobManagementService jobManagementService = - new JobManagementService(jobInfoRepository, metricsRepository, jobManager); + new JobManagementService(jobInfoRepository, metricsRepository, jobManager, defaults); JobDetail actual = jobManagementService.getJob("job1"); JobDetail expected = JobDetail.newBuilder() @@ -142,7 +146,7 @@ public void shouldReturnDetailOfRequestedJobId() { public void shouldThrowErrorIfJobIdNotFoundWhenGettingJob() { when(jobInfoRepository.findById("job1")).thenReturn(Optional.empty()); JobManagementService jobManagementService = - new JobManagementService(jobInfoRepository, metricsRepository, jobManager); + new JobManagementService(jobInfoRepository, metricsRepository, jobManager, defaults); exception.expect(RetrievalException.class); exception.expectMessage("Unable to retrieve job with id job1"); jobManagementService.getJob("job1"); @@ -152,7 +156,7 @@ public void shouldThrowErrorIfJobIdNotFoundWhenGettingJob() { public void shouldThrowErrorIfJobIdNotFoundWhenAbortingJob() { when(jobInfoRepository.findById("job1")).thenReturn(Optional.empty()); JobManagementService jobManagementService = - new JobManagementService(jobInfoRepository, metricsRepository, jobManager); + new JobManagementService(jobInfoRepository, metricsRepository, jobManager, defaults); exception.expect(RetrievalException.class); exception.expectMessage("Unable to retrieve job with id job1"); jobManagementService.abortJob("job1"); @@ -164,7 +168,7 @@ public void shouldThrowErrorIfJobInTerminalStateWhenAbortingJob() { job.setStatus(JobStatus.COMPLETED); when(jobInfoRepository.findById("job1")).thenReturn(Optional.of(job)); JobManagementService jobManagementService = - new JobManagementService(jobInfoRepository, metricsRepository, jobManager); + new JobManagementService(jobInfoRepository, metricsRepository, jobManager, defaults); exception.expect(IllegalStateException.class); exception.expectMessage("Unable to stop job already in terminal state"); jobManagementService.abortJob("job1"); @@ -177,10 +181,44 @@ public void shouldUpdateJobAfterAborting() { job.setExtId("extId1"); when(jobInfoRepository.findById("job1")).thenReturn(Optional.of(job)); JobManagementService jobManagementService = - new JobManagementService(jobInfoRepository, metricsRepository, jobManager); + new JobManagementService(jobInfoRepository, metricsRepository, jobManager, defaults); jobManagementService.abortJob("job1"); ArgumentCaptor jobCapture = ArgumentCaptor.forClass(JobInfo.class); verify(jobInfoRepository).saveAndFlush(jobCapture.capture()); assertThat(jobCapture.getValue().getStatus(), equalTo(JobStatus.ABORTING)); } + + @Test + public void shouldUpdateJobStatusIfExists() { + JobInfo jobInfo = new JobInfo(); + when(jobInfoRepository.findById("jobid")).thenReturn(Optional.of(jobInfo)); + + ArgumentCaptor jobInfoArgumentCaptor = ArgumentCaptor.forClass(JobInfo.class); + JobManagementService jobExecutionService = + new JobManagementService(jobInfoRepository, metricsRepository, jobManager, defaults); + jobExecutionService.updateJobStatus("jobid", JobStatus.PENDING); + + verify(jobInfoRepository, times(1)).save(jobInfoArgumentCaptor.capture()); + + JobInfo jobInfoUpdated = new JobInfo(); + jobInfoUpdated.setStatus(JobStatus.PENDING); + assertThat(jobInfoArgumentCaptor.getValue(), equalTo(jobInfoUpdated)); + } + + @Test + public void shouldUpdateJobExtIdIfExists() { + JobInfo jobInfo = new JobInfo(); + when(jobInfoRepository.findById("jobid")).thenReturn(Optional.of(jobInfo)); + + ArgumentCaptor jobInfoArgumentCaptor = ArgumentCaptor.forClass(JobInfo.class); + JobManagementService jobExecutionService = + new JobManagementService(jobInfoRepository, metricsRepository, jobManager, defaults); + jobExecutionService.updateJobExtId("jobid", "extid"); + + verify(jobInfoRepository, times(1)).save(jobInfoArgumentCaptor.capture()); + + JobInfo jobInfoUpdated = new JobInfo(); + jobInfoUpdated.setExtId("extid"); + assertThat(jobInfoArgumentCaptor.getValue(), equalTo(jobInfoUpdated)); + } }