Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Protobuf support for the Kafka source #4819

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions metadata-ingestion/archived/source_docs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ source:

The Kafka Source uses the schema registry to figure out the schema associated with both `key` and `value` for the topic.
By default it uses the [Confluent's Kafka Schema registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
and supports the `AVRO` schema type.
and supports the `AVRO` and `PROTOBUF` schema types.

If you're using a custom schema registry, or you are using schema type other than `AVRO`, then you can provide your own
If you're using a custom schema registry, or you are using schema type other than `AVRO` or `PROTOBUF`, then you can provide your own
custom implementation of the `KafkaSchemaRegistryBase` class, and implement the `get_schema_metadata(topic, platform_urn)` method that
given a topic name would return object of `SchemaMetadata` containing schema for that topic. Please refer
`datahub.ingestion.source.confluent_schema_registry::ConfluentSchemaRegistry` for sample implementation of this class.
Expand Down Expand Up @@ -142,6 +142,34 @@ sink:
# sink configs
```

### Limitations of `PROTOBUF` schema types implementation

The current implementation of the support for `PROTOBUF` schema type has the following limitations:

+ Recursive types are not supported.
+ If the schemas of different topics define a type in the same package, the source would raise an exception.

In addition to this, maps are represented as arrays of messages. The following message,

```
message MessageWithMap {
map<int, string> map_1 = 1;
}
```

becomes:

```
message Map1Entry {
int key = 1;
string value = 2/
}

message MessageWithMap {
repeated Map1Entry map_1 = 1;
}
```

## Config details

Note that a `.` is used to denote nested fields in the YAML recipe.
Expand Down
41 changes: 35 additions & 6 deletions metadata-ingestion/docs/sources/kafka/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ source:

### Custom Schema Registry

The Kafka Source uses the schema registry to figure out the schema associated with both `key` and `value` for the topic.
By default it uses the [Confluent's Kafka Schema registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
and supports the `AVRO` schema type.
The Kafka Source uses the schema registry to figure out the schema associated with both `key` and `value` for the topic.
By default it uses the [Confluent's Kafka Schema registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
and supports the `AVRO` and `PROTOBUF` schema types.

If you're using a custom schema registry, or you are using schema type other than `AVRO`, then you can provide your own

If you're using a custom schema registry, or you are using schema type other than `AVRO` or `PROTOBUF`, then you can provide your own
custom implementation of the `KafkaSchemaRegistryBase` class, and implement the `get_schema_metadata(topic, platform_urn)` method that
given a topic name would return object of `SchemaMetadata` containing schema for that topic. Please refer
given a topic name would return object of `SchemaMetadata` containing schema for that topic. Please refer
`datahub.ingestion.source.confluent_schema_registry::ConfluentSchemaRegistry` for sample implementation of this class.
```python
class KafkaSchemaRegistryBase(ABC):
Expand All @@ -101,4 +102,32 @@ source:
schema_registry_url: http://localhost:8081

# sink configs
```
```

### Limitations of `PROTOBUF` schema types implementation

The current implementation of the support for `PROTOBUF` schema type has the following limitations:

+ Requires Python 3.7 & above.
+ Recursive types are not supported.
+ If the schemas of different topics define a type in the same package, the source would raise an exception.

In addition to this, maps are represented as arrays of messages. The following message,

```
message MessageWithMap {
map<int, string> map_1 = 1;
}
```

becomes:

```
message Map1Entry {
int key = 1;
string value = 2/
}
message MessageWithMap {
repeated Map1Entry map_1 = 1;
}
```
18 changes: 16 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ def get_long_description():
"fastavro>=1.2.0",
}

kafka_protobuf = (
{
"networkx>=2.6.2",
# Required to generate protobuf python modules from the schema downloaded from the schema registry
"grpcio==1.44.0",
"grpcio-tools==1.44.0",
"types-protobuf",
}
if is_py37_or_newer
else {
"types-protobuf",
}
)

sql_common = {
# Required for all SQL sources.
"sqlalchemy==1.3.24",
Expand Down Expand Up @@ -186,7 +200,7 @@ def get_long_description():
# - 0.6.12 adds support for Spark Thrift Server
"acryl-pyhive[hive]>=0.6.13"
},
"kafka": kafka_common,
"kafka": {*kafka_common, *kafka_protobuf},
"kafka-connect": sql_common | {"requests", "JPype1"},
"ldap": {"python-ldap>=2.4"},
"looker": looker_common,
Expand Down Expand Up @@ -309,7 +323,7 @@ def get_long_description():
"oracle",
"postgres",
"sagemaker",
"datahub-kafka",
"kafka",
"datahub-rest",
"redash",
"redshift",
Expand Down
Loading