Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/kafka-connect): Add lineage extraction for BigQuery Sink Connector in Kafka Connect source #10647

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ class BQParser:
project: str
target_platform: str
sanitizeTopics: str
transforms: list
topicsToTables: Optional[str] = None
datasets: Optional[str] = None
defaultDataset: Optional[str] = None
Expand All @@ -788,6 +789,20 @@ def get_parser(
) -> BQParser:
project = connector_manifest.config["project"]
sanitizeTopics = connector_manifest.config.get("sanitizeTopics", "false")
transform_names = (
self.connector_manifest.config.get("transforms", "").split(",")
if self.connector_manifest.config.get("transforms")
else []
)
transforms = []
for name in transform_names:
transform = {"name": name}
transforms.append(transform)
for key in self.connector_manifest.config.keys():
if key.startswith(f"transforms.{name}."):
transform[
key.replace(f"transforms.{name}.", "")
] = self.connector_manifest.config[key]

if "defaultDataset" in connector_manifest.config:
defaultDataset = connector_manifest.config["defaultDataset"]
Expand All @@ -797,6 +812,7 @@ def get_parser(
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
version="v2",
transforms=transforms,
)
else:
# version 1.6.x and similar configs supported
Expand All @@ -809,6 +825,7 @@ def get_parser(
datasets=datasets,
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
transforms=transforms,
)

def get_list(self, property: str) -> Iterable[Tuple[str, str]]:
Expand Down Expand Up @@ -867,33 +884,45 @@ def get_dataset_table_for_topic(
table = self.sanitize_table_name(table)
return f"{dataset}.{table}"

def apply_transformations(
self, topic: str, transforms: List[Dict[str, str]]
) -> str:
for transform in transforms:
if transform["type"] == "org.apache.kafka.connect.transforms.RegexRouter":
regex = transform["regex"]
replacement = transform["replacement"]
pattern = re.compile(regex)
if pattern.match(topic):
topic = pattern.sub(replacement, topic, count=1)
return topic

def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
if not parser:
return lineages
target_platform = parser.target_platform
project = parser.project

transforms = parser.transforms
self.connector_manifest.flow_property_bag = self.connector_manifest.config

# Mask/Remove properties that may reveal credentials
if "keyfile" in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag["keyfile"]

for topic in self.connector_manifest.topic_names:
dataset_table = self.get_dataset_table_for_topic(topic, parser)
transformed_topic = self.apply_transformations(topic, transforms)
dataset_table = self.get_dataset_table_for_topic(transformed_topic, parser)
if dataset_table is None:
self.report_warning(
self.connector_manifest.name,
f"could not find target dataset for topic {topic}, please check your connector configuration",
f"could not find target dataset for topic {transformed_topic}, please check your connector configuration",
)
continue
target_dataset = f"{project}.{dataset_table}"

lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_dataset=transformed_topic,
source_platform=KAFKA,
target_dataset=target_dataset,
target_platform=target_platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
#
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
#
#confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
#
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1
#
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.bigquery-sink-connector,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"autoCreateTables": "true",
"transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TableNameTransformation.replacement": "my_dest_table_name",
"topics": "kafka-topic-name",
"transforms.TableNameTransformation.regex": ".*",
"transforms": "TableNameTransformation",
"name": "bigquery-sink-connector",
"project": "my-gcp-project",
"datasets": "kafka-topic-name=mybqdataset",
"defaultDataset": "mybqdataset"
},
"name": "bigquery-sink-connector",
"description": "Sink connector using `com.wepay.kafka.connect.bigquery.BigQuerySinkConnector` plugin."
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.bigquery-sink-connector,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
run_id: kafka-connect-run

source:
type: kafka-connect
config:
platform_instance: connect-instance-1
connect_uri: http://localhost:28083
connector_patterns:
allow:
- bigquery-sink-connector
sink:
type: file
config:
filename: "./kafka_connect_mces.json"
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ source:
deny:
- source_mongodb_connector
- confluent_s3_sink_connector
- bigquery-sink-connector
provided_configs:
- provider: env
path_key: MYSQL_PORT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,30 @@ def loaded_kafka_connect(kafka_connect_runner):
r.raise_for_status()
assert r.status_code == 201

# Creating BigQuery sink connector
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "bigquery-sink-connector",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"autoCreateTables": "true",
"transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TableNameTransformation.replacement": "my_dest_table_name",
"topics": "kafka-topic-name",
"transforms.TableNameTransformation.regex": ".*",
"transforms": "TableNameTransformation",
"name": "bigquery-sink-connector",
"project": "my-gcp-project",
"defaultDataset": "mybqdataset",
"datasets": "kafka-topic-name=mybqdataset"
}
}
""",
)
assert r.status_code == 201 # Created

# Give time for connectors to process the table data
kafka_connect_runner.wait_until_responsive(
timeout=30,
Expand Down Expand Up @@ -637,3 +661,22 @@ def test_kafka_connect_snowflake_sink_ingest(
output_path=tmp_path / "kafka_connect_snowflake_sink_mces.json",
golden_path=f"{test_resources_dir}/{golden_file}",
)


@freeze_time(FROZEN_TIME)
def test_kafka_connect_bigquery_sink_ingest(
loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir
):
# Run the metadata ingestion pipeline.
config_file = (
test_resources_dir / "kafka_connect_bigquery_sink_to_file.yml"
).resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)

# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_bigquery_sink_mces_golden.json",
ignore_paths=[],
)
Loading