Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIN-160 [summary lineage] added LineagePreprocessor to add and delete connectionProcess assets #3794

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ public final class Constants {

public static String[] PROCESS_EDGE_LABELS = {PROCESS_OUTPUTS, PROCESS_INPUTS};

public static final String PROCESS_ENTITY_TYPE = "Process";

public static final String CONNECTION_PROCESS_ENTITY_TYPE = "ConnectionProcess";
public static final String PARENT_CONNECTION_PROCESS_QUALIFIED_NAME = "parentConnectionProcessQualifiedName";

/**
* The homeId field is used when saving into Atlas a copy of an object that is being imported from another
* repository. The homeId will be set to a String that identifies the other repository. The specific format
Expand Down Expand Up @@ -269,6 +274,7 @@ public final class Constants {

public static final String NAME = "name";
public static final String QUALIFIED_NAME = "qualifiedName";
public static final String CONNECTION_QUALIFIED_NAME = "connectionQualifiedName";
public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName";
public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size";
public static final String INDEX_SEARCH_TYPES_MAX_QUERY_STR_LENGTH = "atlas.graph.index.search.types.max-query-str-length";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class EntityLineageService implements AtlasLineageService {

private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
private static final String CONNECTION_PROCESS_INPUTS_EDGE = "__ConnectionProcess.inputs";
private static final String CONNECTION_PROCESS_OUTPUTS_EDGE = "__ConnectionProcess.outputs";
private static final String COLUMNS = "columns";
private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000;
Expand Down Expand Up @@ -177,8 +179,8 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand

RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes());
AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry);
boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid);
AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet);
EntityValidationResult entityValidationResult = validateEntityTypeAndCheckIfDataSet(guid);
AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, entityValidationResult);
appendLineageOnDemandPayload(ret, lineageOnDemandRequest);
// filtering out on-demand relations which has input & output nodes within the limit
cleanupRelationsOnDemand(ret);
Expand All @@ -203,20 +205,44 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR
return ret;
}

private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException {
public class EntityValidationResult {
public final boolean isProcess;
public final boolean isDataSet;
public final boolean isConnection;
public final boolean isConnectionProcess;

public EntityValidationResult(boolean isProcess, boolean isDataSet, boolean isConnection, boolean isConnectionProcess) {
this.isProcess = isProcess;
this.isDataSet = isDataSet;
this.isConnection = isConnection;
this.isConnectionProcess = isConnectionProcess;
}
}


private EntityValidationResult validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException {
String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName);
}
boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE);
boolean isConnectionProcess = false;
boolean isDataSet = false;
boolean isConnection = false;
if (!isProcess) {
boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE);
if (!isDataSet) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName);
isConnectionProcess = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_PROCESS_ENTITY_TYPE);
if(!isConnectionProcess){
isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE);
if (!isDataSet) {
isConnection = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_ENTITY_TYPE);
if(!isConnection){
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName);
}
}
}
}
return !isProcess;
return new EntityValidationResult(isProcess, isDataSet, isConnection, isConnectionProcess);
}

private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) {
Expand Down Expand Up @@ -281,7 +307,7 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) {
}
}

private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException {
private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, EntityValidationResult entityValidationResult) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand");

LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext);
Expand All @@ -298,25 +324,25 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag
AtomicInteger inputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger outputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger traversalOrder = new AtomicInteger(1);
if (isDataSet) {
if (entityValidationResult.isConnection || entityValidationResult.isDataSet) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder);
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder);
traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult);
AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes());
setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader);
ret.getGuidEntityMap().put(guid, baseEntityHeader);
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder);
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_INPUTS_EDGE:CONNECTION_PROCESS_INPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult);
}
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder);
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_OUTPUTS_EDGE:CONNECTION_PROCESS_OUTPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult);
}
}
RequestContext.get().endMetricRecord(metricRecorder);
Expand All @@ -329,7 +355,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal
baseEntityHeader.setFinishTime(traversalOrder.get());
}

private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException {
AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT;
int nextLevel = isInput ? level - 1: level + 1;

Expand Down Expand Up @@ -360,11 +386,12 @@ private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isI
ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains));
}

traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder);
EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(inGuid);
traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1);
}
}

private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException {
if (isEntityTraversalLimitReached(entitiesTraversed))
return;
if (depth != 0) { // base condition of recursion for depth
Expand All @@ -374,8 +401,13 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
// keep track of visited vertices to avoid circular loop
visitedVertices.add(getId(datasetVertex));

Iterator<AtlasEdge> incomingEdges = null;
AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn");
Iterator<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator();
if(entityValidationResult.isDataSet){
incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator();
} else if (entityValidationResult.isConnection) {
incomingEdges = datasetVertex.getEdges(IN, isInput ? CONNECTION_PROCESS_OUTPUTS_EDGE : CONNECTION_PROCESS_INPUTS_EDGE).iterator();
}
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn);

while (incomingEdges.hasNext()) {
Expand Down Expand Up @@ -403,7 +435,12 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
}

AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut");
Iterator<AtlasEdge> outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
Iterator<AtlasEdge> outgoingEdges = null;
if(entityValidationResult.isDataSet){
outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
} else if (entityValidationResult.isConnection) {
outgoingEdges = processVertex.getEdges(OUT, isInput ? CONNECTION_PROCESS_INPUTS_EDGE : CONNECTION_PROCESS_OUTPUTS_EDGE).iterator();
}
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut);

while (outgoingEdges.hasNext()) {
Expand Down Expand Up @@ -434,7 +471,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
setEntityLimitReachedFlag(isInput, ret);
}
if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) {
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth
EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(getGuid(entityVertex));
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1); // execute inner depth
AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex));
traversedEntity.setFinishTime(traversalOrder.get());
}
Expand Down Expand Up @@ -465,7 +503,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line


AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid);
boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid);
boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid).isDataSet;
// Get the neighbors for the current node
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap);
int currentDepth = 0;
Expand Down Expand Up @@ -493,7 +531,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
if (Objects.isNull(currentVertex))
throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID);

boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID);
boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID).isDataSet;
if (!lineageListContext.evaluateVertexFilter(currentVertex)) {
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap);
continue;
Expand Down Expand Up @@ -1538,7 +1576,7 @@ private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHead
String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex);
String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex);
String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE) || edge.getLabel().equalsIgnoreCase(CONNECTION_PROCESS_INPUTS_EDGE);

if (!entities.containsKey(inGuid)) {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
import org.apache.atlas.repository.store.graph.v2.preprocessor.AssetPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.lineage.LineagePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor;
Expand Down Expand Up @@ -1925,6 +1930,9 @@ public List<PreProcessor> getPreProcessor(String typeName) {
case STAKEHOLDER_TITLE_ENTITY_TYPE:
preProcessors.add(new StakeholderTitlePreProcessor(graph, typeRegistry, entityRetriever));
break;

case PROCESS_ENTITY_TYPE:
preProcessors.add(new LineagePreProcessor(typeRegistry, entityRetriever, graph, this));
}

// The default global pre-processor for all AssetTypes
Expand Down
Loading
Loading