From 783ec62e4c6abae0e02dc83b6f2e4088a4818d62 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 15 Nov 2024 09:11:23 -0800 Subject: [PATCH] Add Lineage Export Async endpoint using Search (#18553) * Add Lineage Export Async endpoint using Search * Lineage api (#18593) * add api for lineage export * update API call for lineage async * use JsonUtils instead of objectMapper to read lineage search response --------- Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com> Co-authored-by: Sweta Agarwalla <105535990+sweta1308@users.noreply.github.com> Co-authored-by: sonikashah --- .run/Template JUnit.run.xml | 13 - openmetadata-service/pom.xml | 5 + .../service/jdbi3/LineageRepository.java | 229 ++++++++++++++++++ .../resources/lineage/LineageResource.java | 58 ++++- .../sdk/exception/CSVExportException.java | 30 +++ .../LineageProvider/LineageProvider.tsx | 24 +- .../main/resources/ui/src/rest/lineageAPI.ts | 25 ++ 7 files changed, 355 insertions(+), 29 deletions(-) delete mode 100644 .run/Template JUnit.run.xml create mode 100644 openmetadata-spec/src/main/java/org/openmetadata/sdk/exception/CSVExportException.java diff --git a/.run/Template JUnit.run.xml b/.run/Template JUnit.run.xml deleted file mode 100644 index a76cd50c6bd1..000000000000 --- a/.run/Template JUnit.run.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - \ No newline at end of file diff --git a/openmetadata-service/pom.xml b/openmetadata-service/pom.xml index 53fdc85b8cf3..a16a0e2af7c1 100644 --- a/openmetadata-service/pom.xml +++ b/openmetadata-service/pom.xml @@ -577,6 +577,11 @@ commons-csv 1.12.0 + + com.opencsv + opencsv + 5.9 + com.onelogin java-saml diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index 55a6d0136472..abad6c7a7d69 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -30,7 +30,10 @@ import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS; import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT; +import com.fasterxml.jackson.databind.JsonNode; +import com.opencsv.CSVWriter; import java.io.IOException; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -66,6 +69,7 @@ import org.openmetadata.schema.type.csv.CsvDocumentation; import org.openmetadata.schema.type.csv.CsvFile; import org.openmetadata.schema.type.csv.CsvHeader; +import org.openmetadata.sdk.exception.CSVExportException; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.EntityNotFoundException; @@ -254,6 +258,231 @@ public final String exportCsv( return CsvUtil.formatCsv(csvFile); } + public final String exportCsvAsync( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + String entityType, + boolean deleted) + throws IOException { + Response response = + Entity.getSearchRepository() + .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + + try { + String jsonResponse = JsonUtils.pojoToJson(response.getEntity()); + JsonNode rootNode = JsonUtils.readTree(jsonResponse); + + Map entityMap = new HashMap<>(); + JsonNode nodes = rootNode.path("nodes"); + for (JsonNode node : nodes) { + String id = node.path("id").asText(); + entityMap.put(id, node); + } + + StringWriter csvContent = new StringWriter(); + CSVWriter csvWriter = new CSVWriter(csvContent); + String[] headers = { + "fromEntityFQN", "fromServiceName", "fromServiceType", "fromOwners", "fromDomain", + "toEntityFQN", "toServiceName", "toServiceType", "toOwners", "toDomain", + "fromChildEntityFQN", "toChildEntityFQN" + }; + csvWriter.writeNext(headers); + JsonNode edges = rootNode.path("edges"); + for (JsonNode edge : edges) { + String fromEntityId = edge.path("fromEntity").path("id").asText(); + String toEntityId = edge.path("toEntity").path("id").asText(); + + JsonNode fromEntity = entityMap.getOrDefault(fromEntityId, null); + JsonNode toEntity = entityMap.getOrDefault(toEntityId, null); + + Map baseRow = new HashMap<>(); + baseRow.put("fromEntityFQN", getText(fromEntity, "fullyQualifiedName")); + baseRow.put("fromServiceName", getText(fromEntity.path("service"), "name")); + baseRow.put("fromServiceType", getText(fromEntity, "serviceType")); + baseRow.put("fromOwners", getOwners(fromEntity.path("owners"))); + baseRow.put("fromDomain", getText(fromEntity, "domain")); + + baseRow.put("toEntityFQN", getText(toEntity, "fullyQualifiedName")); + baseRow.put("toServiceName", getText(toEntity.path("service"), "name")); + baseRow.put("toServiceType", getText(toEntity, "serviceType")); + baseRow.put("toOwners", getOwners(toEntity.path("owners"))); + baseRow.put("toDomain", getText(toEntity, "domain")); + + List fromChildFQNs = new ArrayList<>(); + List toChildFQNs = new ArrayList<>(); + + extractChildEntities(fromEntity, fromChildFQNs); + extractChildEntities(toEntity, toChildFQNs); + + JsonNode columns = edge.path("columns"); + if (columns.isArray() && !columns.isEmpty()) { + for (JsonNode columnMapping : columns) { + JsonNode fromColumns = columnMapping.path("fromColumns"); + String toColumn = columnMapping.path("toColumn").asText(); + + for (JsonNode fromColumn : fromColumns) { + String fromChildFQN = fromColumn.asText(); + String toChildFQN = toColumn; + writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN); + } + } + } else if (!fromChildFQNs.isEmpty() || !toChildFQNs.isEmpty()) { + if (!fromChildFQNs.isEmpty() && !toChildFQNs.isEmpty()) { + for (String fromChildFQN : fromChildFQNs) { + for (String toChildFQN : toChildFQNs) { + writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN); + } + } + } else if (!fromChildFQNs.isEmpty()) { + for (String fromChildFQN : fromChildFQNs) { + writeCsvRow(csvWriter, baseRow, fromChildFQN, ""); + } + } else { + for (String toChildFQN : toChildFQNs) { + writeCsvRow(csvWriter, baseRow, "", toChildFQN); + } + } + } else { + writeCsvRow(csvWriter, baseRow, "", ""); + } + } + csvWriter.close(); + return csvContent.toString(); + } catch (IOException e) { + throw CSVExportException.byMessage("Failed to export lineage data to CSV", e.getMessage()); + } + } + + private static void writeCsvRow( + CSVWriter csvWriter, Map baseRow, String fromChildFQN, String toChildFQN) { + String[] row = { + baseRow.get("fromEntityFQN"), + baseRow.get("fromServiceName"), + baseRow.get("fromServiceType"), + baseRow.get("fromOwners"), + baseRow.get("fromDomain"), + baseRow.get("toEntityFQN"), + baseRow.get("toServiceName"), + baseRow.get("toServiceType"), + baseRow.get("toOwners"), + baseRow.get("toDomain"), + fromChildFQN, + toChildFQN + }; + csvWriter.writeNext(row); + } + + private static String getText(JsonNode node, String fieldName) { + if (node != null && node.has(fieldName)) { + JsonNode fieldNode = node.get(fieldName); + return fieldNode.isNull() ? "" : fieldNode.asText(); + } + return ""; + } + + private static String getOwners(JsonNode ownersNode) { + if (ownersNode != null && ownersNode.isArray()) { + List ownersList = new ArrayList<>(); + for (JsonNode owner : ownersNode) { + String ownerName = getText(owner, "name"); + if (!ownerName.isEmpty()) { + ownersList.add(ownerName); + } + } + return String.join(";", ownersList); + } + return ""; + } + + private static void extractChildEntities(JsonNode entityNode, List childFQNs) { + if (entityNode == null) { + return; + } + String entityType = getText(entityNode, "entityType"); + switch (entityType) { + case TABLE: + extractColumns(entityNode.path("columns"), childFQNs); + break; + case DASHBOARD: + extractCharts(entityNode.path("charts"), childFQNs); + break; + case SEARCH_INDEX: + extractFields(entityNode.path("fields"), childFQNs); + break; + case CONTAINER: + extractContainers(entityNode.path("children"), childFQNs); + extractColumns(entityNode.path("dataModel").path("columns"), childFQNs); + break; + case TOPIC: + extractSchemaFields(entityNode.path("messageSchema").path("schemaFields"), childFQNs); + break; + case DASHBOARD_DATA_MODEL: + extractColumns(entityNode.path("columns"), childFQNs); + break; + default: + break; + } + } + + private static void extractColumns(JsonNode columnsNode, List childFQNs) { + if (columnsNode != null && columnsNode.isArray()) { + for (JsonNode column : columnsNode) { + if (column != null) { + String columnFQN = getText(column, "fullyQualifiedName"); + childFQNs.add(columnFQN); + extractColumns(column.path("children"), childFQNs); + } + } + } + } + + private static void extractCharts(JsonNode chartsNode, List childFQNs) { + if (chartsNode != null && chartsNode.isArray()) { + for (JsonNode chart : chartsNode) { + String chartFQN = getText(chart, "fullyQualifiedName"); + childFQNs.add(chartFQN); + } + } + } + + private static void extractFields(JsonNode fieldsNode, List childFQNs) { + if (fieldsNode != null && fieldsNode.isArray()) { + for (JsonNode field : fieldsNode) { + if (field != null) { + String fieldFQN = getText(field, "fullyQualifiedName"); + childFQNs.add(fieldFQN); + extractFields(field.path("children"), childFQNs); + } + } + } + } + + private static void extractContainers(JsonNode containersNode, List childFQNs) { + if (containersNode != null && containersNode.isArray()) { + for (JsonNode container : containersNode) { + if (container != null) { + String containerFQN = getText(container, "fullyQualifiedName"); + childFQNs.add(containerFQN); + extractContainers(container.path("children"), childFQNs); + } + } + } + } + + private static void extractSchemaFields(JsonNode schemaFieldsNode, List childFQNs) { + if (schemaFieldsNode != null && schemaFieldsNode.isArray()) { + for (JsonNode field : schemaFieldsNode) { + if (field != null) { + String fieldFQN = getText(field, "fullyQualifiedName"); + childFQNs.add(fieldFQN); + extractSchemaFields(field.path("children"), childFQNs); + } + } + } + } + private String getStringOrNull(HashMap map, String key) { return nullOrEmpty(map.get(key)) ? "" : map.get(key).toString(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java index 2a751153dfdb..b5ace8f9ab59 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutorService; import javax.json.JsonPatch; import javax.validation.Valid; import javax.validation.constraints.Max; @@ -60,6 +61,10 @@ import org.openmetadata.service.security.Authorizer; import org.openmetadata.service.security.policyevaluator.OperationContext; import org.openmetadata.service.security.policyevaluator.ResourceContextInterface; +import org.openmetadata.service.util.AsyncService; +import org.openmetadata.service.util.CSVExportMessage; +import org.openmetadata.service.util.CSVExportResponse; +import org.openmetadata.service.util.WebsocketNotificationHandler; @Path("/v1/lineage") @Tag( @@ -273,10 +278,61 @@ public String exportLineage( boolean deleted, @Parameter(description = "entity type") @QueryParam("type") String entityType) throws IOException { - + Entity.getSearchRepository() + .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } + @GET + @Path("/exportAsync") + @Produces(MediaType.APPLICATION_JSON) + @Operation( + operationId = "exportLineage", + summary = "Export lineage", + responses = { + @ApiResponse( + responseCode = "200", + description = "search response", + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = CSVExportMessage.class))) + }) + public Response exportLineageAsync( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "fqn") @QueryParam("fqn") String fqn, + @Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth, + @Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth") + int downstreamDepth, + @Parameter( + description = + "Elasticsearch query that will be combined with the query_string query generator from the `query` argument") + @QueryParam("query_filter") + String queryFilter, + @Parameter(description = "Filter documents by deleted param. By default deleted is false") + @QueryParam("includeDeleted") + boolean deleted, + @Parameter(description = "entity type") @QueryParam("type") String entityType) { + String jobId = UUID.randomUUID().toString(); + ExecutorService executorService = AsyncService.getInstance().getExecutorService(); + executorService.submit( + () -> { + try { + String csvData = + dao.exportCsvAsync( + fqn, upstreamDepth, downstreamDepth, queryFilter, entityType, deleted); + WebsocketNotificationHandler.sendCsvExportCompleteNotification( + jobId, securityContext, csvData); + } catch (Exception e) { + WebsocketNotificationHandler.sendCsvExportFailedNotification( + jobId, securityContext, e.getMessage()); + } + }); + CSVExportResponse response = new CSVExportResponse(jobId, "Export initiated successfully."); + return Response.accepted().entity(response).type(MediaType.APPLICATION_JSON).build(); + } + @PUT @Operation( operationId = "addLineageEdge", diff --git a/openmetadata-spec/src/main/java/org/openmetadata/sdk/exception/CSVExportException.java b/openmetadata-spec/src/main/java/org/openmetadata/sdk/exception/CSVExportException.java new file mode 100644 index 000000000000..e546b53c55f5 --- /dev/null +++ b/openmetadata-spec/src/main/java/org/openmetadata/sdk/exception/CSVExportException.java @@ -0,0 +1,30 @@ +package org.openmetadata.sdk.exception; + +import javax.ws.rs.core.Response; + +public class CSVExportException extends WebServiceException { + private static final String BY_NAME_MESSAGE = "CSVExport Exception [%s] due to [%s]."; + private static final String ERROR_TYPE = "CSV_EXPORT_ERROR"; + + public CSVExportException(String message) { + super(Response.Status.BAD_REQUEST, ERROR_TYPE, message); + } + + public CSVExportException(Response.Status status, String message) { + super(status, ERROR_TYPE, message); + } + + public static CSVExportException byMessage( + String name, String errorMessage, Response.Status status) { + return new CSVExportException(status, buildMessageByName(name, errorMessage)); + } + + public static CSVExportException byMessage(String name, String errorMessage) { + return new CSVExportException( + Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage)); + } + + private static String buildMessageByName(String name, String errorMessage) { + return String.format(BY_NAME_MESSAGE, name, errorMessage); + } +} diff --git a/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx b/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx index 49d25f6274e0..256cf6489ffb 100644 --- a/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx @@ -80,6 +80,7 @@ import { useApplicationStore } from '../../hooks/useApplicationStore'; import useCustomLocation from '../../hooks/useCustomLocation/useCustomLocation'; import { useFqn } from '../../hooks/useFqn'; import { + exportLineageAsync, getDataQualityLineage, getLineageDataByFQN, updateLineageEdge, @@ -96,7 +97,6 @@ import { getChildMap, getClassifiedEdge, getConnectedNodesEdges, - getExportData, getLayoutedElements, getLineageEdge, getLineageEdgeForAPI, @@ -338,20 +338,14 @@ const LineageProvider = ({ children }: LineageProviderProps) => { const exportLineageData = useCallback( async (_: string) => { - if ( - entityType === EntityType.PIPELINE || - entityType === EntityType.STORED_PROCEDURE - ) { - // Since pipeline is an edge, we cannot create a tree, hence we take the nodes from the lineage response - // to get the export data. - return getExportData(entityLineage.nodes ?? []); - } - - const { exportResult } = getChildMap(entityLineage, decodedFqn); - - return exportResult; + return exportLineageAsync( + decodedFqn, + entityType, + lineageConfig, + queryFilter + ); }, - [entityType, decodedFqn, entityLineage] + [entityType, decodedFqn, lineageConfig, queryFilter] ); const onExportClick = useCallback(() => { @@ -361,7 +355,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => { onExport: exportLineageData, }); } - }, [entityType, decodedFqn, entityLineage]); + }, [entityType, decodedFqn, lineageConfig, queryFilter]); const loadChildNodesHandler = useCallback( async (node: SourceType, direction: EdgeTypeEnum) => { diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/lineageAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/lineageAPI.ts index 1b3ea71ee529..f661cdcfa6ae 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/lineageAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/lineageAPI.ts @@ -11,6 +11,7 @@ * limitations under the License. */ +import { CSVExportResponse } from '../components/Entity/EntityExportModalProvider/EntityExportModalProvider.interface'; import { LineageConfig } from '../components/Entity/EntityLineage/EntityLineage.interface'; import { EntityLineageResponse } from '../components/Lineage/Lineage.interface'; import { AddLineage } from '../generated/api/lineage/addLineage'; @@ -22,6 +23,30 @@ export const updateLineageEdge = async (edge: AddLineage) => { return response.data; }; +export const exportLineageAsync = async ( + fqn: string, + entityType: string, + config?: LineageConfig, + queryFilter?: string +) => { + const { upstreamDepth = 1, downstreamDepth = 1 } = config ?? {}; + const response = await APIClient.get( + `/lineage/exportAsync`, + { + params: { + fqn, + type: entityType, + upstreamDepth, + downstreamDepth, + query_filter: queryFilter, + includeDeleted: false, + }, + } + ); + + return response.data; +}; + export const getLineageDataByFQN = async ( fqn: string, entityType: string,