Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/kafka): clarify meta-mapping docs #10320

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions metadata-ingestion/docs/sources/kafka/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ Stateful Ingestion is available only when a Platform Instance is assigned to thi

### Connecting to Confluent Cloud

If using Confluent Cloud you can use a recipe like this. In this `consumer_config.sasl.username` and `consumer_config.sasl.password` are the API credentials that you get (in the Confluent UI) from your cluster -> Data Integration -> API Keys. `schema_registry_config.basic.auth.user.info` has API credentials for Confluent schema registry which you get (in Confluent UI) from Schema Registry -> API credentials.
If using Confluent Cloud you can use a recipe like this. In this `consumer_config.sasl.username` and `consumer_config.sasl.password` are the API credentials that you get (in the Confluent UI) from your cluster -> Data Integration -> API Keys. `schema_registry_config.basic.auth.user.info` has API credentials for Confluent schema registry which you get (in Confluent UI) from Schema Registry -> API credentials.

When creating API Key for the cluster ensure that the ACLs associated with the key are set like below. This is required for DataHub to read topic metadata from topics in Confluent Cloud.

```
Topic Name = *
Permission = ALLOW
Expand Down Expand Up @@ -75,11 +76,11 @@ The Kafka Source uses the schema registry to figure out the schema associated wi
By default it uses the [Confluent's Kafka Schema registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
and supports the `AVRO` and `PROTOBUF` schema types.


If you're using a custom schema registry, or you are using schema type other than `AVRO` or `PROTOBUF`, then you can provide your own
custom implementation of the `KafkaSchemaRegistryBase` class, and implement the `get_schema_metadata(topic, platform_urn)` method that
given a topic name would return object of `SchemaMetadata` containing schema for that topic. Please refer
`datahub.ingestion.source.confluent_schema_registry::ConfluentSchemaRegistry` for sample implementation of this class.

```python
class KafkaSchemaRegistryBase(ABC):
@abstractmethod
Expand All @@ -90,6 +91,7 @@ class KafkaSchemaRegistryBase(ABC):
```

The custom schema registry class can be configured using the `schema_registry_class` config param of the `kafka` source as shown below.

```YAML
source:
type: "kafka"
Expand All @@ -108,8 +110,8 @@ source:

The current implementation of the support for `PROTOBUF` schema type has the following limitations:

+ Recursive types are not supported.
+ If the schemas of different topics define a type in the same package, the source would raise an exception.
- Recursive types are not supported.
- If the schemas of different topics define a type in the same package, the source would raise an exception.

In addition to this, maps are represented as arrays of messages. The following message,

Expand All @@ -134,7 +136,7 @@ message MessageWithMap {
### Enriching DataHub metadata with automated meta mapping

:::note
Meta mapping is currently only available for Avro schemas
Meta mapping is currently only available for Avro schemas, and requires that those Avro schemas are pushed to the schema registry.
:::

Avro schemas are permitted to have additional attributes not defined by the specification as arbitrary metadata. A common pattern is to utilize this for business metadata. The Kafka source has the ability to transform this directly into DataHub Owners, Tags and Terms.
Expand All @@ -150,11 +152,13 @@ Example Avro schema:
"name": "sampleRecord",
"type": "record",
"tags": ["tag1", "tag2"],
"fields": [{
"name": "field_1",
"type": "string",
"tags": ["tag3", "tag4"]
}]
"fields": [
{
"name": "field_1",
"type": "string",
"tags": ["tag3", "tag4"]
}
]
}
```

Expand All @@ -178,13 +182,15 @@ Example Avro schema:
"type": "record",
"owning_team": "@Data-Science",
"data_tier": "Bronze",
"fields": [{
"name": "field_1",
"type": "string",
"gdpr": {
"pii": true
"fields": [
{
"name": "field_1",
"type": "string",
"gdpr": {
"pii": true
}
}
}]
]
}
```

Expand Down Expand Up @@ -212,4 +218,3 @@ config:
```

The underlying implementation is similar to [dbt meta mapping](https://datahubproject.io/docs/generated/ingestion/sources/dbt#dbt-meta-automated-mappings), which has more detailed examples that can be used for reference.

Loading