Skip to content

Commit

Permalink
fix(ingest): patch lookml types and refactor ingestion sources layout (
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinhu authored Jul 26, 2021
1 parent 5d396b1 commit 662017e
Show file tree
Hide file tree
Showing 39 changed files with 118 additions and 82 deletions.
30 changes: 15 additions & 15 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,29 +198,29 @@ def get_long_description():
"console_scripts": ["datahub = datahub.entrypoints:main"],
"datahub.ingestion.source.plugins": [
"file = datahub.ingestion.source.file:GenericFileSource",
"sqlalchemy = datahub.ingestion.source.sql_generic:SQLAlchemyGenericSource",
"athena = datahub.ingestion.source.athena:AthenaSource",
"bigquery = datahub.ingestion.source.bigquery:BigQuerySource",
"bigquery-usage = datahub.ingestion.source.bigquery_usage:BigQueryUsageSource",
"sqlalchemy = datahub.ingestion.source.sql.sql_generic:SQLAlchemyGenericSource",
"athena = datahub.ingestion.source.sql.athena:AthenaSource",
"bigquery = datahub.ingestion.source.sql.bigquery:BigQuerySource",
"bigquery-usage = datahub.ingestion.source.usage.bigquery_usage:BigQueryUsageSource",
"dbt = datahub.ingestion.source.dbt:DBTSource",
"druid = datahub.ingestion.source.druid:DruidSource",
"druid = datahub.ingestion.source.sql.druid:DruidSource",
"feast = datahub.ingestion.source.feast:FeastSource",
"glue = datahub.ingestion.source.glue:GlueSource",
"sagemaker = datahub.ingestion.source.sagemaker:SagemakerSource",
"hive = datahub.ingestion.source.hive:HiveSource",
"glue = datahub.ingestion.source.aws.glue:GlueSource",
"sagemaker = datahub.ingestion.source.aws.sagemaker:SagemakerSource",
"hive = datahub.ingestion.source.sql.hive:HiveSource",
"kafka = datahub.ingestion.source.kafka:KafkaSource",
"kafka-connect = datahub.ingestion.source.kafka_connect:KafkaConnectSource",
"ldap = datahub.ingestion.source.ldap:LDAPSource",
"looker = datahub.ingestion.source.looker:LookerDashboardSource",
"lookml = datahub.ingestion.source.lookml:LookMLSource",
"mongodb = datahub.ingestion.source.mongodb:MongoDBSource",
"mssql = datahub.ingestion.source.mssql:SQLServerSource",
"mysql = datahub.ingestion.source.mysql:MySQLSource",
"oracle = datahub.ingestion.source.oracle:OracleSource",
"postgres = datahub.ingestion.source.postgres:PostgresSource",
"redshift = datahub.ingestion.source.redshift:RedshiftSource",
"snowflake = datahub.ingestion.source.snowflake:SnowflakeSource",
"snowflake-usage = datahub.ingestion.source.snowflake_usage:SnowflakeUsageSource",
"mssql = datahub.ingestion.source.sql.mssql:SQLServerSource",
"mysql = datahub.ingestion.source.sql.mysql:MySQLSource",
"oracle = datahub.ingestion.source.sql.oracle:OracleSource",
"postgres = datahub.ingestion.source.sql.postgres:PostgresSource",
"redshift = datahub.ingestion.source.sql.redshift:RedshiftSource",
"snowflake = datahub.ingestion.source.sql.snowflake:SnowflakeSource",
"snowflake-usage = datahub.ingestion.source.usage.snowflake_usage:SnowflakeUsageSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
],
"datahub.ingestion.sink.plugins": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws_common import AwsSourceConfig, make_s3_urn
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig, make_s3_urn
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sagemaker_processors.common import (
from datahub.ingestion.source.aws.sagemaker_processors.common import (
SagemakerSourceConfig,
SagemakerSourceReport,
)
from datahub.ingestion.source.sagemaker_processors.feature_groups import (
from datahub.ingestion.source.aws.sagemaker_processors.feature_groups import (
FeatureGroupProcessor,
)
from datahub.ingestion.source.sagemaker_processors.jobs import (
from datahub.ingestion.source.aws.sagemaker_processors.jobs import (
JobKey,
JobProcessor,
ModelJob,
)
from datahub.ingestion.source.sagemaker_processors.lineage import LineageProcessor
from datahub.ingestion.source.sagemaker_processors.models import ModelProcessor
from datahub.ingestion.source.aws.sagemaker_processors.lineage import LineageProcessor
from datahub.ingestion.source.aws.sagemaker_processors.models import ModelProcessor


class SagemakerSource(Source):
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Dict, Optional, Union

from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig


class SagemakerSourceConfig(AwsSourceConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sagemaker_processors.common import SagemakerSourceReport
from datahub.ingestion.source.aws.sagemaker_processors.common import (
SagemakerSourceReport,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
MLFeatureSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

from datahub.emitter import mce_builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws_common import make_s3_urn
from datahub.ingestion.source.sagemaker_processors.common import SagemakerSourceReport
from datahub.ingestion.source.aws.aws_common import make_s3_urn
from datahub.ingestion.source.aws.sagemaker_processors.common import (
SagemakerSourceReport,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from dataclasses import dataclass, field
from typing import Any, DefaultDict, Dict, List, Set

from datahub.ingestion.source.sagemaker_processors.common import SagemakerSourceReport
from datahub.ingestion.source.aws.sagemaker_processors.common import (
SagemakerSourceReport,
)


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sagemaker_processors.common import SagemakerSourceReport
from datahub.ingestion.source.sagemaker_processors.jobs import (
from datahub.ingestion.source.aws.sagemaker_processors.common import (
SagemakerSourceReport,
)
from datahub.ingestion.source.aws.sagemaker_processors.jobs import (
JobDirection,
JobKey,
ModelJob,
)
from datahub.ingestion.source.sagemaker_processors.lineage import LineageInfo
from datahub.ingestion.source.aws.sagemaker_processors.lineage import LineageInfo
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
MLModelDeploymentSnapshot,
MLModelGroupSnapshot,
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.dbt_types import (
from datahub.ingestion.source.sql.sql_types import (
POSTGRES_TYPES_MAP,
SNOWFLAKE_TYPES_MAP,
resolve_postgres_modified_type,
Expand Down
91 changes: 56 additions & 35 deletions metadata-ingestion/src/datahub/ingestion/source/lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_types import (
POSTGRES_TYPES_MAP,
SNOWFLAKE_TYPES_MAP,
resolve_postgres_modified_type,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
Expand Down Expand Up @@ -420,6 +425,46 @@ def get_including_extends(
return None


field_type_mapping = {
**POSTGRES_TYPES_MAP,
**SNOWFLAKE_TYPES_MAP,
"date": DateTypeClass,
"date_time": TimeTypeClass,
"date_millisecond": TimeTypeClass,
"date_minute": TimeTypeClass,
"date_raw": TimeTypeClass,
"date_week": TimeTypeClass,
"duration_day": TimeTypeClass,
"distance": NumberTypeClass,
"duration": NumberTypeClass,
"location": UnionTypeClass,
"number": NumberTypeClass,
"string": StringTypeClass,
"tier": EnumTypeClass,
"time": TimeTypeClass,
"unquoted": StringTypeClass,
"yesno": BooleanTypeClass,
"zipcode": EnumTypeClass,
"int": NumberTypeClass,
"average": NumberTypeClass,
"average_distinct": NumberTypeClass,
"count": NumberTypeClass,
"count_distinct": NumberTypeClass,
"list": ArrayTypeClass,
"max": NumberTypeClass,
"median": NumberTypeClass,
"median_distinct": NumberTypeClass,
"min": NumberTypeClass,
"percent_of_previous": NumberTypeClass,
"percent_of_total": NumberTypeClass,
"percentile": NumberTypeClass,
"percentile_distinct": NumberTypeClass,
"running_total": NumberTypeClass,
"sum": NumberTypeClass,
"sum_distinct": NumberTypeClass,
}


class LookMLSource(Source):
source_config: LookMLSourceConfig
reporter: LookMLSourceReport
Expand Down Expand Up @@ -485,46 +530,22 @@ def _get_upstream_lineage(self, looker_view: LookerView) -> UpstreamLineage:
return upstream_lineage

def _get_field_type(self, native_type: str) -> SchemaFieldDataType:
field_type_mapping = {
"date": DateTypeClass,
"date_time": TimeTypeClass,
"distance": NumberTypeClass,
"duration": NumberTypeClass,
"location": UnionTypeClass,
"number": NumberTypeClass,
"string": StringTypeClass,
"tier": EnumTypeClass,
"time": TimeTypeClass,
"unquoted": StringTypeClass,
"yesno": BooleanTypeClass,
"zipcode": EnumTypeClass,
"int": NumberTypeClass,
"average": NumberTypeClass,
"average_distinct": NumberTypeClass,
"count": NumberTypeClass,
"count_distinct": NumberTypeClass,
"list": ArrayTypeClass,
"max": NumberTypeClass,
"median": NumberTypeClass,
"median_distinct": NumberTypeClass,
"min": NumberTypeClass,
"percent_of_previous": NumberTypeClass,
"percent_of_total": NumberTypeClass,
"percentile": NumberTypeClass,
"percentile_distinct": NumberTypeClass,
"running_total": NumberTypeClass,
"sum": NumberTypeClass,
"sum_distinct": NumberTypeClass,
}

if native_type in field_type_mapping:
type_class = field_type_mapping[native_type]
else:

type_class = field_type_mapping.get(native_type)

if type_class is None:

# attempt Postgres modified type
type_class = resolve_postgres_modified_type(native_type)

# if still not found, report a warning
if type_class is None:
self.reporter.report_warning(
native_type,
f"The type '{native_type}' is not recognized for field type, setting as NullTypeClass.",
)
type_class = NullTypeClass

data_type = SchemaFieldDataType(type=type_class())
return data_type

Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp

from datahub.ingestion.source.sql_common import (
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
register_custom_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from sqlalchemy_redshift.dialect import RedshiftDialect, RelationKey

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.postgres import PostgresConfig
from datahub.ingestion.source.sql_common import SQLAlchemySource
from datahub.ingestion.source.sql.postgres import PostgresConfig
from datahub.ingestion.source.sql.sql_common import SQLAlchemySource

# TRICKY: it's necessary to import the Postgres source because
# that module has some side effects that we care about here.
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import UsageStatsWorkUnit
from datahub.ingestion.source.usage_common import (
from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
get_time_bucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import UsageStatsWorkUnit
from datahub.ingestion.source.snowflake import BaseSnowflakeConfig
from datahub.ingestion.source.usage_common import (
from datahub.ingestion.source.sql.snowflake import BaseSnowflakeConfig
from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
get_time_bucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.bigquery_usage import (
from datahub.ingestion.source.usage.bigquery_usage import (
BigQueryUsageConfig,
BigQueryUsageSource,
)
Expand Down Expand Up @@ -54,7 +54,7 @@ def test_bq_usage_source(pytestconfig, tmp_path):
logs.write(log_entries)

with unittest.mock.patch(
"datahub.ingestion.source.bigquery_usage.GCPLoggingClient", autospec=True
"datahub.ingestion.source.usage.bigquery_usage.GCPLoggingClient", autospec=True
) as MockClient:
# Add mock BigQuery API responses.
with bigquery_reference_logs_path.open() as logs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from freezegun import freeze_time

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.sagemaker import SagemakerSource, SagemakerSourceConfig
from datahub.ingestion.source.sagemaker_processors.jobs import SAGEMAKER_JOB_TYPES
from datahub.ingestion.source.aws.sagemaker import (
SagemakerSource,
SagemakerSourceConfig,
)
from datahub.ingestion.source.aws.sagemaker_processors.jobs import SAGEMAKER_JOB_TYPES
from tests.test_helpers import mce_helpers
from tests.unit.test_sagemaker_source_stubs import (
describe_endpoint_response_1,
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_druid_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

@pytest.mark.integration
def test_druid_uri():
from datahub.ingestion.source.druid import DruidConfig
from datahub.ingestion.source.sql.druid import DruidConfig

config = DruidConfig.parse_obj({"host_port": "localhost:8082"})

Expand Down
6 changes: 5 additions & 1 deletion metadata-ingestion/tests/unit/test_glue_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from freezegun import freeze_time

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.glue import GlueSource, GlueSourceConfig, get_column_type
from datahub.ingestion.source.aws.glue import (
GlueSource,
GlueSourceConfig,
get_column_type,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
MapTypeClass,
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_hive_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

@pytest.mark.integration
def test_hive_configuration_get_identifier_with_database():
from datahub.ingestion.source.hive import HiveConfig
from datahub.ingestion.source.sql.hive import HiveConfig

test_db_name = "test_database"
test_schema_name = "test_schema"
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/tests/unit/test_oracle_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.oracle import OracleConfig, OracleSource
from datahub.ingestion.source.sql.oracle import OracleConfig, OracleSource


def test_oracle_config():
Expand Down Expand Up @@ -34,7 +34,7 @@ def test_oracle_config():
)

with unittest.mock.patch(
"datahub.ingestion.source.sql_common.SQLAlchemySource.get_workunits"
"datahub.ingestion.source.sql.sql_common.SQLAlchemySource.get_workunits"
):
OracleSource.create(
{
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_postgres_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datahub.ingestion.source.postgres import PostgresConfig
from datahub.ingestion.source.sql.postgres import PostgresConfig


def _base_config():
Expand Down
Loading

0 comments on commit 662017e

Please sign in to comment.