Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data quality): update models, add assertions cli with snowflake integration #10602

Merged
merged 11 commits into from
May 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ private Constants() {}
public static final String LINEAGE_SCHEMA_FILE = "lineage.graphql";
public static final String PROPERTIES_SCHEMA_FILE = "properties.graphql";
public static final String FORMS_SCHEMA_FILE = "forms.graphql";
public static final String ASSERTIONS_SCHEMA_FILE = "assertions.graphql";
public static final String INCIDENTS_SCHEMA_FILE = "incident.graphql";
public static final String CONNECTIONS_SCHEMA_FILE = "connection.graphql";
public static final String BROWSE_PATH_DELIMITER = "/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@
import com.linkedin.datahub.graphql.resolvers.assertion.AssertionRunEventResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.DeleteAssertionResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.EntityAssertionsResolver;
import com.linkedin.datahub.graphql.resolvers.auth.*;
import com.linkedin.datahub.graphql.resolvers.auth.CreateAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.auth.DebugAccessResolver;
import com.linkedin.datahub.graphql.resolvers.auth.GetAccessTokenMetadataResolver;
import com.linkedin.datahub.graphql.resolvers.auth.GetAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.auth.ListAccessTokensResolver;
import com.linkedin.datahub.graphql.resolvers.auth.RevokeAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.browse.BrowsePathsResolver;
import com.linkedin.datahub.graphql.resolvers.browse.BrowseResolver;
import com.linkedin.datahub.graphql.resolvers.browse.EntityBrowsePathsResolver;
Expand Down Expand Up @@ -814,6 +819,7 @@ public GraphQLEngine.Builder builder() {
.addSchema(fileBasedSchema(PROPERTIES_SCHEMA_FILE))
.addSchema(fileBasedSchema(FORMS_SCHEMA_FILE))
.addSchema(fileBasedSchema(CONNECTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(ASSERTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE));

for (GmsGraphQLPlugin plugin : this.graphQLPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment e
&& AssertionResultType.SUCCESS.equals(
runEvent.getResult().getType()))
.count()));
result.setErrored(
Math.toIntExact(
runEvents.stream()
.filter(
runEvent ->
AssertionRunStatus.COMPLETE.equals(runEvent.getStatus())
&& runEvent.getResult() != null
&& AssertionResultType.ERROR.equals(
runEvent.getResult().getType()))
.count()));
result.setRunEvents(runEvents);
return result;
} catch (RemoteInvocationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;

import com.linkedin.assertion.AssertionAction;
import com.linkedin.assertion.AssertionActions;
import com.linkedin.assertion.AssertionInfo;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.GlobalTags;
Expand All @@ -10,24 +12,40 @@
import com.linkedin.data.DataMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.AssertionActionType;
import com.linkedin.datahub.graphql.generated.AssertionSource;
import com.linkedin.datahub.graphql.generated.AssertionSourceType;
import com.linkedin.datahub.graphql.generated.AssertionStdAggregation;
import com.linkedin.datahub.graphql.generated.AssertionStdOperator;
import com.linkedin.datahub.graphql.generated.AssertionStdParameter;
import com.linkedin.datahub.graphql.generated.AssertionStdParameterType;
import com.linkedin.datahub.graphql.generated.AssertionStdParameters;
import com.linkedin.datahub.graphql.generated.AssertionType;
import com.linkedin.datahub.graphql.generated.AuditStamp;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DatasetAssertionInfo;
import com.linkedin.datahub.graphql.generated.DatasetAssertionScope;
import com.linkedin.datahub.graphql.generated.DateInterval;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FieldAssertionInfo;
import com.linkedin.datahub.graphql.generated.FixedIntervalSchedule;
import com.linkedin.datahub.graphql.generated.FreshnessAssertionInfo;
import com.linkedin.datahub.graphql.generated.SchemaAssertionCompatibility;
import com.linkedin.datahub.graphql.generated.SchemaAssertionField;
import com.linkedin.datahub.graphql.generated.SchemaAssertionInfo;
import com.linkedin.datahub.graphql.generated.SchemaFieldRef;
import com.linkedin.datahub.graphql.generated.SqlAssertionInfo;
import com.linkedin.datahub.graphql.generated.VolumeAssertionInfo;
import com.linkedin.datahub.graphql.types.common.mappers.DataPlatformInstanceAspectMapper;
import com.linkedin.datahub.graphql.types.common.mappers.StringMapMapper;
import com.linkedin.datahub.graphql.types.dataset.mappers.SchemaFieldMapper;
import com.linkedin.datahub.graphql.types.dataset.mappers.SchemaMetadataMapper;
import com.linkedin.datahub.graphql.types.tag.mappers.GlobalTagsMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.metadata.Constants;
import com.linkedin.schema.SchemaField;
import java.util.Collections;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -48,6 +66,14 @@ public static Assertion map(@Nullable QueryContext context, final EntityResponse
result.setInfo(
mapAssertionInfo(context, new AssertionInfo(envelopedAssertionInfo.getValue().data())));
}

final EnvelopedAspect envelopedAssertionActions =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please remove?

aspects.get(Constants.ASSERTION_ACTIONS_ASPECT_NAME);
if (envelopedAssertionActions != null) {
result.setActions(
mapAssertionActions(new AssertionActions(envelopedAssertionActions.getValue().data())));
}

final EnvelopedAspect envelopedPlatformInstance =
aspects.get(Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME);
if (envelopedPlatformInstance != null) {
Expand Down Expand Up @@ -83,20 +109,93 @@ private static com.linkedin.datahub.graphql.generated.Status mapStatus(Status st
return result;
}

private static com.linkedin.datahub.graphql.generated.AssertionInfo mapAssertionInfo(
public static com.linkedin.datahub.graphql.generated.AssertionInfo mapAssertionInfo(
@Nullable QueryContext context, final AssertionInfo gmsAssertionInfo) {
final com.linkedin.datahub.graphql.generated.AssertionInfo assertionInfo =
new com.linkedin.datahub.graphql.generated.AssertionInfo();
assertionInfo.setType(AssertionType.valueOf(gmsAssertionInfo.getType().name()));

if (gmsAssertionInfo.hasLastUpdated()) {
assertionInfo.setLastUpdated(
new AuditStamp(
gmsAssertionInfo.getLastUpdated().getTime(),
gmsAssertionInfo.getLastUpdated().getActor().toString()));
}
if (gmsAssertionInfo.hasDatasetAssertion()) {
DatasetAssertionInfo datasetAssertion =
mapDatasetAssertionInfo(context, gmsAssertionInfo.getDatasetAssertion());
assertionInfo.setDatasetAssertion(datasetAssertion);
}
assertionInfo.setDescription(gmsAssertionInfo.getDescription());
// Description
if (gmsAssertionInfo.hasDescription()) {
assertionInfo.setDescription(gmsAssertionInfo.getDescription());
}
// FRESHNESS Assertions
if (gmsAssertionInfo.hasFreshnessAssertion()) {
FreshnessAssertionInfo freshnessAssertionInfo =
FreshnessAssertionMapper.mapFreshnessAssertionInfo(
context, gmsAssertionInfo.getFreshnessAssertion());
assertionInfo.setFreshnessAssertion(freshnessAssertionInfo);
}
// VOLUME Assertions
if (gmsAssertionInfo.hasVolumeAssertion()) {
VolumeAssertionInfo volumeAssertionInfo =
VolumeAssertionMapper.mapVolumeAssertionInfo(
context, gmsAssertionInfo.getVolumeAssertion());
assertionInfo.setVolumeAssertion(volumeAssertionInfo);
}
// SQL Assertions
if (gmsAssertionInfo.hasSqlAssertion()) {
SqlAssertionInfo sqlAssertionInfo =
SqlAssertionMapper.mapSqlAssertionInfo(gmsAssertionInfo.getSqlAssertion());
assertionInfo.setSqlAssertion(sqlAssertionInfo);
}
// FIELD Assertions
if (gmsAssertionInfo.hasFieldAssertion()) {
FieldAssertionInfo fieldAssertionInfo =
FieldAssertionMapper.mapFieldAssertionInfo(context, gmsAssertionInfo.getFieldAssertion());
assertionInfo.setFieldAssertion(fieldAssertionInfo);
}
// SCHEMA Assertions
if (gmsAssertionInfo.hasSchemaAssertion()) {
SchemaAssertionInfo schemaAssertionInfo =
mapSchemaAssertionInfo(context, gmsAssertionInfo.getSchemaAssertion());
assertionInfo.setSchemaAssertion(schemaAssertionInfo);
}
// Source Type
if (gmsAssertionInfo.hasSource()) {
assertionInfo.setSource(mapSource(gmsAssertionInfo.getSource()));
}
return assertionInfo;
}

private static com.linkedin.datahub.graphql.generated.AssertionActions mapAssertionActions(
final AssertionActions gmsAssertionActions) {
final com.linkedin.datahub.graphql.generated.AssertionActions result =
new com.linkedin.datahub.graphql.generated.AssertionActions();
if (gmsAssertionActions.hasOnFailure()) {
result.setOnFailure(
gmsAssertionActions.getOnFailure().stream()
.map(AssertionMapper::mapAssertionAction)
.collect(Collectors.toList()));
}
if (gmsAssertionActions.hasOnSuccess()) {
result.setOnSuccess(
gmsAssertionActions.getOnSuccess().stream()
.map(AssertionMapper::mapAssertionAction)
.collect(Collectors.toList()));
}
return result;
}

private static com.linkedin.datahub.graphql.generated.AssertionAction mapAssertionAction(
final AssertionAction gmsAssertionAction) {
final com.linkedin.datahub.graphql.generated.AssertionAction result =
new com.linkedin.datahub.graphql.generated.AssertionAction();
result.setType(AssertionActionType.valueOf(gmsAssertionAction.getType().toString()));
return result;
}

private static DatasetAssertionInfo mapDatasetAssertionInfo(
@Nullable QueryContext context,
final com.linkedin.assertion.DatasetAssertionInfo gmsDatasetAssertion) {
Expand Down Expand Up @@ -152,7 +251,7 @@ private static SchemaFieldRef mapDatasetSchemaField(final Urn schemaFieldUrn) {
return new SchemaFieldRef(schemaFieldUrn.toString(), schemaFieldUrn.getEntityKey().get(1));
}

private static AssertionStdParameters mapParameters(
protected static AssertionStdParameters mapParameters(
final com.linkedin.assertion.AssertionStdParameters params) {
final AssertionStdParameters result = new AssertionStdParameters();
if (params.hasValue()) {
Expand All @@ -175,5 +274,61 @@ private static AssertionStdParameter mapParameter(
return result;
}

private AssertionMapper() {}
protected static FixedIntervalSchedule mapFixedIntervalSchedule(
com.linkedin.assertion.FixedIntervalSchedule gmsFixedIntervalSchedule) {
FixedIntervalSchedule fixedIntervalSchedule = new FixedIntervalSchedule();
fixedIntervalSchedule.setUnit(DateInterval.valueOf(gmsFixedIntervalSchedule.getUnit().name()));
fixedIntervalSchedule.setMultiple(gmsFixedIntervalSchedule.getMultiple());
return fixedIntervalSchedule;
}

private static AssertionSource mapSource(final com.linkedin.assertion.AssertionSource gmsSource) {
AssertionSource result = new AssertionSource();
result.setType(AssertionSourceType.valueOf(gmsSource.getType().toString()));
if (gmsSource.hasCreated()) {
result.setCreated(
new AuditStamp(
gmsSource.getCreated().getTime(), gmsSource.getCreated().getActor().toString()));
}
return result;
}

protected static com.linkedin.datahub.graphql.generated.SchemaFieldSpec mapSchemaFieldSpec(
final com.linkedin.schema.SchemaFieldSpec gmsField) {
final com.linkedin.datahub.graphql.generated.SchemaFieldSpec result =
new com.linkedin.datahub.graphql.generated.SchemaFieldSpec();
result.setPath(gmsField.getPath());
result.setType(gmsField.getType());
result.setNativeType(gmsField.getNativeType());
return result;
}

private static SchemaAssertionInfo mapSchemaAssertionInfo(
@Nullable final QueryContext context,
final com.linkedin.assertion.SchemaAssertionInfo gmsSchemaAssertionInfo) {
SchemaAssertionInfo result = new SchemaAssertionInfo();
result.setCompatibility(
SchemaAssertionCompatibility.valueOf(gmsSchemaAssertionInfo.getCompatibility().name()));
result.setEntityUrn(gmsSchemaAssertionInfo.getEntity().toString());
result.setSchema(
SchemaMetadataMapper.INSTANCE.apply(
context, gmsSchemaAssertionInfo.getSchema(), gmsSchemaAssertionInfo.getEntity(), 0L));
result.setFields(
gmsSchemaAssertionInfo.getSchema().getFields().stream()
.map(AssertionMapper::mapSchemaField)
.collect(Collectors.toList()));
return result;
}

private static SchemaAssertionField mapSchemaField(final SchemaField gmsField) {
SchemaAssertionField result = new SchemaAssertionField();
result.setPath(gmsField.getFieldPath());
result.setType(new SchemaFieldMapper().mapSchemaFieldDataType(gmsField.getType()));
if (gmsField.hasNativeDataType()) {
result.setNativeType(gmsField.getNativeDataType());
}
return result;
}

protected AssertionMapper() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class AssertionType
Constants.ASSERTION_KEY_ASPECT_NAME,
Constants.ASSERTION_INFO_ASPECT_NAME,
Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME,
Constants.GLOBAL_TAGS_ASPECT_NAME);

Constants.GLOBAL_TAGS_ASPECT_NAME,
Constants.ASSERTION_ACTIONS_ASPECT_NAME);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

private final EntityClient _entityClient;

public AssertionType(final EntityClient entityClient) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.linkedin.datahub.graphql.types.assertion;

import com.linkedin.assertion.FieldAssertionInfo;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AssertionStdOperator;
import com.linkedin.datahub.graphql.generated.FieldAssertionType;
import com.linkedin.datahub.graphql.generated.FieldMetricType;
import com.linkedin.datahub.graphql.generated.FieldTransformType;
import com.linkedin.datahub.graphql.generated.FieldValuesFailThresholdType;
import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetFilterMapper;
import javax.annotation.Nullable;

public class FieldAssertionMapper extends AssertionMapper {

public static com.linkedin.datahub.graphql.generated.FieldAssertionInfo mapFieldAssertionInfo(
@Nullable final QueryContext context, final FieldAssertionInfo gmsFieldAssertionInfo) {
final com.linkedin.datahub.graphql.generated.FieldAssertionInfo result =
new com.linkedin.datahub.graphql.generated.FieldAssertionInfo();
result.setEntityUrn(gmsFieldAssertionInfo.getEntity().toString());
result.setType(FieldAssertionType.valueOf(gmsFieldAssertionInfo.getType().name()));
if (gmsFieldAssertionInfo.hasFilter()) {
result.setFilter(DatasetFilterMapper.map(context, gmsFieldAssertionInfo.getFilter()));
}
if (gmsFieldAssertionInfo.hasFieldValuesAssertion()) {
result.setFieldValuesAssertion(
mapFieldValuesAssertion(gmsFieldAssertionInfo.getFieldValuesAssertion()));
}
if (gmsFieldAssertionInfo.hasFieldMetricAssertion()) {
result.setFieldMetricAssertion(
mapFieldMetricAssertion(gmsFieldAssertionInfo.getFieldMetricAssertion()));
}
return result;
}

private static com.linkedin.datahub.graphql.generated.FieldValuesAssertion
mapFieldValuesAssertion(
final com.linkedin.assertion.FieldValuesAssertion gmsFieldValuesAssertion) {
final com.linkedin.datahub.graphql.generated.FieldValuesAssertion result =
new com.linkedin.datahub.graphql.generated.FieldValuesAssertion();
result.setField(mapSchemaFieldSpec(gmsFieldValuesAssertion.getField()));
result.setOperator(AssertionStdOperator.valueOf(gmsFieldValuesAssertion.getOperator().name()));
result.setFailThreshold(
mapFieldValuesFailThreshold(gmsFieldValuesAssertion.getFailThreshold()));
result.setExcludeNulls(gmsFieldValuesAssertion.isExcludeNulls());

if (gmsFieldValuesAssertion.hasTransform()) {
result.setTransform(mapFieldTransform(gmsFieldValuesAssertion.getTransform()));
}

if (gmsFieldValuesAssertion.hasParameters()) {
result.setParameters(mapParameters(gmsFieldValuesAssertion.getParameters()));
}
return result;
}

private static com.linkedin.datahub.graphql.generated.FieldMetricAssertion
mapFieldMetricAssertion(
final com.linkedin.assertion.FieldMetricAssertion gmsFieldMetricAssertion) {
final com.linkedin.datahub.graphql.generated.FieldMetricAssertion result =
new com.linkedin.datahub.graphql.generated.FieldMetricAssertion();
result.setField(mapSchemaFieldSpec(gmsFieldMetricAssertion.getField()));
result.setMetric(FieldMetricType.valueOf(gmsFieldMetricAssertion.getMetric().name()));
result.setOperator(AssertionStdOperator.valueOf(gmsFieldMetricAssertion.getOperator().name()));

if (gmsFieldMetricAssertion.hasParameters()) {
result.setParameters(mapParameters(gmsFieldMetricAssertion.getParameters()));
}

return result;
}

private static com.linkedin.datahub.graphql.generated.FieldTransform mapFieldTransform(
final com.linkedin.assertion.FieldTransform gmsFieldTransform) {
final com.linkedin.datahub.graphql.generated.FieldTransform result =
new com.linkedin.datahub.graphql.generated.FieldTransform();
result.setType(FieldTransformType.valueOf(gmsFieldTransform.getType().name()));
return result;
}

private static com.linkedin.datahub.graphql.generated.FieldValuesFailThreshold
mapFieldValuesFailThreshold(
final com.linkedin.assertion.FieldValuesFailThreshold gmsFieldValuesFailThreshold) {
final com.linkedin.datahub.graphql.generated.FieldValuesFailThreshold result =
new com.linkedin.datahub.graphql.generated.FieldValuesFailThreshold();
result.setType(
FieldValuesFailThresholdType.valueOf(gmsFieldValuesFailThreshold.getType().name()));
result.setValue(gmsFieldValuesFailThreshold.getValue());
return result;
}

private FieldAssertionMapper() {}
}
Loading
Loading