Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(ingest): clarify Kafka connection config #2171

Merged
merged 1 commit into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ Extracts:
source:
type: "kafka"
config:
connection.bootstrap: "broker:9092"
connection.schema_registry_url: http://localhost:8081
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081
consumer_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/index.html#deserializingconsumer
```

## MySQL Metadata `mysql`
Expand Down Expand Up @@ -318,7 +320,9 @@ Datahub mce-consumer container to be running.
sink:
type: "datahub-kafka"
config:
connection.bootstrap: "localhost:9092"
connection:
bootstrap: "localhost:9092"
producer_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/index.html#serializingproducer
```

## Console `console`
Expand Down
14 changes: 11 additions & 3 deletions metadata-ingestion/src/datahub/configuration/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ class _KafkaConnectionConfig(ConfigModel):
# schema registry location
schema_registry_url: str = "http://localhost:8081"

# extra schema registry config
# Extra schema registry config.
# These options will be passed into Kafka's SchemaRegistryClient.
# See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/index.html?highlight=schema%20registry#schemaregistryclient.
schema_registry_config: dict = {}

@validator("bootstrap")
Expand All @@ -25,12 +27,18 @@ def bootstrap_host_colon_port_comma(cls, val):
class KafkaConsumerConnectionConfig(_KafkaConnectionConfig):
"""Configuration class for holding connectivity information for Kafka consumers"""

# extra consumer config
# Extra consumer config.
# These options will be passed into Kafka's DeserializingConsumer.
# See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/index.html#deserializingconsumer
# and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
consumer_config: dict = {}


class KafkaProducerConnectionConfig(_KafkaConnectionConfig):
"""Configuration class for holding connectivity information for Kafka producers"""

# extra producer config
# Extra producer config.
# These options will be passed into Kafka's SerializingProducer.
# See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/index.html#serializingproducer
# and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
producer_config: dict = {}