Skip to content

Commit

Permalink
fix(ingest/patch): Adding proper json patch path quoting (datahub-pro…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Feb 27, 2024
1 parent ca613ba commit d2d9661
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 113 deletions.
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def __init__(
self.audit_header = audit_header
self.patches = defaultdict(list)

# Json Patch quoting based on https://jsonpatch.com/#json-pointer
@staticmethod
def quote(value: str) -> str:
return value.replace("~", "~0").replace("/", "~1")

def _add_patch(self, aspect_name: str, op: str, path: str, value: Any) -> None:
# TODO: Validate that aspectName is a valid aspect for this entityType
self.patches[aspect_name].append(_Patch(op, path, value))
Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/src/datahub/specific/chart.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import time
from typing import Dict, List, Optional, TypeVar, Union
from urllib.parse import quote

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -159,7 +158,7 @@ def add_input_edge(self, input: Union[Edge, Urn, str]) -> "ChartPatchBuilder":
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path=f"/inputEdges/{quote(input_urn, safe='')}",
path=f"/inputEdges/{self.quote(input_urn)}",
value=input_urn,
)
return self
Expand All @@ -177,7 +176,7 @@ def remove_input_edge(self, input: Union[str, Urn]) -> "ChartPatchBuilder":
self._add_patch(
ChartInfo.ASPECT_NAME,
"remove",
path=f"/inputEdges/{input}",
path=f"/inputEdges/{self.quote(str(input))}",
value={},
)
return self
Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/src/datahub/specific/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import time
from typing import Dict, List, Optional, TypeVar, Union
from urllib.parse import quote

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -166,7 +165,7 @@ def add_dataset_edge(
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path=f"/datasetEdges/{quote(dataset_urn, safe='')}",
path=f"/datasetEdges/{MetadataPatchProposal.quote(dataset_urn)}",
value=dataset_edge,
)
return self
Expand Down Expand Up @@ -249,7 +248,7 @@ def add_chart_edge(self, chart: Union[Edge, Urn, str]) -> "DashboardPatchBuilder
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path=f"/chartEdges/{quote(chart_urn, safe='')}",
path=f"/chartEdges/{MetadataPatchProposal.quote(chart_urn)}",
value=chart_edge,
)
return self
Expand Down
17 changes: 8 additions & 9 deletions metadata-ingestion/src/datahub/specific/datajob.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import time
from typing import Dict, List, Optional, TypeVar, Union
from urllib.parse import quote

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -165,7 +164,7 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/inputDatajobEdges/{quote(input_urn, safe='')}",
path=f"/inputDatajobEdges/{MetadataPatchProposal.quote(input_urn)}",
value=input_edge,
)
return self
Expand Down Expand Up @@ -248,7 +247,7 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/inputDatasetEdges/{quote(input_urn, safe='')}",
path=f"/inputDatasetEdges/{MetadataPatchProposal.quote(input_urn)}",
value=input_edge,
)
return self
Expand All @@ -266,7 +265,7 @@ def remove_input_dataset(self, input: Union[str, Urn]) -> "DataJobPatchBuilder":
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"remove",
path=f"/inputDatasetEdges/{input}",
path=f"/inputDatasetEdges/{MetadataPatchProposal.quote(str(input))}",
value={},
)
return self
Expand Down Expand Up @@ -333,7 +332,7 @@ def add_output_dataset(
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/outputDatasetEdges/{quote(output_urn, safe='')}",
path=f"/outputDatasetEdges/{MetadataPatchProposal.quote(str(input))}",
value=output_edge,
)
return self
Expand Down Expand Up @@ -418,7 +417,7 @@ def add_input_dataset_field(
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/inputDatasetFields/{quote(input_urn, safe='')}",
path=f"/inputDatasetFields/{MetadataPatchProposal.quote(input_urn)}",
value=input_edge,
)
return self
Expand All @@ -439,7 +438,7 @@ def remove_input_dataset_field(
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"remove",
path=f"/inputDatasetFields/{quote(input_urn, safe='')}",
path=f"/inputDatasetFields/{MetadataPatchProposal.quote(input_urn)}",
value={},
)
return self
Expand Down Expand Up @@ -506,7 +505,7 @@ def add_output_dataset_field(
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/outputDatasetFields/{quote(output_urn, safe='')}",
path=f"/outputDatasetFields/{MetadataPatchProposal.quote(output_urn)}",
value=output_edge,
)
return self
Expand All @@ -527,7 +526,7 @@ def remove_output_dataset_field(
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"remove",
path=f"/outputDatasetFields/{quote(output_urn, safe='')}",
path=f"/outputDatasetFields/{MetadataPatchProposal.quote(output_urn)}",
value={},
)
return self
Expand Down
11 changes: 5 additions & 6 deletions metadata-ingestion/src/datahub/specific/dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union
from urllib.parse import quote

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -127,7 +126,7 @@ def add_upstream_lineage(self, upstream: Upstream) -> "DatasetPatchBuilder":
self._add_patch(
UpstreamLineage.ASPECT_NAME,
"add",
path=f"/upstreams/{quote(upstream.dataset, safe='')}",
path=f"/upstreams/{MetadataPatchProposal.quote(upstream.dataset)}",
value=upstream,
)
return self
Expand Down Expand Up @@ -199,17 +198,17 @@ def quote_fine_grained_downstream_path(
transform_op: str, downstream_type: str, downstream_urn: str
) -> str:
return (
f"/fineGrainedLineages/{quote(transform_op, safe='')}/downstreamType/"
f"{quote(downstream_type, safe='')}/{quote(downstream_urn, safe='')}"
f"/fineGrainedLineages/{MetadataPatchProposal.quote(transform_op)}/downstreamType/"
f"{MetadataPatchProposal.quote(downstream_type)}/{MetadataPatchProposal.quote(downstream_urn)}"
)

@staticmethod
def quote_fine_grained_upstream_path(
transform_op: str, upstream_type: str, upstream_urn: str
) -> str:
return (
f"/fineGrainedLineages/{quote(transform_op, safe='')}/upstreamType/"
f"{quote(upstream_type, safe='')}/{quote(upstream_urn, safe='')}"
f"/fineGrainedLineages/{MetadataPatchProposal.quote(transform_op)}/upstreamType/"
f"{MetadataPatchProposal.quote(upstream_type)}/{MetadataPatchProposal.quote(upstream_urn)}"
)

def remove_fine_grained_upstream_lineage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3080,7 +3080,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.dbt_postgres.an-aliased-view-for-monthly-billing%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.an-aliased-view-for-monthly-billing,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -3107,7 +3107,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.dbt_postgres.an-aliased-view-for-payments%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.an-aliased-view-for-payments,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -3134,7 +3134,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.dbt_postgres.payments_by_customer_by_month%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3431,7 +3431,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.public.an-aliased-view-for-monthly-billing%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.an-aliased-view-for-monthly-billing,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -3458,7 +3458,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.public.an_aliased_view_for_payments%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.an_aliased_view_for_payments,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -3485,7 +3485,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.public.payments_by_customer_by_month%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payments_by_customer_by_month,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -3512,7 +3512,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.public.customer_snapshot%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer_snapshot,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Ccalm-pagoda-323403.jaffle_shop.customers%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,calm-pagoda-323403.jaffle_shop.customers,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -1282,7 +1282,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Ccalm-pagoda-323403.jaffle_shop.orders%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,calm-pagoda-323403.jaffle_shop.orders,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -1309,7 +1309,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Ccalm-pagoda-323403.jaffle_shop.stg_customers%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,calm-pagoda-323403.jaffle_shop.stg_customers,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -1336,7 +1336,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Ccalm-pagoda-323403.jaffle_shop.stg_orders%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,calm-pagoda-323403.jaffle_shop.stg_orders,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -1363,7 +1363,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Ccalm-pagoda-323403.jaffle_shop.stg_payments%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,calm-pagoda-323403.jaffle_shop.stg_payments,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -1390,7 +1390,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Ccalm-pagoda-323403.jaffle_shop.raw_customers%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,calm-pagoda-323403.jaffle_shop.raw_customers,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -1417,7 +1417,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Ccalm-pagoda-323403.jaffle_shop.raw_orders%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,calm-pagoda-323403.jaffle_shop.raw_orders,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -1444,7 +1444,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Ccalm-pagoda-323403.jaffle_shop.raw_payments%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,calm-pagoda-323403.jaffle_shop.raw_payments,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2852,7 +2852,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.dbt_postgres.an-aliased-view-for-monthly-billing%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.an-aliased-view-for-monthly-billing,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -2879,7 +2879,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.dbt_postgres.an-aliased-view-for-payments%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.an-aliased-view-for-payments,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -2906,7 +2906,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.dbt_postgres.payments_by_customer_by_month%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3019,7 +3019,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cdbt-instance-1.pagila.dbt_postgres.an-aliased-view-for-monthly-billing%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,dbt-instance-1.pagila.dbt_postgres.an-aliased-view-for-monthly-billing,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -3046,7 +3046,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cdbt-instance-1.pagila.dbt_postgres.an-aliased-view-for-payments%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,dbt-instance-1.pagila.dbt_postgres.an-aliased-view-for-payments,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -3073,7 +3073,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cdbt-instance-1.pagila.dbt_postgres.payments_by_customer_by_month%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,dbt-instance-1.pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3019,7 +3019,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.dbt_postgres.an-aliased-view-for-monthly-billing%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.an-aliased-view-for-monthly-billing,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -3046,7 +3046,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.dbt_postgres.an-aliased-view-for-payments%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.an-aliased-view-for-payments,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand All @@ -3073,7 +3073,7 @@
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Adbt%2Cpagila.dbt_postgres.payments_by_customer_by_month%2CPROD%29",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"value": {
"auditStamp": {
"time": 1643871600000,
Expand Down
Loading

0 comments on commit d2d9661

Please sign in to comment.