Skip to content

Commit

Permalink
Add Lineage Export Async endpoint using Search (#18553)
Browse files Browse the repository at this point in the history
* Add Lineage Export Async endpoint using Search

* Lineage api (#18593)

* add api for lineage export

* update API call for lineage async

* use JsonUtils instead of objectMapper to read lineage search response

---------

Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com>
Co-authored-by: Sweta Agarwalla <105535990+sweta1308@users.noreply.github.com>
Co-authored-by: sonikashah <sonikashah94@gmail.com>
  • Loading branch information
4 people authored Nov 15, 2024
1 parent 71f9363 commit 783ec62
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 29 deletions.
13 changes: 0 additions & 13 deletions .run/Template JUnit.run.xml

This file was deleted.

5 changes: 5 additions & 0 deletions openmetadata-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,11 @@
<artifactId>commons-csv</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.9</version>
</dependency>
<dependency>
<groupId>com.onelogin</groupId>
<artifactId>java-saml</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT;

import com.fasterxml.jackson.databind.JsonNode;
import com.opencsv.CSVWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -66,6 +69,7 @@
import org.openmetadata.schema.type.csv.CsvDocumentation;
import org.openmetadata.schema.type.csv.CsvFile;
import org.openmetadata.schema.type.csv.CsvHeader;
import org.openmetadata.sdk.exception.CSVExportException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
Expand Down Expand Up @@ -254,6 +258,231 @@ public final String exportCsv(
return CsvUtil.formatCsv(csvFile);
}

public final String exportCsvAsync(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
String entityType,
boolean deleted)
throws IOException {
Response response =
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);

try {
String jsonResponse = JsonUtils.pojoToJson(response.getEntity());
JsonNode rootNode = JsonUtils.readTree(jsonResponse);

Map<String, JsonNode> entityMap = new HashMap<>();
JsonNode nodes = rootNode.path("nodes");
for (JsonNode node : nodes) {
String id = node.path("id").asText();
entityMap.put(id, node);
}

StringWriter csvContent = new StringWriter();
CSVWriter csvWriter = new CSVWriter(csvContent);
String[] headers = {
"fromEntityFQN", "fromServiceName", "fromServiceType", "fromOwners", "fromDomain",
"toEntityFQN", "toServiceName", "toServiceType", "toOwners", "toDomain",
"fromChildEntityFQN", "toChildEntityFQN"
};
csvWriter.writeNext(headers);
JsonNode edges = rootNode.path("edges");
for (JsonNode edge : edges) {
String fromEntityId = edge.path("fromEntity").path("id").asText();
String toEntityId = edge.path("toEntity").path("id").asText();

JsonNode fromEntity = entityMap.getOrDefault(fromEntityId, null);
JsonNode toEntity = entityMap.getOrDefault(toEntityId, null);

Map<String, String> baseRow = new HashMap<>();
baseRow.put("fromEntityFQN", getText(fromEntity, "fullyQualifiedName"));
baseRow.put("fromServiceName", getText(fromEntity.path("service"), "name"));
baseRow.put("fromServiceType", getText(fromEntity, "serviceType"));
baseRow.put("fromOwners", getOwners(fromEntity.path("owners")));
baseRow.put("fromDomain", getText(fromEntity, "domain"));

baseRow.put("toEntityFQN", getText(toEntity, "fullyQualifiedName"));
baseRow.put("toServiceName", getText(toEntity.path("service"), "name"));
baseRow.put("toServiceType", getText(toEntity, "serviceType"));
baseRow.put("toOwners", getOwners(toEntity.path("owners")));
baseRow.put("toDomain", getText(toEntity, "domain"));

List<String> fromChildFQNs = new ArrayList<>();
List<String> toChildFQNs = new ArrayList<>();

extractChildEntities(fromEntity, fromChildFQNs);
extractChildEntities(toEntity, toChildFQNs);

JsonNode columns = edge.path("columns");
if (columns.isArray() && !columns.isEmpty()) {
for (JsonNode columnMapping : columns) {
JsonNode fromColumns = columnMapping.path("fromColumns");
String toColumn = columnMapping.path("toColumn").asText();

for (JsonNode fromColumn : fromColumns) {
String fromChildFQN = fromColumn.asText();
String toChildFQN = toColumn;
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
}
}
} else if (!fromChildFQNs.isEmpty() || !toChildFQNs.isEmpty()) {
if (!fromChildFQNs.isEmpty() && !toChildFQNs.isEmpty()) {
for (String fromChildFQN : fromChildFQNs) {
for (String toChildFQN : toChildFQNs) {
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
}
}
} else if (!fromChildFQNs.isEmpty()) {
for (String fromChildFQN : fromChildFQNs) {
writeCsvRow(csvWriter, baseRow, fromChildFQN, "");
}
} else {
for (String toChildFQN : toChildFQNs) {
writeCsvRow(csvWriter, baseRow, "", toChildFQN);
}
}
} else {
writeCsvRow(csvWriter, baseRow, "", "");
}
}
csvWriter.close();
return csvContent.toString();
} catch (IOException e) {
throw CSVExportException.byMessage("Failed to export lineage data to CSV", e.getMessage());
}
}

private static void writeCsvRow(
CSVWriter csvWriter, Map<String, String> baseRow, String fromChildFQN, String toChildFQN) {
String[] row = {
baseRow.get("fromEntityFQN"),
baseRow.get("fromServiceName"),
baseRow.get("fromServiceType"),
baseRow.get("fromOwners"),
baseRow.get("fromDomain"),
baseRow.get("toEntityFQN"),
baseRow.get("toServiceName"),
baseRow.get("toServiceType"),
baseRow.get("toOwners"),
baseRow.get("toDomain"),
fromChildFQN,
toChildFQN
};
csvWriter.writeNext(row);
}

private static String getText(JsonNode node, String fieldName) {
if (node != null && node.has(fieldName)) {
JsonNode fieldNode = node.get(fieldName);
return fieldNode.isNull() ? "" : fieldNode.asText();
}
return "";
}

private static String getOwners(JsonNode ownersNode) {
if (ownersNode != null && ownersNode.isArray()) {
List<String> ownersList = new ArrayList<>();
for (JsonNode owner : ownersNode) {
String ownerName = getText(owner, "name");
if (!ownerName.isEmpty()) {
ownersList.add(ownerName);
}
}
return String.join(";", ownersList);
}
return "";
}

private static void extractChildEntities(JsonNode entityNode, List<String> childFQNs) {
if (entityNode == null) {
return;
}
String entityType = getText(entityNode, "entityType");
switch (entityType) {
case TABLE:
extractColumns(entityNode.path("columns"), childFQNs);
break;
case DASHBOARD:
extractCharts(entityNode.path("charts"), childFQNs);
break;
case SEARCH_INDEX:
extractFields(entityNode.path("fields"), childFQNs);
break;
case CONTAINER:
extractContainers(entityNode.path("children"), childFQNs);
extractColumns(entityNode.path("dataModel").path("columns"), childFQNs);
break;
case TOPIC:
extractSchemaFields(entityNode.path("messageSchema").path("schemaFields"), childFQNs);
break;
case DASHBOARD_DATA_MODEL:
extractColumns(entityNode.path("columns"), childFQNs);
break;
default:
break;
}
}

private static void extractColumns(JsonNode columnsNode, List<String> childFQNs) {
if (columnsNode != null && columnsNode.isArray()) {
for (JsonNode column : columnsNode) {
if (column != null) {
String columnFQN = getText(column, "fullyQualifiedName");
childFQNs.add(columnFQN);
extractColumns(column.path("children"), childFQNs);
}
}
}
}

private static void extractCharts(JsonNode chartsNode, List<String> childFQNs) {
if (chartsNode != null && chartsNode.isArray()) {
for (JsonNode chart : chartsNode) {
String chartFQN = getText(chart, "fullyQualifiedName");
childFQNs.add(chartFQN);
}
}
}

private static void extractFields(JsonNode fieldsNode, List<String> childFQNs) {
if (fieldsNode != null && fieldsNode.isArray()) {
for (JsonNode field : fieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractFields(field.path("children"), childFQNs);
}
}
}
}

private static void extractContainers(JsonNode containersNode, List<String> childFQNs) {
if (containersNode != null && containersNode.isArray()) {
for (JsonNode container : containersNode) {
if (container != null) {
String containerFQN = getText(container, "fullyQualifiedName");
childFQNs.add(containerFQN);
extractContainers(container.path("children"), childFQNs);
}
}
}
}

private static void extractSchemaFields(JsonNode schemaFieldsNode, List<String> childFQNs) {
if (schemaFieldsNode != null && schemaFieldsNode.isArray()) {
for (JsonNode field : schemaFieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractSchemaFields(field.path("children"), childFQNs);
}
}
}
}

private String getStringOrNull(HashMap map, String key) {
return nullOrEmpty(map.get(key)) ? "" : map.get(key).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
Expand Down Expand Up @@ -60,6 +61,10 @@
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContextInterface;
import org.openmetadata.service.util.AsyncService;
import org.openmetadata.service.util.CSVExportMessage;
import org.openmetadata.service.util.CSVExportResponse;
import org.openmetadata.service.util.WebsocketNotificationHandler;

@Path("/v1/lineage")
@Tag(
Expand Down Expand Up @@ -273,10 +278,61 @@ public String exportLineage(
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException {

Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}

@GET
@Path("/exportAsync")
@Produces(MediaType.APPLICATION_JSON)
@Operation(
operationId = "exportLineage",
summary = "Export lineage",
responses = {
@ApiResponse(
responseCode = "200",
description = "search response",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CSVExportMessage.class)))
})
public Response exportLineageAsync(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "fqn") @QueryParam("fqn") String fqn,
@Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth")
int downstreamDepth,
@Parameter(
description =
"Elasticsearch query that will be combined with the query_string query generator from the `query` argument")
@QueryParam("query_filter")
String queryFilter,
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
@QueryParam("includeDeleted")
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType) {
String jobId = UUID.randomUUID().toString();
ExecutorService executorService = AsyncService.getInstance().getExecutorService();
executorService.submit(
() -> {
try {
String csvData =
dao.exportCsvAsync(
fqn, upstreamDepth, downstreamDepth, queryFilter, entityType, deleted);
WebsocketNotificationHandler.sendCsvExportCompleteNotification(
jobId, securityContext, csvData);
} catch (Exception e) {
WebsocketNotificationHandler.sendCsvExportFailedNotification(
jobId, securityContext, e.getMessage());
}
});
CSVExportResponse response = new CSVExportResponse(jobId, "Export initiated successfully.");
return Response.accepted().entity(response).type(MediaType.APPLICATION_JSON).build();
}

@PUT
@Operation(
operationId = "addLineageEdge",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.openmetadata.sdk.exception;

import javax.ws.rs.core.Response;

public class CSVExportException extends WebServiceException {
private static final String BY_NAME_MESSAGE = "CSVExport Exception [%s] due to [%s].";
private static final String ERROR_TYPE = "CSV_EXPORT_ERROR";

public CSVExportException(String message) {
super(Response.Status.BAD_REQUEST, ERROR_TYPE, message);
}

public CSVExportException(Response.Status status, String message) {
super(status, ERROR_TYPE, message);
}

public static CSVExportException byMessage(
String name, String errorMessage, Response.Status status) {
return new CSVExportException(status, buildMessageByName(name, errorMessage));
}

public static CSVExportException byMessage(String name, String errorMessage) {
return new CSVExportException(
Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage));
}

private static String buildMessageByName(String name, String errorMessage) {
return String.format(BY_NAME_MESSAGE, name, errorMessage);
}
}
Loading

0 comments on commit 783ec62

Please sign in to comment.