Skip to content

Commit

Permalink
fetch job created from dataflow
Browse files Browse the repository at this point in the history
This reverts commit a37d0a6d1e563dddbe4fe2d7f9739ed157842844.
  • Loading branch information
pyalex committed Aug 31, 2020
1 parent b95f1e3 commit ddb9239
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
11 changes: 8 additions & 3 deletions job-controller/src/main/java/feast/jobcontroller/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ public Date getLastUpdated() {
}

public void preSave() {
if (created == null) {
created = new Date();
if (this.created == null) {
this.created = new Date();
}
lastUpdated = new Date();
this.lastUpdated = new Date();
}

public void setExtId(String extId) {
Expand All @@ -102,6 +102,11 @@ public void setStatus(JobStatus status) {
this.status = status;
}

public void setCreated(Date created) {
this.created = created;
this.lastUpdated = created;
}

public boolean hasTerminated() {
return getStatus().isTerminal();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.time.DateTime;

@Slf4j
public class DataflowJobManager implements JobManager {
Expand Down Expand Up @@ -307,6 +308,9 @@ public List<Job> listRunningJobs() {

job.setExtId(dfJob.getId());
job.setStatus(JobStatus.RUNNING);
if (dfJob.getCreateTime() != null) {
job.setCreated(DateTime.parse(dfJob.getCreateTime()).toDate());
}

return job;
})
Expand Down
4 changes: 2 additions & 2 deletions job-controller/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ feast:
workerMachineType: n1-standard-1
deadLetterTableSpec: project_id:dataset_id.table_id
kafkaConsumerProperties:
"max.poll.records": "50000"
"receive.buffer.bytes": "33554432"
"[max.poll.records]": "50000"
"[receive.buffer.bytes]": "33554432"

# Configuration options for metric collection for all ingestion jobs
metrics:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.time.DateTime;
import org.joda.time.LocalDateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -237,6 +239,8 @@ public void shouldRetrieveRunningJobsFromDataflow() {

Printer jsonPrinter = JsonFormat.printer();

LocalDateTime created = DateTime.now().toLocalDateTime();

when(dataflow
.projects()
.locations()
Expand All @@ -248,6 +252,7 @@ public void shouldRetrieveRunningJobsFromDataflow() {
new com.google.api.services.dataflow.model.Job()
.setLabels(ImmutableMap.of("application", "feast"))
.setId("job-2")
.setCreateTime(created.toString())
.setEnvironment(
new Environment()
.setSdkPipelineOptions(
Expand All @@ -268,7 +273,9 @@ public void shouldRetrieveRunningJobsFromDataflow() {
hasProperty("id", equalTo("kafka-to-redis")),
hasProperty("source", equalTo(source)),
hasProperty("stores", hasValue(store)),
hasProperty("extId", equalTo("job-2")))));
hasProperty("extId", equalTo("job-2")),
hasProperty("created", equalTo(created.toDate())),
hasProperty("lastUpdated", equalTo(created.toDate())))));
}

@Test
Expand Down

0 comments on commit ddb9239

Please sign in to comment.