Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ODD Profiler support #1060

Merged
merged 16 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ venv/
Thumbs.db
.DS_Store
.gradle
.fleet
*.iml
*.ipr
*.iws
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
spring-webflux = '5.3.19'
reactor-extra = '3.4.8'
micrometer-registry-prometheus = '1.9.0'
ingestion-contract-server = '0.1.16'
ingestion-contract-server = '0.1.18'
oddrn-generator-java = '0.1.10'
apache-collections = '4.4'
apache-lang = '3.12.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import org.opendatadiscovery.oddplatform.ingestion.contract.model.CompactDataEntityList;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataEntityList;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSourceList;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DatasetStatisticsList;
import org.opendatadiscovery.oddplatform.service.DataEntityService;
import org.opendatadiscovery.oddplatform.service.DataSourceIngestionService;
import org.opendatadiscovery.oddplatform.service.ingestion.IngestionService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -62,4 +64,14 @@ public Mono<ResponseEntity<CompactDataEntityList>> getDataEntitiesByDEGOddrn(@No
final ServerWebExchange exchange) {
return dataEntityService.listEntitiesWithinDEG(degOddrn).map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<Void>> postDataSetStatsList(
@Valid final Mono<DatasetStatisticsList> datasetStatisticsList,
final ServerWebExchange exchange
) {
return datasetStatisticsList
.flatMap(ingestionService::ingestStats)
.thenReturn(ResponseEntity.status(HttpStatus.CREATED).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelPojo;

public record LabelDto(LabelPojo pojo, Boolean external) {
public record LabelDto(LabelPojo pojo, boolean hasExternalRelations) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.opendatadiscovery.oddplatform.dto;

public enum LabelOrigin {
ALL,
INTERNAL,
EXTERNAL,
EXTERNAL_STATISTICS
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
@Mapper(config = MapperConfig.class)
public interface LabelMapper {
@Mapping(source = "dto.pojo", target = ".")
@Mapping(source = "dto.hasExternalRelations", target = "external")
Label mapToLabel(final LabelDto dto);

Label mapToLabel(final LabelPojo pojo);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.opendatadiscovery.oddplatform.dto.DatasetFieldDto;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface ReactiveDatasetFieldRepository extends ReactiveCRUDRepository<DatasetFieldPojo> {
Expand All @@ -13,5 +15,7 @@ public interface ReactiveDatasetFieldRepository extends ReactiveCRUDRepository<D

Mono<Map<String, DatasetFieldPojo>> getExistingFieldsByOddrnAndType(final List<DatasetFieldPojo> fields);

Flux<DatasetFieldPojo> getExistingFieldsByOddrn(final Collection<String> oddrns);

Mono<Long> getDataEntityIdByDatasetFieldId(final long datasetFieldId);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -28,6 +29,7 @@
import org.opendatadiscovery.oddplatform.service.activity.ActivityLog;
import org.opendatadiscovery.oddplatform.service.activity.ActivityParameter;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static java.util.function.Function.identity;
Expand Down Expand Up @@ -90,6 +92,15 @@ public Mono<Map<String, DatasetFieldPojo>> getExistingFieldsByOddrnAndType(final
.collect(Collectors.toMap(DatasetFieldPojo::getOddrn, identity()));
}

@Override
public Flux<DatasetFieldPojo> getExistingFieldsByOddrn(final Collection<String> oddrns) {
final SelectConditionStep<DatasetFieldRecord> selectConditionStep = DSL
.selectFrom(DATASET_FIELD)
.where(DATASET_FIELD.ODDRN.in(oddrns));

return jooqReactiveOperations.flux(selectConditionStep).map(r -> r.into(DatasetFieldPojo.class));
}

@Override
public Mono<Long> getDataEntityIdByDatasetFieldId(final long datasetFieldId) {
final var query = DSL.select(DATA_ENTITY.ID)
Expand Down Expand Up @@ -133,7 +144,7 @@ private DatasetFieldDto mapRecordToDatasetFieldDto(final Record datasetFieldReco
.extractAggRelation(datasetFieldRecord, "labels", LabelPojo.class);
return DatasetFieldDto.builder()
.datasetFieldPojo(pojo)
.labels(labels.stream().map(l -> new LabelDto(l, null)).toList())
.labels(labels.stream().map(l -> new LabelDto(l, false)).toList())
.parentFieldId(parentFieldId)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opendatadiscovery.oddplatform.dto.DatasetFieldDto;
import org.opendatadiscovery.oddplatform.dto.DatasetStructureDto;
import org.opendatadiscovery.oddplatform.dto.LabelDto;
import org.opendatadiscovery.oddplatform.dto.LabelOrigin;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetVersionPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelPojo;
Expand All @@ -32,7 +33,9 @@
import reactor.core.publisher.Mono;

import static java.util.function.Function.identity;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.jooq.impl.DSL.countDistinct;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.jsonArrayAgg;
Expand Down Expand Up @@ -67,7 +70,7 @@ public ReactiveDatasetVersionRepositoryImpl(final JooqReactiveOperations jooqRea
public Mono<DatasetStructureDto> getDatasetVersion(final long datasetVersionId) {
final List<Field<?>> selectFields = Stream.of(DATASET_VERSION.fields(), DATASET_FIELD.fields())
.flatMap(Arrays::stream)
.collect(Collectors.toList());
.collect(toList());

final SelectHavingStep<Record> selectHavingStep = DSL
.select(selectFields)
Expand All @@ -86,9 +89,7 @@ public Mono<DatasetStructureDto> getDatasetVersion(final long datasetVersionId)

return jooqReactiveOperations
.flux(selectHavingStep)
.collect(Collectors
.groupingBy(this::extractDatasetVersion,
Collectors.mapping(this::extractDatasetFieldDto, Collectors.toList())))
.collect(groupingBy(this::extractDatasetVersion, mapping(this::extractDatasetFieldDto, toList())))
.flatMap(m -> m.entrySet().stream()
.findFirst()
.map(e -> Mono.just(DatasetStructureDto.builder()
Expand All @@ -112,7 +113,7 @@ public Mono<DatasetStructureDto> getLatestDatasetVersion(final long datasetId) {

final List<Field<?>> selectFields = Stream.of(DATASET_VERSION.fields(), DATASET_FIELD.fields())
.flatMap(Arrays::stream)
.collect(Collectors.toList());
.collect(toList());

final SelectHavingStep<Record> selectHavingStep = DSL
.select(selectFields)
Expand All @@ -133,9 +134,8 @@ public Mono<DatasetStructureDto> getLatestDatasetVersion(final long datasetId) {

return jooqReactiveOperations
.flux(selectHavingStep)
.collect(Collectors
.groupingBy(this::extractDatasetVersion,
Collectors.mapping(this::extractDatasetFieldDto, Collectors.toList())))
.collect(groupingBy(this::extractDatasetVersion,
mapping(this::extractDatasetFieldDto, toList())))
.flatMap(m -> m.entrySet().stream()
.findFirst()
.map(e -> Mono.just(DatasetStructureDto.builder()
Expand Down Expand Up @@ -211,9 +211,9 @@ public Mono<Map<Long, List<DatasetFieldPojo>>> getDatasetVersionFields(final Set
.where(DATASET_STRUCTURE.DATASET_VERSION_ID.in(dataVersionPojoIds));

return jooqReactiveOperations.flux(vidToFieldsSelect)
.collect(Collectors.groupingBy(
.collect(groupingBy(
r -> r.get(DATASET_STRUCTURE.DATASET_VERSION_ID),
mapping(this::extractDatasetField, Collectors.toList())));
mapping(this::extractDatasetField, toList())));
}

private DatasetVersionPojo extractDatasetVersion(final Record datasetVersionRecord) {
Expand All @@ -234,11 +234,17 @@ private DatasetFieldDto extractDatasetFieldDto(final Record datasetVersionRecord

private List<LabelDto> extractLabels(final Record record) {
final Set<LabelPojo> labels = jooqRecordHelper.extractAggRelation(record, LABELS, LabelPojo.class);
final Map<Long, LabelToDatasetFieldPojo> relations =
jooqRecordHelper.extractAggRelation(record, LABEL_RELATIONS, LabelToDatasetFieldPojo.class).stream()
.collect(Collectors.toMap(LabelToDatasetFieldPojo::getLabelId, identity()));

final Map<Long, LabelToDatasetFieldPojo> relations = jooqRecordHelper
.extractAggRelation(record, LABEL_RELATIONS, LabelToDatasetFieldPojo.class)
.stream()
.collect(Collectors.toMap(LabelToDatasetFieldPojo::getLabelId, identity()));

return labels.stream()
.map(labelPojo -> new LabelDto(labelPojo, relations.get(labelPojo.getId()).getExternal()))
.map(labelPojo -> new LabelDto(
labelPojo,
!LabelOrigin.INTERNAL.equals(LabelOrigin.valueOf(relations.get(labelPojo.getId()).getOrigin()))
))
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collection;
import java.util.List;
import org.opendatadiscovery.oddplatform.dto.LabelDto;
import org.opendatadiscovery.oddplatform.dto.LabelOrigin;
import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelToDatasetFieldPojo;
import org.opendatadiscovery.oddplatform.utils.Page;
Expand All @@ -18,7 +19,7 @@ public interface ReactiveLabelRepository extends ReactiveCRUDRepository<LabelPoj

Flux<LabelPojo> listByNames(final Collection<String> names);

Flux<LabelToDatasetFieldPojo> listLabelRelations(final Collection<Long> datasetFieldIds);
Flux<LabelToDatasetFieldPojo> listLabelRelations(final Collection<Long> datasetFieldIds, final LabelOrigin origin);

Flux<LabelToDatasetFieldPojo> createRelations(final Collection<LabelToDatasetFieldPojo> pojos);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.jooq.Condition;
import org.jooq.DeleteResultStep;
import org.jooq.Field;
import org.jooq.InsertResultStep;
import org.jooq.InsertSetStep;
import org.jooq.Record;
Expand All @@ -14,6 +16,7 @@
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.opendatadiscovery.oddplatform.dto.LabelDto;
import org.opendatadiscovery.oddplatform.dto.LabelOrigin;
import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelToDatasetFieldPojo;
import org.opendatadiscovery.oddplatform.model.tables.records.LabelRecord;
Expand All @@ -33,7 +36,7 @@
public class ReactiveLabelRepositoryImpl
extends ReactiveAbstractSoftDeleteCRUDRepository<LabelRecord, LabelPojo>
implements ReactiveLabelRepository {
private static final String EXTERNAL_FIELD = "external";
private static final String HAS_EXTERNAL_RELATIONS_FIELD = "has_external_relations";

public ReactiveLabelRepositoryImpl(final JooqReactiveOperations jooqReactiveOperations,
final JooqQueryHelper jooqQueryHelper) {
Expand All @@ -43,7 +46,9 @@ public ReactiveLabelRepositoryImpl(final JooqReactiveOperations jooqReactiveOper
@Override
public Mono<LabelDto> getDto(final long id) {
final var query = DSL.select(LABEL.fields())
.select(DSL.coalesce(DSL.boolOr(LABEL_TO_DATASET_FIELD.EXTERNAL), false).as(EXTERNAL_FIELD))
.select(DSL
.coalesce(DSL.boolOr(LABEL_TO_DATASET_FIELD.ORIGIN.ne(LabelOrigin.INTERNAL.toString())), false)
.as(HAS_EXTERNAL_RELATIONS_FIELD))
.from(LABEL)
.leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.LABEL_ID.eq(LABEL.ID))
.where(idCondition(id))
Expand All @@ -56,7 +61,9 @@ public Mono<LabelDto> getDto(final long id) {
@Override
public Mono<List<LabelDto>> listDatasetFieldDtos(final Long datasetFieldId) {
final var query = DSL.select(LABEL.fields())
.select(DSL.coalesce(DSL.boolOr(LABEL_TO_DATASET_FIELD.EXTERNAL), false).as(EXTERNAL_FIELD))
.select(DSL
.coalesce(DSL.boolOr(LABEL_TO_DATASET_FIELD.ORIGIN.ne(LabelOrigin.INTERNAL.toString())), false)
.as(HAS_EXTERNAL_RELATIONS_FIELD))
.from(LABEL)
.leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.LABEL_ID.eq(LABEL.ID))
.where(addSoftDeleteFilter(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.eq(datasetFieldId)))
Expand All @@ -80,7 +87,9 @@ public Mono<Page<LabelDto>> pageDto(final int page, final int size, final String
final var cteSelect = DSL.with(labelCte.getName())
.as(select)
.select(labelCte.fields())
.select(DSL.coalesce(DSL.boolOr(LABEL_TO_DATASET_FIELD.EXTERNAL), false).as(EXTERNAL_FIELD))
.select(DSL
.coalesce(DSL.boolOr(LABEL_TO_DATASET_FIELD.ORIGIN.ne(LabelOrigin.INTERNAL.toString())), false)
.as(HAS_EXTERNAL_RELATIONS_FIELD))
.from(labelCte.getName())
.leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.LABEL_ID.eq(labelCte.field(LABEL.ID)))
.groupBy(labelCte.fields());
Expand All @@ -105,16 +114,22 @@ public Flux<LabelPojo> listByNames(final Collection<String> names) {
}

@Override
public Flux<LabelToDatasetFieldPojo> listLabelRelations(final Collection<Long> datasetFieldIds) {
public Flux<LabelToDatasetFieldPojo> listLabelRelations(final Collection<Long> datasetFieldIds,
final LabelOrigin origin) {
if (CollectionUtils.isEmpty(datasetFieldIds)) {
return Flux.just();
}
final var query = DSL.select(LABEL_TO_DATASET_FIELD.fields())

var query = DSL.select(LABEL_TO_DATASET_FIELD.fields())
.from(LABEL_TO_DATASET_FIELD)
.join(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID))
.where(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.in(datasetFieldIds).and(LABEL.IS_DELETED.isFalse()));
return jooqReactiveOperations.flux(query)
.map(r -> r.into(LabelToDatasetFieldPojo.class));

if (origin != null && !origin.equals(LabelOrigin.ALL)) {
query = query.and(LABEL_TO_DATASET_FIELD.ORIGIN.eq(origin.toString()));
}

return jooqReactiveOperations.flux(query).map(r -> r.into(LabelToDatasetFieldPojo.class));
}

@Override
Expand Down Expand Up @@ -169,9 +184,12 @@ public Flux<LabelToDatasetFieldPojo> createRelations(final Collection<LabelToDat
}

private LabelDto mapLabel(final Record jooqRecord) {
return new LabelDto(
jooqRecord.into(LabelPojo.class),
jooqRecord.get(EXTERNAL_FIELD, Boolean.class)
);
final Boolean hasExternalRelations = jooqRecord.get(HAS_EXTERNAL_RELATIONS_FIELD, Boolean.class);

if (hasExternalRelations == null) {
throw new IllegalStateException("hasExternalRelations field cannot be null");
}

return new LabelDto(jooqRecord.into(LabelPojo.class), hasExternalRelations);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.opendatadiscovery.oddplatform.service;

import java.util.List;
import java.util.Map;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSetField;
import org.opendatadiscovery.oddplatform.api.contract.model.DatasetFieldUpdateFormData;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetFieldStat;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo;
import reactor.core.publisher.Mono;

Expand All @@ -11,4 +13,6 @@ Mono<DataSetField> updateDatasetField(final long datasetFieldId,
final DatasetFieldUpdateFormData datasetFieldUpdateFormData);

Mono<List<DatasetFieldPojo>> createOrUpdateDatasetFields(final List<DatasetFieldPojo> fields);

Mono<Void> updateStatistics(final Map<String, DataSetFieldStat> stats);
}
Loading