-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Snowflake sink kafka connect cll integration #13
base: master
Are you sure you want to change the base?
Snowflake sink kafka connect cll integration #13
Conversation
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None | ||
include_column_lineage: bool = Field( | ||
default=False, | ||
description="Populates topic->table and table->topic column lineage.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
description="Populates topic->table and table->topic column lineage.", | |
description="Populates column lineage for source table->topic and topic->destination table edges. Works only for SnowflakeSinkConnector.", |
How easy it is to support this for all connectors ?
dataset_field: str, | ||
platform_instance: Optional[str], | ||
) -> str: | ||
if config.convert_lineage_urns_to_lowercase: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's reuse make_lineage_dataset_urn
here ?
# If sink type is file or | ||
# Source dataset is none | ||
if self.ctx.graph is None or lineage.source_dataset is None: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this self.ctx.graph check before extract_cll is called, and report warning and leave it there.
continue | ||
|
||
is_snowpipe_streaming: bool = False | ||
if "snowflake.ingestion.method" in self.connector_manifest.config: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract to a string constant ?
target_dataset_fields: List[str] = [] | ||
column_lineages: List[ColumnLineageMap] = [] | ||
|
||
if not is_snowpipe_streaming: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condition seems reversed ?
|
||
# key schema dosn't get mapped with any target dataset column | ||
source_dataset_fields: List[str] = [ | ||
schema_field.fieldPath.split(".")[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this split and take the last entry for ? This doesn't seem right.
Wonder how structs or nested structs or unions are mapped in snowflake table.
target_dataset_fields.append(target_field) | ||
column_lineages.append( | ||
ColumnLineageMap( | ||
source_columns=[source_field], target_columns=[target_field] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice. Would be great if this can be generalized to work for other kafka-connect connectors. Not in scope of this PR of course.
@@ -1199,6 +1476,9 @@ def construct_job_workunits( | |||
aspect=models.DataJobInputOutputClass( | |||
inputDatasets=inlets, | |||
outputDatasets=outlets, | |||
inputDatasetFields=inlets_fields, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the significance of inputDatasetFields, outputDatasetFields ? Looks like everything is already covered in fineGrainedLineages.
def _extract_lineages(self): | ||
self.connector_manifest.flow_property_bag = self.connector_manifest.config | ||
# remove private key from properties | ||
del self.connector_manifest.flow_property_bag["snowflake.private.key"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's have a list of secret properties and remove all of them, similar to how its done in s3 sink connector. Ideally every connector should have this property similar to connector_manifest.
snowflake.private.key
snowflake.private.key.passphrase
value.converter.basic.auth.user.info
(from docs, it looks sensitive)
@@ -0,0 +1,152 @@ | |||
[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update tests for fine grained lineage ? Can mock ctx/DatahubGraph
Checklist