Skip to content

Commit

Permalink
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
Browse files Browse the repository at this point in the history
…hub-fork into master+ing-510-lookml-cll
  • Loading branch information
sid-acryl committed May 30, 2024
2 parents fcd9957 + 522d4a5 commit 808ded1
Show file tree
Hide file tree
Showing 257 changed files with 2,652 additions and 1,447 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.13.1'
ext.openLineageVersion = '1.14.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linkedin.datahub.graphql.analytics.resolver.GetMetadataAnalyticsResolver;
import com.linkedin.datahub.graphql.analytics.resolver.IsAnalyticsEnabledResolver;
import com.linkedin.datahub.graphql.analytics.service.AnalyticsService;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.AccessToken;
import com.linkedin.datahub.graphql.generated.AccessTokenMetadata;
Expand Down Expand Up @@ -394,7 +395,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -2900,7 +2900,7 @@ private <T, K> DataLoader<K, DataFetcherResult<T>> createDataLoader(
DataLoaderOptions.newOptions().setBatchLoaderContextProvider(contextProvider);
return DataLoader.newDataLoader(
(keys, context) ->
CompletableFuture.supplyAsync(
GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
log.debug(
Expand All @@ -2919,7 +2919,9 @@ private <T, K> DataLoader<K, DataFetcherResult<T>> createDataLoader(
String.format("Failed to retrieve entities of type %s", graphType.name()),
e);
}
}),
},
graphType.getClass().getSimpleName(),
"batchLoad"),
loaderOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static graphql.schema.idl.RuntimeWiring.*;

import com.linkedin.datahub.graphql.exception.DataHubDataFetcherExceptionHandler;
import com.linkedin.datahub.graphql.instrumentation.DataHubFieldComplexityCalculator;
import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
Expand Down Expand Up @@ -80,7 +81,9 @@ private GraphQLEngine(
List<Instrumentation> instrumentations = new ArrayList<>(3);
instrumentations.add(new TracingInstrumentation());
instrumentations.add(new MaxQueryDepthInstrumentation(graphQLQueryDepthLimit));
instrumentations.add(new MaxQueryComplexityInstrumentation(graphQLQueryComplexityLimit));
instrumentations.add(
new MaxQueryComplexityInstrumentation(
graphQLQueryComplexityLimit, new DataHubFieldComplexityCalculator()));
ChainedInstrumentation chainedInstrumentation = new ChainedInstrumentation(instrumentations);
_graphQL =
new GraphQL.Builder(graphQLSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.common.SubTypes;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
Expand All @@ -26,7 +27,7 @@ public class SubTypesResolver implements DataFetcher<CompletableFuture<SubTypes>
@Override
@Nullable
public CompletableFuture<SubTypes> get(DataFetchingEnvironment environment) throws Exception {
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
final QueryContext context = environment.getContext();
SubTypes subType = null;
Expand All @@ -50,6 +51,8 @@ public CompletableFuture<SubTypes> get(DataFetchingEnvironment environment) thro
"Failed to fetch aspect " + _aspectName + " for urn " + urnStr + " ", e);
}
return subType;
});
},
this.getClass().getSimpleName(),
"get");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.data.codec.JacksonDataCodec;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.AspectParams;
import com.linkedin.datahub.graphql.generated.AspectRenderSpec;
import com.linkedin.datahub.graphql.generated.Entity;
Expand Down Expand Up @@ -48,7 +49,7 @@ private boolean shouldReturnAspect(AspectSpec aspectSpec, AspectParams params) {
@Override
public CompletableFuture<List<RawAspect>> get(DataFetchingEnvironment environment)
throws Exception {
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
List<RawAspect> results = new ArrayList<>();

Expand Down Expand Up @@ -111,6 +112,8 @@ public CompletableFuture<List<RawAspect>> get(DataFetchingEnvironment environmen
}
});
return results;
});
},
this.getClass().getSimpleName(),
"get");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.linkedin.datahub.graphql.concurrency;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

public class GraphQLConcurrencyUtils {
private GraphQLConcurrencyUtils() {}

private static ExecutorService graphQLExecutorService = null;

public static ExecutorService getExecutorService() {
return GraphQLConcurrencyUtils.graphQLExecutorService;
}

public static void setExecutorService(ExecutorService executorService) {
GraphQLConcurrencyUtils.graphQLExecutorService = executorService;
}

public static <T> CompletableFuture<T> supplyAsync(
Supplier<T> supplier, String caller, String task) {
MetricUtils.counter(
MetricRegistry.name(
GraphQLConcurrencyUtils.class.getSimpleName(), "supplyAsync", caller, task))
.inc();
if (GraphQLConcurrencyUtils.graphQLExecutorService == null) {
return CompletableFuture.supplyAsync(supplier);
} else {
return CompletableFuture.supplyAsync(
supplier, GraphQLConcurrencyUtils.graphQLExecutorService);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.datahub.graphql.concurrency;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

public class GraphQLWorkerPoolThreadFactory implements ThreadFactory {

private static final AtomicLong THREAD_INIT_NUMBER = new AtomicLong();
public static final String GRAPHQL_THREAD_POOL_GROUP_NAME = "graphQLThreadGroup";
public static final ThreadGroup GRAPHQL_THREAD_POOL_GROUP =
new ThreadGroup(GRAPHQL_THREAD_POOL_GROUP_NAME);

private static long nextThreadNum() {
return THREAD_INIT_NUMBER.getAndIncrement();
}

private long stackSize;

public GraphQLWorkerPoolThreadFactory(long stackSize) {
this.stackSize = stackSize;
}

@Override
public final Thread newThread(Runnable runnable) {

return new Thread(
GRAPHQL_THREAD_POOL_GROUP, runnable, "GraphQLWorkerThread-" + nextThreadNum(), stackSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.datahub.graphql.instrumentation;

import graphql.analysis.FieldComplexityCalculator;
import graphql.analysis.FieldComplexityEnvironment;
import graphql.language.Field;
import graphql.language.FragmentSpread;
import graphql.language.Selection;
import graphql.language.SelectionSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DataHubFieldComplexityCalculator implements FieldComplexityCalculator {

private static final String COUNT_ARG = "count";
private static final String INPUT_ARG = "input";
private static final String SEARCH_RESULTS_FIELD = "searchResults";
private static final String ENTITY_FIELD = "entity";
private static final String SEARCH_RESULT_FIELDS_FIELD = "searchResultFields";
private static final String GRAPHQL_QUERY_TYPE = "Query";

@SuppressWarnings("rawtypes")
@Override
public int calculate(FieldComplexityEnvironment environment, int childComplexity) {
int complexity = 1;
Map<String, Object> args = environment.getArguments();
if (args.containsKey(INPUT_ARG)) {
Map<String, Object> input = (Map<String, Object>) args.get(INPUT_ARG);
if (input.containsKey(COUNT_ARG) && (Integer) input.get(COUNT_ARG) > 1) {
Integer count = (Integer) input.get(COUNT_ARG);
Field field = environment.getField();
complexity += countRecursiveLineageComplexity(count, field);
}
}
if (GRAPHQL_QUERY_TYPE.equals(environment.getParentType().getName())) {
log.info(
"Query complexity for query: {} is {}",
environment.getField().getName(),
complexity + childComplexity);
}
return complexity + childComplexity;
}

private int countRecursiveLineageComplexity(Integer count, Field field) {
List<Selection> subFields = field.getSelectionSet().getSelections();
Optional<FragmentSpread> searchResultsFieldsField =
subFields.stream()
.filter(selection -> selection instanceof Field)
.map(selection -> (Field) selection)
.filter(subField -> SEARCH_RESULTS_FIELD.equals(subField.getName()))
.map(Field::getSelectionSet)
.map(SelectionSet::getSelections)
.flatMap(List::stream)
.filter(selection -> selection instanceof Field)
.map(selection -> (Field) selection)
.filter(subField -> ENTITY_FIELD.equals(subField.getName()))
.map(Field::getSelectionSet)
.map(SelectionSet::getSelections)
.flatMap(List::stream)
.filter(selection -> selection instanceof FragmentSpread)
.map(selection -> (FragmentSpread) selection)
.filter(subField -> SEARCH_RESULT_FIELDS_FIELD.equals(subField.getName()))
.findFirst();
if (searchResultsFieldsField.isPresent()) {
// This fragment includes 2 lineage queries, we account for this additional complexity by
// multiplying
// by the count of entities attempting to be returned
return 2 * count;
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.AuthenticatedUser;
import com.linkedin.datahub.graphql.generated.CorpUser;
Expand Down Expand Up @@ -49,7 +50,7 @@ public MeResolver(final EntityClient entityClient, final FeatureFlags featureFla
@Override
public CompletableFuture<AuthenticatedUser> get(DataFetchingEnvironment environment) {
final QueryContext context = environment.getContext();
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
// 1. Get currently logged in user profile.
Expand All @@ -75,6 +76,7 @@ public CompletableFuture<AuthenticatedUser> get(DataFetchingEnvironment environm
platformPrivileges.setManageIngestion(canManageIngestion(context));
platformPrivileges.setManageSecrets(canManageSecrets(context));
platformPrivileges.setManageTokens(canManageTokens(context));
platformPrivileges.setViewTests(canViewTests(context));
platformPrivileges.setManageTests(canManageTests(context));
platformPrivileges.setManageGlossaries(canManageGlossaries(context));
platformPrivileges.setManageUserCredentials(canManageUserCredentials(context));
Expand All @@ -99,7 +101,9 @@ public CompletableFuture<AuthenticatedUser> get(DataFetchingEnvironment environm
} catch (URISyntaxException | RemoteInvocationException e) {
throw new RuntimeException("Failed to fetch authenticated user!", e);
}
});
},
this.getClass().getSimpleName(),
"get");
}

/** Returns true if the authenticated user has privileges to view analytics. */
Expand Down Expand Up @@ -130,6 +134,12 @@ private boolean canGeneratePersonalAccessToken(final QueryContext context) {
PoliciesConfig.GENERATE_PERSONAL_ACCESS_TOKENS_PRIVILEGE);
}

/** Returns true if the authenticated user has privileges to view tests. */
private boolean canViewTests(final QueryContext context) {
return isAuthorized(
context.getAuthorizer(), context.getActorUrn(), PoliciesConfig.VIEW_TESTS_PRIVILEGE);
}

/** Returns true if the authenticated user has privileges to manage (add or remove) tests. */
private boolean canManageTests(final QueryContext context) {
return isAuthorized(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.AssertionResultType;
import com.linkedin.datahub.graphql.generated.AssertionRunEvent;
Expand Down Expand Up @@ -40,7 +41,7 @@ public AssertionRunEventResolver(final EntityClient client) {

@Override
public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment environment) {
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
final QueryContext context = environment.getContext();

Expand Down Expand Up @@ -102,7 +103,9 @@ public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment e
} catch (RemoteInvocationException e) {
throw new RuntimeException("Failed to retrieve Assertion Run Events from GMS", e);
}
});
},
this.getClass().getSimpleName(),
"get");
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
Expand Down Expand Up @@ -38,7 +39,7 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
throws Exception {
final QueryContext context = environment.getContext();
final Urn assertionUrn = Urn.createFromString(environment.getArgument("urn"));
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {

// 1. check the entity exists. If not, return false.
Expand Down Expand Up @@ -75,7 +76,9 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
});
},
this.getClass().getSimpleName(),
"get");
}

/** Determine whether the current user is allowed to remove an assertion. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityAssertionsResult;
Expand Down Expand Up @@ -45,7 +46,7 @@ public EntityAssertionsResolver(final EntityClient entityClient, final GraphClie

@Override
public CompletableFuture<EntityAssertionsResult> get(DataFetchingEnvironment environment) {
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
final QueryContext context = environment.getContext();

Expand Down Expand Up @@ -102,7 +103,9 @@ public CompletableFuture<EntityAssertionsResult> get(DataFetchingEnvironment env
} catch (URISyntaxException | RemoteInvocationException e) {
throw new RuntimeException("Failed to retrieve Assertion Run Events from GMS", e);
}
});
},
this.getClass().getSimpleName(),
"get");
}

private boolean assertionExists(
Expand Down
Loading

0 comments on commit 808ded1

Please sign in to comment.