Skip to content

Commit

Permalink
fix daos container - speeds up tests twice
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Nov 3, 2023
1 parent 3aacc39 commit 5587fb2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 33 deletions.
53 changes: 35 additions & 18 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
public interface OpenLineageDao extends BaseDao {
String DEFAULT_SOURCE_NAME = "default";
String DEFAULT_NAMESPACE_OWNER = "anonymous";
ModelDaos daos = new ModelDaos();

@SqlUpdate(
"INSERT INTO lineage_events ("
Expand Down Expand Up @@ -150,6 +149,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map
}

default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) {
ModelDaos daos = new ModelDaos();
daos.initBaseDao(this);
Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();

Expand All @@ -165,10 +165,10 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map

Dataset dataset = event.getDataset();
List<DatasetRecord> datasetOutputs = new ArrayList<>();
DatasetRecord record = upsertLineageDataset(dataset, now, null, false);
DatasetRecord record = upsertLineageDataset(daos, dataset, now, null, false);
datasetOutputs.add(record);
insertDatasetFacets(dataset, record, null, null, now);
insertOutputDatasetFacets(dataset, record, null, null, now);
insertDatasetFacets(daos, dataset, record, null, null, now);
insertOutputDatasetFacets(daos, dataset, record, null, null, now);

daos.getDatasetDao()
.updateVersion(
Expand All @@ -181,6 +181,7 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map
}

default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) {
ModelDaos daos = new ModelDaos();
daos.initBaseDao(this);
Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();

Expand Down Expand Up @@ -240,7 +241,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUpsertBuilder.runStateType(getRunState(event.getEventType())).runStateTime(now);
}
run = daos.getRunDao().upsert(runUpsertBuilder.build());
insertRunFacets(event, runUuid, now);
insertRunFacets(daos, event, runUuid, now);
bag.setRun(run);

if (event.getEventType() != null) {
Expand All @@ -256,17 +257,17 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
}
}

insertJobFacets(event, job.getUuid(), runUuid, now);
insertJobFacets(daos, event, job.getUuid(), runUuid, now);

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetInputs = null;
if (event.getInputs() != null) {
datasetInputs = new ArrayList<>();
for (Dataset dataset : event.getInputs()) {
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true);
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, true);
datasetInputs.add(record);
insertDatasetFacets(dataset, record, runUuid, event.getEventType(), now);
insertInputDatasetFacets(dataset, record, runUuid, event.getEventType(), now);
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
}
bag.setInputs(Optional.ofNullable(datasetInputs));
Expand All @@ -276,10 +277,10 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
if (event.getOutputs() != null) {
datasetOutputs = new ArrayList<>();
for (Dataset dataset : event.getOutputs()) {
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false);
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, false);
datasetOutputs.add(record);
insertDatasetFacets(dataset, record, runUuid, event.getEventType(), now);
insertOutputDatasetFacets(dataset, record, runUuid, event.getEventType(), now);
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
}

Expand All @@ -303,7 +304,7 @@ private static Instant getNominalEndTime(LineageEvent event) {
.orElse(null);
}

private void insertRunFacets(LineageEvent event, UUID runUuid, Instant now) {
private void insertRunFacets(ModelDaos daos, LineageEvent event, UUID runUuid, Instant now) {
// Add ...
Optional.ofNullable(event.getRun().getFacets())
.ifPresent(
Expand All @@ -313,7 +314,8 @@ private void insertRunFacets(LineageEvent event, UUID runUuid, Instant now) {
runUuid, now, event.getEventType(), event.getRun().getFacets()));
}

private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) {
private void insertJobFacets(
ModelDaos daos, LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) {
// Add ...
Optional.ofNullable(event.getJob().getFacets())
.ifPresent(
Expand All @@ -324,7 +326,12 @@ private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Ins
}

private void insertDatasetFacets(
Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) {
ModelDaos daos,
Dataset dataset,
DatasetRecord record,
UUID runUuid,
String eventType,
Instant now) {
// Facets ...
Optional.ofNullable(dataset.getFacets())
.ifPresent(
Expand All @@ -340,7 +347,12 @@ private void insertDatasetFacets(
}

private void insertInputDatasetFacets(
Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) {
ModelDaos daos,
Dataset dataset,
DatasetRecord record,
UUID runUuid,
String eventType,
Instant now) {
// InputFacets ...
Optional.ofNullable(dataset.getInputFacets())
.ifPresent(
Expand All @@ -356,7 +368,12 @@ private void insertInputDatasetFacets(
}

private void insertOutputDatasetFacets(
Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) {
ModelDaos daos,
Dataset dataset,
DatasetRecord record,
UUID runUuid,
String eventType,
Instant now) {
// OutputFacets ...
Optional.ofNullable(dataset.getOutputFacets())
.ifPresent(
Expand Down Expand Up @@ -640,7 +657,7 @@ default JobType getJobType(Job job) {
}

default DatasetRecord upsertLineageDataset(
Dataset ds, Instant now, UUID runUuid, boolean isInput) {
ModelDaos daos, Dataset ds, Instant now, UUID runUuid, boolean isInput) {
NamespaceRow dsNamespace =
daos.getNamespaceDao()
.upsertNamespaceRow(UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER);
Expand Down
28 changes: 14 additions & 14 deletions api/src/main/java/marquez/db/models/ModelDaos.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@
* exactly once.
*/
public final class ModelDaos {
private static NamespaceDao namespaceDao = null;
private static DatasetSymlinkDao datasetSymlinkDao = null;
private static DatasetDao datasetDao = null;
private static SourceDao sourceDao = null;
private static DatasetVersionDao datasetVersionDao = null;
private static DatasetFieldDao datasetFieldDao = null;
private static RunDao runDao = null;
private static DatasetFacetsDao datasetFacetsDao = null;
private static ColumnLineageDao columnLineageDao = null;
private static JobDao jobDao = null;
private static JobFacetsDao jobFacetsDao = null;
private static RunArgsDao runArgsDao = null;
private static RunStateDao runStateDao = null;
private static RunFacetsDao runFacetsDao = null;
private NamespaceDao namespaceDao = null;
private DatasetSymlinkDao datasetSymlinkDao = null;
private DatasetDao datasetDao = null;
private SourceDao sourceDao = null;
private DatasetVersionDao datasetVersionDao = null;
private DatasetFieldDao datasetFieldDao = null;
private RunDao runDao = null;
private DatasetFacetsDao datasetFacetsDao = null;
private ColumnLineageDao columnLineageDao = null;
private JobDao jobDao = null;
private JobFacetsDao jobFacetsDao = null;
private RunArgsDao runArgsDao = null;
private RunStateDao runStateDao = null;
private RunFacetsDao runFacetsDao = null;
private BaseDao baseDao;

public void initBaseDao(BaseDao baseDao) {
Expand Down
1 change: 0 additions & 1 deletion api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class LineageServiceTest {
private static LineageDao lineageDao;
private static LineageService lineageService;
private static OpenLineageDao openLineageDao;

private static DatasetDao datasetDao;
private static JobDao jobDao;

Expand Down

0 comments on commit 5587fb2

Please sign in to comment.