Skip to content

Commit

Permalink
deletes: 'undelete' job on subsequent OL event
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Sep 12, 2022
1 parent 00226b2 commit 2c9f898
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.25.0...HEAD)
* Add job/dataset soft delete API [`#2032`](https://github.com/MarquezProject/marquez/pull/2032)

## [0.25.0](https://github.com/MarquezProject/marquez/compare/0.24.0...0.25.0) - 2022-08-08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ BEGIN
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, description,
current_version_uuid, namespace_name, current_job_context_uuid,
current_location, current_inputs, symlink_target_uuid, parent_job_uuid,
parent_job_uuid_string)
parent_job_uuid_string, is_hidden)
SELECT NEW.uuid,
NEW.type,
NEW.created_at,
Expand All @@ -50,7 +50,8 @@ BEGIN
NEW.current_inputs,
NEW.symlink_target_uuid,
NEW.parent_job_uuid,
COALESCE(NEW.parent_job_uuid::char(36), '')
COALESCE(NEW.parent_job_uuid::char(36), ''),
false
ON CONFLICT (name, namespace_uuid, parent_job_uuid_string)
DO UPDATE SET updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
Expand All @@ -60,8 +61,9 @@ BEGIN
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if null. otherwise, keep the old value
symlink_target_uuid = COALESCE(jobs.symlink_target_uuid,
EXCLUDED.symlink_target_uuid)
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
EXCLUDED.symlink_target_uuid),
is_hidden = false
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
-- version in case of insert
RETURNING uuid, symlink_target_uuid, (SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid)
INTO job_uuid, new_symlink_target_uuid, old_symlink_target_uuid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ public class OpenLineageServiceIntegrationTest {
public static final ZoneId TIMEZONE = ZoneId.of("America/Los_Angeles");
public static final String DATASET_NAME = "theDataset";
private RunService runService;

private JobService jobService;
private OpenLineageDao openLineageDao;

private JobDao jobDao;
private DatasetDao datasetDao;
private DatasetVersionDao datasetVersionDao;
private ArgumentCaptor<JobInputUpdate> runInputListener;
Expand Down Expand Up @@ -134,7 +138,9 @@ public ExpectedResults(
public void setup(Jdbi jdbi) throws SQLException {
openLineageDao = jdbi.onDemand(OpenLineageDao.class);
datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class);
jobDao = jdbi.onDemand(JobDao.class);
runService = mock(RunService.class);
jobService = new JobService(jobDao, runService);
runInputListener = ArgumentCaptor.forClass(JobInputUpdate.class);
doNothing().when(runService).notify(runInputListener.capture());
runOutputListener = ArgumentCaptor.forClass(JobOutputUpdate.class);
Expand All @@ -146,19 +152,18 @@ public void setup(Jdbi jdbi) throws SQLException {
jdbi.onDemand(NamespaceDao.class)
.upsertNamespaceRow(UUID.randomUUID(), Instant.now(), NAMESPACE, "me");
JobRow job =
jdbi.onDemand(JobDao.class)
.upsertJob(
UUID.randomUUID(),
JobType.BATCH,
Instant.now(),
namespace.getUuid(),
NAMESPACE,
"parentJob",
"description",
null,
null,
null,
null);
jobDao.upsertJob(
UUID.randomUUID(),
JobType.BATCH,
Instant.now(),
namespace.getUuid(),
NAMESPACE,
"parentJob",
"description",
null,
null,
null,
null);
Map<String, String> runArgsMap = new HashMap<>();
RunArgsRow argsRow =
jdbi.onDemand(RunArgsDao.class)
Expand Down Expand Up @@ -382,6 +387,41 @@ public void testDatasetVersionUpdatedOnRunCompletion()
.contains(dsVersion1Id);
}

@Test
void testJobIsNotHiddenAfterSubsequentOLEvent() throws ExecutionException, InterruptedException {
String name = "aNotHiddenJob";

LineageEvent.LineageEventBuilder builder =
LineageEvent.builder()
.eventType("COMPLETE")
.job(LineageEvent.Job.builder().name(name).namespace(NAMESPACE).build())
.eventTime(Instant.now().atZone(TIMEZONE))
.inputs(Collections.emptyList())
.outputs(Collections.emptyList());

lineageService
.createAsync(
builder
.run(new LineageEvent.Run(UUID.randomUUID().toString(), RunFacet.builder().build()))
.build())
.get();

assertThat(jobService.findJobByName(NAMESPACE, name)).isNotEmpty();

jobService.delete(NAMESPACE, name);

assertThat(jobService.findJobByName(NAMESPACE, name)).isEmpty();

lineageService
.createAsync(
builder
.run(new LineageEvent.Run(UUID.randomUUID().toString(), RunFacet.builder().build()))
.build())
.get();

assertThat(jobService.findJobByName(NAMESPACE, name)).isNotEmpty();
}

private void checkExists(LineageEvent.Dataset ds) {
DatasetService datasetService = new DatasetService(openLineageDao, runService);

Expand Down

0 comments on commit 2c9f898

Please sign in to comment.