Skip to content

Commit

Permalink
feat(ingestion/kafka)-Add support for ingesting schemas from schema r…
Browse files Browse the repository at this point in the history
  • Loading branch information
aabharti-visa authored Jun 11, 2024
1 parent 80e687e commit 8a90577
Show file tree
Hide file tree
Showing 7 changed files with 1,059 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,25 @@ def get_schemas_from_confluent_ref_json(
return all_schemas

def _get_schema_and_fields(
self, topic: str, is_key_schema: bool
self, topic: str, is_key_schema: bool, is_subject: bool
) -> Tuple[Optional[Schema], List[SchemaField]]:
schema: Optional[Schema] = None
schema_type_str: str = "key" if is_key_schema else "value"
topic_subject: Optional[str] = self._get_subject_for_topic(
topic=topic, is_key_schema=is_key_schema
)
kafka_entity = "subject" if is_subject else "topic"

# if provided schema as topic, assuming it as value subject
schema_type_str: Optional[str] = "value"
topic_subject: Optional[str] = None
if not is_subject:
schema_type_str = "key" if is_key_schema else "value"
topic_subject = self._get_subject_for_topic(
topic=topic, is_key_schema=is_key_schema
)
else:
topic_subject = topic

if topic_subject is not None:
logger.debug(
f"The {schema_type_str} schema subject:'{topic_subject}' is found for topic:'{topic}'."
f"The {schema_type_str} schema subject:'{topic_subject}' is found for {kafka_entity}:'{topic}'."
)
try:
registered_schema = self.schema_registry_client.get_latest_version(
Expand All @@ -249,29 +258,31 @@ def _get_schema_and_fields(
schema = registered_schema.schema
except Exception as e:
logger.warning(
f"For topic: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}."
f"For {kafka_entity}: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}."
)
self.report.report_warning(
topic,
f"failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}.",
)
else:
logger.debug(
f"For topic: {topic}, the schema registry subject for the {schema_type_str} schema is not found."
f"For {kafka_entity}: {topic}, the schema registry subject for the {schema_type_str} schema is not found."
)
if not is_key_schema:
# Value schema is always expected. Report a warning.
self.report.report_warning(
topic,
f"The schema registry subject for the {schema_type_str} schema is not found."
f" The topic is either schema-less, or no messages have been written to the topic yet.",
f" The {kafka_entity} is either schema-less, or no messages have been written to the {kafka_entity} yet.",
)

# Obtain the schema fields from schema for the topic.
fields: List[SchemaField] = []
if schema is not None:
fields = self._get_schema_fields(
topic=topic, schema=schema, is_key_schema=is_key_schema
topic=topic,
schema=schema,
is_key_schema=is_key_schema,
)
return (schema, fields)

Expand Down Expand Up @@ -352,16 +363,21 @@ def _get_schema_fields(
return fields

def _get_schema_metadata(
self, topic: str, platform_urn: str
self, topic: str, platform_urn: str, is_subject: bool
) -> Optional[SchemaMetadata]:

# Process the value schema
schema, fields = self._get_schema_and_fields(
topic=topic, is_key_schema=False
topic=topic,
is_key_schema=False,
is_subject=is_subject,
) # type: Tuple[Optional[Schema], List[SchemaField]]

# Process the key schema
key_schema, key_fields = self._get_schema_and_fields(
topic=topic, is_key_schema=True
topic=topic,
is_key_schema=True,
is_subject=is_subject,
) # type:Tuple[Optional[Schema], List[SchemaField]]

# Create the schemaMetadata aspect.
Expand All @@ -388,17 +404,22 @@ def _get_schema_metadata(
return None

def get_schema_metadata(
self, topic: str, platform_urn: str
self, topic: str, platform_urn: str, is_subject: bool
) -> Optional[SchemaMetadata]:
logger.debug(f"Inside _get_schema_metadata {topic} {platform_urn}")
logger.debug(f"Inside get_schema_metadata {topic} {platform_urn}")

# Process the value schema
schema, fields = self._get_schema_and_fields(
topic=topic, is_key_schema=False
topic=topic,
is_key_schema=False,
is_subject=is_subject,
) # type: Tuple[Optional[Schema], List[SchemaField]]

# Process the key schema
key_schema, key_fields = self._get_schema_and_fields(
topic=topic, is_key_schema=True
topic=topic,
is_key_schema=True,
is_subject=is_subject,
) # type:Tuple[Optional[Schema], List[SchemaField]]

# Create the schemaMetadata aspect.
Expand All @@ -423,3 +444,6 @@ def get_schema_metadata(
fields=key_fields + fields,
)
return None

def get_subjects(self) -> List[str]:
return self.known_schema_registry_subjects
80 changes: 58 additions & 22 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,34 +303,63 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
).topics
extra_topic_details = self.fetch_extra_topic_details(topics.keys())

for t, t_detail in topics.items():
self.report.report_topic_scanned(t)
if self.source_config.topic_patterns.allowed(t):
for topic, topic_detail in topics.items():
self.report.report_topic_scanned(topic)
if self.source_config.topic_patterns.allowed(topic):
try:
yield from self._extract_record(
t, t_detail, extra_topic_details.get(t)
topic, False, topic_detail, extra_topic_details.get(topic)
)
except Exception as e:
logger.warning(f"Failed to extract topic {t}", exc_info=True)
logger.warning(f"Failed to extract topic {topic}", exc_info=True)
self.report.report_warning(
"topic", f"Exception while extracting topic {t}: {e}"
"topic", f"Exception while extracting topic {topic}: {e}"
)
else:
self.report.report_dropped(t)
self.report.report_dropped(topic)

# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
for subject in self.schema_registry_client.get_subjects():
try:
yield from self._extract_record(
subject, True, topic_detail=None, extra_topic_config=None
)
except Exception as e:
logger.warning(f"Failed to extract subject {subject}", exc_info=True)
self.report.report_warning(
"subject", f"Exception while extracting topic {subject}: {e}"
)

def _extract_record(
self,
topic: str,
is_subject: bool,
topic_detail: Optional[TopicMetadata],
extra_topic_config: Optional[Dict[str, ConfigEntry]],
) -> Iterable[MetadataWorkUnit]:
logger.debug(f"topic = {topic}")

AVRO = "AVRO"

# 1. Create the default dataset snapshot for the topic.
dataset_name = topic
kafka_entity = "subject" if is_subject else "topic"

logger.debug(f"extracting schema metadata from kafka entity = {kafka_entity}")

platform_urn = make_data_platform_urn(self.platform)

# 1. Create schemaMetadata aspect (pass control to SchemaRegistry)
schema_metadata = self.schema_registry_client.get_schema_metadata(
topic, platform_urn, is_subject
)

# topic can have no associated subject, but still it can be ingested without schema
# for schema ingestion, ingest only if it has valid schema
if is_subject:
if schema_metadata is None:
return
dataset_name = schema_metadata.schemaName
else:
dataset_name = topic

# 2. Create the default dataset snapshot for the topic.
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=dataset_name,
Expand All @@ -342,10 +371,6 @@ def _extract_record(
aspects=[Status(removed=False)], # we append to this list later on
)

# 2. Attach schemaMetadata aspect (pass control to SchemaRegistry)
schema_metadata = self.schema_registry_client.get_schema_metadata(
topic, platform_urn
)
if schema_metadata is not None:
dataset_snapshot.aspects.append(schema_metadata)

Expand All @@ -356,9 +381,19 @@ def _extract_record(
browse_path = BrowsePathsClass([browse_path_str])
dataset_snapshot.aspects.append(browse_path)

custom_props = self.build_custom_properties(
topic, topic_detail, extra_topic_config
)
# build custom properties for topic, schema properties may be added as needed
custom_props: Dict[str, str] = {}
if not is_subject:
custom_props = self.build_custom_properties(
topic, topic_detail, extra_topic_config
)
schema_name: Optional[
str
] = self.schema_registry_client._get_subject_for_topic(
topic, is_key_schema=False
)
if schema_name is not None:
custom_props["Schema Name"] = schema_name

# 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro
description: Optional[str] = None
Expand Down Expand Up @@ -414,7 +449,7 @@ def _extract_record(
)

dataset_properties = DatasetPropertiesClass(
name=topic, customProperties=custom_props, description=description
name=dataset_name, customProperties=custom_props, description=description
)
dataset_snapshot.aspects.append(dataset_properties)

Expand All @@ -431,12 +466,13 @@ def _extract_record(

# 6. Emit the datasetSnapshot MCE
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
yield MetadataWorkUnit(id=f"kafka-{topic}", mce=mce)
yield MetadataWorkUnit(id=f"kafka-{kafka_entity}", mce=mce)

# 7. Add the subtype aspect marking this as a "topic"
# 7. Add the subtype aspect marking this as a "topic" or "schema"
typeName = DatasetSubTypes.SCHEMA if is_subject else DatasetSubTypes.TOPIC
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SubTypesClass(typeNames=[DatasetSubTypes.TOPIC]),
aspect=SubTypesClass(typeNames=[typeName]),
).as_workunit()

domain_urn: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
from abc import ABC, abstractmethod
from typing import Optional
from typing import List, Optional

from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata


class KafkaSchemaRegistryBase(ABC):
@abstractmethod
def get_schema_metadata(
self, topic: str, platform_urn: str
self, topic: str, platform_urn: str, is_subject: bool
) -> Optional[SchemaMetadata]:
pass

@abstractmethod
def get_subjects(self) -> List[str]:
pass

@abstractmethod
def _get_subject_for_topic(
self, dataset_subtype: str, is_key_schema: bool
) -> Optional[str]:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def __init__(
def get_assertion_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:

self.connection = self.create_connection()
if self.connection is None:
return
Expand All @@ -80,7 +79,6 @@ def get_assertion_workunits(
yield self._gen_platform_instance_wu(mcp.entityUrn)

def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:

# Construct a MetadataChangeProposalWrapper object for assertion platform
return MetadataChangeProposalWrapper(
entityUrn=urn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,6 @@ def table_upstreams_only(

@staticmethod
def dmf_assertion_results(start_time_millis: int, end_time_millis: int) -> str:

pattern = r"datahub\\_\\_%"
escape_pattern = r"\\"
return f"""
Expand Down
Loading

0 comments on commit 8a90577

Please sign in to comment.