diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java index b751849859be4..cba214365fdac 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java @@ -1869,7 +1869,9 @@ private void configureCorpGroupResolvers(final RuntimeWiring.Builder builder) { "CorpGroup", typeWiring -> typeWiring - .dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient)) + .dataFetcher( + "relationships", + new EntityRelationshipsResultResolver(graphClient, entityService)) .dataFetcher("privileges", new EntityPrivilegesResolver(entityClient)) .dataFetcher( "aspects", new WeaklyTypedAspectsResolver(entityClient, entityRegistry)) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/load/EntityRelationshipsResultResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/load/EntityRelationshipsResultResolver.java index f775853dd5956..fd72edb2972e3 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/load/EntityRelationshipsResultResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/load/EntityRelationshipsResultResolver.java @@ -5,6 +5,7 @@ import com.linkedin.common.EntityRelationship; import com.linkedin.common.EntityRelationships; +import com.linkedin.common.urn.Urn; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils; import com.linkedin.datahub.graphql.generated.Entity; @@ -12,11 +13,13 @@ import com.linkedin.datahub.graphql.generated.RelationshipsInput; import com.linkedin.datahub.graphql.types.common.mappers.AuditStampMapper; import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper; +import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.graph.GraphClient; import com.linkedin.metadata.query.filter.RelationshipDirection; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -29,8 +32,16 @@ public class EntityRelationshipsResultResolver private final GraphClient _graphClient; + private final EntityService _entityService; + public EntityRelationshipsResultResolver(final GraphClient graphClient) { + this(graphClient, null); + } + + public EntityRelationshipsResultResolver( + final GraphClient graphClient, final EntityService entityService) { _graphClient = graphClient; + _entityService = entityService; } @Override @@ -47,13 +58,16 @@ public CompletableFuture get(DataFetchingEnvironment final Integer count = input.getCount(); // Optional! final RelationshipDirection resolvedDirection = RelationshipDirection.valueOf(relationshipDirection.toString()); + final boolean includeSoftDelete = input.getIncludeSoftDelete(); + return GraphQLConcurrencyUtils.supplyAsync( () -> mapEntityRelationships( context, fetchEntityRelationships( urn, relationshipTypes, resolvedDirection, start, count, context.getActorUrn()), - resolvedDirection), + resolvedDirection, + includeSoftDelete), this.getClass().getSimpleName(), "get"); } @@ -72,13 +86,28 @@ private EntityRelationships fetchEntityRelationships( private EntityRelationshipsResult mapEntityRelationships( @Nullable final QueryContext context, final EntityRelationships entityRelationships, - final RelationshipDirection relationshipDirection) { + final RelationshipDirection relationshipDirection, + final boolean includeSoftDelete) { final EntityRelationshipsResult result = new EntityRelationshipsResult(); + final Set existentUrns; + if (context != null && _entityService != null && !includeSoftDelete) { + Set allRelatedUrns = + entityRelationships.getRelationships().stream() + .map(EntityRelationship::getEntity) + .collect(Collectors.toSet()); + existentUrns = _entityService.exists(context.getOperationContext(), allRelatedUrns, false); + } else { + existentUrns = null; + } + List viewable = entityRelationships.getRelationships().stream() .filter( - rel -> context == null || canView(context.getOperationContext(), rel.getEntity())) + rel -> + (existentUrns == null || existentUrns.contains(rel.getEntity())) + && (context == null + || canView(context.getOperationContext(), rel.getEntity()))) .collect(Collectors.toList()); result.setStart(entityRelationships.getStart()); diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index 246ace2fc0f5f..c18550d2d407f 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -1267,6 +1267,11 @@ input RelationshipsInput { The number of results to be returned """ count: Int + + """ + Whether to include soft-deleted, related, entities + """ + includeSoftDelete: Boolean = true } """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/load/EntityRelationshipsResultResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/load/EntityRelationshipsResultResolverTest.java new file mode 100644 index 0000000000000..d2799278c1238 --- /dev/null +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/load/EntityRelationshipsResultResolverTest.java @@ -0,0 +1,124 @@ +package com.linkedin.datahub.graphql.resolvers.load; + +import static com.linkedin.datahub.graphql.TestUtils.getMockAllowContext; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import com.linkedin.common.EntityRelationship; +import com.linkedin.common.EntityRelationshipArray; +import com.linkedin.common.EntityRelationships; +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.graphql.QueryContext; +import com.linkedin.datahub.graphql.generated.*; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.graph.GraphClient; +import graphql.schema.DataFetchingEnvironment; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class EntityRelationshipsResultResolverTest { + private final Urn existentUser = Urn.createFromString("urn:li:corpuser:johndoe"); + private final Urn softDeletedUser = Urn.createFromString("urn:li:corpuser:deletedUser"); + + private CorpUser existentEntity; + private CorpUser softDeletedEntity; + + private EntityService _entityService; + private GraphClient _graphClient; + + private EntityRelationshipsResultResolver resolver; + private RelationshipsInput input; + private DataFetchingEnvironment mockEnv; + + public EntityRelationshipsResultResolverTest() throws URISyntaxException {} + + @BeforeMethod + public void setupTest() { + _entityService = mock(EntityService.class); + _graphClient = mock(GraphClient.class); + resolver = new EntityRelationshipsResultResolver(_graphClient, _entityService); + + mockEnv = mock(DataFetchingEnvironment.class); + QueryContext context = getMockAllowContext(); + when(mockEnv.getContext()).thenReturn(context); + + CorpGroup source = new CorpGroup(); + source.setUrn("urn:li:corpGroup:group1"); + when(mockEnv.getSource()).thenReturn(source); + + when(_entityService.exists(any(), eq(Set.of(existentUser, softDeletedUser)), eq(true))) + .thenReturn(Set.of(existentUser, softDeletedUser)); + when(_entityService.exists(any(), eq(Set.of(existentUser, softDeletedUser)), eq(false))) + .thenReturn(Set.of(existentUser)); + + input = new RelationshipsInput(); + input.setStart(0); + input.setCount(10); + input.setDirection(RelationshipDirection.INCOMING); + input.setTypes(List.of("SomeType")); + + EntityRelationships entityRelationships = + new EntityRelationships() + .setStart(0) + .setCount(2) + .setTotal(2) + .setRelationships( + new EntityRelationshipArray( + new EntityRelationship().setEntity(existentUser).setType("SomeType"), + new EntityRelationship().setEntity(softDeletedUser).setType("SomeType"))); + + // always expected INCOMING, and "SomeType" in all tests + when(_graphClient.getRelatedEntities( + eq(source.getUrn()), + eq(input.getTypes()), + same(com.linkedin.metadata.query.filter.RelationshipDirection.INCOMING), + eq(input.getStart()), + eq(input.getCount()), + any())) + .thenReturn(entityRelationships); + + when(mockEnv.getArgument(eq("input"))).thenReturn(input); + + existentEntity = new CorpUser(); + existentEntity.setUrn(existentUser.toString()); + existentEntity.setType(EntityType.CORP_USER); + + softDeletedEntity = new CorpUser(); + softDeletedEntity.setUrn(softDeletedUser.toString()); + softDeletedEntity.setType(EntityType.CORP_USER); + } + + @Test + public void testIncludeSoftDeleted() throws ExecutionException, InterruptedException { + EntityRelationshipsResult expected = new EntityRelationshipsResult(); + expected.setRelationships( + List.of(resultRelationship(existentEntity), resultRelationship(softDeletedEntity))); + expected.setStart(0); + expected.setCount(2); + expected.setTotal(2); + assertEquals(resolver.get(mockEnv).get().toString(), expected.toString()); + } + + @Test + public void testExcludeSoftDeleted() throws ExecutionException, InterruptedException { + input.setIncludeSoftDelete(false); + EntityRelationshipsResult expected = new EntityRelationshipsResult(); + expected.setRelationships(List.of(resultRelationship(existentEntity))); + expected.setStart(0); + expected.setCount(1); + expected.setTotal(1); + assertEquals(resolver.get(mockEnv).get().toString(), expected.toString()); + } + + private com.linkedin.datahub.graphql.generated.EntityRelationship resultRelationship( + Entity entity) { + return new com.linkedin.datahub.graphql.generated.EntityRelationship( + "SomeType", RelationshipDirection.INCOMING, entity, null); + } +} diff --git a/datahub-web-react/src/graphql/group.graphql b/datahub-web-react/src/graphql/group.graphql index ee04489540f9c..60da627fd254d 100644 --- a/datahub-web-react/src/graphql/group.graphql +++ b/datahub-web-react/src/graphql/group.graphql @@ -48,6 +48,7 @@ query getGroup($urn: String!, $membersCount: Int!) { direction: INCOMING start: 0 count: $membersCount + includeSoftDelete: false } ) { start @@ -86,6 +87,7 @@ query getAllGroupMembers($urn: String!, $start: Int!, $count: Int!) { direction: INCOMING start: $start count: $count + includeSoftDelete: false } ) { start @@ -121,7 +123,15 @@ query getAllGroupMembers($urn: String!, $start: Int!, $count: Int!) { query getGroupMembers($urn: String!, $start: Int!, $count: Int!) { corpGroup(urn: $urn) { - relationships(input: { types: ["IsMemberOfGroup"], direction: INCOMING, start: $start, count: $count }) { + relationships( + input: { + types: ["IsMemberOfGroup"] + direction: INCOMING + start: $start + count: $count + includeSoftDelete: false + } + ) { start count total @@ -155,7 +165,15 @@ query getGroupMembers($urn: String!, $start: Int!, $count: Int!) { query getNativeGroupMembers($urn: String!, $start: Int!, $count: Int!) { corpGroup(urn: $urn) { - relationships(input: { types: ["IsMemberOfNativeGroup"], direction: INCOMING, start: $start, count: $count }) { + relationships( + input: { + types: ["IsMemberOfNativeGroup"] + direction: INCOMING + start: $start + count: $count + includeSoftDelete: false + } + ) { start count total @@ -209,7 +227,13 @@ query listGroups($input: ListGroupsInput!) { pictureLink } memberCount: relationships( - input: { types: ["IsMemberOfGroup", "IsMemberOfNativeGroup"], direction: INCOMING, start: 0, count: 1 } + input: { + types: ["IsMemberOfGroup", "IsMemberOfNativeGroup"] + direction: INCOMING + start: 0 + count: 1 + includeSoftDelete: false + } ) { total } diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index a7aece48be3bc..6bc7c11907acc 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -281,6 +281,7 @@ module.exports = { }, { "Managed DataHub Release History": [ + "docs/managed-datahub/release-notes/v_0_3_4", "docs/managed-datahub/release-notes/v_0_3_3", "docs/managed-datahub/release-notes/v_0_3_2", "docs/managed-datahub/release-notes/v_0_3_1", diff --git a/docs/cli.md b/docs/cli.md index fdcfa6616c9bf..1f1e6dfa26be7 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -102,6 +102,7 @@ Command Options: --test-source-connection When set, ingestion will only test the source connection details from the recipe --no-progress If enabled, mute intermediate progress ingestion reports ``` + #### ingest --dry-run The `--dry-run` option of the `ingest` command performs all of the ingestion steps, except writing to the sink. This is useful to validate that the @@ -133,23 +134,8 @@ By default `--preview` creates 10 workunits. But if you wish to try producing mo datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml -n --preview --preview-workunits=20 ``` -#### ingest deploy - -The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md). -This command can also be used to schedule the ingestion while uploading or even to update existing sources. It will upload to the remote instance the -CLI is connected to, not the sink of the recipe. Use `datahub init` to set the remote if not already set. - -To schedule a recipe called "test", to run at 5am everyday, London time with the recipe configured in a local `recipe.yaml` file: -````shell -datahub ingest deploy --name "test" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml -```` - -To update an existing recipe please use the `--urn` parameter to specify the id of the recipe to update. - -**Note:** Updating a recipe will result in a replacement of the existing options with what was specified in the cli command. -I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated. - #### ingest --no-default-report + By default, the cli sends an ingestion report to DataHub, which allows you to see the result of all cli-based ingestion in the UI. This can be turned off with the `--no-default-report` flag. ```shell @@ -180,6 +166,52 @@ failure_log: filename: ./path/to/failure.json ``` +### ingest deploy + +The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md). +This command can also be used to schedule the ingestion while uploading or even to update existing sources. It will upload to the remote instance the +CLI is connected to, not the sink of the recipe. Use `datahub init` to set the remote if not already set. + +This command will automatically create a new recipe if it doesn't exist, or update it if it does. +Note that this is a complete update, and will remove any options that were previously set. +I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated. + +**Basic example** + +To schedule a recipe called "Snowflake Integration", to run at 5am every day, London time with the recipe configured in a local `recipe.yaml` file: + +```shell +datahub ingest deploy --name "Snowflake Integration" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml +``` + +By default, the ingestion recipe's identifier is generated by hashing the name. +You can override the urn generation by passing the `--urn` flag to the CLI. + +**Using `deployment` to avoid CLI args** + +As an alternative to configuring settings from the CLI, all of these settings can also be set in the `deployment` field of the recipe. + +```yml +# deployment_recipe.yml +deployment: + name: "Snowflake Integration" + schedule: "5 * * * *" + time_zone: "Europe/London" + +source: ... +``` + +```shell +datahub ingest deploy -c deployment_recipe.yml +``` + +This is particularly useful when you want all recipes to be stored in version control. + +```shell +# Deploy every yml recipe in a directory +ls recipe_directory/*.yml | xargs -n 1 -I {} datahub ingest deploy -c {} +``` + ### init The init command is used to tell `datahub` about where your DataHub instance is located. The CLI will point to localhost DataHub by default. @@ -242,8 +274,6 @@ The [metadata deletion guide](./how/delete-metadata.md) covers the various optio ### exists -**🤝 Version compatibility** : `acryl-datahub>=0.10.2.4` - The exists command can be used to check if an entity exists in DataHub. ```shell @@ -253,7 +283,6 @@ true false ``` - ### get The `get` command allows you to easily retrieve metadata from DataHub, by using the REST API. This works for both versioned aspects and timeseries aspects. For timeseries aspects, it fetches the latest value. @@ -314,6 +343,7 @@ Update succeeded with status 200 ``` #### put platform + **🤝 Version Compatibility:** `acryl-datahub>0.8.44.4` The **put platform** command instructs `datahub` to create or update metadata about a data platform. This is very useful if you are using a custom data platform, to set up its logo and display name for a native UI experience. @@ -346,6 +376,7 @@ datahub timeline --urn "urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccou The `dataset` command allows you to interact with the dataset entity. The `get` operation can be used to read in a dataset into a yaml file. + ```shell datahub dataset get --urn "$URN" --to-file "$FILE_NAME" ``` @@ -358,7 +389,6 @@ datahub dataset upsert -f dataset.yaml An example of `dataset.yaml` would look like as in [dataset.yaml](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/cli_usage/dataset/dataset.yaml). - ### user (User Entity) The `user` command allows you to interact with the User entity. @@ -411,7 +441,6 @@ members: display_name: "Joe's Hub" ``` - ### dataproduct (Data Product Entity) **🤝 Version Compatibility:** `acryl-datahub>=0.10.2.4` @@ -566,14 +595,12 @@ Use this to delete a Data Product from DataHub. Default to `--soft` which preser # > datahub dataproduct delete --urn "urn:li:dataProduct:pet_of_the_week" --hard ``` - ## Miscellaneous Admin Commands ### lite (experimental) The lite group of commands allow you to run an embedded, lightweight DataHub instance for command line exploration of your metadata. This is intended more for developer tool oriented usage rather than as a production server instance for DataHub. See [DataHub Lite](./datahub_lite.md) for more information about how you can ingest metadata into DataHub Lite and explore your metadata easily. - ### telemetry To help us understand how people are using DataHub, we collect anonymous usage statistics on actions such as command invocations via Mixpanel. @@ -640,7 +667,6 @@ External Entities Affected: None Old Entities Migrated = {'urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)', 'urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)', 'urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)', 'urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)'} ``` - ## Alternate Installation Options ### Using docker @@ -673,7 +699,7 @@ We use a plugin architecture so that you can install only the dependencies you a Please see our [Integrations page](https://datahubproject.io/integrations) if you want to filter on the features offered by each source. | Plugin Name | Install Command | Provides | -|------------------------------------------------------------------------------------------------| ---------------------------------------------------------- | --------------------------------------- | +| ---------------------------------------------------------------------------------------------- | ---------------------------------------------------------- | --------------------------------------- | | [metadata-file](./generated/ingestion/sources/metadata-file.md) | _included by default_ | File source and sink | | [athena](./generated/ingestion/sources/athena.md) | `pip install 'acryl-datahub[athena]'` | AWS Athena source | | [bigquery](./generated/ingestion/sources/bigquery.md) | `pip install 'acryl-datahub[bigquery]'` | BigQuery source | @@ -715,7 +741,7 @@ Please see our [Integrations page](https://datahubproject.io/integrations) if yo ### Sinks | Plugin Name | Install Command | Provides | -|-------------------------------------------------------------------| -------------------------------------------- | -------------------------- | +| ----------------------------------------------------------------- | -------------------------------------------- | -------------------------- | | [metadata-file](../metadata-ingestion/sink_docs/metadata-file.md) | _included by default_ | File source and sink | | [console](../metadata-ingestion/sink_docs/console.md) | _included by default_ | Console sink | | [datahub-rest](../metadata-ingestion/sink_docs/datahub.md) | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API | diff --git a/docs/managed-datahub/release-notes/v_0_3_4.md b/docs/managed-datahub/release-notes/v_0_3_4.md new file mode 100644 index 0000000000000..381fcea816a0b --- /dev/null +++ b/docs/managed-datahub/release-notes/v_0_3_4.md @@ -0,0 +1,39 @@ +# v0.3.4 +--- + +Release Availability Date +--- +29-Jul-2024 + +Recommended CLI/SDK +--- +- `v0.13.3.4` with release notes at https://github.com/acryldata/datahub/releases/tag/v0.13.3.4 + +If you are using an older CLI/SDK version, then please upgrade it. This applies for all CLI/SDK usages, if you are using it through your terminal, GitHub Actions, Airflow, in Python SDK somewhere, Java SDK, etc. This is a strong recommendation to upgrade, as we keep on pushing fixes in the CLI, and it helps us support you better. + +## Release Changelog +--- + +- Product changes + - [notifications] You are now able to send test notifications to Slack user or group from settings. + - [cross-instance share] Share failures should now correctly display handle URLs. + - [siblings] Fixed siblings split feature when used for autocomplete; DataHub now disallows creation of assertions on merged sibling page; Fixed a bug in editing an assertion on a merged lineage node. + - [lineage - v2] Minor fixes to lineage counts and column-level-lineage resolution by respecting soft deleted assets in the lineage graph. + + +- Ingestion changes + - Added support for SQL parsing in Looker source to more correctly represent column-level-lineage. Please report any edge cases issues to the Acryl team as we will be closely monitoring this integration. + - Added cli version in system metadata at the aspect level for tracking purposes. + - Added async REST sink capabilities in CLI. + - Added documentation on how to cleanup obsolete airflow pipelines and tasks from Datahub + - Soft deleted entities using the airflow plugin should now come back once they re-appear. + - Added 2 new connectors: Grafana & Azure Blob Storage (ABS) + - Glue to s3 lineage should now be generated automatically in the glue source. + - Added a snowflake queries source which does lineage, usage, queries, and operations all in one go. + - Added dataset profiling to PowerBI. + - Added a file source integration (S3 / Local Files), this is to add files as datasets. Not to be confused with the metadata file source that loads MCPs into DataHub. + +- Platform changes + - Added initial support for custom MCP mutator hooks. + +- Since `v0.3.3`, these changes from OSS DataHub have been pulled in: [OSS DataHub Changes](https://github.com/datahub-project/datahub/compare/92e9a5823bc14e81f0f21c55a68c493c3bbe87b9...8d874ad1e4bef9d7afbe20fb3cb457566a15c61c). diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 75760f3dbd95d..6c8cc6beafbbf 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -16,7 +16,9 @@ import datahub as datahub_package from datahub.cli import cli_utils from datahub.cli.config_utils import CONDENSED_DATAHUB_CONFIG_PATH +from datahub.configuration.common import ConfigModel from datahub.configuration.config_loader import load_config_file +from datahub.emitter.mce_builder import datahub_guid from datahub.ingestion.graph.client import get_default_graph from datahub.ingestion.run.connection import ConnectionManager from datahub.ingestion.run.pipeline import Pipeline @@ -204,6 +206,23 @@ async def run_ingestion_and_check_upgrade() -> int: # don't raise SystemExit if there's no error +def _make_ingestion_urn(name: str) -> str: + guid = datahub_guid( + { + "name": name, + } + ) + return f"urn:li:dataHubIngestionSource:deploy-{guid}" + + +class DeployOptions(ConfigModel): + name: str + schedule: Optional[str] = None + time_zone: str = "UTC" + cli_version: Optional[str] = None + executor_id: str = "default" + + @ingest.command() @upgrade.check_upgrade @telemetry.with_telemetry() @@ -212,7 +231,6 @@ async def run_ingestion_and_check_upgrade() -> int: "--name", type=str, help="Recipe Name", - required=True, ) @click.option( "-c", @@ -224,7 +242,7 @@ async def run_ingestion_and_check_upgrade() -> int: @click.option( "--urn", type=str, - help="Urn of recipe to update. Creates recipe if provided urn does not exist", + help="Urn of recipe to update. If not specified here or in the recipe's pipeline_name, this will create a new ingestion source.", required=False, ) @click.option( @@ -256,7 +274,7 @@ async def run_ingestion_and_check_upgrade() -> int: default="UTC", ) def deploy( - name: str, + name: Optional[str], config: str, urn: Optional[str], executor_id: str, @@ -280,74 +298,84 @@ def deploy( resolve_env_vars=False, ) + deploy_options_raw = pipeline_config.pop("deployment", None) + if deploy_options_raw is not None: + deploy_options = DeployOptions.parse_obj(deploy_options_raw) + + if name: + logger.info(f"Overriding deployment name {deploy_options.name} with {name}") + deploy_options.name = name + else: + if not name: + raise click.UsageError( + "Either --name must be set or deployment_name specified in the config" + ) + deploy_options = DeployOptions(name=name) + + # Use remaining CLI args to override deploy_options + if schedule: + deploy_options.schedule = schedule + if time_zone: + deploy_options.time_zone = time_zone + if cli_version: + deploy_options.cli_version = cli_version + if executor_id: + deploy_options.executor_id = executor_id + + logger.info(f"Using {repr(deploy_options)}") + + if not urn: + # When urn/name is not specified, we will generate a unique urn based on the deployment name. + urn = _make_ingestion_urn(deploy_options.name) + logger.info(f"Using recipe urn: {urn}") + + # Invariant - at this point, both urn and deploy_options are set. + variables: dict = { "urn": urn, - "name": name, + "name": deploy_options.name, "type": pipeline_config["source"]["type"], "recipe": json.dumps(pipeline_config), - "executorId": executor_id, - "version": cli_version, + "executorId": deploy_options.executor_id, + "version": deploy_options.cli_version, } - if schedule is not None: - variables["schedule"] = {"interval": schedule, "timezone": time_zone} - - if urn: - - graphql_query: str = textwrap.dedent( - """ - mutation updateIngestionSource( - $urn: String!, - $name: String!, - $type: String!, - $schedule: UpdateIngestionSourceScheduleInput, - $recipe: String!, - $executorId: String! - $version: String) { - - updateIngestionSource(urn: $urn, input: { - name: $name, - type: $type, - schedule: $schedule, - config: { - recipe: $recipe, - executorId: $executorId, - version: $version, - } - }) - } - """ - ) - else: - logger.info("No URN specified recipe urn, will create a new recipe.") - graphql_query = textwrap.dedent( - """ - mutation createIngestionSource( - $name: String!, - $type: String!, - $schedule: UpdateIngestionSourceScheduleInput, - $recipe: String!, - $executorId: String!, - $version: String) { - - createIngestionSource(input: { - name: $name, - type: $type, - schedule: $schedule, - config: { - recipe: $recipe, - executorId: $executorId, - version: $version, - } - }) - } - """ - ) + if deploy_options.schedule is not None: + variables["schedule"] = { + "interval": deploy_options.schedule, + "timezone": deploy_options.time_zone, + } + + # The updateIngestionSource endpoint can actually do upserts as well. + graphql_query: str = textwrap.dedent( + """ + mutation updateIngestionSource( + $urn: String!, + $name: String!, + $type: String!, + $schedule: UpdateIngestionSourceScheduleInput, + $recipe: String!, + $executorId: String! + $version: String) { + + updateIngestionSource(urn: $urn, input: { + name: $name, + type: $type, + schedule: $schedule, + config: { + recipe: $recipe, + executorId: $executorId, + version: $version, + } + }) + } + """ + ) response = datahub_graph.execute_graphql(graphql_query, variables=variables) click.echo( - f"✅ Successfully wrote data ingestion source metadata for recipe {name}:" + f"✅ Successfully wrote data ingestion source metadata for recipe {deploy_options.name}:" ) click.echo(response) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py index f8fcea7c57545..bc00517567bbd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py @@ -80,8 +80,8 @@ def get_table_and_shard(table_name: str) -> Tuple[Optional[str], Optional[str]]: @classmethod def from_string_name(cls, table: str) -> "BigqueryTableIdentifier": - parts = table.split(".") - # If the table name contains dollar sign, it is a referrence to a partitioned table and we have to strip it + parts = table.split(".", maxsplit=2) + # If the table name contains dollar sign, it is a reference to a partitioned table and we have to strip it table = parts[2].split("$", 1)[0] return cls(parts[0], parts[1], table) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py index 4ad9635069b15..bcf6d380a60fd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py @@ -286,12 +286,17 @@ class LookerDashboardSourceConfig( ) extract_independent_looks: bool = Field( False, - description="Extract looks which are not part of any Dashboard. To enable this flag the stateful_ingestion should also be enabled.", + description="Extract looks which are not part of any Dashboard. To enable this flag the stateful_ingestion " + "should also be enabled.", ) emit_used_explores_only: bool = Field( True, description="When enabled, only explores that are used by a Dashboard/Look will be ingested.", ) + include_platform_instance_in_urns: bool = Field( + False, + description="When enabled, platform instance will be added in dashboard and chart urn.", + ) @validator("external_base_url", pre=True, always=True) def external_url_defaults_to_api_config_base_url( diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index cd050fec35c2c..d61458d8e924a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -80,6 +80,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, ChangeAuditStamps, + DataPlatformInstance, Status, ) from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( @@ -95,11 +96,13 @@ ChartTypeClass, ContainerClass, DashboardInfoClass, + DataPlatformInfoClass, InputFieldClass, InputFieldsClass, OwnerClass, OwnershipClass, OwnershipTypeClass, + PlatformTypeClass, SubTypesClass, ) from datahub.utilities.backpressure_aware_executor import BackpressureAwareExecutor @@ -594,19 +597,21 @@ def _get_chart_type( } type_str = dashboard_element.type if not type_str: - self.reporter.report_warning( + self.reporter.info( title="Unrecognized Chart Type", - message=f"Chart type {type_str} is not recognized. Setting to None", - context=f"Dashboard Id: {dashboard_element.id}", + message="Chart is missing a chart type.", + context=f"Chart Id: {dashboard_element.id}", + log=False, ) return None try: chart_type = type_mapping[type_str] except KeyError: - self.reporter.report_warning( + self.reporter.info( title="Unrecognized Chart Type", message=f"Chart type {type_str} is not recognized. Setting to None", - context=f"Dashboard Id: {dashboard_element.id}", + context=f"Chart Id: {dashboard_element.id}", + log=False, ) chart_type = None @@ -624,6 +629,38 @@ def _get_folder_browse_path_v2_entries( if include_current_folder: yield BrowsePathEntryClass(id=urn, urn=urn) + def _create_platform_instance_aspect( + self, + ) -> DataPlatformInstance: + + assert ( + self.source_config.platform_name + ), "Platform name is not set in the configuration." + assert ( + self.source_config.platform_instance + ), "Platform instance is not set in the configuration." + + return DataPlatformInstance( + platform=builder.make_data_platform_urn(self.source_config.platform_name), + instance=builder.make_dataplatform_instance_urn( + platform=self.source_config.platform_name, + instance=self.source_config.platform_instance, + ), + ) + + def _make_chart_urn(self, element_id: str) -> str: + + platform_instance: Optional[str] = None + + if self.source_config.include_platform_instance_in_urns: + platform_instance = self.source_config.platform_instance + + return builder.make_chart_urn( + name=element_id, + platform=self.source_config.platform_name, + platform_instance=platform_instance, + ) + def _make_chart_metadata_events( self, dashboard_element: LookerDashboardElement, @@ -631,8 +668,8 @@ def _make_chart_metadata_events( LookerDashboard ], # dashboard will be None if this is a standalone look ) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]: - chart_urn = builder.make_chart_urn( - self.source_config.platform_name, dashboard_element.get_urn_element_id() + chart_urn = self._make_chart_urn( + element_id=dashboard_element.get_urn_element_id() ) chart_snapshot = ChartSnapshot( urn=chart_urn, @@ -713,6 +750,14 @@ def _make_chart_metadata_events( ), ] + if self.source_config.include_platform_instance_in_urns: + proposals.append( + MetadataChangeProposalWrapper( + entityUrn=chart_urn, + aspect=self._create_platform_instance_aspect(), + ), + ) + # If extracting embeds is enabled, produce an MCP for embed URL. if ( self.source_config.extract_embed_urls @@ -818,11 +863,26 @@ def _make_dashboard_metadata_events( ) ) + if self.source_config.include_platform_instance_in_urns: + proposals.append( + MetadataChangeProposalWrapper( + entityUrn=dashboard_urn, + aspect=self._create_platform_instance_aspect(), + ) + ) + return proposals def make_dashboard_urn(self, looker_dashboard: LookerDashboard) -> str: + platform_instance: Optional[str] = None + + if self.source_config.include_platform_instance_in_urns: + platform_instance = self.source_config.platform_instance + return builder.make_dashboard_urn( - self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id() + name=looker_dashboard.get_urn_dashboard_id(), + platform=self.source_config.platform_name, + platform_instance=platform_instance, ) def _make_explore_metadata_events( @@ -1154,8 +1214,8 @@ def _input_fields_from_dashboard_element( # enrich the input_fields with the fully hydrated ViewField from the now fetched explores for input_field in input_fields: - entity_urn = builder.make_chart_urn( - self.source_config.platform_name, dashboard_element.get_urn_element_id() + entity_urn = self._make_chart_urn( + element_id=dashboard_element.get_urn_element_id() ) view_field_for_reference = input_field.view_field @@ -1220,8 +1280,8 @@ def _make_metrics_dimensions_dashboard_mcp( def _make_metrics_dimensions_chart_mcp( self, dashboard_element: LookerDashboardElement ) -> MetadataChangeProposalWrapper: - chart_urn = builder.make_chart_urn( - self.source_config.platform_name, dashboard_element.get_urn_element_id() + chart_urn = self._make_chart_urn( + element_id=dashboard_element.get_urn_element_id() ) input_fields_aspect = InputFieldsClass( fields=self._input_fields_from_dashboard_element(dashboard_element) @@ -1513,6 +1573,25 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: looker_dashboards_for_usage: List[looker_usage.LookerDashboardForUsage] = [] + # Emit platform instance entity + if self.source_config.platform_instance: + platform_instance_urn = builder.make_dataplatform_instance_urn( + platform=self.source_config.platform_name, + instance=self.source_config.platform_instance, + ) + + yield MetadataWorkUnit( + id=f"{platform_instance_urn}-aspect-dataplatformInfo", + mcp=MetadataChangeProposalWrapper( + entityUrn=platform_instance_urn, + aspect=DataPlatformInfoClass( + name=self.source_config.platform_instance, + type=PlatformTypeClass.OTHERS, + datasetNameDelimiter=".", + ), + ), + ) + with self.reporter.report_stage("dashboard_chart_metadata"): for job in BackpressureAwareExecutor.map( self.process_dashboard, diff --git a/metadata-ingestion/src/datahub/sql_parsing/_models.py b/metadata-ingestion/src/datahub/sql_parsing/_models.py index 23594534b5fbb..d92d178b81cf4 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/_models.py +++ b/metadata-ingestion/src/datahub/sql_parsing/_models.py @@ -77,8 +77,20 @@ def from_sqlglot_table( default_db: Optional[str] = None, default_schema: Optional[str] = None, ) -> "_TableName": + if isinstance(table.this, sqlglot.exp.Dot): + # For tables that are more than 3 parts, the extra parts will be in a Dot. + # For now, we just merge them into the table name. + parts = [] + exp = table.this + while isinstance(exp, sqlglot.exp.Dot): + parts.append(exp.this.name) + exp = exp.expression + parts.append(exp.name) + table_name = ".".join(parts) + else: + table_name = table.this.name return cls( database=table.catalog or default_db, db_schema=table.db or default_schema, - table=table.this.name, + table=table_name, ) diff --git a/metadata-ingestion/tests/integration/looker/golden_test_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_ingest.json index 639e69a6f8205..76c8f04e8447a 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_ingest.json @@ -1,13 +1,32 @@ [ +{ + "entityType": "dataPlatformInstance", + "entityUrn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInfo", + "aspect": { + "json": { + "name": "ap-south-1", + "type": "OTHERS", + "datasetNameDelimiter": "." + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", - "entityUrn": "urn:li:container:691314a7b63628684d62a14861d057a8", + "entityUrn": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7", "changeType": "UPSERT", "aspectName": "containerProperties", "aspect": { "json": { "customProperties": { "platform": "looker", + "instance": "ap-south-1", "env": "PROD", "folder_id": "shared-folder-id" }, @@ -22,7 +41,7 @@ }, { "entityType": "container", - "entityUrn": "urn:li:container:691314a7b63628684d62a14861d057a8", + "entityUrn": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -38,12 +57,13 @@ }, { "entityType": "container", - "entityUrn": "urn:li:container:691314a7b63628684d62a14861d057a8", + "entityUrn": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { "json": { - "platform": "urn:li:dataPlatform:looker" + "platform": "urn:li:dataPlatform:looker", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)" } }, "systemMetadata": { @@ -54,7 +74,7 @@ }, { "entityType": "container", - "entityUrn": "urn:li:container:691314a7b63628684d62a14861d057a8", + "entityUrn": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7", "changeType": "UPSERT", "aspectName": "subTypes", "aspect": { @@ -72,12 +92,16 @@ }, { "entityType": "container", - "entityUrn": "urn:li:container:691314a7b63628684d62a14861d057a8", + "entityUrn": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7", "changeType": "UPSERT", "aspectName": "browsePathsV2", "aspect": { "json": { "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)" + }, { "id": "Folders" } @@ -93,7 +117,7 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { - "urn": "urn:li:chart:(looker,dashboard_elements.2)", + "urn": "urn:li:chart:(looker,ap-south-1.dashboard_elements.2)", "aspects": [ { "com.linkedin.pegasus2avro.common.Status": { @@ -120,7 +144,7 @@ "chartUrl": "https://looker.company.com/x/", "inputs": [ { - "string": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)" + "string": "urn:li:dataset:(urn:li:dataPlatform:looker,ap-south-1.data.explore.my_view,PROD)" } ] } @@ -143,7 +167,7 @@ }, { "entityType": "chart", - "entityUrn": "urn:li:chart:(looker,dashboard_elements.2)", + "entityUrn": "urn:li:chart:(looker,ap-south-1.dashboard_elements.2)", "changeType": "UPSERT", "aspectName": "subTypes", "aspect": { @@ -161,22 +185,43 @@ }, { "entityType": "chart", - "entityUrn": "urn:li:chart:(looker,dashboard_elements.2)", + "entityUrn": "urn:li:chart:(looker,ap-south-1.dashboard_elements.2)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:looker", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(looker,ap-south-1.dashboard_elements.2)", "changeType": "UPSERT", "aspectName": "browsePathsV2", "aspect": { "json": { "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)" + }, { "id": "Folders" }, { - "id": "urn:li:container:691314a7b63628684d62a14861d057a8", - "urn": "urn:li:container:691314a7b63628684d62a14861d057a8" + "id": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7", + "urn": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7" }, { - "id": "urn:li:dashboard:(looker,dashboards.1)", - "urn": "urn:li:dashboard:(looker,dashboards.1)" + "id": "urn:li:dashboard:(looker,ap-south-1.dashboards.1)", + "urn": "urn:li:dashboard:(looker,ap-south-1.dashboards.1)" } ] } @@ -190,7 +235,7 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": { - "urn": "urn:li:dashboard:(looker,dashboards.1)", + "urn": "urn:li:dashboard:(looker,ap-south-1.dashboards.1)", "aspects": [ { "com.linkedin.pegasus2avro.dashboard.DashboardInfo": { @@ -198,7 +243,7 @@ "title": "foo", "description": "lorem ipsum", "charts": [ - "urn:li:chart:(looker,dashboard_elements.2)" + "urn:li:chart:(looker,ap-south-1.dashboard_elements.2)" ], "datasets": [], "lastModified": { @@ -237,12 +282,12 @@ }, { "entityType": "dashboard", - "entityUrn": "urn:li:dashboard:(looker,dashboards.1)", + "entityUrn": "urn:li:dashboard:(looker,ap-south-1.dashboards.1)", "changeType": "UPSERT", "aspectName": "container", "aspect": { "json": { - "container": "urn:li:container:691314a7b63628684d62a14861d057a8" + "container": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7" } }, "systemMetadata": { @@ -253,7 +298,7 @@ }, { "entityType": "dashboard", - "entityUrn": "urn:li:dashboard:(looker,dashboards.1)", + "entityUrn": "urn:li:dashboard:(looker,ap-south-1.dashboards.1)", "changeType": "UPSERT", "aspectName": "embed", "aspect": { @@ -269,18 +314,39 @@ }, { "entityType": "dashboard", - "entityUrn": "urn:li:dashboard:(looker,dashboards.1)", + "entityUrn": "urn:li:dashboard:(looker,ap-south-1.dashboards.1)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:looker", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(looker,ap-south-1.dashboards.1)", "changeType": "UPSERT", "aspectName": "browsePathsV2", "aspect": { "json": { "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)" + }, { "id": "Folders" }, { - "id": "urn:li:container:691314a7b63628684d62a14861d057a8", - "urn": "urn:li:container:691314a7b63628684d62a14861d057a8" + "id": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7", + "urn": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7" } ] } @@ -293,14 +359,14 @@ }, { "entityType": "chart", - "entityUrn": "urn:li:chart:(looker,dashboard_elements.2)", + "entityUrn": "urn:li:chart:(looker,ap-south-1.dashboard_elements.2)", "changeType": "UPSERT", "aspectName": "inputFields", "aspect": { "json": { "fields": [ { - "schemaFieldUrn": "urn:li:schemaField:(urn:li:chart:(looker,dashboard_elements.2),calc)", + "schemaFieldUrn": "urn:li:schemaField:(urn:li:chart:(looker,ap-south-1.dashboard_elements.2),calc)", "schemaField": { "fieldPath": "calc", "nullable": false, @@ -317,7 +383,7 @@ } }, { - "schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD),dim1)", + "schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,ap-south-1.data.explore.my_view,PROD),dim1)", "schemaField": { "fieldPath": "dim1", "nullable": false, @@ -351,14 +417,14 @@ }, { "entityType": "dashboard", - "entityUrn": "urn:li:dashboard:(looker,dashboards.1)", + "entityUrn": "urn:li:dashboard:(looker,ap-south-1.dashboards.1)", "changeType": "UPSERT", "aspectName": "inputFields", "aspect": { "json": { "fields": [ { - "schemaFieldUrn": "urn:li:schemaField:(urn:li:chart:(looker,dashboard_elements.2),calc)", + "schemaFieldUrn": "urn:li:schemaField:(urn:li:chart:(looker,ap-south-1.dashboard_elements.2),calc)", "schemaField": { "fieldPath": "calc", "nullable": false, @@ -375,7 +441,7 @@ } }, { - "schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD),dim1)", + "schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,ap-south-1.data.explore.my_view,PROD),dim1)", "schemaField": { "fieldPath": "dim1", "nullable": false, @@ -409,13 +475,14 @@ }, { "entityType": "container", - "entityUrn": "urn:li:container:59a5aa45397364e6882e793f1bc77b42", + "entityUrn": "urn:li:container:63e49aaeb15b289d177acbb32625d577", "changeType": "UPSERT", "aspectName": "containerProperties", "aspect": { "json": { "customProperties": { "platform": "looker", + "instance": "ap-south-1", "env": "PROD", "model_name": "data" }, @@ -430,7 +497,7 @@ }, { "entityType": "container", - "entityUrn": "urn:li:container:59a5aa45397364e6882e793f1bc77b42", + "entityUrn": "urn:li:container:63e49aaeb15b289d177acbb32625d577", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -446,12 +513,13 @@ }, { "entityType": "container", - "entityUrn": "urn:li:container:59a5aa45397364e6882e793f1bc77b42", + "entityUrn": "urn:li:container:63e49aaeb15b289d177acbb32625d577", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { "json": { - "platform": "urn:li:dataPlatform:looker" + "platform": "urn:li:dataPlatform:looker", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)" } }, "systemMetadata": { @@ -462,7 +530,7 @@ }, { "entityType": "container", - "entityUrn": "urn:li:container:59a5aa45397364e6882e793f1bc77b42", + "entityUrn": "urn:li:container:63e49aaeb15b289d177acbb32625d577", "changeType": "UPSERT", "aspectName": "subTypes", "aspect": { @@ -480,12 +548,16 @@ }, { "entityType": "container", - "entityUrn": "urn:li:container:59a5aa45397364e6882e793f1bc77b42", + "entityUrn": "urn:li:container:63e49aaeb15b289d177acbb32625d577", "changeType": "UPSERT", "aspectName": "browsePathsV2", "aspect": { "json": { "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)" + }, { "id": "Explore" } @@ -501,7 +573,7 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,ap-south-1.data.explore.my_view,PROD)", "aspects": [ { "com.linkedin.pegasus2avro.common.BrowsePaths": { @@ -535,7 +607,7 @@ "time": 1586847600000, "actor": "urn:li:corpuser:datahub" }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.underlying_view,PROD)", + "dataset": "urn:li:dataset:(urn:li:dataPlatform:looker,ap-south-1.lkml_samples.view.underlying_view,PROD)", "type": "VIEW" } ] @@ -597,7 +669,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,ap-south-1.data.explore.my_view,PROD)", "changeType": "UPSERT", "aspectName": "subTypes", "aspect": { @@ -615,7 +687,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,ap-south-1.data.explore.my_view,PROD)", "changeType": "UPSERT", "aspectName": "embed", "aspect": { @@ -631,12 +703,12 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,ap-south-1.data.explore.my_view,PROD)", "changeType": "UPSERT", "aspectName": "container", "aspect": { "json": { - "container": "urn:li:container:59a5aa45397364e6882e793f1bc77b42" + "container": "urn:li:container:63e49aaeb15b289d177acbb32625d577" } }, "systemMetadata": { @@ -647,18 +719,22 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,ap-south-1.data.explore.my_view,PROD)", "changeType": "UPSERT", "aspectName": "browsePathsV2", "aspect": { "json": { "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)" + }, { "id": "Explore" }, { - "id": "urn:li:container:59a5aa45397364e6882e793f1bc77b42", - "urn": "urn:li:container:59a5aa45397364e6882e793f1bc77b42" + "id": "urn:li:container:63e49aaeb15b289d177acbb32625d577", + "urn": "urn:li:container:63e49aaeb15b289d177acbb32625d577" } ] } @@ -729,6 +805,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataPlatformInstance", + "entityUrn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "looker-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "tag", "entityUrn": "urn:li:tag:Dimension", diff --git a/metadata-ingestion/tests/integration/looker/test_looker.py b/metadata-ingestion/tests/integration/looker/test_looker.py index e1cedee33dcb6..fdc9c45fcf539 100644 --- a/metadata-ingestion/tests/integration/looker/test_looker.py +++ b/metadata-ingestion/tests/integration/looker/test_looker.py @@ -94,6 +94,8 @@ def test_looker_ingest(pytestconfig, tmp_path, mock_time): "client_id": "foo", "client_secret": "bar", "extract_usage_history": False, + "platform_instance": "ap-south-1", + "include_platform_instance_in_urns": True, }, }, "sink": { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_information_schema_query.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_information_schema_query.json new file mode 100644 index 0000000000000..4b9bbd06ecba6 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_information_schema_query.json @@ -0,0 +1,183 @@ +{ + "query_type": "SELECT", + "query_type_props": {}, + "query_fingerprint": "772187d1c6ce8dbed2dd1ba79975b108d4e733015ffb7bcbf9b7146e64cf9914", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS,PROD)" + ], + "out_tables": [], + "column_lineage": [ + { + "downstream": { + "table": null, + "column": "table_catalog", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "table_catalog" + } + ] + }, + { + "downstream": { + "table": null, + "column": "table_schema", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "table_schema" + } + ] + }, + { + "downstream": { + "table": null, + "column": "table_name", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "table_name" + } + ] + }, + { + "downstream": { + "table": null, + "column": "column_name", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "column_name" + } + ] + }, + { + "downstream": { + "table": null, + "column": "ordinal_position", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "ordinal_position" + } + ] + }, + { + "downstream": { + "table": null, + "column": "field_path", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS,PROD)", + "column": "field_path" + } + ] + }, + { + "downstream": { + "table": null, + "column": "is_nullable", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "is_nullable" + } + ] + }, + { + "downstream": { + "table": null, + "column": "data_type", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "data_type" + }, + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS,PROD)", + "column": "field_path" + } + ] + }, + { + "downstream": { + "table": null, + "column": "comment", + "column_type": null, + "native_column_type": null + }, + "upstreams": [] + }, + { + "downstream": { + "table": null, + "column": "is_hidden", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "is_hidden" + } + ] + }, + { + "downstream": { + "table": null, + "column": "is_partitioning_column", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "is_partitioning_column" + } + ] + }, + { + "downstream": { + "table": null, + "column": "clustering_ordinal_position", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)", + "column": "clustering_ordinal_position" + } + ] + } + ], + "debug_info": { + "confidence": 0.2, + "generalized_statement": "SELECT c.table_catalog AS table_catalog, c.table_schema AS table_schema, c.table_name AS table_name, c.column_name AS column_name, c.ordinal_position AS ordinal_position, cfp.field_path AS field_path, c.is_nullable AS is_nullable, CASE WHEN CONTAINS_SUBSTR(field_path, ?) THEN NULL ELSE c.data_type END AS data_type, description AS comment, c.is_hidden AS is_hidden, c.is_partitioning_column AS is_partitioning_column, c.clustering_ordinal_position AS clustering_ordinal_position FROM `acryl-staging-2`.`smoke_test_db_4`.INFORMATION_SCHEMA.COLUMNS AS c JOIN `acryl-staging-2`.`smoke_test_db_4`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS AS cfp ON cfp.table_name = c.table_name AND cfp.column_name = c.column_name ORDER BY table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC" + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index ed767f48cf685..e5b669329f16c 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -1171,3 +1171,34 @@ def test_sqlite_insert_into_values() -> None: dialect="sqlite", expected_file=RESOURCE_DIR / "test_sqlite_insert_into_values.json", ) + + +def test_bigquery_information_schema_query() -> None: + # Special case - the BigQuery INFORMATION_SCHEMA views are prefixed with a + # project + possibly a dataset/region, so sometimes are 4 parts instead of 3. + # https://cloud.google.com/bigquery/docs/information-schema-intro#syntax + + assert_sql_result( + """\ +select + c.table_catalog as table_catalog, + c.table_schema as table_schema, + c.table_name as table_name, + c.column_name as column_name, + c.ordinal_position as ordinal_position, + cfp.field_path as field_path, + c.is_nullable as is_nullable, + CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type, + description as comment, + c.is_hidden as is_hidden, + c.is_partitioning_column as is_partitioning_column, + c.clustering_ordinal_position as clustering_ordinal_position, +from + `acryl-staging-2`.`smoke_test_db_4`.INFORMATION_SCHEMA.COLUMNS c + join `acryl-staging-2`.`smoke_test_db_4`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name + and cfp.column_name = c.column_name +ORDER BY + table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC""", + dialect="bigquery", + expected_file=RESOURCE_DIR / "test_bigquery_information_schema_query.json", + ) diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index 81108aa7b914d..9caa5a6dec65d 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.15 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.16 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.15 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.16 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.15 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.16 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.15") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.16") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -79,7 +79,7 @@ appName("test-application") config("spark.master","spark://spark-master:7077") . -config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.13") +config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.16") . config("spark.extraListeners","datahub.spark.DatahubSparkListener") @@ -356,6 +356,9 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b + ## Changelog +### Version 0.2.16 +- Remove logging DataHub config into logs + ### Version 0.2.15 - Add Kafka emitter to emit lineage to kafka - Add File emitter to emit lineage to file diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index d64e159482c1b..52507a682a1f8 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -257,7 +257,6 @@ private synchronized SparkLineageConf loadDatahubConfig( this.appContext.setDatabricksTags(databricksTags.orElse(null)); } - log.info("Datahub configuration: {}", datahubConf.root().render()); Optional emitterConfig = initializeEmitter(datahubConf); SparkLineageConf sparkLineageConf = SparkLineageConf.toSparkLineageConf(datahubConf, appContext, emitterConfig.orElse(null)); diff --git a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh index e3aa181c58801..fe3dd8d18f699 100755 --- a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh +++ b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh @@ -39,7 +39,9 @@ jar -tvf $jarFile |\ grep -v "darwin" |\ grep -v "MetadataChangeProposal.avsc" |\ grep -v "aix" |\ - grep -v "com/sun/" + grep -v "com/sun/" |\ + grep -v "VersionInfo.java" |\ + grep -v "mime.types" if [ $? -ne 0 ]; then echo "✅ No unexpected class paths found in ${jarFile}" diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index 33598be8fc72b..acdbd7855f7b0 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -2147,6 +2147,43 @@ public void testCreateChangeTypeProposal() { opContext, secondCreateProposal, TEST_AUDIT_STAMP, false)); } + @Test + public void testExists() throws Exception { + Urn existentUrn = UrnUtils.getUrn("urn:li:corpuser:exists"); + Urn softDeletedUrn = UrnUtils.getUrn("urn:li:corpuser:softDeleted"); + Urn nonExistentUrn = UrnUtils.getUrn("urn:li:corpuser:nonExistent"); + Urn noStatusUrn = UrnUtils.getUrn("urn:li:corpuser:noStatus"); + + List> pairToIngest = new ArrayList<>(); + SystemMetadata metadata = AspectGenerationUtils.createSystemMetadata(); + + // to ensure existence + CorpUserInfo userInfoAspect = AspectGenerationUtils.createCorpUserInfo("email@test.com"); + pairToIngest.add(getAspectRecordPair(userInfoAspect, CorpUserInfo.class)); + + _entityServiceImpl.ingestAspects( + opContext, noStatusUrn, pairToIngest, TEST_AUDIT_STAMP, metadata); + + Status statusExistsAspect = new Status().setRemoved(false); + pairToIngest.add(getAspectRecordPair(statusExistsAspect, Status.class)); + + _entityServiceImpl.ingestAspects( + opContext, existentUrn, pairToIngest, TEST_AUDIT_STAMP, metadata); + + Status statusRemovedAspect = new Status().setRemoved(true); + pairToIngest.set(1, getAspectRecordPair(statusRemovedAspect, Status.class)); + + _entityServiceImpl.ingestAspects( + opContext, softDeletedUrn, pairToIngest, TEST_AUDIT_STAMP, metadata); + + Set inputUrns = Set.of(existentUrn, softDeletedUrn, nonExistentUrn, noStatusUrn); + assertEquals( + _entityServiceImpl.exists(opContext, inputUrns, false), Set.of(existentUrn, noStatusUrn)); + assertEquals( + _entityServiceImpl.exists(opContext, inputUrns, true), + Set.of(existentUrn, noStatusUrn, softDeletedUrn)); + } + @Nonnull protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email) throws Exception {