Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connections) Add Connection entity type and graphql endpoints #10550

Merged
merged 2 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ private Constants() {}
public static final String PROPERTIES_SCHEMA_FILE = "properties.graphql";
public static final String FORMS_SCHEMA_FILE = "forms.graphql";
public static final String INCIDENTS_SCHEMA_FILE = "incident.graphql";
public static final String CONNECTIONS_SCHEMA_FILE = "connection.graphql";
public static final String BROWSE_PATH_DELIMITER = "/";
public static final String BROWSE_PATH_V2_DELIMITER = "␟";
public static final String VERSION_STAMP_FIELD_NAME = "versionStamp";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.linkedin.datahub.graphql.generated.DashboardStatsSummary;
import com.linkedin.datahub.graphql.generated.DashboardUserUsageCounts;
import com.linkedin.datahub.graphql.generated.DataFlow;
import com.linkedin.datahub.graphql.generated.DataHubConnection;
import com.linkedin.datahub.graphql.generated.DataHubView;
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
Expand Down Expand Up @@ -129,6 +130,7 @@
import com.linkedin.datahub.graphql.resolvers.chart.BrowseV2Resolver;
import com.linkedin.datahub.graphql.resolvers.chart.ChartStatsSummaryResolver;
import com.linkedin.datahub.graphql.resolvers.config.AppConfigResolver;
import com.linkedin.datahub.graphql.resolvers.connection.UpsertConnectionResolver;
import com.linkedin.datahub.graphql.resolvers.container.ContainerEntitiesResolver;
import com.linkedin.datahub.graphql.resolvers.container.ParentContainersResolver;
import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardStatsSummaryResolver;
Expand Down Expand Up @@ -306,6 +308,7 @@
import com.linkedin.datahub.graphql.types.chart.ChartType;
import com.linkedin.datahub.graphql.types.common.mappers.OperationMapper;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.datahub.graphql.types.connection.DataHubConnectionType;
import com.linkedin.datahub.graphql.types.container.ContainerType;
import com.linkedin.datahub.graphql.types.corpgroup.CorpGroupType;
import com.linkedin.datahub.graphql.types.corpuser.CorpUserType;
Expand Down Expand Up @@ -355,6 +358,7 @@
import com.linkedin.metadata.config.ViewsConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
Expand Down Expand Up @@ -439,6 +443,7 @@ public class GmsGraphQLEngine {
private final ERModelRelationshipService erModelRelationshipService;
private final FormService formService;
private final RestrictedService restrictedService;
private ConnectionService connectionService;

private final BusinessAttributeService businessAttributeService;
private final FeatureFlags featureFlags;
Expand Down Expand Up @@ -472,6 +477,7 @@ public class GmsGraphQLEngine {
private final GlossaryTermType glossaryTermType;
private final GlossaryNodeType glossaryNodeType;
private final AspectType aspectType;
private final DataHubConnectionType connectionType;
private final ContainerType containerType;
private final DomainType domainType;
private final NotebookType notebookType;
Expand Down Expand Up @@ -558,6 +564,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.dataProductService = args.dataProductService;
this.formService = args.formService;
this.restrictedService = args.restrictedService;
this.connectionService = args.connectionService;

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
Expand Down Expand Up @@ -588,6 +595,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.glossaryTermType = new GlossaryTermType(entityClient);
this.glossaryNodeType = new GlossaryNodeType(entityClient);
this.aspectType = new AspectType(entityClient);
this.connectionType = new DataHubConnectionType(entityClient, secretService);
this.containerType = new ContainerType(entityClient);
this.domainType = new DomainType(entityClient);
this.notebookType = new NotebookType(entityClient);
Expand Down Expand Up @@ -636,6 +644,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
dataJobType,
glossaryTermType,
glossaryNodeType,
connectionType,
containerType,
notebookType,
domainType,
Expand Down Expand Up @@ -753,6 +762,7 @@ public void configureRuntimeWiring(final RuntimeWiring.Builder builder) {
configureRoleResolvers(builder);
configureBusinessAttributeResolver(builder);
configureBusinessAttributeAssociationResolver(builder);
configureConnectionResolvers(builder);
}

private void configureOrganisationRoleResolvers(RuntimeWiring.Builder builder) {
Expand Down Expand Up @@ -803,6 +813,7 @@ public GraphQLEngine.Builder builder() {
.addSchema(fileBasedSchema(LINEAGE_SCHEMA_FILE))
.addSchema(fileBasedSchema(PROPERTIES_SCHEMA_FILE))
.addSchema(fileBasedSchema(FORMS_SCHEMA_FILE))
.addSchema(fileBasedSchema(CONNECTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE));

for (GmsGraphQLPlugin plugin : this.graphQLPlugins) {
Expand Down Expand Up @@ -3015,4 +3026,29 @@ private void configureBusinessAttributeAssociationResolver(final RuntimeWiring.B
.getBusinessAttribute()
.getUrn())));
}

private void configureConnectionResolvers(final RuntimeWiring.Builder builder) {
builder.type(
"Mutation",
typeWiring ->
typeWiring.dataFetcher(
"upsertConnection",
new UpsertConnectionResolver(connectionService, secretService)));
builder.type(
"Query",
typeWiring -> typeWiring.dataFetcher("connection", getResolver(this.connectionType)));
builder.type(
"DataHubConnection",
typeWiring ->
typeWiring.dataFetcher(
"platform",
new LoadableTypeResolver<>(
this.dataPlatformType,
(env) -> {
final DataHubConnection connection = env.getSource();
return connection.getPlatform() != null
? connection.getPlatform().getUrn()
: null;
})));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.metadata.config.ViewsConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class GmsGraphQLEngineArgs {
int graphQLQueryDepthLimit;
boolean graphQLQueryIntrospectionEnabled;
BusinessAttributeService businessAttributeService;
ConnectionService connectionService;

// any fork specific args should go below this line
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.linkedin.datahub.graphql.resolvers.connection;

import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataHubConnection;
import com.linkedin.datahub.graphql.generated.DataHubConnectionDetails;
import com.linkedin.datahub.graphql.generated.DataHubJsonConnection;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.metadata.Constants;
import io.datahubproject.metadata.services.SecretService;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class ConnectionMapper {
/**
* Maps a GMS encrypted connection details object into the decrypted form returned by the GraphQL
* API.
*
* <p>Returns null if the Entity does not have the required aspects: dataHubConnectionDetails or
* dataPlatformInstance.
*/
@Nullable
public static DataHubConnection map(
@Nonnull final QueryContext context,
@Nonnull final EntityResponse entityResponse,
@Nonnull final SecretService secretService) {
// If the connection does not exist, simply return null
if (!hasAspects(entityResponse)) {
return null;
}

final DataHubConnection result = new DataHubConnection();
final Urn entityUrn = entityResponse.getUrn();
final EnvelopedAspectMap aspects = entityResponse.getAspects();

result.setUrn(entityUrn.toString());
result.setType(EntityType.DATAHUB_CONNECTION);

final EnvelopedAspect envelopedAssertionInfo =
aspects.get(Constants.DATAHUB_CONNECTION_DETAILS_ASPECT_NAME);
if (envelopedAssertionInfo != null) {
result.setDetails(
mapConnectionDetails(
context,
new com.linkedin.connection.DataHubConnectionDetails(
envelopedAssertionInfo.getValue().data()),
secretService));
}
final EnvelopedAspect envelopedPlatformInstance =
aspects.get(Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME);
if (envelopedPlatformInstance != null) {
final DataMap data = envelopedPlatformInstance.getValue().data();
result.setPlatform(mapPlatform(new DataPlatformInstance(data)));
}
return result;
}

private static DataHubConnectionDetails mapConnectionDetails(
@Nonnull final QueryContext context,
@Nonnull final com.linkedin.connection.DataHubConnectionDetails gmsDetails,
@Nonnull final SecretService secretService) {
final DataHubConnectionDetails result = new DataHubConnectionDetails();
result.setType(
com.linkedin.datahub.graphql.generated.DataHubConnectionDetailsType.valueOf(
gmsDetails.getType().toString()));
if (gmsDetails.hasJson() && ConnectionUtils.canManageConnections(context)) {
result.setJson(mapJsonConnectionDetails(gmsDetails.getJson(), secretService));
}
if (gmsDetails.hasName()) {
result.setName(gmsDetails.getName());
}
return result;
}

private static DataHubJsonConnection mapJsonConnectionDetails(
@Nonnull final com.linkedin.connection.DataHubJsonConnection gmsJsonConnection,
@Nonnull final SecretService secretService) {
final DataHubJsonConnection result = new DataHubJsonConnection();
// Decrypt the BLOB!
result.setBlob(secretService.decrypt(gmsJsonConnection.getEncryptedBlob()));
return result;
}

private static DataPlatform mapPlatform(final DataPlatformInstance platformInstance) {
// Set dummy platform to be resolved.
final DataPlatform partialPlatform = new DataPlatform();
partialPlatform.setUrn(platformInstance.getPlatform().toString());
return partialPlatform;
}

private static boolean hasAspects(@Nonnull final EntityResponse response) {
return response.hasAspects()
&& response.getAspects().containsKey(Constants.DATAHUB_CONNECTION_DETAILS_ASPECT_NAME)
&& response.getAspects().containsKey(Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME);
}

private ConnectionMapper() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.datahub.graphql.resolvers.connection;

import com.datahub.authorization.AuthUtil;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.metadata.authorization.PoliciesConfig;
import javax.annotation.Nonnull;

/** Utilities for working with DataHub Connections. */
public class ConnectionUtils {

/**
* Returns true if the user is able to read and or write connection between DataHub and external
* platforms.
*/
public static boolean canManageConnections(@Nonnull QueryContext context) {
return AuthUtil.isAuthorized(
context.getAuthorizer(),
context.getActorUrn(),
PoliciesConfig.MANAGE_CONNECTIONS_PRIVILEGE);
}

private ConnectionUtils() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.linkedin.datahub.graphql.resolvers.connection;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;

import com.datahub.authentication.Authentication;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.connection.DataHubConnectionDetailsType;
import com.linkedin.connection.DataHubJsonConnection;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.DataHubConnection;
import com.linkedin.datahub.graphql.generated.UpsertDataHubConnectionInput;
import com.linkedin.entity.EntityResponse;
import com.linkedin.metadata.connection.ConnectionService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.services.SecretService;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class UpsertConnectionResolver implements DataFetcher<CompletableFuture<DataHubConnection>> {

private final ConnectionService _connectionService;
private final SecretService _secretService;

public UpsertConnectionResolver(
@Nonnull final ConnectionService connectionService,
@Nonnull final SecretService secretService) {
_connectionService =
Objects.requireNonNull(connectionService, "connectionService cannot be null");
_secretService = Objects.requireNonNull(secretService, "secretService cannot be null");
}

@Override
public CompletableFuture<DataHubConnection> get(final DataFetchingEnvironment environment)
throws Exception {

final QueryContext context = environment.getContext();
final UpsertDataHubConnectionInput input =
bindArgument(environment.getArgument("input"), UpsertDataHubConnectionInput.class);
final Authentication authentication = context.getAuthentication();

return CompletableFuture.supplyAsync(
() -> {
if (!ConnectionUtils.canManageConnections(context)) {
throw new AuthorizationException(
"Unauthorized to upsert Connection. Please contact your DataHub administrator for more information.");
}

try {
final Urn connectionUrn =
_connectionService.upsertConnection(
context.getOperationContext(),
input.getId(),
UrnUtils.getUrn(input.getPlatformUrn()),
DataHubConnectionDetailsType.valueOf(input.getType().toString()),
input.getJson() != null
// Encrypt payload
? new DataHubJsonConnection()
.setEncryptedBlob(_secretService.encrypt(input.getJson().getBlob()))
: null,
input.getName());

final EntityResponse connectionResponse =
_connectionService.getConnectionEntityResponse(
context.getOperationContext(), connectionUrn);
return ConnectionMapper.map(context, connectionResponse, _secretService);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to upsert a Connection from input %s", input), e);
}
});
}
}
Loading
Loading