diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/policy/ListPoliciesResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/policy/ListPoliciesResolver.java index 516d6fa2d3137..b44da1c2f832c 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/policy/ListPoliciesResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/policy/ListPoliciesResolver.java @@ -40,23 +40,15 @@ public CompletableFuture get(final DataFetchingEnvironment e final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount(); final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery(); - return CompletableFuture.supplyAsync(() -> { - try { - // First, get all policy Urns. - final PolicyFetcher.PolicyFetchResult policyFetchResult = - _policyFetcher.fetchPolicies(start, count, query, context.getAuthentication()); - - // Now that we have entities we can bind this to a result. - final ListPoliciesResult result = new ListPoliciesResult(); - result.setStart(start); - result.setCount(count); - result.setTotal(policyFetchResult.getTotal()); - result.setPolicies(mapEntities(policyFetchResult.getPolicies())); - return result; - } catch (Exception e) { - throw new RuntimeException("Failed to list policies", e); - } - }); + return _policyFetcher.fetchPolicies(start, query, count, context.getAuthentication()) + .thenApply(policyFetchResult -> { + final ListPoliciesResult result = new ListPoliciesResult(); + result.setStart(start); + result.setCount(count); + result.setTotal(policyFetchResult.getTotal()); + result.setPolicies(mapEntities(policyFetchResult.getPolicies())); + return result; + }); } throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator."); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index a69c6008fea47..dff9a22de8efd 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -381,7 +381,7 @@ public SearchResult searchAcrossEntities( @Nonnull @Override public ScrollResult scrollAcrossEntities(@Nonnull List entities, @Nonnull String input, - @Nullable Filter filter, @Nullable String scrollId, @Nonnull String keepAlive, int count, + @Nullable Filter filter, @Nullable String scrollId, @Nullable String keepAlive, int count, @Nullable SearchFlags searchFlags, @Nonnull Authentication authentication) throws RemoteInvocationException { final SearchFlags finalFlags = searchFlags != null ? searchFlags : new SearchFlags().setFulltext(true); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java index 94b8d57efcc16..c99e4a94feb29 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java @@ -147,15 +147,23 @@ public SearchResult searchAcrossEntities(@Nonnull List entities, @Nonnul return result; } + /** + * If no entities are provided, fallback to the list of non-empty entities + * @param inputEntities the requested entities + * @return some entities to search + */ private List getEntitiesToSearch(@Nonnull List inputEntities) { List nonEmptyEntities; List lowercaseEntities = inputEntities.stream().map(String::toLowerCase).collect(Collectors.toList()); - try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "getNonEmptyEntities").time()) { - nonEmptyEntities = _entityDocCountCache.getNonEmptyEntities(); - } - if (!inputEntities.isEmpty()) { - nonEmptyEntities = nonEmptyEntities.stream().filter(lowercaseEntities::contains).collect(Collectors.toList()); + + if (lowercaseEntities.isEmpty()) { + try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "getNonEmptyEntities").time()) { + nonEmptyEntities = _entityDocCountCache.getNonEmptyEntities(); + } + } else { + nonEmptyEntities = lowercaseEntities; } + return nonEmptyEntities; } diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java index f8b28f6c182a7..f8f99475de23e 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java @@ -72,11 +72,13 @@ public DataHubAuthorizer( final EntityClient entityClient, final int delayIntervalSeconds, final int refreshIntervalSeconds, - final AuthorizationMode mode) { + final AuthorizationMode mode, + final int policyFetchSize) { _systemAuthentication = Objects.requireNonNull(systemAuthentication); _mode = Objects.requireNonNull(mode); _policyEngine = new PolicyEngine(systemAuthentication, Objects.requireNonNull(entityClient)); - _policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, readWriteLock.writeLock()); + _policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, + readWriteLock.writeLock(), policyFetchSize); _refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS); } @@ -244,29 +246,28 @@ static class PolicyRefreshRunnable implements Runnable { private final PolicyFetcher _policyFetcher; private final Map> _policyCache; private final Lock writeLock; + private final int count; @Override public void run() { try { // Populate new cache and swap. Map> newCache = new HashMap<>(); + Integer total = null; + String scrollId = null; - int start = 0; - int count = 30; - int total = 30; - - while (start < total) { + while (total == null || scrollId != null) { try { final PolicyFetcher.PolicyFetchResult - policyFetchResult = _policyFetcher.fetchPolicies(start, count, _systemAuthentication); + policyFetchResult = _policyFetcher.fetchPolicies(count, scrollId, _systemAuthentication); addPoliciesToCache(newCache, policyFetchResult.getPolicies()); total = policyFetchResult.getTotal(); - start = start + count; + scrollId = policyFetchResult.getScrollId(); } catch (Exception e) { log.error( - "Failed to retrieve policy urns! Skipping updating policy cache until next refresh. start: {}, count: {}", start, count, e); + "Failed to retrieve policy urns! Skipping updating policy cache until next refresh. count: {}, scrollId: {}", count, scrollId, e); return; } } diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/PolicyFetcher.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/PolicyFetcher.java index 92d12bad41c9f..c06da4d245f91 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/PolicyFetcher.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/PolicyFetcher.java @@ -8,8 +8,8 @@ import com.linkedin.metadata.query.SearchFlags; import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.query.filter.SortOrder; +import com.linkedin.metadata.search.ScrollResult; import com.linkedin.metadata.search.SearchEntity; -import com.linkedin.metadata.search.SearchResult; import com.linkedin.policy.DataHubPolicyInfo; import com.linkedin.r2.RemoteInvocationException; import java.net.URISyntaxException; @@ -18,11 +18,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.Value; import lombok.extern.slf4j.Slf4j; +import javax.annotation.Nullable; + import static com.linkedin.metadata.Constants.DATAHUB_POLICY_INFO_ASPECT_NAME; import static com.linkedin.metadata.Constants.POLICY_ENTITY_NAME; @@ -38,22 +41,53 @@ public class PolicyFetcher { private static final SortCriterion POLICY_SORT_CRITERION = new SortCriterion().setField("lastUpdatedTimestamp").setOrder(SortOrder.DESCENDING); - public PolicyFetchResult fetchPolicies(int start, int count, Authentication authentication) - throws RemoteInvocationException, URISyntaxException { - return fetchPolicies(start, count, "", authentication); + /** + * This is to provide a scroll implementation using the start/count api. It is not efficient + * and the scroll native functions should be used instead. This does fix a failure to fetch + * policies when deep pagination happens where there are >10k policies. + * Exists primarily to prevent breaking change to the graphql api. + */ + @Deprecated + public CompletableFuture fetchPolicies(int start, String query, int count, Authentication authentication) { + return CompletableFuture.supplyAsync(() -> { + try { + PolicyFetchResult result = PolicyFetchResult.EMPTY; + String scrollId = ""; + int fetchedResults = 0; + + while (PolicyFetchResult.EMPTY.equals(result) && scrollId != null) { + PolicyFetchResult tmpResult = fetchPolicies(query, count, scrollId.isEmpty() ? null : scrollId, authentication); + fetchedResults += tmpResult.getPolicies().size(); + scrollId = tmpResult.getScrollId(); + if (fetchedResults > start) { + result = tmpResult; + } + } + + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to list policies", e); + } + }); } - public PolicyFetchResult fetchPolicies(int start, int count, String query, Authentication authentication) + public PolicyFetchResult fetchPolicies(int count, @Nullable String scrollId, Authentication authentication) + throws RemoteInvocationException, URISyntaxException { + return fetchPolicies("", count, scrollId, authentication); + } + + public PolicyFetchResult fetchPolicies(String query, int count, @Nullable String scrollId, Authentication authentication) throws RemoteInvocationException, URISyntaxException { - log.debug(String.format("Batch fetching policies. start: %s, count: %s ", start, count)); - // First fetch all policy urns from start - start + count - SearchResult result = - _entityClient.search(POLICY_ENTITY_NAME, query, null, POLICY_SORT_CRITERION, start, count, authentication, - new SearchFlags().setFulltext(true)); + log.debug(String.format("Batch fetching policies. count: %s, scroll: %s", count, scrollId)); + + // First fetch all policy urns + ScrollResult result = _entityClient.scrollAcrossEntities(List.of(POLICY_ENTITY_NAME), query, null, scrollId, + null, count, new SearchFlags().setSkipCache(true).setSkipAggregates(true) + .setSkipHighlighting(true).setFulltext(true), authentication); List policyUrns = result.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList()); if (policyUrns.isEmpty()) { - return new PolicyFetchResult(Collections.emptyList(), 0); + return PolicyFetchResult.EMPTY; } // Fetch DataHubPolicyInfo aspects for each urn @@ -64,7 +98,7 @@ public PolicyFetchResult fetchPolicies(int start, int count, String query, Authe .filter(Objects::nonNull) .map(this::extractPolicy) .filter(Objects::nonNull) - .collect(Collectors.toList()), result.getNumEntities()); + .collect(Collectors.toList()), result.getNumEntities(), result.getScrollId()); } private Policy extractPolicy(EntityResponse entityResponse) { @@ -82,6 +116,10 @@ private Policy extractPolicy(EntityResponse entityResponse) { public static class PolicyFetchResult { List policies; int total; + @Nullable + String scrollId; + + public static final PolicyFetchResult EMPTY = new PolicyFetchResult(Collections.emptyList(), 0, null); } @Value diff --git a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/DataHubAuthorizerTest.java b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/DataHubAuthorizerTest.java index 24ecfa6fefc85..babb1c5d00ee8 100644 --- a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/DataHubAuthorizerTest.java +++ b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/DataHubAuthorizerTest.java @@ -22,6 +22,7 @@ import com.linkedin.entity.EnvelopedAspectMap; import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.query.SearchFlags; +import com.linkedin.metadata.search.ScrollResult; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.search.SearchResult; @@ -35,6 +36,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; + import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -89,30 +92,58 @@ public void setupTest() throws Exception { final EnvelopedAspectMap childDomainPolicyAspectMap = new EnvelopedAspectMap(); childDomainPolicyAspectMap.put(DATAHUB_POLICY_INFO_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(childDomainPolicy.data()))); - final SearchResult policySearchResult = new SearchResult(); - policySearchResult.setNumEntities(3); - policySearchResult.setEntities( - new SearchEntityArray( - ImmutableList.of( - new SearchEntity().setEntity(activePolicyUrn), - new SearchEntity().setEntity(inactivePolicyUrn), - new SearchEntity().setEntity(parentDomainPolicyUrn), - new SearchEntity().setEntity(childDomainPolicyUrn) - ) - ) - ); - - when(_entityClient.search(eq("dataHubPolicy"), eq(""), isNull(), any(), anyInt(), anyInt(), any(), - eq(new SearchFlags().setFulltext(true)))).thenReturn(policySearchResult); - when(_entityClient.batchGetV2(eq(POLICY_ENTITY_NAME), - eq(ImmutableSet.of(activePolicyUrn, inactivePolicyUrn, parentDomainPolicyUrn, childDomainPolicyUrn)), eq(null), any())).thenReturn( - ImmutableMap.of( - activePolicyUrn, new EntityResponse().setUrn(activePolicyUrn).setAspects(activeAspectMap), - inactivePolicyUrn, new EntityResponse().setUrn(inactivePolicyUrn).setAspects(inactiveAspectMap), - parentDomainPolicyUrn, new EntityResponse().setUrn(parentDomainPolicyUrn).setAspects(parentDomainPolicyAspectMap), - childDomainPolicyUrn, new EntityResponse().setUrn(childDomainPolicyUrn).setAspects(childDomainPolicyAspectMap) - ) - ); + final ScrollResult policySearchResult1 = new ScrollResult() + .setScrollId("1") + .setNumEntities(4) + .setEntities( + new SearchEntityArray( + ImmutableList.of(new SearchEntity().setEntity(activePolicyUrn)))); + + final ScrollResult policySearchResult2 = new ScrollResult() + .setScrollId("2") + .setNumEntities(4) + .setEntities( + new SearchEntityArray( + ImmutableList.of(new SearchEntity().setEntity(inactivePolicyUrn)))); + + final ScrollResult policySearchResult3 = new ScrollResult() + .setScrollId("3") + .setNumEntities(4) + .setEntities( + new SearchEntityArray( + ImmutableList.of(new SearchEntity().setEntity(parentDomainPolicyUrn)))); + + final ScrollResult policySearchResult4 = new ScrollResult() + .setNumEntities(4) + .setEntities( + new SearchEntityArray( + ImmutableList.of( + new SearchEntity().setEntity(childDomainPolicyUrn)))); + + when(_entityClient.scrollAcrossEntities(eq(List.of("dataHubPolicy")), eq(""), isNull(), any(), isNull(), + anyInt(), eq(new SearchFlags().setFulltext(true).setSkipAggregates(true).setSkipHighlighting(true).setSkipCache(true)), any())) + .thenReturn(policySearchResult1) + .thenReturn(policySearchResult2) + .thenReturn(policySearchResult3) + .thenReturn(policySearchResult4); + + when(_entityClient.batchGetV2(eq(POLICY_ENTITY_NAME), any(), eq(null), any())).thenAnswer(args -> { + Set inputUrns = args.getArgument(1); + Urn urn = inputUrns.stream().findFirst().get(); + + switch (urn.toString()) { + case "urn:li:dataHubPolicy:0": + return Map.of(activePolicyUrn, new EntityResponse().setUrn(activePolicyUrn).setAspects(activeAspectMap)); + case "urn:li:dataHubPolicy:1": + return Map.of(inactivePolicyUrn, new EntityResponse().setUrn(inactivePolicyUrn).setAspects(inactiveAspectMap)); + case "urn:li:dataHubPolicy:2": + return Map.of(parentDomainPolicyUrn, new EntityResponse().setUrn(parentDomainPolicyUrn).setAspects(parentDomainPolicyAspectMap)); + case "urn:li:dataHubPolicy:3": + return Map.of(childDomainPolicyUrn, new EntityResponse().setUrn(childDomainPolicyUrn).setAspects(childDomainPolicyAspectMap)); + default: + throw new IllegalStateException(); + } + }); final List userUrns = ImmutableList.of(Urn.createFromString("urn:li:corpuser:user3"), Urn.createFromString("urn:li:corpuser:user4")); final List groupUrns = ImmutableList.of(Urn.createFromString("urn:li:corpGroup:group3"), Urn.createFromString("urn:li:corpGroup:group4")); @@ -146,7 +177,8 @@ childDomainPolicyUrn, new EntityResponse().setUrn(childDomainPolicyUrn).setAspec _entityClient, 10, 10, - DataHubAuthorizer.AuthorizationMode.DEFAULT + DataHubAuthorizer.AuthorizationMode.DEFAULT, + 1 // force pagination logic ); _dataHubAuthorizer.init(Collections.emptyMap(), createAuthorizerContext(systemAuthentication, _entityClient)); _dataHubAuthorizer.invalidateCache(); diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index 91b10a75c922e..e9113d339e81d 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -39,6 +39,7 @@ authorization: defaultAuthorizer: enabled: ${AUTH_POLICIES_ENABLED:true} cacheRefreshIntervalSecs: ${POLICY_CACHE_REFRESH_INTERVAL_SECONDS:120} + cachePolicyFetchSize: ${POLICY_CACHE_FETCH_SIZE:1000} # Enables authorization of reads, writes, and deletes on REST APIs. Defaults to false for backwards compatibility, but should become true down the road restApiAuthorization: ${REST_API_AUTHORIZATION_ENABLED:false} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/DataHubAuthorizerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/DataHubAuthorizerFactory.java index 5b298a453547a..663234e2519fa 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/DataHubAuthorizerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/DataHubAuthorizerFactory.java @@ -32,6 +32,9 @@ public class DataHubAuthorizerFactory { @Value("${authorization.defaultAuthorizer.cacheRefreshIntervalSecs}") private Integer policyCacheRefreshIntervalSeconds; + @Value("${authorization.defaultAuthorizer.cachePolicyFetchSize}") + private Integer policyCacheFetchSize; + @Value("${authorization.defaultAuthorizer.enabled:true}") private Boolean policiesEnabled; @@ -44,6 +47,6 @@ protected DataHubAuthorizer getInstance() { : DataHubAuthorizer.AuthorizationMode.ALLOW_ALL; return new DataHubAuthorizer(systemAuthentication, entityClient, 10, - policyCacheRefreshIntervalSeconds, mode); + policyCacheRefreshIntervalSeconds, mode, policyCacheFetchSize); } } diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java index b9661ec75e1b1..84d0ed6b9594d 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java @@ -241,7 +241,7 @@ public SearchResult searchAcrossEntities(@Nonnull List entities, @Nonnul */ @Nonnull ScrollResult scrollAcrossEntities(@Nonnull List entities, @Nonnull String input, - @Nullable Filter filter, @Nullable String scrollId, @Nonnull String keepAlive, int count, @Nullable SearchFlags searchFlags, + @Nullable Filter filter, @Nullable String scrollId, @Nullable String keepAlive, int count, @Nullable SearchFlags searchFlags, @Nonnull Authentication authentication) throws RemoteInvocationException; diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java index 47a00e711a935..2716e27518fcc 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java @@ -482,11 +482,11 @@ public SearchResult searchAcrossEntities(@Nonnull List entities, @Nonnul @Nonnull @Override public ScrollResult scrollAcrossEntities(@Nonnull List entities, @Nonnull String input, - @Nullable Filter filter, @Nullable String scrollId, @Nonnull String keepAlive, int count, + @Nullable Filter filter, @Nullable String scrollId, @Nullable String keepAlive, int count, @Nullable SearchFlags searchFlags, @Nonnull Authentication authentication) throws RemoteInvocationException { final EntitiesDoScrollAcrossEntitiesRequestBuilder requestBuilder = - ENTITIES_REQUEST_BUILDERS.actionScrollAcrossEntities().inputParam(input).countParam(count).keepAliveParam(keepAlive); + ENTITIES_REQUEST_BUILDERS.actionScrollAcrossEntities().inputParam(input).countParam(count); if (entities != null) { requestBuilder.entitiesParam(new StringArray(entities)); @@ -500,6 +500,9 @@ public ScrollResult scrollAcrossEntities(@Nonnull List entities, @Nonnul if (searchFlags != null) { requestBuilder.searchFlagsParam(searchFlags); } + if (keepAlive != null) { + requestBuilder.keepAliveParam(keepAlive); + } return sendClientRequest(requestBuilder, authentication).getEntity(); } diff --git a/smoke-test/tests/cypress/cypress/e2e/settings/managing_groups.js b/smoke-test/tests/cypress/cypress/e2e/settings/managing_groups.js index 9559435ff01c8..8d689c7e2303c 100644 --- a/smoke-test/tests/cypress/cypress/e2e/settings/managing_groups.js +++ b/smoke-test/tests/cypress/cypress/e2e/settings/managing_groups.js @@ -81,7 +81,7 @@ describe("create and manage group", () => { cy.focused().type(expected_name); cy.get(".ant-select-item-option").contains(expected_name, { matchCase: false }).click(); cy.focused().blur(); - cy.contains(expected_name).should("have.length", 1); + cy.contains(expected_name, { matchCase: false }).should("have.length", 1); cy.get('[role="dialog"] button').contains("Done").click(); cy.waitTextVisible("Owners Added"); cy.contains(expected_name, { matchCase: false }).should("be.visible");