diff --git a/.run/Template JUnit.run.xml b/.run/Template JUnit.run.xml
deleted file mode 100644
index a76cd50c6bd1..000000000000
--- a/.run/Template JUnit.run.xml
+++ /dev/null
@@ -1,13 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/openmetadata-service/pom.xml b/openmetadata-service/pom.xml
index 53fdc85b8cf3..a16a0e2af7c1 100644
--- a/openmetadata-service/pom.xml
+++ b/openmetadata-service/pom.xml
@@ -577,6 +577,11 @@
commons-csv
1.12.0
+
+ com.opencsv
+ opencsv
+ 5.9
+
com.onelogin
java-saml
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java
index 55a6d0136472..abad6c7a7d69 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java
@@ -30,7 +30,10 @@
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.opencsv.CSVWriter;
import java.io.IOException;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -66,6 +69,7 @@
import org.openmetadata.schema.type.csv.CsvDocumentation;
import org.openmetadata.schema.type.csv.CsvFile;
import org.openmetadata.schema.type.csv.CsvHeader;
+import org.openmetadata.sdk.exception.CSVExportException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
@@ -254,6 +258,231 @@ public final String exportCsv(
return CsvUtil.formatCsv(csvFile);
}
+ public final String exportCsvAsync(
+ String fqn,
+ int upstreamDepth,
+ int downstreamDepth,
+ String queryFilter,
+ String entityType,
+ boolean deleted)
+ throws IOException {
+ Response response =
+ Entity.getSearchRepository()
+ .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
+
+ try {
+ String jsonResponse = JsonUtils.pojoToJson(response.getEntity());
+ JsonNode rootNode = JsonUtils.readTree(jsonResponse);
+
+ Map entityMap = new HashMap<>();
+ JsonNode nodes = rootNode.path("nodes");
+ for (JsonNode node : nodes) {
+ String id = node.path("id").asText();
+ entityMap.put(id, node);
+ }
+
+ StringWriter csvContent = new StringWriter();
+ CSVWriter csvWriter = new CSVWriter(csvContent);
+ String[] headers = {
+ "fromEntityFQN", "fromServiceName", "fromServiceType", "fromOwners", "fromDomain",
+ "toEntityFQN", "toServiceName", "toServiceType", "toOwners", "toDomain",
+ "fromChildEntityFQN", "toChildEntityFQN"
+ };
+ csvWriter.writeNext(headers);
+ JsonNode edges = rootNode.path("edges");
+ for (JsonNode edge : edges) {
+ String fromEntityId = edge.path("fromEntity").path("id").asText();
+ String toEntityId = edge.path("toEntity").path("id").asText();
+
+ JsonNode fromEntity = entityMap.getOrDefault(fromEntityId, null);
+ JsonNode toEntity = entityMap.getOrDefault(toEntityId, null);
+
+ Map baseRow = new HashMap<>();
+ baseRow.put("fromEntityFQN", getText(fromEntity, "fullyQualifiedName"));
+ baseRow.put("fromServiceName", getText(fromEntity.path("service"), "name"));
+ baseRow.put("fromServiceType", getText(fromEntity, "serviceType"));
+ baseRow.put("fromOwners", getOwners(fromEntity.path("owners")));
+ baseRow.put("fromDomain", getText(fromEntity, "domain"));
+
+ baseRow.put("toEntityFQN", getText(toEntity, "fullyQualifiedName"));
+ baseRow.put("toServiceName", getText(toEntity.path("service"), "name"));
+ baseRow.put("toServiceType", getText(toEntity, "serviceType"));
+ baseRow.put("toOwners", getOwners(toEntity.path("owners")));
+ baseRow.put("toDomain", getText(toEntity, "domain"));
+
+ List fromChildFQNs = new ArrayList<>();
+ List toChildFQNs = new ArrayList<>();
+
+ extractChildEntities(fromEntity, fromChildFQNs);
+ extractChildEntities(toEntity, toChildFQNs);
+
+ JsonNode columns = edge.path("columns");
+ if (columns.isArray() && !columns.isEmpty()) {
+ for (JsonNode columnMapping : columns) {
+ JsonNode fromColumns = columnMapping.path("fromColumns");
+ String toColumn = columnMapping.path("toColumn").asText();
+
+ for (JsonNode fromColumn : fromColumns) {
+ String fromChildFQN = fromColumn.asText();
+ String toChildFQN = toColumn;
+ writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
+ }
+ }
+ } else if (!fromChildFQNs.isEmpty() || !toChildFQNs.isEmpty()) {
+ if (!fromChildFQNs.isEmpty() && !toChildFQNs.isEmpty()) {
+ for (String fromChildFQN : fromChildFQNs) {
+ for (String toChildFQN : toChildFQNs) {
+ writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
+ }
+ }
+ } else if (!fromChildFQNs.isEmpty()) {
+ for (String fromChildFQN : fromChildFQNs) {
+ writeCsvRow(csvWriter, baseRow, fromChildFQN, "");
+ }
+ } else {
+ for (String toChildFQN : toChildFQNs) {
+ writeCsvRow(csvWriter, baseRow, "", toChildFQN);
+ }
+ }
+ } else {
+ writeCsvRow(csvWriter, baseRow, "", "");
+ }
+ }
+ csvWriter.close();
+ return csvContent.toString();
+ } catch (IOException e) {
+ throw CSVExportException.byMessage("Failed to export lineage data to CSV", e.getMessage());
+ }
+ }
+
+ private static void writeCsvRow(
+ CSVWriter csvWriter, Map baseRow, String fromChildFQN, String toChildFQN) {
+ String[] row = {
+ baseRow.get("fromEntityFQN"),
+ baseRow.get("fromServiceName"),
+ baseRow.get("fromServiceType"),
+ baseRow.get("fromOwners"),
+ baseRow.get("fromDomain"),
+ baseRow.get("toEntityFQN"),
+ baseRow.get("toServiceName"),
+ baseRow.get("toServiceType"),
+ baseRow.get("toOwners"),
+ baseRow.get("toDomain"),
+ fromChildFQN,
+ toChildFQN
+ };
+ csvWriter.writeNext(row);
+ }
+
+ private static String getText(JsonNode node, String fieldName) {
+ if (node != null && node.has(fieldName)) {
+ JsonNode fieldNode = node.get(fieldName);
+ return fieldNode.isNull() ? "" : fieldNode.asText();
+ }
+ return "";
+ }
+
+ private static String getOwners(JsonNode ownersNode) {
+ if (ownersNode != null && ownersNode.isArray()) {
+ List ownersList = new ArrayList<>();
+ for (JsonNode owner : ownersNode) {
+ String ownerName = getText(owner, "name");
+ if (!ownerName.isEmpty()) {
+ ownersList.add(ownerName);
+ }
+ }
+ return String.join(";", ownersList);
+ }
+ return "";
+ }
+
+ private static void extractChildEntities(JsonNode entityNode, List childFQNs) {
+ if (entityNode == null) {
+ return;
+ }
+ String entityType = getText(entityNode, "entityType");
+ switch (entityType) {
+ case TABLE:
+ extractColumns(entityNode.path("columns"), childFQNs);
+ break;
+ case DASHBOARD:
+ extractCharts(entityNode.path("charts"), childFQNs);
+ break;
+ case SEARCH_INDEX:
+ extractFields(entityNode.path("fields"), childFQNs);
+ break;
+ case CONTAINER:
+ extractContainers(entityNode.path("children"), childFQNs);
+ extractColumns(entityNode.path("dataModel").path("columns"), childFQNs);
+ break;
+ case TOPIC:
+ extractSchemaFields(entityNode.path("messageSchema").path("schemaFields"), childFQNs);
+ break;
+ case DASHBOARD_DATA_MODEL:
+ extractColumns(entityNode.path("columns"), childFQNs);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private static void extractColumns(JsonNode columnsNode, List childFQNs) {
+ if (columnsNode != null && columnsNode.isArray()) {
+ for (JsonNode column : columnsNode) {
+ if (column != null) {
+ String columnFQN = getText(column, "fullyQualifiedName");
+ childFQNs.add(columnFQN);
+ extractColumns(column.path("children"), childFQNs);
+ }
+ }
+ }
+ }
+
+ private static void extractCharts(JsonNode chartsNode, List childFQNs) {
+ if (chartsNode != null && chartsNode.isArray()) {
+ for (JsonNode chart : chartsNode) {
+ String chartFQN = getText(chart, "fullyQualifiedName");
+ childFQNs.add(chartFQN);
+ }
+ }
+ }
+
+ private static void extractFields(JsonNode fieldsNode, List childFQNs) {
+ if (fieldsNode != null && fieldsNode.isArray()) {
+ for (JsonNode field : fieldsNode) {
+ if (field != null) {
+ String fieldFQN = getText(field, "fullyQualifiedName");
+ childFQNs.add(fieldFQN);
+ extractFields(field.path("children"), childFQNs);
+ }
+ }
+ }
+ }
+
+ private static void extractContainers(JsonNode containersNode, List childFQNs) {
+ if (containersNode != null && containersNode.isArray()) {
+ for (JsonNode container : containersNode) {
+ if (container != null) {
+ String containerFQN = getText(container, "fullyQualifiedName");
+ childFQNs.add(containerFQN);
+ extractContainers(container.path("children"), childFQNs);
+ }
+ }
+ }
+ }
+
+ private static void extractSchemaFields(JsonNode schemaFieldsNode, List childFQNs) {
+ if (schemaFieldsNode != null && schemaFieldsNode.isArray()) {
+ for (JsonNode field : schemaFieldsNode) {
+ if (field != null) {
+ String fieldFQN = getText(field, "fullyQualifiedName");
+ childFQNs.add(fieldFQN);
+ extractSchemaFields(field.path("children"), childFQNs);
+ }
+ }
+ }
+ }
+
private String getStringOrNull(HashMap map, String key) {
return nullOrEmpty(map.get(key)) ? "" : map.get(key).toString();
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java
index 2a751153dfdb..b5ace8f9ab59 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java
@@ -29,6 +29,7 @@
import java.io.IOException;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
@@ -60,6 +61,10 @@
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContextInterface;
+import org.openmetadata.service.util.AsyncService;
+import org.openmetadata.service.util.CSVExportMessage;
+import org.openmetadata.service.util.CSVExportResponse;
+import org.openmetadata.service.util.WebsocketNotificationHandler;
@Path("/v1/lineage")
@Tag(
@@ -273,10 +278,61 @@ public String exportLineage(
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException {
-
+ Entity.getSearchRepository()
+ .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}
+ @GET
+ @Path("/exportAsync")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Operation(
+ operationId = "exportLineage",
+ summary = "Export lineage",
+ responses = {
+ @ApiResponse(
+ responseCode = "200",
+ description = "search response",
+ content =
+ @Content(
+ mediaType = "application/json",
+ schema = @Schema(implementation = CSVExportMessage.class)))
+ })
+ public Response exportLineageAsync(
+ @Context UriInfo uriInfo,
+ @Context SecurityContext securityContext,
+ @Parameter(description = "fqn") @QueryParam("fqn") String fqn,
+ @Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth,
+ @Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth")
+ int downstreamDepth,
+ @Parameter(
+ description =
+ "Elasticsearch query that will be combined with the query_string query generator from the `query` argument")
+ @QueryParam("query_filter")
+ String queryFilter,
+ @Parameter(description = "Filter documents by deleted param. By default deleted is false")
+ @QueryParam("includeDeleted")
+ boolean deleted,
+ @Parameter(description = "entity type") @QueryParam("type") String entityType) {
+ String jobId = UUID.randomUUID().toString();
+ ExecutorService executorService = AsyncService.getInstance().getExecutorService();
+ executorService.submit(
+ () -> {
+ try {
+ String csvData =
+ dao.exportCsvAsync(
+ fqn, upstreamDepth, downstreamDepth, queryFilter, entityType, deleted);
+ WebsocketNotificationHandler.sendCsvExportCompleteNotification(
+ jobId, securityContext, csvData);
+ } catch (Exception e) {
+ WebsocketNotificationHandler.sendCsvExportFailedNotification(
+ jobId, securityContext, e.getMessage());
+ }
+ });
+ CSVExportResponse response = new CSVExportResponse(jobId, "Export initiated successfully.");
+ return Response.accepted().entity(response).type(MediaType.APPLICATION_JSON).build();
+ }
+
@PUT
@Operation(
operationId = "addLineageEdge",
diff --git a/openmetadata-spec/src/main/java/org/openmetadata/sdk/exception/CSVExportException.java b/openmetadata-spec/src/main/java/org/openmetadata/sdk/exception/CSVExportException.java
new file mode 100644
index 000000000000..e546b53c55f5
--- /dev/null
+++ b/openmetadata-spec/src/main/java/org/openmetadata/sdk/exception/CSVExportException.java
@@ -0,0 +1,30 @@
+package org.openmetadata.sdk.exception;
+
+import javax.ws.rs.core.Response;
+
+public class CSVExportException extends WebServiceException {
+ private static final String BY_NAME_MESSAGE = "CSVExport Exception [%s] due to [%s].";
+ private static final String ERROR_TYPE = "CSV_EXPORT_ERROR";
+
+ public CSVExportException(String message) {
+ super(Response.Status.BAD_REQUEST, ERROR_TYPE, message);
+ }
+
+ public CSVExportException(Response.Status status, String message) {
+ super(status, ERROR_TYPE, message);
+ }
+
+ public static CSVExportException byMessage(
+ String name, String errorMessage, Response.Status status) {
+ return new CSVExportException(status, buildMessageByName(name, errorMessage));
+ }
+
+ public static CSVExportException byMessage(String name, String errorMessage) {
+ return new CSVExportException(
+ Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage));
+ }
+
+ private static String buildMessageByName(String name, String errorMessage) {
+ return String.format(BY_NAME_MESSAGE, name, errorMessage);
+ }
+}
diff --git a/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx b/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx
index 49d25f6274e0..256cf6489ffb 100644
--- a/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx
+++ b/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx
@@ -80,6 +80,7 @@ import { useApplicationStore } from '../../hooks/useApplicationStore';
import useCustomLocation from '../../hooks/useCustomLocation/useCustomLocation';
import { useFqn } from '../../hooks/useFqn';
import {
+ exportLineageAsync,
getDataQualityLineage,
getLineageDataByFQN,
updateLineageEdge,
@@ -96,7 +97,6 @@ import {
getChildMap,
getClassifiedEdge,
getConnectedNodesEdges,
- getExportData,
getLayoutedElements,
getLineageEdge,
getLineageEdgeForAPI,
@@ -338,20 +338,14 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
const exportLineageData = useCallback(
async (_: string) => {
- if (
- entityType === EntityType.PIPELINE ||
- entityType === EntityType.STORED_PROCEDURE
- ) {
- // Since pipeline is an edge, we cannot create a tree, hence we take the nodes from the lineage response
- // to get the export data.
- return getExportData(entityLineage.nodes ?? []);
- }
-
- const { exportResult } = getChildMap(entityLineage, decodedFqn);
-
- return exportResult;
+ return exportLineageAsync(
+ decodedFqn,
+ entityType,
+ lineageConfig,
+ queryFilter
+ );
},
- [entityType, decodedFqn, entityLineage]
+ [entityType, decodedFqn, lineageConfig, queryFilter]
);
const onExportClick = useCallback(() => {
@@ -361,7 +355,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
onExport: exportLineageData,
});
}
- }, [entityType, decodedFqn, entityLineage]);
+ }, [entityType, decodedFqn, lineageConfig, queryFilter]);
const loadChildNodesHandler = useCallback(
async (node: SourceType, direction: EdgeTypeEnum) => {
diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/lineageAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/lineageAPI.ts
index 1b3ea71ee529..f661cdcfa6ae 100644
--- a/openmetadata-ui/src/main/resources/ui/src/rest/lineageAPI.ts
+++ b/openmetadata-ui/src/main/resources/ui/src/rest/lineageAPI.ts
@@ -11,6 +11,7 @@
* limitations under the License.
*/
+import { CSVExportResponse } from '../components/Entity/EntityExportModalProvider/EntityExportModalProvider.interface';
import { LineageConfig } from '../components/Entity/EntityLineage/EntityLineage.interface';
import { EntityLineageResponse } from '../components/Lineage/Lineage.interface';
import { AddLineage } from '../generated/api/lineage/addLineage';
@@ -22,6 +23,30 @@ export const updateLineageEdge = async (edge: AddLineage) => {
return response.data;
};
+export const exportLineageAsync = async (
+ fqn: string,
+ entityType: string,
+ config?: LineageConfig,
+ queryFilter?: string
+) => {
+ const { upstreamDepth = 1, downstreamDepth = 1 } = config ?? {};
+ const response = await APIClient.get(
+ `/lineage/exportAsync`,
+ {
+ params: {
+ fqn,
+ type: entityType,
+ upstreamDepth,
+ downstreamDepth,
+ query_filter: queryFilter,
+ includeDeleted: false,
+ },
+ }
+ );
+
+ return response.data;
+};
+
export const getLineageDataByFQN = async (
fqn: string,
entityType: string,