From b8f74d2aa6bb047836c6f57cd8da20ce1f38aabd Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Mon, 17 Oct 2022 08:30:56 +0300 Subject: [PATCH 01/14] Mock API endpoint --- .gitignore | 1 + gradle/libs.versions.toml | 2 +- .../oddplatform/controller/IngestionController.java | 10 ++++++++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index c367c6238..b86d7668e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ venv/ Thumbs.db .DS_Store .gradle +.fleet *.iml *.ipr *.iws diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5b1038f84..266d64f83 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -2,7 +2,7 @@ spring-webflux = '5.3.19' reactor-extra = '3.4.8' micrometer-registry-prometheus = '1.9.0' -ingestion-contract-server = '0.1.16' +ingestion-contract-server = '0.1.18' oddrn-generator-java = '0.1.10' apache-collections = '4.4' apache-lang = '3.12.0' diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/controller/IngestionController.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/controller/IngestionController.java index 1c1dd1619..8bee93d96 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/controller/IngestionController.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/controller/IngestionController.java @@ -9,9 +9,11 @@ import org.opendatadiscovery.oddplatform.ingestion.contract.model.CompactDataEntityList; import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataEntityList; import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSourceList; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DatasetStatisticsList; import org.opendatadiscovery.oddplatform.service.DataEntityService; import org.opendatadiscovery.oddplatform.service.DataSourceIngestionService; import org.opendatadiscovery.oddplatform.service.ingestion.IngestionService; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; @@ -62,4 +64,12 @@ public Mono> getDataEntitiesByDEGOddrn(@No final ServerWebExchange exchange) { return dataEntityService.listEntitiesWithinDEG(degOddrn).map(ResponseEntity::ok); } + + @Override + public Mono> postDataSetStatsList( + @Valid final Mono datasetStatisticsList, + final ServerWebExchange exchange + ) { + return Mono.just(ResponseEntity.status(HttpStatus.CREATED).build()); + } } From e4ce608c2ae6109b80b2e5787cf4d13cef9da2cf Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Mon, 17 Oct 2022 09:40:16 +0300 Subject: [PATCH 02/14] Implementation --- .../controller/IngestionController.java | 4 ++- .../ReactiveAbstractCRUDRepository.java | 1 + .../ReactiveDatasetFieldRepository.java | 4 +++ .../ReactiveDatasetFieldRepositoryImpl.java | 11 ++++++ .../service/DatasetFieldService.java | 4 +++ .../service/DatasetFieldServiceImpl.java | 34 ++++++++++++++++++- .../service/ingestion/IngestionService.java | 3 ++ .../ingestion/IngestionServiceImpl.java | 18 ++++++++++ 8 files changed, 77 insertions(+), 2 deletions(-) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/controller/IngestionController.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/controller/IngestionController.java index 8bee93d96..64a90b8e8 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/controller/IngestionController.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/controller/IngestionController.java @@ -70,6 +70,8 @@ public Mono> postDataSetStatsList( @Valid final Mono datasetStatisticsList, final ServerWebExchange exchange ) { - return Mono.just(ResponseEntity.status(HttpStatus.CREATED).build()); + return datasetStatisticsList + .flatMap(ingestionService::ingestStats) + .thenReturn(ResponseEntity.status(HttpStatus.CREATED).build()); } } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveAbstractCRUDRepository.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveAbstractCRUDRepository.java index d125a391b..12da9830d 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveAbstractCRUDRepository.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveAbstractCRUDRepository.java @@ -214,6 +214,7 @@ protected Flux updateMany(final List records) { final Map, Field> fields = Arrays .stream(recordTable.fields()) .filter(f -> !nonUpdatableFields.contains(f)) + // TODO: extract logic of map into collect, no Pair model is needed here .map(r -> Pair.of(r, table.field(r.getName()))) .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetFieldRepository.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetFieldRepository.java index a0933c88d..5030b0720 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetFieldRepository.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetFieldRepository.java @@ -1,9 +1,11 @@ package org.opendatadiscovery.oddplatform.repository.reactive; +import java.util.Collection; import java.util.List; import java.util.Map; import org.opendatadiscovery.oddplatform.dto.DatasetFieldDto; import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface ReactiveDatasetFieldRepository extends ReactiveCRUDRepository { @@ -13,5 +15,7 @@ public interface ReactiveDatasetFieldRepository extends ReactiveCRUDRepository> getExistingFieldsByOddrnAndType(final List fields); + Flux getExistingFieldsByOddrn(final Collection oddrns); + Mono getDataEntityIdByDatasetFieldId(final long datasetFieldId); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetFieldRepositoryImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetFieldRepositoryImpl.java index 15df349cd..a6bfa21d4 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetFieldRepositoryImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetFieldRepositoryImpl.java @@ -1,6 +1,7 @@ package org.opendatadiscovery.oddplatform.repository.reactive; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +29,7 @@ import org.opendatadiscovery.oddplatform.service.activity.ActivityLog; import org.opendatadiscovery.oddplatform.service.activity.ActivityParameter; import org.springframework.stereotype.Repository; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static java.util.function.Function.identity; @@ -90,6 +92,15 @@ public Mono> getExistingFieldsByOddrnAndType(final .collect(Collectors.toMap(DatasetFieldPojo::getOddrn, identity())); } + @Override + public Flux getExistingFieldsByOddrn(final Collection oddrns) { + final SelectConditionStep selectConditionStep = DSL + .selectFrom(DATASET_FIELD) + .where(DATASET_FIELD.ODDRN.in(oddrns)); + + return jooqReactiveOperations.flux(selectConditionStep).map(r -> r.into(DatasetFieldPojo.class)); + } + @Override public Mono getDataEntityIdByDatasetFieldId(final long datasetFieldId) { final var query = DSL.select(DATA_ENTITY.ID) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldService.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldService.java index 7b53b3c48..a39a8de17 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldService.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldService.java @@ -1,8 +1,10 @@ package org.opendatadiscovery.oddplatform.service; import java.util.List; +import java.util.Map; import org.opendatadiscovery.oddplatform.api.contract.model.DataSetField; import org.opendatadiscovery.oddplatform.api.contract.model.DatasetFieldUpdateFormData; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetFieldStat; import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo; import reactor.core.publisher.Mono; @@ -11,4 +13,6 @@ Mono updateDatasetField(final long datasetFieldId, final DatasetFieldUpdateFormData datasetFieldUpdateFormData); Mono> createOrUpdateDatasetFields(final List fields); + + Mono updateStatistics(final Map stats); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java index 77f9bf80a..79da973e1 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java @@ -1,28 +1,34 @@ package org.opendatadiscovery.oddplatform.service; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; +import org.jooq.JSONB; import org.opendatadiscovery.oddplatform.annotation.ReactiveTransactional; import org.opendatadiscovery.oddplatform.api.contract.model.DataSetField; import org.opendatadiscovery.oddplatform.api.contract.model.DatasetFieldUpdateFormData; import org.opendatadiscovery.oddplatform.dto.DataEntityFilledField; import org.opendatadiscovery.oddplatform.dto.DatasetFieldDto; import org.opendatadiscovery.oddplatform.dto.LabelDto; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetFieldStat; import org.opendatadiscovery.oddplatform.mapper.DatasetFieldApiMapper; import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo; import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelToDatasetFieldPojo; import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveDatasetFieldRepository; import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveLabelRepository; import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveSearchEntrypointRepository; +import org.opendatadiscovery.oddplatform.utils.JSONSerDeUtils; import org.springframework.stereotype.Service; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static org.opendatadiscovery.oddplatform.dto.DataEntityFilledField.DATASET_FIELD_LABELS; @@ -102,6 +108,32 @@ public Mono> createOrUpdateDatasetFields(final List updateStatistics(final Map stats) { + return reactiveDatasetFieldRepository.getExistingFieldsByOddrn(stats.keySet()) + .collectList() + .map(fields -> { + final List fieldsToUpdate = new ArrayList<>(); + + for (final DatasetFieldPojo field : fields) { + final DataSetFieldStat stat = stats.get(field.getOddrn()); + if (stat == null) { + log.error("Unexpected behaviour while building an update object for datasetField {}", + field.getOddrn()); + + continue; + } + + field.setStats(JSONB.jsonb(JSONSerDeUtils.serializeJson(stat))); + fieldsToUpdate.add(field); + } + + return fieldsToUpdate; + }) + .flatMapMany(reactiveDatasetFieldRepository::bulkUpdate) + .then(); + } + private Mono> updateDatasetFieldLabels(final long datasetFieldId, final DatasetFieldUpdateFormData datasetFieldUpdateFormData) { final Set names = new HashSet<>(datasetFieldUpdateFormData.getLabelNames()); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionService.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionService.java index a05588ee1..7acfcbb60 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionService.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionService.java @@ -1,8 +1,11 @@ package org.opendatadiscovery.oddplatform.service.ingestion; import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataEntityList; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DatasetStatisticsList; import reactor.core.publisher.Mono; public interface IngestionService { Mono ingest(final DataEntityList dataEntityList); + + Mono ingestStats(final DatasetStatisticsList datasetStatisticsList); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionServiceImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionServiceImpl.java index 57305ca54..6defca116 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionServiceImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionServiceImpl.java @@ -26,6 +26,9 @@ import org.opendatadiscovery.oddplatform.exception.NotFoundException; import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataEntity; import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataEntityList; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetFieldStat; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetStatistics; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DatasetStatisticsList; import org.opendatadiscovery.oddplatform.mapper.ingestion.IngestionMapper; import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityPojo; import org.opendatadiscovery.oddplatform.model.tables.pojos.DataQualityTestRelationsPojo; @@ -34,6 +37,7 @@ import org.opendatadiscovery.oddplatform.model.tables.pojos.LineagePojo; import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveDataEntityRepository; import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveDataSourceRepository; +import org.opendatadiscovery.oddplatform.service.DatasetFieldService; import org.opendatadiscovery.oddplatform.service.ingestion.processor.IngestionProcessorChain; import org.opendatadiscovery.oddplatform.service.metric.MetricService; import org.springframework.stereotype.Service; @@ -50,6 +54,7 @@ public class IngestionServiceImpl implements IngestionService { private final IngestionProcessorChain ingestionProcessorChain; private final MetricService metricService; + private final DatasetFieldService datasetFieldService; private final ReactiveDataEntityRepository dataEntityRepository; private final ReactiveDataSourceRepository dataSourceRepository; @@ -69,6 +74,19 @@ public Mono ingest(final DataEntityList dataEntityList) { .then(); } + @Override + public Mono ingestStats(final DatasetStatisticsList datasetStatisticsList) { + final Map statistics = datasetStatisticsList.getItems() + .stream() + .map(DataSetStatistics::getFields) + .reduce(new HashMap<>(), (acc, fields) -> { + acc.putAll(fields); + return acc; + }); + + return datasetFieldService.updateStatistics(statistics); + } + private Mono persistDataEntities(final long dataSourceId, final List dataEntities) { final Map ingestionDtoMap = dataEntities.stream() From 808e2de716cbd1daef4f9cfa27e6a184d384b64f Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Mon, 17 Oct 2022 12:49:15 +0300 Subject: [PATCH 03/14] Add int stats --- odd-platform-specification/components.yaml | 29 ++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/odd-platform-specification/components.yaml b/odd-platform-specification/components.yaml index e9ff079f8..ec1f0a821 100644 --- a/odd-platform-specification/components.yaml +++ b/odd-platform-specification/components.yaml @@ -1074,6 +1074,8 @@ components: $ref: '#/components/schemas/BooleanFieldStat' number_stats: $ref: '#/components/schemas/NumberFieldStat' + integer_stats: + $ref: '#/components/schemas/IntegerFieldStat' string_stats: $ref: '#/components/schemas/StringFieldStat' binary_stats: @@ -1136,6 +1138,33 @@ components: - nulls_count - unique_count + IntegerFieldStat: + type: object + properties: + low_value: + type: integer + format: int64 + high_value: + type: integer + format: int64 + mean_value: + type: integer + format: int64 + median_value: + type: integer + format: int64 + nulls_count: + type: integer + format: int64 + unique_count: + type: integer + format: int64 + required: + - low_value + - high_value + - nulls_count + - unique_count + StringFieldStat: type: object properties: From 4d5490a170c572b57c21d87911966058ec9ae75e Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Mon, 17 Oct 2022 15:54:40 +0300 Subject: [PATCH 04/14] Fix tests --- .../api/ingestion/utils/IngestionModelMapper.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/api/ingestion/utils/IngestionModelMapper.java b/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/api/ingestion/utils/IngestionModelMapper.java index 8ca4cda93..0c6108552 100644 --- a/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/api/ingestion/utils/IngestionModelMapper.java +++ b/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/api/ingestion/utils/IngestionModelMapper.java @@ -12,6 +12,7 @@ import org.opendatadiscovery.oddplatform.api.contract.model.DataSetFieldType; import org.opendatadiscovery.oddplatform.api.contract.model.DataSource; import org.opendatadiscovery.oddplatform.api.contract.model.DateTimeFieldStat; +import org.opendatadiscovery.oddplatform.api.contract.model.IntegerFieldStat; import org.opendatadiscovery.oddplatform.api.contract.model.Label; import org.opendatadiscovery.oddplatform.api.contract.model.MetadataField; import org.opendatadiscovery.oddplatform.api.contract.model.MetadataFieldOrigin; @@ -49,6 +50,15 @@ public static DataSetField buildExpectedDataSetField( .isNullable(field.getType().getIsNullable()); final DataSetFieldStat stats = new DataSetFieldStat() + .integerStats( + new IntegerFieldStat() + .highValue(field.getStats().getIntegerStats().getHighValue()) + .lowValue(field.getStats().getIntegerStats().getLowValue()) + .meanValue(field.getStats().getIntegerStats().getMeanValue()) + .medianValue(field.getStats().getIntegerStats().getMedianValue()) + .nullsCount(field.getStats().getIntegerStats().getNullsCount()) + .uniqueCount(field.getStats().getIntegerStats().getUniqueCount()) + ) .stringStats( new StringFieldStat() .avgLength(field.getStats().getStringStats().getAvgLength()) From 8e84b8afa11b7627bd9c06f409491b77659af08d Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Thu, 20 Oct 2022 15:14:28 +0300 Subject: [PATCH 05/14] Do not update stats in the Ingestion API if null was given --- .../oddplatform/service/DatasetFieldServiceImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java index 79da973e1..9961ba10e 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java @@ -98,6 +98,12 @@ public Mono> createOrUpdateDatasetFields(final List Date: Wed, 26 Oct 2022 00:39:39 +0300 Subject: [PATCH 06/14] Refactoring label external -> origin --- .../oddplatform/dto/LabelDto.java | 2 +- .../oddplatform/dto/LabelOrigin.java | 8 ++++ .../oddplatform/mapper/LabelMapper.java | 6 +++ .../ReactiveDatasetVersionRepositoryImpl.java | 15 ++++-- .../reactive/ReactiveLabelRepository.java | 3 +- .../reactive/ReactiveLabelRepositoryImpl.java | 25 ++++++---- .../service/DatasetFieldServiceImpl.java | 48 +++++++++---------- .../service/ReactiveLabelServiceImpl.java | 5 +- .../ingestion/LabelIngestionServiceImpl.java | 25 ++++------ .../V0_0_54__external_statistics_labels.sql | 13 +++++ .../mapper/DatasetFieldApiMapperTest.java | 3 +- .../mapper/DatasetVersionMapperTest.java | 3 +- ...eactiveDatasetFieldRepositoryImplTest.java | 3 +- ...ctiveDatasetVersionRepositoryImplTest.java | 3 +- .../service/DatasetFieldServiceImplTest.java | 12 +++-- 15 files changed, 109 insertions(+), 65 deletions(-) create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/LabelOrigin.java create mode 100644 odd-platform-api/src/main/resources/db/migration/V0_0_54__external_statistics_labels.sql diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/LabelDto.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/LabelDto.java index 307cdc1dd..4aaef804f 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/LabelDto.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/LabelDto.java @@ -2,5 +2,5 @@ import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelPojo; -public record LabelDto(LabelPojo pojo, Boolean external) { +public record LabelDto(LabelPojo pojo, LabelOrigin origin) { } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/LabelOrigin.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/LabelOrigin.java new file mode 100644 index 000000000..488d82e41 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/LabelOrigin.java @@ -0,0 +1,8 @@ +package org.opendatadiscovery.oddplatform.dto; + +public enum LabelOrigin { + ALL, + INTERNAL, + EXTERNAL, + EXTERNAL_STATISTICS +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/mapper/LabelMapper.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/mapper/LabelMapper.java index 2e4237b33..5f5ca0d3f 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/mapper/LabelMapper.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/mapper/LabelMapper.java @@ -8,12 +8,14 @@ import org.opendatadiscovery.oddplatform.api.contract.model.LabelsResponse; import org.opendatadiscovery.oddplatform.api.contract.model.PageInfo; import org.opendatadiscovery.oddplatform.dto.LabelDto; +import org.opendatadiscovery.oddplatform.dto.LabelOrigin; import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelPojo; import org.opendatadiscovery.oddplatform.utils.Page; @Mapper(config = MapperConfig.class) public interface LabelMapper { @Mapping(source = "dto.pojo", target = ".") + @Mapping(source = "dto.origin", target = "external") Label mapToLabel(final LabelDto dto); Label mapToLabel(final LabelPojo pojo); @@ -22,6 +24,10 @@ public interface LabelMapper { LabelPojo applyToPojo(@MappingTarget final LabelPojo pojo, final LabelFormData form); + default Boolean isExternal(final LabelOrigin origin) { + return !LabelOrigin.INTERNAL.equals(origin); + } + default LabelsResponse mapToLabelResponse(final Page page) { return new LabelsResponse() .items(page.getData().stream().map(this::mapToLabel).toList()) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetVersionRepositoryImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetVersionRepositoryImpl.java index 6e8bf8703..363f146b8 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetVersionRepositoryImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveDatasetVersionRepositoryImpl.java @@ -19,6 +19,7 @@ import org.opendatadiscovery.oddplatform.dto.DatasetFieldDto; import org.opendatadiscovery.oddplatform.dto.DatasetStructureDto; import org.opendatadiscovery.oddplatform.dto.LabelDto; +import org.opendatadiscovery.oddplatform.dto.LabelOrigin; import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo; import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetVersionPojo; import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelPojo; @@ -234,11 +235,17 @@ private DatasetFieldDto extractDatasetFieldDto(final Record datasetVersionRecord private List extractLabels(final Record record) { final Set labels = jooqRecordHelper.extractAggRelation(record, LABELS, LabelPojo.class); - final Map relations = - jooqRecordHelper.extractAggRelation(record, LABEL_RELATIONS, LabelToDatasetFieldPojo.class).stream() - .collect(Collectors.toMap(LabelToDatasetFieldPojo::getLabelId, identity())); + + final Map relations = jooqRecordHelper + .extractAggRelation(record, LABEL_RELATIONS, LabelToDatasetFieldPojo.class) + .stream() + .collect(Collectors.toMap(LabelToDatasetFieldPojo::getLabelId, identity())); + return labels.stream() - .map(labelPojo -> new LabelDto(labelPojo, relations.get(labelPojo.getId()).getExternal())) + .map(labelPojo -> new LabelDto( + labelPojo, + LabelOrigin.valueOf(relations.get(labelPojo.getId()).getOrigin()) + )) .toList(); } } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveLabelRepository.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveLabelRepository.java index 490a57441..bf6d6a8b7 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveLabelRepository.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveLabelRepository.java @@ -3,6 +3,7 @@ import java.util.Collection; import java.util.List; import org.opendatadiscovery.oddplatform.dto.LabelDto; +import org.opendatadiscovery.oddplatform.dto.LabelOrigin; import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelPojo; import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelToDatasetFieldPojo; import org.opendatadiscovery.oddplatform.utils.Page; @@ -18,7 +19,7 @@ public interface ReactiveLabelRepository extends ReactiveCRUDRepository listByNames(final Collection names); - Flux listLabelRelations(final Collection datasetFieldIds); + Flux listLabelRelations(final Collection datasetFieldIds, final LabelOrigin origin); Flux createRelations(final Collection pojos); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveLabelRepositoryImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveLabelRepositoryImpl.java index a569fb659..bcf1c18a1 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveLabelRepositoryImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveLabelRepositoryImpl.java @@ -14,6 +14,7 @@ import org.jooq.Table; import org.jooq.impl.DSL; import org.opendatadiscovery.oddplatform.dto.LabelDto; +import org.opendatadiscovery.oddplatform.dto.LabelOrigin; import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelPojo; import org.opendatadiscovery.oddplatform.model.tables.pojos.LabelToDatasetFieldPojo; import org.opendatadiscovery.oddplatform.model.tables.records.LabelRecord; @@ -33,7 +34,7 @@ public class ReactiveLabelRepositoryImpl extends ReactiveAbstractSoftDeleteCRUDRepository implements ReactiveLabelRepository { - private static final String EXTERNAL_FIELD = "external"; + private static final String ORIGIN_FIELD = "origin"; public ReactiveLabelRepositoryImpl(final JooqReactiveOperations jooqReactiveOperations, final JooqQueryHelper jooqQueryHelper) { @@ -43,7 +44,7 @@ public ReactiveLabelRepositoryImpl(final JooqReactiveOperations jooqReactiveOper @Override public Mono getDto(final long id) { final var query = DSL.select(LABEL.fields()) - .select(DSL.coalesce(DSL.boolOr(LABEL_TO_DATASET_FIELD.EXTERNAL), false).as(EXTERNAL_FIELD)) + .select(DSL.coalesce(LABEL_TO_DATASET_FIELD.ORIGIN).as(ORIGIN_FIELD)) .from(LABEL) .leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.LABEL_ID.eq(LABEL.ID)) .where(idCondition(id)) @@ -56,7 +57,7 @@ public Mono getDto(final long id) { @Override public Mono> listDatasetFieldDtos(final Long datasetFieldId) { final var query = DSL.select(LABEL.fields()) - .select(DSL.coalesce(DSL.boolOr(LABEL_TO_DATASET_FIELD.EXTERNAL), false).as(EXTERNAL_FIELD)) + .select(DSL.coalesce(LABEL_TO_DATASET_FIELD.ORIGIN).as(ORIGIN_FIELD)) .from(LABEL) .leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.LABEL_ID.eq(LABEL.ID)) .where(addSoftDeleteFilter(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.eq(datasetFieldId))) @@ -80,7 +81,7 @@ public Mono> pageDto(final int page, final int size, final String final var cteSelect = DSL.with(labelCte.getName()) .as(select) .select(labelCte.fields()) - .select(DSL.coalesce(DSL.boolOr(LABEL_TO_DATASET_FIELD.EXTERNAL), false).as(EXTERNAL_FIELD)) + .select(DSL.coalesce(LABEL_TO_DATASET_FIELD.ORIGIN).as(ORIGIN_FIELD)) .from(labelCte.getName()) .leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.LABEL_ID.eq(labelCte.field(LABEL.ID))) .groupBy(labelCte.fields()); @@ -105,16 +106,22 @@ public Flux listByNames(final Collection names) { } @Override - public Flux listLabelRelations(final Collection datasetFieldIds) { + public Flux listLabelRelations(final Collection datasetFieldIds, + final LabelOrigin origin) { if (CollectionUtils.isEmpty(datasetFieldIds)) { return Flux.just(); } - final var query = DSL.select(LABEL_TO_DATASET_FIELD.fields()) + + var query = DSL.select(LABEL_TO_DATASET_FIELD.fields()) .from(LABEL_TO_DATASET_FIELD) .join(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID)) .where(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.in(datasetFieldIds).and(LABEL.IS_DELETED.isFalse())); - return jooqReactiveOperations.flux(query) - .map(r -> r.into(LabelToDatasetFieldPojo.class)); + + if (origin != null && !origin.equals(LabelOrigin.ALL)) { + query = query.and(LABEL_TO_DATASET_FIELD.ORIGIN.eq(origin.toString())); + } + + return jooqReactiveOperations.flux(query).map(r -> r.into(LabelToDatasetFieldPojo.class)); } @Override @@ -171,7 +178,7 @@ public Flux createRelations(final Collection updateDatasetField(final long datasetFieldId, .switchIfEmpty(Mono.error( new IllegalArgumentException(String.format("DatasetField not found by id = %s", datasetFieldId)))) .flatMap(dto -> updateDescription(datasetFieldId, datasetFieldUpdateFormData, dto)) - .zipWith(Mono.defer(() -> updateDatasetFieldLabels(datasetFieldId, datasetFieldUpdateFormData))) + .zipWith(Mono.defer(() -> updateInternalDatasetFieldLabels(datasetFieldId, datasetFieldUpdateFormData))) .map(tuple -> { final DatasetFieldDto dto = tuple.getT1(); final List labels = tuple.getT2(); @@ -64,7 +64,7 @@ public Mono updateDatasetField(final long datasetFieldId, .ignoreElement().thenReturn(dto)) .flatMap(dto -> { final List internalLabels = dto.getLabels().stream() - .filter(l -> !l.external()) + .filter(l -> l.origin().equals(LabelOrigin.INTERNAL)) .toList(); if (CollectionUtils.isEmpty(internalLabels)) { return dataEntityFilledService @@ -99,7 +99,6 @@ public Mono> createOrUpdateDatasetFields(final List> createOrUpdateDatasetFields(final List updateStatistics(final Map stats) { + // TODO: also handle labels + + reactiveDatasetFieldRepository + .getExistingFieldsByOddrn(stats.keySet()) + .collect(Collectors.toMap(DatasetFieldPojo::getOddrn, identity())); + return reactiveDatasetFieldRepository.getExistingFieldsByOddrn(stats.keySet()) .collectList() .map(fields -> { @@ -140,22 +145,22 @@ public Mono updateStatistics(final Map stats) { .then(); } - private Mono> updateDatasetFieldLabels(final long datasetFieldId, - final DatasetFieldUpdateFormData datasetFieldUpdateFormData) { + private Mono> updateInternalDatasetFieldLabels( + final long datasetFieldId, + final DatasetFieldUpdateFormData datasetFieldUpdateFormData + ) { final Set names = new HashSet<>(datasetFieldUpdateFormData.getLabelNames()); - return getCurrentRelations(List.of(datasetFieldId)).zipWith(getUpdatedRelations(names, datasetFieldId)) - .flatMap((function( - (current, updated) -> { - if (labelsAreTheSame(current, updated)) { - return reactiveLabelRepository.listDatasetFieldDtos(datasetFieldId); - } - final List currentInternalRelations = current.stream() - .filter(pojo -> !pojo.getExternal()) - .toList(); - return labelService.updateDatasetFieldLabels(datasetFieldId, currentInternalRelations, updated); + return reactiveLabelRepository.listLabelRelations(List.of(datasetFieldId), LabelOrigin.INTERNAL) + .collectList() + .zipWith(getUpdatedRelations(names, datasetFieldId)) + .flatMap((function((current, updated) -> { + if (labelsAreTheSame(current, updated)) { + return reactiveLabelRepository.listDatasetFieldDtos(datasetFieldId); } - ))); + + return labelService.updateDatasetFieldLabels(datasetFieldId, current, updated); + }))); } @NotNull @@ -181,18 +186,13 @@ private Mono updateDescription(final long datasetFieldId, return Mono.just(dto); } - private Mono> getCurrentRelations(final Collection datasetFieldIds) { - return reactiveLabelRepository.listLabelRelations(datasetFieldIds) - .collectList(); - } - private Mono> getUpdatedRelations(final Set labelNames, final long datasetFieldId) { return labelService.getOrCreateLabelsByName(labelNames) .map(pojo -> new LabelToDatasetFieldPojo() .setLabelId(pojo.getId()) .setDatasetFieldId(datasetFieldId) - .setExternal(false)) + .setOrigin(LabelOrigin.INTERNAL.toString())) .collectList(); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ReactiveLabelServiceImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ReactiveLabelServiceImpl.java index f7ed697c3..e72a62e6a 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ReactiveLabelServiceImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ReactiveLabelServiceImpl.java @@ -9,6 +9,7 @@ import org.opendatadiscovery.oddplatform.api.contract.model.LabelFormData; import org.opendatadiscovery.oddplatform.api.contract.model.LabelsResponse; import org.opendatadiscovery.oddplatform.dto.LabelDto; +import org.opendatadiscovery.oddplatform.dto.LabelOrigin; import org.opendatadiscovery.oddplatform.dto.activity.ActivityEventTypeDto; import org.opendatadiscovery.oddplatform.exception.NotFoundException; import org.opendatadiscovery.oddplatform.mapper.LabelMapper; @@ -82,7 +83,7 @@ public Flux