Skip to content

Commit

Permalink
fix(ingest/kafka-connect): Add lineage extraction for BigQuery Sink C…
Browse files Browse the repository at this point in the history
…onnector in Kafka Connect source (#10647)
  • Loading branch information
sagar-salvi-apptware authored Jun 14, 2024
1 parent d699660 commit cc8389e
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 6 deletions.
39 changes: 34 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ class BQParser:
project: str
target_platform: str
sanitizeTopics: str
transforms: list
topicsToTables: Optional[str] = None
datasets: Optional[str] = None
defaultDataset: Optional[str] = None
Expand All @@ -788,6 +789,20 @@ def get_parser(
) -> BQParser:
project = connector_manifest.config["project"]
sanitizeTopics = connector_manifest.config.get("sanitizeTopics", "false")
transform_names = (
self.connector_manifest.config.get("transforms", "").split(",")
if self.connector_manifest.config.get("transforms")
else []
)
transforms = []
for name in transform_names:
transform = {"name": name}
transforms.append(transform)
for key in self.connector_manifest.config.keys():
if key.startswith(f"transforms.{name}."):
transform[
key.replace(f"transforms.{name}.", "")
] = self.connector_manifest.config[key]

if "defaultDataset" in connector_manifest.config:
defaultDataset = connector_manifest.config["defaultDataset"]
Expand All @@ -797,6 +812,7 @@ def get_parser(
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
version="v2",
transforms=transforms,
)
else:
# version 1.6.x and similar configs supported
Expand All @@ -809,6 +825,7 @@ def get_parser(
datasets=datasets,
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
transforms=transforms,
)

def get_list(self, property: str) -> Iterable[Tuple[str, str]]:
Expand Down Expand Up @@ -867,33 +884,45 @@ def get_dataset_table_for_topic(
table = self.sanitize_table_name(table)
return f"{dataset}.{table}"

def apply_transformations(
self, topic: str, transforms: List[Dict[str, str]]
) -> str:
for transform in transforms:
if transform["type"] == "org.apache.kafka.connect.transforms.RegexRouter":
regex = transform["regex"]
replacement = transform["replacement"]
pattern = re.compile(regex)
if pattern.match(topic):
topic = pattern.sub(replacement, topic, count=1)
return topic

def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
if not parser:
return lineages
target_platform = parser.target_platform
project = parser.project

transforms = parser.transforms
self.connector_manifest.flow_property_bag = self.connector_manifest.config

# Mask/Remove properties that may reveal credentials
if "keyfile" in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag["keyfile"]

for topic in self.connector_manifest.topic_names:
dataset_table = self.get_dataset_table_for_topic(topic, parser)
transformed_topic = self.apply_transformations(topic, transforms)
dataset_table = self.get_dataset_table_for_topic(transformed_topic, parser)
if dataset_table is None:
self.report_warning(
self.connector_manifest.name,
f"could not find target dataset for topic {topic}, please check your connector configuration",
f"could not find target dataset for topic {transformed_topic}, please check your connector configuration",
)
continue
target_dataset = f"{project}.{dataset_table}"

lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_dataset=transformed_topic,
source_platform=KAFKA,
target_dataset=target_dataset,
target_platform=target_platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
#
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
#
#confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
#
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1
#
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.bigquery-sink-connector,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"autoCreateTables": "true",
"transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TableNameTransformation.replacement": "my_dest_table_name",
"topics": "kafka-topic-name",
"transforms.TableNameTransformation.regex": ".*",
"transforms": "TableNameTransformation",
"name": "bigquery-sink-connector",
"project": "my-gcp-project",
"datasets": "kafka-topic-name=mybqdataset",
"defaultDataset": "mybqdataset"
},
"name": "bigquery-sink-connector",
"description": "Sink connector using `com.wepay.kafka.connect.bigquery.BigQuerySinkConnector` plugin."
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.bigquery-sink-connector,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
run_id: kafka-connect-run

source:
type: kafka-connect
config:
platform_instance: connect-instance-1
connect_uri: http://localhost:28083
connector_patterns:
allow:
- bigquery-sink-connector
sink:
type: file
config:
filename: "./kafka_connect_mces.json"
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ source:
deny:
- source_mongodb_connector
- confluent_s3_sink_connector
- bigquery-sink-connector
provided_configs:
- provider: env
path_key: MYSQL_PORT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,30 @@ def loaded_kafka_connect(kafka_connect_runner):
r.raise_for_status()
assert r.status_code == 201

# Creating BigQuery sink connector
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "bigquery-sink-connector",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"autoCreateTables": "true",
"transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TableNameTransformation.replacement": "my_dest_table_name",
"topics": "kafka-topic-name",
"transforms.TableNameTransformation.regex": ".*",
"transforms": "TableNameTransformation",
"name": "bigquery-sink-connector",
"project": "my-gcp-project",
"defaultDataset": "mybqdataset",
"datasets": "kafka-topic-name=mybqdataset"
}
}
""",
)
assert r.status_code == 201 # Created

# Give time for connectors to process the table data
kafka_connect_runner.wait_until_responsive(
timeout=30,
Expand Down Expand Up @@ -637,3 +661,22 @@ def test_kafka_connect_snowflake_sink_ingest(
output_path=tmp_path / "kafka_connect_snowflake_sink_mces.json",
golden_path=f"{test_resources_dir}/{golden_file}",
)


@freeze_time(FROZEN_TIME)
def test_kafka_connect_bigquery_sink_ingest(
loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir
):
# Run the metadata ingestion pipeline.
config_file = (
test_resources_dir / "kafka_connect_bigquery_sink_to_file.yml"
).resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)

# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_bigquery_sink_mces_golden.json",
ignore_paths=[],
)

0 comments on commit cc8389e

Please sign in to comment.