Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lineage Export Async endpoint using Search #18553

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions .run/Template JUnit.run.xml

This file was deleted.

5 changes: 5 additions & 0 deletions openmetadata-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,11 @@
<artifactId>commons-csv</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.9</version>
</dependency>
<dependency>
<groupId>com.onelogin</groupId>
<artifactId>java-saml</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT;

import com.fasterxml.jackson.databind.JsonNode;
import com.opencsv.CSVWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -66,6 +69,7 @@
import org.openmetadata.schema.type.csv.CsvDocumentation;
import org.openmetadata.schema.type.csv.CsvFile;
import org.openmetadata.schema.type.csv.CsvHeader;
import org.openmetadata.sdk.exception.CSVExportException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
Expand Down Expand Up @@ -254,6 +258,231 @@ public final String exportCsv(
return CsvUtil.formatCsv(csvFile);
}

public final String exportCsvAsync(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
String entityType,
boolean deleted)
throws IOException {
Response response =
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);

try {
String jsonResponse = JsonUtils.pojoToJson(response.getEntity());
JsonNode rootNode = JsonUtils.readTree(jsonResponse);

Map<String, JsonNode> entityMap = new HashMap<>();
JsonNode nodes = rootNode.path("nodes");
for (JsonNode node : nodes) {
String id = node.path("id").asText();
entityMap.put(id, node);
}

StringWriter csvContent = new StringWriter();
CSVWriter csvWriter = new CSVWriter(csvContent);
String[] headers = {
"fromEntityFQN", "fromServiceName", "fromServiceType", "fromOwners", "fromDomain",
"toEntityFQN", "toServiceName", "toServiceType", "toOwners", "toDomain",
"fromChildEntityFQN", "toChildEntityFQN"
};
csvWriter.writeNext(headers);
JsonNode edges = rootNode.path("edges");
for (JsonNode edge : edges) {
String fromEntityId = edge.path("fromEntity").path("id").asText();
String toEntityId = edge.path("toEntity").path("id").asText();

JsonNode fromEntity = entityMap.getOrDefault(fromEntityId, null);
JsonNode toEntity = entityMap.getOrDefault(toEntityId, null);

Map<String, String> baseRow = new HashMap<>();
baseRow.put("fromEntityFQN", getText(fromEntity, "fullyQualifiedName"));
baseRow.put("fromServiceName", getText(fromEntity.path("service"), "name"));
baseRow.put("fromServiceType", getText(fromEntity, "serviceType"));
baseRow.put("fromOwners", getOwners(fromEntity.path("owners")));
baseRow.put("fromDomain", getText(fromEntity, "domain"));

baseRow.put("toEntityFQN", getText(toEntity, "fullyQualifiedName"));
baseRow.put("toServiceName", getText(toEntity.path("service"), "name"));
baseRow.put("toServiceType", getText(toEntity, "serviceType"));
baseRow.put("toOwners", getOwners(toEntity.path("owners")));
baseRow.put("toDomain", getText(toEntity, "domain"));

List<String> fromChildFQNs = new ArrayList<>();
List<String> toChildFQNs = new ArrayList<>();

extractChildEntities(fromEntity, fromChildFQNs);
extractChildEntities(toEntity, toChildFQNs);

JsonNode columns = edge.path("columns");
if (columns.isArray() && !columns.isEmpty()) {
for (JsonNode columnMapping : columns) {
JsonNode fromColumns = columnMapping.path("fromColumns");
String toColumn = columnMapping.path("toColumn").asText();

for (JsonNode fromColumn : fromColumns) {
String fromChildFQN = fromColumn.asText();
String toChildFQN = toColumn;
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
}
}
} else if (!fromChildFQNs.isEmpty() || !toChildFQNs.isEmpty()) {
if (!fromChildFQNs.isEmpty() && !toChildFQNs.isEmpty()) {
for (String fromChildFQN : fromChildFQNs) {
for (String toChildFQN : toChildFQNs) {
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
}
}
} else if (!fromChildFQNs.isEmpty()) {
for (String fromChildFQN : fromChildFQNs) {
writeCsvRow(csvWriter, baseRow, fromChildFQN, "");
}
} else {
for (String toChildFQN : toChildFQNs) {
writeCsvRow(csvWriter, baseRow, "", toChildFQN);
}
}
} else {
writeCsvRow(csvWriter, baseRow, "", "");
}
}
csvWriter.close();
return csvContent.toString();
} catch (IOException e) {
throw CSVExportException.byMessage("Failed to export lineage data to CSV", e.getMessage());
}
}

private static void writeCsvRow(
CSVWriter csvWriter, Map<String, String> baseRow, String fromChildFQN, String toChildFQN) {
String[] row = {
baseRow.get("fromEntityFQN"),
baseRow.get("fromServiceName"),
baseRow.get("fromServiceType"),
baseRow.get("fromOwners"),
baseRow.get("fromDomain"),
baseRow.get("toEntityFQN"),
baseRow.get("toServiceName"),
baseRow.get("toServiceType"),
baseRow.get("toOwners"),
baseRow.get("toDomain"),
fromChildFQN,
toChildFQN
};
csvWriter.writeNext(row);
}

private static String getText(JsonNode node, String fieldName) {
if (node != null && node.has(fieldName)) {
JsonNode fieldNode = node.get(fieldName);
return fieldNode.isNull() ? "" : fieldNode.asText();
}
return "";
}

private static String getOwners(JsonNode ownersNode) {
if (ownersNode != null && ownersNode.isArray()) {
List<String> ownersList = new ArrayList<>();
for (JsonNode owner : ownersNode) {
String ownerName = getText(owner, "name");
if (!ownerName.isEmpty()) {
ownersList.add(ownerName);
}
}
return String.join(";", ownersList);
}
return "";
}

private static void extractChildEntities(JsonNode entityNode, List<String> childFQNs) {
if (entityNode == null) {
return;
}
String entityType = getText(entityNode, "entityType");
switch (entityType) {
case TABLE:
extractColumns(entityNode.path("columns"), childFQNs);
break;
case DASHBOARD:
extractCharts(entityNode.path("charts"), childFQNs);
break;
case SEARCH_INDEX:
extractFields(entityNode.path("fields"), childFQNs);
break;
case CONTAINER:
extractContainers(entityNode.path("children"), childFQNs);
extractColumns(entityNode.path("dataModel").path("columns"), childFQNs);
break;
case TOPIC:
extractSchemaFields(entityNode.path("messageSchema").path("schemaFields"), childFQNs);
break;
case DASHBOARD_DATA_MODEL:
extractColumns(entityNode.path("columns"), childFQNs);
break;
default:
break;
}
}

private static void extractColumns(JsonNode columnsNode, List<String> childFQNs) {
if (columnsNode != null && columnsNode.isArray()) {
for (JsonNode column : columnsNode) {
if (column != null) {
String columnFQN = getText(column, "fullyQualifiedName");
childFQNs.add(columnFQN);
extractColumns(column.path("children"), childFQNs);
}
}
}
}

private static void extractCharts(JsonNode chartsNode, List<String> childFQNs) {
if (chartsNode != null && chartsNode.isArray()) {
for (JsonNode chart : chartsNode) {
String chartFQN = getText(chart, "fullyQualifiedName");
childFQNs.add(chartFQN);
}
}
}

private static void extractFields(JsonNode fieldsNode, List<String> childFQNs) {
if (fieldsNode != null && fieldsNode.isArray()) {
for (JsonNode field : fieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractFields(field.path("children"), childFQNs);
}
}
}
}

private static void extractContainers(JsonNode containersNode, List<String> childFQNs) {
if (containersNode != null && containersNode.isArray()) {
for (JsonNode container : containersNode) {
if (container != null) {
String containerFQN = getText(container, "fullyQualifiedName");
childFQNs.add(containerFQN);
extractContainers(container.path("children"), childFQNs);
}
}
}
}

private static void extractSchemaFields(JsonNode schemaFieldsNode, List<String> childFQNs) {
if (schemaFieldsNode != null && schemaFieldsNode.isArray()) {
for (JsonNode field : schemaFieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractSchemaFields(field.path("children"), childFQNs);
}
}
}
}

private String getStringOrNull(HashMap map, String key) {
return nullOrEmpty(map.get(key)) ? "" : map.get(key).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
Expand Down Expand Up @@ -60,6 +61,10 @@
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContextInterface;
import org.openmetadata.service.util.AsyncService;
import org.openmetadata.service.util.CSVExportMessage;
import org.openmetadata.service.util.CSVExportResponse;
import org.openmetadata.service.util.WebsocketNotificationHandler;

@Path("/v1/lineage")
@Tag(
Expand Down Expand Up @@ -273,10 +278,61 @@ public String exportLineage(
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException {

Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}

@GET
@Path("/exportAsync")
@Produces(MediaType.APPLICATION_JSON)
@Operation(
operationId = "exportLineage",
summary = "Export lineage",
responses = {
@ApiResponse(
responseCode = "200",
description = "search response",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CSVExportMessage.class)))
})
public Response exportLineageAsync(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "fqn") @QueryParam("fqn") String fqn,
@Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth")
int downstreamDepth,
@Parameter(
description =
"Elasticsearch query that will be combined with the query_string query generator from the `query` argument")
@QueryParam("query_filter")
String queryFilter,
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
@QueryParam("includeDeleted")
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType) {
String jobId = UUID.randomUUID().toString();
ExecutorService executorService = AsyncService.getInstance().getExecutorService();
executorService.submit(
() -> {
try {
String csvData =
dao.exportCsvAsync(
fqn, upstreamDepth, downstreamDepth, queryFilter, entityType, deleted);
WebsocketNotificationHandler.sendCsvExportCompleteNotification(
jobId, securityContext, csvData);
} catch (Exception e) {
WebsocketNotificationHandler.sendCsvExportFailedNotification(
jobId, securityContext, e.getMessage());
}
});
CSVExportResponse response = new CSVExportResponse(jobId, "Export initiated successfully.");
return Response.accepted().entity(response).type(MediaType.APPLICATION_JSON).build();
}

@PUT
@Operation(
operationId = "addLineageEdge",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.openmetadata.sdk.exception;

import javax.ws.rs.core.Response;

public class CSVExportException extends WebServiceException {
private static final String BY_NAME_MESSAGE = "CSVExport Exception [%s] due to [%s].";
private static final String ERROR_TYPE = "CSV_EXPORT_ERROR";

public CSVExportException(String message) {
super(Response.Status.BAD_REQUEST, ERROR_TYPE, message);
}

public CSVExportException(Response.Status status, String message) {
super(status, ERROR_TYPE, message);
}

public static CSVExportException byMessage(
String name, String errorMessage, Response.Status status) {
return new CSVExportException(status, buildMessageByName(name, errorMessage));
}

public static CSVExportException byMessage(String name, String errorMessage) {
return new CSVExportException(
Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage));
}

private static String buildMessageByName(String name, String errorMessage) {
return String.format(BY_NAME_MESSAGE, name, errorMessage);
}
}
Loading
Loading