Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): refactor mlModel grouping and add browsepaths #2929

Merged
merged 52 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
4753f74
Init endpoint models
kevinhu Jul 15, 2021
3c8e7e4
Init lineage extractor
kevinhu Jul 15, 2021
ef2f664
Refactor model endpoint extraction
kevinhu Jul 15, 2021
f2e1d50
Add model groups
kevinhu Jul 15, 2021
057a455
Add model group lineage extraction
kevinhu Jul 15, 2021
393f8ec
Init endpoint stubs
kevinhu Jul 15, 2021
a6e2173
Successful endpoint ingestion
kevinhu Jul 15, 2021
821994a
Endpoint lineage stubs
kevinhu Jul 15, 2021
4612045
Add endpoint field to model schema
kevinhu Jul 16, 2021
18a9001
Stubs for model group lineage
kevinhu Jul 16, 2021
4eda319
Ingest model-group lineage
kevinhu Jul 16, 2021
77640af
Formatting
kevinhu Jul 16, 2021
4ad6638
More comments
kevinhu Jul 16, 2021
0220e7f
Refactor status types
kevinhu Jul 16, 2021
11546c2
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 16, 2021
61db239
Merge
kevinhu Jul 16, 2021
160edab
Fix context typo
kevinhu Jul 16, 2021
0ae0ed8
Init model group entity
kevinhu Jul 16, 2021
698940c
Ingest model groups
kevinhu Jul 16, 2021
d8931db
Reorder models and groups
kevinhu Jul 16, 2021
1fc4461
Successful model group lineage ingestion
kevinhu Jul 16, 2021
7c3628f
Ingest dataowners
kevinhu Jul 16, 2021
7f1bf48
Sort
kevinhu Jul 16, 2021
4875083
Ingest model group description
kevinhu Jul 16, 2021
581dfd9
Init hyperparams and metrics aspects
kevinhu Jul 16, 2021
471f8c5
External links for Glue jobs
kevinhu Jul 17, 2021
4469e37
SageMaker job URLs
kevinhu Jul 17, 2021
4cf997b
Add external URLs to models
kevinhu Jul 17, 2021
2a36fb6
Ingest model URLs
kevinhu Jul 17, 2021
444525e
Hyperparam ingestion
kevinhu Jul 17, 2021
78daddc
PR updates
kevinhu Jul 19, 2021
a6aaef5
Rename endpoint -> deployment
kevinhu Jul 19, 2021
c44bff2
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 19, 2021
9a72890
Metrics ingestion
kevinhu Jul 19, 2021
5713172
Merge
kevinhu Jul 19, 2021
02b1289
Merge
kevinhu Jul 19, 2021
063e247
Add external URLs for endpoints
kevinhu Jul 19, 2021
cd43b58
Ingest endpoint URLs
kevinhu Jul 19, 2021
54b1496
Remove unnecessary aspects
kevinhu Jul 19, 2021
dea8883
Set hyperparameter type to string
kevinhu Jul 19, 2021
c7e85fb
Strip quotes
kevinhu Jul 19, 2021
734a9f4
Update job hashing
kevinhu Jul 21, 2021
7e31015
Update types
kevinhu Jul 21, 2021
f0e03c0
Add return types
kevinhu Jul 21, 2021
35f2752
Merge branch 'linkedin:master' into sagemaker-model-metrics
kevinhu Jul 21, 2021
54cfa2f
Init browsepaths refactor
kevinhu Jul 22, 2021
bb31009
Refactor groups by model
kevinhu Jul 22, 2021
df51562
Add browse paths for models and groups
kevinhu Jul 22, 2021
4b1c29f
Update goldens
kevinhu Jul 22, 2021
5735aa5
isort
kevinhu Jul 22, 2021
85ae2bb
Merge
kevinhu Jul 22, 2021
fa8997c
Handle edge case for group-less model
kevinhu Jul 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2598,6 +2598,20 @@
"name" : "UsedBy"
}
}
}, {
"name" : "groups",
"type" : {
"type" : "array",
"items" : "com.linkedin.common.Urn"
},
"doc" : "Groups the model belongs to",
"optional" : true,
"Relationship" : {
"/*" : {
"entityTypes" : [ "mlModelGroup" ],
"name" : "MemberOf"
}
}
} ],
"Aspect" : {
"name" : "mlModelProperties"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2998,6 +2998,20 @@
"name" : "UsedBy"
}
}
}, {
"name" : "groups",
"type" : {
"type" : "array",
"items" : "com.linkedin.common.Urn"
},
"doc" : "Groups the model belongs to",
"optional" : true,
"Relationship" : {
"/*" : {
"entityTypes" : [ "mlModelGroup" ],
"name" : "MemberOf"
}
}
} ],
"Aspect" : {
"name" : "mlModelProperties"
Expand Down Expand Up @@ -3794,20 +3808,6 @@
"type" : "com.linkedin.common.VersionTag",
"doc" : "Version of the MLModelGroup",
"optional" : true
}, {
"name" : "models",
"type" : {
"type" : "array",
"items" : "com.linkedin.common.Urn"
},
"doc" : "Models in the group",
"default" : [ ],
"Relationship" : {
"/*" : {
"entityTypes" : [ "mlModel" ],
"name" : "Includes"
}
}
} ],
"Aspect" : {
"name" : "mlModelGroupProperties"
Expand Down
14 changes: 14 additions & 0 deletions gms/api/src/main/snapshot/com.linkedin.ml.mlModels.snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,20 @@
"name" : "UsedBy"
}
}
}, {
"name" : "groups",
"type" : {
"type" : "array",
"items" : "com.linkedin.common.Urn"
},
"doc" : "Groups the model belongs to",
"optional" : true,
"Relationship" : {
"/*" : {
"entityTypes" : [ "mlModelGroup" ],
"name" : "MemberOf"
}
}
} ],
"Aspect" : {
"name" : "mlModelProperties"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ class LineageInfo:
)

# map from group ARNs to model URIs
group_model_uris: DefaultDict[str, Set[str]] = field(
model_uri_to_groups: DefaultDict[str, Set[str]] = field(
default_factory=lambda: defaultdict(set)
)
# map from group ARNs to model images
group_model_images: DefaultDict[str, Set[str]] = field(
model_image_to_groups: DefaultDict[str, Set[str]] = field(
default_factory=lambda: defaultdict(set)
)

Expand Down Expand Up @@ -203,8 +203,8 @@ def get_model_group_lineage(
and source_uri is not None
):

self.lineage_info.group_model_uris[model_group_arn].add(
source_uri
self.lineage_info.model_uri_to_groups[source_uri].add(
model_group_arn
)

# add model_group_arn -> model_image mapping
Expand All @@ -213,8 +213,8 @@ def get_model_group_lineage(
and source_uri is not None
):

self.lineage_info.group_model_images[model_group_arn].add(
source_uri
self.lineage_info.model_image_to_groups[source_uri].add(
model_group_arn
)

def get_lineage(self) -> LineageInfo:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DeploymentStatusClass,
MLHyperParamClass,
MLMetricClass,
Expand Down Expand Up @@ -66,6 +67,8 @@ class ModelProcessor:
# map from model image path to model name
model_image_to_name: Dict[str, str] = field(default_factory=dict)

group_arn_to_name: Dict[str, str] = field(default_factory=dict)

def get_all_models(self) -> List[Dict[str, Any]]:
"""
List all models in SageMaker.
Expand Down Expand Up @@ -215,6 +218,54 @@ def get_model_endpoints(

return model_endpoints_sorted

def get_group_wu(self, group_details: Dict[str, Any]) -> MetadataWorkUnit:
"""
Get a workunit for a model group.
"""

# params to remove since we extract them
redundant_fields = {"ModelPackageGroupName", "CreationTime"}

group_arn = group_details["ModelPackageGroupArn"]
group_name = group_details["ModelPackageGroupName"]

self.group_arn_to_name[group_arn] = group_name

owners = []

if group_details.get("CreatedBy", {}).get("UserProfileName") is not None:
owners.append(
OwnerClass(
owner=group_details["CreatedBy"]["UserProfileName"],
type=OwnershipTypeClass.DATAOWNER,
)
)

group_snapshot = MLModelGroupSnapshot(
urn=builder.make_ml_model_group_urn("sagemaker", group_name, self.env),
aspects=[
MLModelGroupPropertiesClass(
createdAt=int(
group_details.get("CreationTime", datetime.now()).timestamp()
* 1000
),
description=group_details.get("ModelPackageGroupDescription"),
customProperties={
key: str(value)
for key, value in group_details.items()
if key not in redundant_fields
},
),
OwnershipClass(owners),
BrowsePathsClass(paths=[f"sagemaker/{group_name}"]),
],
)

# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=group_snapshot)

return MetadataWorkUnit(id=group_name, mce=mce)

def match_model_jobs(
self, model_details: Dict[str, Any]
) -> Tuple[Set[str], Set[str], List[MLHyperParamClass], List[MLMetricClass]]:
Expand Down Expand Up @@ -333,6 +384,28 @@ def get_model_wu(
model_metrics,
) = self.match_model_jobs(model_details)

# resolve groups that the model is a part of
model_uri_groups = self.lineage.model_uri_to_groups.get(model_uri, set())
model_image_groups = self.lineage.model_image_to_groups.get(model_image, set())

model_group_arns = model_uri_groups | model_image_groups

model_group_names = sorted(
[self.group_arn_to_name[x] for x in model_group_arns]
)
model_group_urns = [
builder.make_ml_model_group_urn("sagemaker", x, self.env)
for x in model_group_names
]

model_browsepaths = [
kevinhu marked this conversation as resolved.
Show resolved Hide resolved
f"sagemaker/{x}/{model_details['ModelName']}" for x in model_group_names
]

# if model is not in any groups, set a single browsepath with the model as the first entity
if not model_browsepaths:
model_browsepaths.append(f"sagemaker/{model_details['ModelName']}")

model_snapshot = MLModelSnapshot(
urn=builder.make_ml_model_urn(
"sagemaker", model_details["ModelName"], self.env
Expand All @@ -359,7 +432,9 @@ def get_model_wu(
externalUrl=f"https://{self.aws_region}.console.aws.amazon.com/sagemaker/home?region={self.aws_region}#/models/{model_details['ModelName']}",
hyperParams=model_hyperparams,
trainingMetrics=model_metrics,
)
groups=model_group_urns,
),
BrowsePathsClass(paths=model_browsepaths),
],
)

Expand All @@ -371,76 +446,6 @@ def get_model_wu(
mce=mce,
)

def get_group_wu(self, group_details: Dict[str, Any]) -> MetadataWorkUnit:
"""
Get a workunit for a model group.
"""

# params to remove since we extract them
redundant_fields = {"ModelPackageGroupName", "CreationTime"}

group_arn = group_details["ModelPackageGroupArn"]

group_model_names = set()

if group_arn in self.lineage.group_model_uris:
model_uris = self.lineage.group_model_uris[group_arn]
group_model_names |= {
self.model_uri_to_name[x]
for x in model_uris
if x in self.model_uri_to_name
}

if group_arn in self.lineage.group_model_images:
model_images = self.lineage.group_model_images[group_arn]
group_model_names |= {
self.model_image_to_name[x]
for x in model_images
if x in self.model_uri_to_name
}

owners = []

if group_details.get("CreatedBy", {}).get("UserProfileName") is not None:
owners.append(
OwnerClass(
owner=group_details["CreatedBy"]["UserProfileName"],
type=OwnershipTypeClass.DATAOWNER,
)
)

group_snapshot = MLModelGroupSnapshot(
urn=builder.make_ml_model_group_urn(
"sagemaker", group_details["ModelPackageGroupName"], self.env
),
aspects=[
MLModelGroupPropertiesClass(
createdAt=int(
group_details.get("CreationTime", datetime.now()).timestamp()
* 1000
),
description=group_details.get("ModelPackageGroupDescription"),
customProperties={
key: str(value)
for key, value in group_details.items()
if key not in redundant_fields
},
models=sorted(
[
builder.make_ml_model_urn("sagemaker", model_name, self.env)
for model_name in group_model_names
]
),
),
OwnershipClass(owners),
],
)

# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=group_snapshot)

return MetadataWorkUnit(id=f'{group_details["ModelPackageGroupName"]}', mce=mce)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:

endpoints = self.get_all_endpoints()
Expand All @@ -463,19 +468,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.report.report_workunit(wu)
yield wu

models = self.get_all_models()
# sort models for consistency
models = sorted(models, key=lambda x: x["ModelArn"])

for model in models:

model_details = self.get_model_details(model["ModelName"])

self.report.report_model_scanned()
wu = self.get_model_wu(model_details, endpoint_arn_to_name)
self.report.report_workunit(wu)
yield wu

groups = self.get_all_groups()
# sort groups for consistency
groups = sorted(groups, key=lambda x: x["ModelPackageGroupName"])
Expand All @@ -489,3 +481,16 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
wu = self.get_group_wu(group_details)
self.report.report_workunit(wu)
yield wu

models = self.get_all_models()
# sort models for consistency
models = sorted(models, key=lambda x: x["ModelArn"])

for model in models:

model_details = self.get_model_details(model["ModelName"])

self.report.report_model_scanned()
wu = self.get_model_wu(model_details, endpoint_arn_to_name)
self.report.report_workunit(wu)
yield wu
37 changes: 20 additions & 17 deletions metadata-ingestion/src/datahub/metadata/schema.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -3608,6 +3608,26 @@
"name": "downstreamJobs",
"default": null,
"doc": "List of jobs (if any) that use the model"
},
{
"Relationship": {
"/*": {
"entityTypes": [
"mlModelGroup"
],
"name": "MemberOf"
}
},
"type": [
"null",
{
"type": "array",
"items": "string"
}
],
"name": "groups",
"default": null,
"doc": "Groups the model belongs to"
}
],
"doc": "Properties associated with a ML Model"
Expand Down Expand Up @@ -4902,23 +4922,6 @@
"name": "version",
"default": null,
"doc": "Version of the MLModelGroup"
},
{
"Relationship": {
"/*": {
"entityTypes": [
"mlModel"
],
"name": "Includes"
}
},
"type": {
"type": "array",
"items": "string"
},
"name": "models",
"default": [],
"doc": "Models in the group"
}
],
"doc": "Properties associated with an ML Model Group"
Expand Down
Loading