Skip to content

Commit

Permalink
fix comments for code redability
Browse files Browse the repository at this point in the history
  • Loading branch information
aabharti-visa committed Jun 6, 2024
1 parent b85260d commit c9ae68f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ def create(
) -> "ConfluentSchemaRegistry":
return cls(source_config, report)

def _get_subject_for_topic(
self, dataset_subtype: str, is_key_schema: bool
) -> Optional[str]:
topic: str = dataset_subtype
def _get_subject_for_topic(self, topic: str, is_key_schema: bool) -> Optional[str]:
subject_key_suffix: str = "-key" if is_key_schema else "-value"
# For details on schema registry subject name strategy,
# see: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work
Expand Down Expand Up @@ -234,24 +231,25 @@ def get_schemas_from_confluent_ref_json(
return all_schemas

def _get_schema_and_fields(
self, dataset_subtype: str, is_key_schema: bool, is_subject: bool
self, topic: str, is_key_schema: bool, is_subject: bool
) -> Tuple[Optional[Schema], List[SchemaField]]:
schema: Optional[Schema] = None
kafka_entity = "subject" if is_subject else "topic"

# if provided schema as dataset_subtype, assuming it as value subject
# if provided schema as topic, assuming it as value subject
schema_type_str: Optional[str] = "value"
topic_subject: Optional[str] = None
if not is_subject:
schema_type_str = "key" if is_key_schema else "value"
topic_subject = self._get_subject_for_topic(
dataset_subtype=dataset_subtype, is_key_schema=is_key_schema
topic=topic, is_key_schema=is_key_schema
)
else:
topic_subject = dataset_subtype
topic_subject = topic

if topic_subject is not None:
logger.debug(
f"The {schema_type_str} schema subject:'{topic_subject}' is found for dataset_subtype:'{dataset_subtype}'."
f"The {schema_type_str} schema subject:'{topic_subject}' is found for {kafka_entity}:'{topic}'."
)
try:
registered_schema = self.schema_registry_client.get_latest_version(
Expand All @@ -260,29 +258,29 @@ def _get_schema_and_fields(
schema = registered_schema.schema
except Exception as e:
logger.warning(
f"For dataset_subtype: {dataset_subtype}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}."
f"For {kafka_entity}: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}."
)
self.report.report_warning(
dataset_subtype,
topic,
f"failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}.",
)
else:
logger.debug(
f"For dataset_subtype: {dataset_subtype}, the schema registry subject for the {schema_type_str} schema is not found."
f"For {kafka_entity}: {topic}, the schema registry subject for the {schema_type_str} schema is not found."
)
if not is_key_schema:
# Value schema is always expected. Report a warning.
self.report.report_warning(
dataset_subtype,
topic,
f"The schema registry subject for the {schema_type_str} schema is not found."
f" The dataset_subtype is either schema-less, or no messages have been written to the dataset_subtype yet.",
f" The {kafka_entity} is either schema-less, or no messages have been written to the {kafka_entity} yet.",
)

# Obtain the schema fields from schema for the dataset_subtype.
# Obtain the schema fields from schema for the topic.
fields: List[SchemaField] = []
if schema is not None:
fields = self._get_schema_fields(
dataset_subtype=dataset_subtype,
topic=topic,
schema=schema,
is_key_schema=is_key_schema,
)
Expand All @@ -308,7 +306,7 @@ def _load_json_schema_with_resolved_references(
return jsonref_schema

def _get_schema_fields(
self, dataset_subtype: str, schema: Schema, is_key_schema: bool
self, topic: str, schema: Schema, is_key_schema: bool
) -> List[SchemaField]:
# Parse the schema and convert it to SchemaFields.
fields: List[SchemaField] = []
Expand All @@ -320,9 +318,11 @@ def _get_schema_fields(
fields = schema_util.avro_schema_to_mce_fields(
avro_schema,
is_key_schema=is_key_schema,
meta_mapping_processor=self.field_meta_processor
if self.source_config.enable_meta_mapping
else None,
meta_mapping_processor=(
self.field_meta_processor
if self.source_config.enable_meta_mapping
else None
),
schema_tags_field=self.source_config.schema_tags_field,
tag_prefix=self.source_config.tag_prefix,
)
Expand All @@ -331,28 +331,28 @@ def _get_schema_fields(
imported_schemas: List[
ProtobufSchema
] = self.get_schemas_from_confluent_ref_protobuf(schema)
base_name: str = dataset_subtype.replace(".", "_")
base_name: str = topic.replace(".", "_")
fields = protobuf_util.protobuf_schema_to_mce_fields(
ProtobufSchema(
f"{base_name}-key.proto"
if is_key_schema
else f"{base_name}-value.proto",
(
f"{base_name}-key.proto"
if is_key_schema
else f"{base_name}-value.proto"
),
schema.schema_str,
),
imported_schemas,
is_key_schema=is_key_schema,
)
elif schema.schema_type == "JSON":
base_name = dataset_subtype.replace(".", "_")
base_name = topic.replace(".", "_")
canonical_name = (
f"{base_name}-key" if is_key_schema else f"{base_name}-value"
)
jsonref_schema = self._load_json_schema_with_resolved_references(
schema=schema,
name=canonical_name,
subject=f"{dataset_subtype}-key"
if is_key_schema
else f"{dataset_subtype}-value",
subject=f"{topic}-key" if is_key_schema else f"{topic}-value",
)
fields = list(
JsonSchemaTranslator.get_fields_from_schema(
Expand All @@ -361,25 +361,25 @@ def _get_schema_fields(
)
elif not self.source_config.ignore_warnings_on_schema_type:
self.report.report_warning(
dataset_subtype,
topic,
f"Parsing kafka schema type {schema.schema_type} is currently not implemented",
)
return fields

def _get_schema_metadata(
self, dataset_subtype: str, platform_urn: str, is_subject: bool
self, topic: str, platform_urn: str, is_subject: bool
) -> Optional[SchemaMetadata]:

# Process the value schema
schema, fields = self._get_schema_and_fields(
dataset_subtype=dataset_subtype,
topic=topic,
is_key_schema=False,
is_subject=is_subject,
) # type: Tuple[Optional[Schema], List[SchemaField]]

# Process the key schema
key_schema, key_fields = self._get_schema_and_fields(
dataset_subtype=dataset_subtype,
topic=topic,
is_key_schema=True,
is_subject=is_subject,
) # type:Tuple[Optional[Schema], List[SchemaField]]
Expand All @@ -393,7 +393,7 @@ def _get_schema_metadata(
md5_hash: str = md5(schema_as_string.encode()).hexdigest()

return SchemaMetadata(
schemaName=dataset_subtype,
schemaName=topic,
version=0,
hash=md5_hash,
platform=platform_urn,
Expand All @@ -408,20 +408,20 @@ def _get_schema_metadata(
return None

def get_schema_metadata(
self, dataset_subtype: str, platform_urn: str, is_subject: bool
self, topic: str, platform_urn: str, is_subject: bool
) -> Optional[SchemaMetadata]:
logger.debug(f"Inside get_schema_metadata {dataset_subtype} {platform_urn}")
logger.debug(f"Inside get_schema_metadata {topic} {platform_urn}")

# Process the value schema
schema, fields = self._get_schema_and_fields(
dataset_subtype=dataset_subtype,
topic=topic,
is_key_schema=False,
is_subject=is_subject,
) # type: Tuple[Optional[Schema], List[SchemaField]]

# Process the key schema
key_schema, key_fields = self._get_schema_and_fields(
dataset_subtype=dataset_subtype,
topic=topic,
is_key_schema=True,
is_subject=is_subject,
) # type:Tuple[Optional[Schema], List[SchemaField]]
Expand All @@ -435,7 +435,7 @@ def get_schema_metadata(
md5_hash = md5(schema_as_string.encode()).hexdigest()

return SchemaMetadata(
schemaName=dataset_subtype,
schemaName=topic,
version=0,
hash=md5_hash,
platform=platform_urn,
Expand Down
35 changes: 16 additions & 19 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.source_config.topic_patterns.allowed(topic):
try:
yield from self._extract_record(
topic, "", topic_detail, extra_topic_details.get(topic)
topic, False, topic_detail, extra_topic_details.get(topic)
)
except Exception as e:
logger.warning(f"Failed to extract topic {topic}", exc_info=True)
Expand All @@ -322,7 +322,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for subject in self.schema_registry_client.get_subjects():
try:
yield from self._extract_record(
"", subject, topic_detail=None, extra_topic_config=None
subject, True, topic_detail=None, extra_topic_config=None
)
except Exception as e:
logger.warning(f"Failed to extract subject {subject}", exc_info=True)
Expand All @@ -333,36 +333,33 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
def _extract_record(
self,
topic: str,
subject: str,
is_subject: bool,
topic_detail: Optional[TopicMetadata],
extra_topic_config: Optional[Dict[str, ConfigEntry]],
) -> Iterable[MetadataWorkUnit]:
AVRO = "AVRO"

kafka_entity = topic if len(topic) != 0 else subject
is_subject = False if len(topic) != 0 else True
kafka_entity = "subject" if is_subject else "topic"

logger.debug(f"kafka entity name = {kafka_entity}")
logger.debug(f"extracting schema metadata from kafka entity = {kafka_entity}")

platform_urn = make_data_platform_urn(self.platform)

# 1. Create schemaMetadata aspect (pass control to SchemaRegistry)
schema_metadata = self.schema_registry_client.get_schema_metadata(
kafka_entity, platform_urn, is_subject
topic, platform_urn, is_subject
)

# topic can have no associated subject, but still it can be ingested without schema
# for schema ingestion, ingest only if it has valid schema
if is_subject:
if schema_metadata is None:
return
dataset_name = schema_metadata.schemaName
else:
dataset_name = topic

# dataset_name = schema_metadata.schemaName if len(topic) == 0 else topic
# 2. Create the default dataset snapshot for the topic.
# if schema_metadata is not None:
# dataset_name = schema_metadata.schemaName if len(topic) == 0 else topic
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=dataset_name,
Expand All @@ -386,17 +383,17 @@ def _extract_record(

# build custom properties for topic, schema properties may be added as needed
custom_props: Dict[str, str] = {}
if len(topic) != 0:
if not is_subject:
custom_props = self.build_custom_properties(
topic, topic_detail, extra_topic_config
)
schemaName: Optional[
schema_name: Optional[
str
] = self.schema_registry_client._get_subject_for_topic(
dataset_subtype=topic, is_key_schema=False
topic, is_key_schema=False
)
if schemaName is not None:
custom_props["Schema Name"] = schemaName
if schema_name is not None:
custom_props["Schema Name"] = schema_name

# 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro
description: Optional[str] = None
Expand Down Expand Up @@ -472,7 +469,7 @@ def _extract_record(
yield MetadataWorkUnit(id=f"kafka-{kafka_entity}", mce=mce)

# 7. Add the subtype aspect marking this as a "topic" or "schema"
typeName = DatasetSubTypes.TOPIC if len(topic) != 0 else DatasetSubTypes.SCHEMA
typeName = DatasetSubTypes.SCHEMA if is_subject else DatasetSubTypes.TOPIC
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SubTypesClass(typeNames=[typeName]),
Expand Down Expand Up @@ -584,9 +581,9 @@ def fetch_topic_configurations(self, topics: List[str]) -> Dict[str, dict]:
configs: Dict[
ConfigResource, concurrent.futures.Future
] = self.admin_client.describe_configs(
resources=[ConfigResource(ConfigResource.Type.TOPIC, t) for t in topics],
request_timeout=self.source_config.connection.client_timeout_seconds,
)
resources=[ConfigResource(ConfigResource.Type.TOPIC, t) for t in topics],
request_timeout=self.source_config.connection.client_timeout_seconds,
)
logger.debug("Waiting for config details futures to complete")
concurrent.futures.wait(configs.values())
logger.debug("Config details futures completed")
Expand Down

0 comments on commit c9ae68f

Please sign in to comment.