Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add underlying platform for glue #3035

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
class GlueSourceConfig(AwsSourceConfig):

extract_transforms: Optional[bool] = True
underlying_platform: Optional[str] = None

@property
def glue_client(self):
Expand Down Expand Up @@ -80,13 +81,19 @@ def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
self.glue_client = config.glue_client
self.s3_client = config.s3_client
self.extract_transforms = config.extract_transforms
self.underlying_platform = config.underlying_platform
self.env = config.env

@classmethod
def create(cls, config_dict, ctx):
config = GlueSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_underlying_platform(self):
if self.underlying_platform in ["athena"]:
return self.underlying_platform
return "glue"

def get_all_jobs(self):
"""
List all jobs in Glue.
Expand Down Expand Up @@ -181,7 +188,7 @@ def process_dataflow_node(
full_table_name = f"{node_args['database']}.{node_args['table_name']}"

# we know that the table will already be covered when ingesting Glue tables
node_urn = f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.env})"
node_urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{full_table_name},{self.env})"

# if data object is S3 bucket
elif node_args.get("connection_type") == "s3":
Expand Down Expand Up @@ -433,7 +440,9 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

for job in self.get_all_jobs():

flow_urn = mce_builder.make_data_flow_urn("glue", job["Name"], self.env)
flow_urn = mce_builder.make_data_flow_urn(
self.get_underlying_platform(), job["Name"], self.env
)

flow_wu = self.get_dataflow_wu(flow_urn, job)
self.report.report_workunit(flow_wu)
Expand Down Expand Up @@ -536,13 +545,13 @@ def get_schema_metadata(glue_source: GlueSource) -> SchemaMetadata:
schemaName=table_name,
version=0,
fields=fields,
platform="urn:li:dataPlatform:glue",
platform=f"urn:li:dataPlatform:{self.get_underlying_platform()}",
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:glue,{table_name},{self.env})",
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{table_name},{self.env})",
aspects=[],
)

Expand Down
8 changes: 7 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
# This import verifies that the dependencies are available.
import pymysql # noqa: F401

from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource
from .sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
get_identifier_three_layer_hierarchy,
)


class MySQLConfig(BasicSQLAlchemyConfig):
# defaults
host_port = "localhost:3306"
scheme = "mysql+pymysql"

get_identifier = get_identifier_three_layer_hierarchy


class MySQLSource(SQLAlchemySource):
def __init__(self, config, ctx):
Expand Down
15 changes: 7 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
MapTypeClass,
)

from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource, register_custom_type
from .sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
get_identifier_three_layer_hierarchy,
register_custom_type,
)

register_custom_type(custom_types.ARRAY, ArrayTypeClass)
register_custom_type(custom_types.JSON, BytesTypeClass)
Expand All @@ -27,13 +32,7 @@ class PostgresConfig(BasicSQLAlchemyConfig):
# defaults
scheme = "postgresql+psycopg2"

def get_identifier(self, schema: str, table: str) -> str:
regular = f"{schema}.{table}"
if self.database_alias:
return f"{self.database_alias}.{regular}"
if self.database:
return f"{self.database}.{regular}"
return regular
get_identifier = get_identifier_three_layer_hierarchy


class PostgresSource(SQLAlchemySource):
Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ def get_sql_alchemy_url(self, uri_opts=None):
)


def get_identifier_three_layer_hierarchy(
self: BasicSQLAlchemyConfig, schema: str, table: str
) -> str:
regular = f"{schema}.{table}"
if self.database_alias:
return f"{self.database_alias}.{regular}"
if self.database:
return f"{self.database}.{regular}"
return regular


class SqlWorkUnit(MetadataWorkUnit):
pass

Expand Down
12 changes: 6 additions & 6 deletions metadata-ingestion/src/datahub/metadata/schema.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -5219,20 +5219,20 @@
"fields": [
{
"type": [
"null",
"long"
"long",
"null"
],
"name": "lastObserved",
"default": null,
"default": 0,
"doc": "The timestamp the metadata was observed at"
},
{
"type": [
"null",
"string"
"string",
"null"
],
"name": "runId",
"default": null,
"default": "no-run-id-provided",
"doc": "The run id that produced the metadata"
},
{
Expand Down
24 changes: 16 additions & 8 deletions metadata-ingestion/src/datahub/metadata/schema_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7381,14 +7381,22 @@ class SystemMetadataClass(DictWrapper):

RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.mxe.SystemMetadata")
def __init__(self,
lastObserved: Union[None, int]=None,
runId: Union[None, str]=None,
lastObserved: Optional[Union[int, None]]=None,
runId: Optional[Union[str, None]]=None,
properties: Union[None, Dict[str, str]]=None,
):
super().__init__()

self.lastObserved = lastObserved
self.runId = runId
if lastObserved is None:
# default: 0
self.lastObserved = self.RECORD_SCHEMA.field_map["lastObserved"].default
else:
self.lastObserved = lastObserved
if runId is None:
# default: 'no-run-id-provided'
self.runId = self.RECORD_SCHEMA.field_map["runId"].default
else:
self.runId = runId
self.properties = properties

@classmethod
Expand All @@ -7405,23 +7413,23 @@ def _restore_defaults(self) -> None:


@property
def lastObserved(self) -> Union[None, int]:
def lastObserved(self) -> Union[int, None]:
"""Getter: The timestamp the metadata was observed at"""
return self._inner_dict.get('lastObserved') # type: ignore

@lastObserved.setter
def lastObserved(self, value: Union[None, int]) -> None:
def lastObserved(self, value: Union[int, None]) -> None:
"""Setter: The timestamp the metadata was observed at"""
self._inner_dict['lastObserved'] = value


@property
def runId(self) -> Union[None, str]:
def runId(self) -> Union[str, None]:
"""Getter: The run id that produced the metadata"""
return self._inner_dict.get('runId') # type: ignore

@runId.setter
def runId(self, value: Union[None, str]) -> None:
def runId(self, value: Union[str, None]) -> None:
"""Setter: The run id that produced the metadata"""
self._inner_dict['runId'] = value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5163,20 +5163,20 @@
{
"name": "lastObserved",
"type": [
"null",
"long"
"long",
"null"
],
"doc": "The timestamp the metadata was observed at",
"default": null
"default": 0
},
{
"name": "runId",
"type": [
"null",
"string"
"string",
"null"
],
"doc": "The run id that produced the metadata",
"default": null
"default": "no-run-id-provided"
},
{
"name": "properties",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,20 +185,20 @@
{
"name": "lastObserved",
"type": [
"null",
"long"
"long",
"null"
],
"doc": "The timestamp the metadata was observed at",
"default": null
"default": 0
},
{
"name": "runId",
"type": [
"null",
"string"
"string",
"null"
],
"doc": "The run id that produced the metadata",
"default": null
"default": "no-run-id-provided"
},
{
"name": "properties",
Expand Down
26 changes: 26 additions & 0 deletions metadata-ingestion/tests/unit/test_glue_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,29 @@ def test_glue_ingest(tmp_path, pytestconfig):
output_path=tmp_path / "glue_mces.json",
golden_path=test_resources_dir / "glue_mces_golden.json",
)


def test_underlying_platform_takes_precendence():
source = GlueSource(
ctx=PipelineContext(run_id="glue-source-test"),
config=GlueSourceConfig(aws_region="us-west-2", underlying_platform="athena"),
)
assert source.get_underlying_platform() == "athena"


def test_underlying_platform_cannot_be_other_than_athena():
source = GlueSource(
ctx=PipelineContext(run_id="glue-source-test"),
config=GlueSourceConfig(
aws_region="us-west-2", underlying_platform="data-warehouse"
),
)
assert source.get_underlying_platform() == "glue"


def test_without_underlying_platform():
source = GlueSource(
ctx=PipelineContext(run_id="glue-source-test"),
config=GlueSourceConfig(aws_region="us-west-2"),
)
assert source.get_underlying_platform() == "glue"