Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): sql parsing aggregator #9786

Merged
merged 53 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
e46dce1
init
hsheth2 Jan 30, 2024
96b74dd
start impl
hsheth2 Jan 30, 2024
d98cc26
more of the shell
hsheth2 Jan 30, 2024
2f796cf
add query subtypes
hsheth2 Jan 30, 2024
6e4c0a3
remove query subtypes + add query type props
hsheth2 Jan 30, 2024
d8a3a91
fixup temp table detection
hsheth2 Jan 31, 2024
3426032
thinking about data structures
hsheth2 Jan 31, 2024
2d666b0
tweaks
hsheth2 Jan 31, 2024
47f5248
add query generalization + fingerprinting method
hsheth2 Feb 1, 2024
30cd1a0
add query_fingerprint to result object
hsheth2 Feb 1, 2024
5bb36be
start queries
hsheth2 Feb 1, 2024
dbbac2e
refactor: move sqlglot into sql_parsing package
hsheth2 Feb 1, 2024
7119b73
more refactoring
hsheth2 Feb 2, 2024
2b16a18
more refactoring
hsheth2 Feb 2, 2024
6042e42
refactor utils
hsheth2 Feb 2, 2024
06b1217
forgot file
hsheth2 Feb 2, 2024
812d9a4
make naming consistent
hsheth2 Feb 2, 2024
3f78a8b
strictness
hsheth2 Feb 2, 2024
ac6951f
fix subtle bug in `mark_dirty` implementation of FileBackedDict
hsheth2 Feb 2, 2024
7682997
initial lineage gen
hsheth2 Feb 2, 2024
f3dd139
setup initial test
hsheth2 Feb 2, 2024
3e83faf
fix edge case with INSERT INTO statements
hsheth2 Feb 2, 2024
f25206a
confidence score + details
hsheth2 Feb 2, 2024
5d238e4
emit queries
hsheth2 Feb 2, 2024
5f4b70a
fix bug in urn tracking of schema resolver
hsheth2 Feb 3, 2024
8b98f9c
add infer_output_schema helper
hsheth2 Feb 5, 2024
6f729a8
start schema resolver interface
hsheth2 Feb 5, 2024
71c5bdb
use SchemaResolverInterface + temp table schemas
hsheth2 Feb 5, 2024
58a21d7
lint fixes + start recursion
hsheth2 Feb 5, 2024
8c5211a
correct imports
hsheth2 Feb 5, 2024
879b54a
use query models
hsheth2 Feb 6, 2024
2c6e26a
add ordered set
hsheth2 Feb 6, 2024
00e35ee
working on stuff
hsheth2 Feb 6, 2024
7926eaa
fix bug in temp schema resolver
hsheth2 Feb 6, 2024
5af9e6b
make temp tables work
hsheth2 Feb 6, 2024
efa2f3b
fix mypy issues
hsheth2 Feb 6, 2024
b0e5dc4
fix lint
hsheth2 Feb 6, 2024
1f9a03d
fix airflow
hsheth2 Feb 6, 2024
48bd62c
fix python 3.8 compat
hsheth2 Feb 6, 2024
e9766f0
fix lint
hsheth2 Feb 6, 2024
f741044
fix composite queries + freeze time
hsheth2 Feb 6, 2024
196d687
fix mypy
hsheth2 Feb 6, 2024
ffad682
fix airflow bug
hsheth2 Feb 6, 2024
7c16911
normalize kind
hsheth2 Feb 6, 2024
fcde9ca
simplify interface
hsheth2 Feb 6, 2024
b148fe4
add operations test
hsheth2 Feb 6, 2024
1a69960
add reporting
hsheth2 Feb 7, 2024
6eeb730
start adding query log
hsheth2 Feb 7, 2024
f7a560a
add support for view defs
hsheth2 Feb 7, 2024
221cb03
fix close ordering for shared conns
hsheth2 Feb 7, 2024
fa02f1c
add special case for copy / alter table operations + refactor temp sc…
hsheth2 Feb 8, 2024
0ec2251
conditionally include query urn
hsheth2 Feb 9, 2024
68829b0
review
hsheth2 Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import (
get_platform_from_sqlalchemy_uri,
)
from datahub.utilities.sqlglot_lineage import (
from datahub.sql_parsing.sqlglot_lineage import (
SqlParsingResult,
create_lineage_sql_parsed_result,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ def _get_dependencies(
job_id=task.external_task_id,
data_flow_urn=str(
DataFlowUrn.create_from_ids(
orchestrator=flow_urn.get_orchestrator_name(),
orchestrator=flow_urn.orchestrator,
flow_id=task.external_dag_id,
env=flow_urn.get_env(),
env=flow_urn.cluster,
)
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
)
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult
from datahub.telemetry import telemetry
from datahub.utilities.sqlglot_lineage import SqlParsingResult
from openlineage.airflow.listener import TaskHolder
from openlineage.airflow.utils import redact_with_exclusions
from openlineage.client.serde import Serde
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ def entities_to_dataset_urn_list(iolets: List[str]) -> List[DatasetUrn]:
dataset_urn_list: List[DatasetUrn] = []
for let in iolets:
if guess_entity_type(let) == "dataset":
dataset_urn_list.append(DatasetUrn.create_from_string(let))
dataset_urn_list.append(DatasetUrn.from_string(let))
return dataset_urn_list


def entities_to_datajob_urn_list(inlets: List[str]) -> List[DataJobUrn]:
datajob_urn_list: List[DataJobUrn] = []
for let in inlets:
if guess_entity_type(let) == "dataJob":
datajob_urn_list.append(DataJobUrn.create_from_string(let))
datajob_urn_list.append(DataJobUrn.from_string(let))
return datajob_urn_list
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@
"name": "sqlite_operator_create_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223533895,
"time": 1707253651059,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -261,7 +261,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223533895,
"timestampMillis": 1707253651059,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -442,7 +442,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223534302,
"timestampMillis": 1707253651425,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -492,10 +492,10 @@
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)"
],
Expand Down Expand Up @@ -576,7 +576,7 @@
"name": "sqlite_operator_populate_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223539348,
"time": 1707253655698,
"actor": "urn:li:corpuser:datahub"
}
}
Expand All @@ -598,10 +598,10 @@
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:04e1badac1eacd1c41123d07f579fa92",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"inputs": [
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
]
}
Expand All @@ -625,7 +625,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223539348,
"timestampMillis": 1707253655698,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -672,10 +672,10 @@
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)"
],
Expand Down Expand Up @@ -735,7 +735,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223540058,
"timestampMillis": 1707253656320,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -939,7 +939,7 @@
"name": "sqlite_operator_transform_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223548187,
"time": 1707253660584,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -1012,7 +1012,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223548187,
"timestampMillis": 1707253660584,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1248,7 +1248,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223549416,
"timestampMillis": 1707253661682,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1384,7 +1384,7 @@
"name": "sqlite_operator_cleanup_costs_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223557795,
"time": 1707253669241,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -1433,7 +1433,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223557795,
"timestampMillis": 1707253669241,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1545,7 +1545,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223559079,
"timestampMillis": 1707253670409,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1681,7 +1681,7 @@
"name": "sqlite_operator_cleanup_processed_costs_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223564459,
"time": 1707253675107,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -1730,7 +1730,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223564459,
"timestampMillis": 1707253675107,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1842,7 +1842,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223566107,
"timestampMillis": 1707253676482,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@
"name": "sqlite_operator_create_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223251992,
"time": 1707253281415,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -261,7 +261,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223251992,
"timestampMillis": 1707253281415,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -442,7 +442,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223253042,
"timestampMillis": 1707253282244,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -549,10 +549,10 @@
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)"
],
Expand Down Expand Up @@ -633,7 +633,7 @@
"name": "sqlite_operator_populate_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223258947,
"time": 1707253286225,
"actor": "urn:li:corpuser:datahub"
}
}
Expand All @@ -655,10 +655,10 @@
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:04e1badac1eacd1c41123d07f579fa92",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"inputs": [
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
]
}
Expand All @@ -682,7 +682,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223258947,
"timestampMillis": 1707253286225,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -729,10 +729,10 @@
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)"
],
Expand Down Expand Up @@ -792,7 +792,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223260414,
"timestampMillis": 1707253287414,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1053,7 +1053,7 @@
"name": "sqlite_operator_transform_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223266595,
"time": 1707253293513,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -1126,7 +1126,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223266595,
"timestampMillis": 1707253293513,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1362,7 +1362,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223268728,
"timestampMillis": 1707253295443,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1555,7 +1555,7 @@
"name": "sqlite_operator_cleanup_costs_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223275045,
"time": 1707253301697,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -1604,7 +1604,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223275045,
"timestampMillis": 1707253301697,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1716,7 +1716,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223277378,
"timestampMillis": 1707253303779,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -1909,7 +1909,7 @@
"name": "sqlite_operator_cleanup_processed_costs_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1701223282010,
"time": 1707253308368,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -1958,7 +1958,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223282010,
"timestampMillis": 1707253308368,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down Expand Up @@ -2070,7 +2070,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1701223284766,
"timestampMillis": 1707253310722,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
Expand Down
Loading
Loading