Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lineage): logging reduction and fixes #9878

Merged
merged 5 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.AndFilterInput;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.LineageDirection;
Expand Down Expand Up @@ -92,9 +93,14 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment

final int start = input.getStart() != null ? input.getStart() : DEFAULT_START;
final int count = input.getCount() != null ? input.getCount() : DEFAULT_COUNT;
final List<FacetFilterInput> filters =
input.getFilters() != null ? input.getFilters() : new ArrayList<>();
final Integer maxHops = getMaxHops(filters);
final List<AndFilterInput> filters =
input.getOrFilters() != null ? input.getOrFilters() : new ArrayList<>();
final List<FacetFilterInput> facetFilters =
filters.stream()
.map(AndFilterInput::getAnd)
.flatMap(List::stream)
.collect(Collectors.toList());
final Integer maxHops = getMaxHops(facetFilters);

@Nullable
final Long startTimeMillis =
Expand All @@ -117,7 +123,8 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
start,
count);

final Filter filter = ResolverUtils.buildFilter(filters, input.getOrFilters());
final Filter filter =
ResolverUtils.buildFilter(input.getFilters(), input.getOrFilters());
SearchFlags searchFlags = null;
com.linkedin.datahub.graphql.generated.SearchFlags inputFlags = input.getSearchFlags();
if (inputFlags != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public LineageResponse getLineage(
exploreMultiplePaths);
for (LineageRelationship oneHopRelnship : oneHopRelationships) {
if (result.containsKey(oneHopRelnship.getEntity())) {
log.debug("Urn encountered again during graph walk {}", oneHopRelnship.getEntity());
result.put(
oneHopRelnship.getEntity(),
mergeLineageRelationships(result.get(oneHopRelnship.getEntity()), oneHopRelnship));
Expand Down Expand Up @@ -553,26 +554,6 @@ public static void addEdgeToPaths(
addEdgeToPaths(existingPaths, parentUrn, null, childUrn);
}

/**
* Utility method to log paths to the debug log.
*
* @param paths
* @param message
*/
private static void logPaths(UrnArrayArray paths, String message) {
if (log.isDebugEnabled()) {
log.debug("xxxxxxxxxx");
log.debug(message);
log.debug("---------");
if (paths != null) {
paths.forEach(path -> log.debug("{}", path));
} else {
log.debug("EMPTY");
}
log.debug("xxxxxxxxxx");
}
}

private static boolean containsCycle(final UrnArray path) {
Set<Urn> urnSet = path.stream().collect(Collectors.toUnmodifiableSet());
// path contains a cycle if any urn is repeated twice
Expand All @@ -587,8 +568,6 @@ public static boolean addEdgeToPaths(
boolean edgeAdded = false;
// Collect all full-paths to this child node. This is what will be returned.
UrnArrayArray pathsToParent = existingPaths.get(parentUrn);
logPaths(pathsToParent, String.format("Paths to Parent: %s, Child: %s", parentUrn, childUrn));
logPaths(existingPaths.get(childUrn), String.format("Existing Paths to Child: %s", childUrn));
if (pathsToParent != null && !pathsToParent.isEmpty()) {
// If there are existing paths to this parent node, then we attempt
// to append the child to each of the existing paths (lengthen it).
Expand Down Expand Up @@ -630,7 +609,6 @@ public static boolean addEdgeToPaths(
existingPaths.get(childUrn).add(pathToChild);
edgeAdded = true;
}
logPaths(existingPaths.get(childUrn), String.format("New paths to Child: %s", childUrn));
return edgeAdded;
}

Expand All @@ -655,7 +633,6 @@ private static List<LineageRelationship> extractRelationships(
for (SearchHit hit : hits) {
index++;
final Map<String, Object> document = hit.getSourceAsMap();
log.debug("{}: hit: {}", index, document);
final Urn sourceUrn =
UrnUtils.getUrn(((Map<String, Object>) document.get(SOURCE)).get("urn").toString());
final Urn destinationUrn =
Expand Down Expand Up @@ -808,7 +785,6 @@ private static List<LineageRelationship> extractRelationships(
}
List<LineageRelationship> result = new ArrayList<>(lineageRelationshipMap.values());
log.debug("Number of lineage relationships in list: {}", result.size());
log.debug("Result: {}", result);
return result;
} catch (Exception e) {
// This exception handler merely exists to log the exception at an appropriate point and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ elasticsearch:
timeoutSeconds: ${ELASTICSEARCH_SEARCH_GRAPH_TIMEOUT_SECONDS:50} # graph dao timeout seconds
batchSize: ${ELASTICSEARCH_SEARCH_GRAPH_BATCH_SIZE:1000} # graph dao batch size
maxResult: ${ELASTICSEARCH_SEARCH_GRAPH_MAX_RESULT:10000} # graph dao max result size
enableMultiPathSearch: ${ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH:true}
enableMultiPathSearch: ${ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH:false}

# TODO: Kafka topic convention
kafka:
Expand Down Expand Up @@ -315,7 +315,7 @@ systemUpdate:
backOffFactor: ${BOOTSTRAP_SYSTEM_UPDATE_BACK_OFF_FACTOR:2} # Multiplicative factor for back off, default values will result in waiting 5min 15s
waitForSystemUpdate: ${BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE:true}
dataJobNodeCLL:
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:true}
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:false}
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_BATCH_SIZE:200}
browsePathsV2:
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_ENABLED:true}
Expand Down
66 changes: 32 additions & 34 deletions smoke-test/tests/lineage/test_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
import pytest
from datahub.cli.cli_utils import get_url_and_token
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import (
DatahubClientConfig,
DataHubGraph,
get_default_graph,
)
from datahub.ingestion.graph.client import DataHubGraph # get_default_graph,
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeAuditStampsClass,
Expand Down Expand Up @@ -959,32 +956,33 @@ def ingest_multipath_metadata(
wait_for_writes_to_sync()


@pytest.mark.dependency(depends=["test_healthchecks"])
def test_simple_lineage_multiple_paths(
ingest_multipath_metadata,
chart_urn_fixture,
intermediates_fixture,
destination_urn_fixture,
):
chart_urn = chart_urn_fixture
intermediates = intermediates_fixture
destination_urn = destination_urn_fixture
results = search_across_lineage(
get_default_graph(),
chart_urn,
direction="UPSTREAM",
convert_schema_fields_to_datasets=True,
)
assert destination_urn in [
x["entity"]["urn"] for x in results["searchAcrossLineage"]["searchResults"]
]
for search_result in results["searchAcrossLineage"]["searchResults"]:
if search_result["entity"]["urn"] == destination_urn:
assert (
len(search_result["paths"]) == 2
) # 2 paths from the chart to the dataset
for path in search_result["paths"]:
assert len(path["path"]) == 3
assert path["path"][-1]["urn"] == destination_urn
assert path["path"][0]["urn"] == chart_urn
assert path["path"][1]["urn"] in intermediates
# TODO: Reenable once fixed
# @pytest.mark.dependency(depends=["test_healthchecks"])
# def test_simple_lineage_multiple_paths(
# ingest_multipath_metadata,
# chart_urn_fixture,
# intermediates_fixture,
# destination_urn_fixture,
# ):
# chart_urn = chart_urn_fixture
# intermediates = intermediates_fixture
# destination_urn = destination_urn_fixture
# results = search_across_lineage(
# get_default_graph(),
# chart_urn,
# direction="UPSTREAM",
# convert_schema_fields_to_datasets=True,
# )
# assert destination_urn in [
# x["entity"]["urn"] for x in results["searchAcrossLineage"]["searchResults"]
# ]
# for search_result in results["searchAcrossLineage"]["searchResults"]:
# if search_result["entity"]["urn"] == destination_urn:
# assert (
# len(search_result["paths"]) == 2
# ) # 2 paths from the chart to the dataset
# for path in search_result["paths"]:
# assert len(path["path"]) == 3
# assert path["path"][-1]["urn"] == destination_urn
# assert path["path"][0]["urn"] == chart_urn
# assert path["path"][1]["urn"] in intermediates
Loading