diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py index 4a339da750eb..40855bc4b658 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py @@ -15,7 +15,7 @@ from dataclasses import dataclass from typing import Optional, Union -from confluent_kafka.admin import AdminClient +from confluent_kafka.admin import AdminClient, KafkaException from confluent_kafka.avro import AvroConsumer from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient @@ -35,6 +35,15 @@ logger = ingestion_logger() +class InvalidKafkaCreds(Exception): + """ + Class to indicate invalid kafka credentials exception + """ + + +TIMEOUT_SECONDS = 10 + + @dataclass class KafkaClient: def __init__(self, admin_client, schema_registry_client, consumer_client) -> None: @@ -60,6 +69,14 @@ def get_connection( if connection.saslMechanism: connection.consumerConfig["sasl.mechanism"] = connection.saslMechanism.value + if ( + connection.consumerConfig.get("security.protocol") is None + and connection.securityProtocol + ): + connection.consumerConfig[ + "security.protocol" + ] = connection.securityProtocol.value + if connection.basicAuthUserInfo: connection.schemaRegistryConfig = connection.schemaRegistryConfig or {} connection.schemaRegistryConfig[ @@ -109,9 +126,18 @@ def test_connection( """ def custom_executor(): - _ = client.admin_client.list_topics().topics - - test_fn = {"GetTopics": custom_executor} + try: + client.admin_client.list_topics(timeout=TIMEOUT_SECONDS).topics + except KafkaException as err: + raise InvalidKafkaCreds( + f"Failed to fetch topics due to: {err}. " + "Please validate credentials and check if you are using correct security protocol" + ) + + test_fn = { + "GetTopics": custom_executor, + "CheckSchemaRegistry": client.schema_registry_client.get_subjects, + } test_connection_steps( metadata=metadata, diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/messaging/kafka.json b/openmetadata-service/src/main/resources/json/data/testConnections/messaging/kafka.json index 9dea13670ecf..7ff469720ea7 100644 --- a/openmetadata-service/src/main/resources/json/data/testConnections/messaging/kafka.json +++ b/openmetadata-service/src/main/resources/json/data/testConnections/messaging/kafka.json @@ -9,6 +9,13 @@ "errorMessage": "Failed to fetch topics, please validate the credentials", "shortCircuit": true, "mandatory": true + }, + { + "name": "CheckSchemaRegistry", + "description": "Validate schema registry credentials", + "errorMessage": "Failed to interact with schema registry API, please validate the schema registry credentials", + "shortCircuit": false, + "mandatory": false } ] } \ No newline at end of file diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/messaging/redpanda.json b/openmetadata-service/src/main/resources/json/data/testConnections/messaging/redpanda.json index 09ccede5cdfa..e95a150e6622 100644 --- a/openmetadata-service/src/main/resources/json/data/testConnections/messaging/redpanda.json +++ b/openmetadata-service/src/main/resources/json/data/testConnections/messaging/redpanda.json @@ -9,6 +9,13 @@ "errorMessage": "Failed to fetch topics, please validate the credentials", "shortCircuit": true, "mandatory": true + }, + { + "name": "CheckSchemaRegistry", + "description": "Validate schema registry credentials", + "errorMessage": "Failed to interact with schema registry API, please validate the schema registry credentials", + "shortCircuit": false, + "mandatory": false } ] } \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json index 2ea66d1b8f5c..4f29c91b79d3 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json @@ -42,6 +42,13 @@ "type": "string", "format": "password" }, + "securityProtocol": { + "title": "Security Protocol", + "description": "security.protocol consumer config property", + "type": "string", + "enum": ["PLAINTEXT","SASL_PLAINTEXT","SASL_SSL","SSL"], + "default": "PLAINTEXT" + }, "saslMechanism": { "title": "SASL Mechanism", "description": "sasl.mechanism Consumer Config property", @@ -49,7 +56,7 @@ "default": "PLAIN" }, "basicAuthUserInfo": { - "title": "Basic Auth User Info", + "title": "Schema Registry Basic Auth User Info", "description": "basic.auth.user.info schema registry config property, Client HTTP credentials in the form of username:password.", "type": "string", "format": "password" diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/redpandaConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/redpandaConnection.json index 1f6cf10c2c8d..aff0cce84e35 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/redpandaConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/redpandaConnection.json @@ -42,6 +42,13 @@ "type": "string", "format": "password" }, + "securityProtocol": { + "title": "Security Protocol", + "description": "security.protocol consumer config property", + "type": "string", + "enum": ["PLAINTEXT","SASL_PLAINTEXT","SASL_SSL","SSL"], + "default": "PLAINTEXT" + }, "saslMechanism": { "title": "SASL Mechanism", "description": "sasl.mechanism Consumer Config property", @@ -49,7 +56,7 @@ "default": "PLAIN" }, "basicAuthUserInfo": { - "title": "Basic Auth User Info", + "title": "Schema Registry Basic Auth User Info", "description": "basic.auth.user.info schema registry config property, Client HTTP credentials in the form of username:password.", "type": "string", "format": "password" diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Messaging/Kafka.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Messaging/Kafka.md index 225eacb3d1d9..65a6fc2f486b 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Messaging/Kafka.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Messaging/Kafka.md @@ -40,6 +40,18 @@ $$section SASL password for use with the PLAIN and SASL-SCRAM mechanisms. $$ +$$section +### Security Protocol $(id="securityProtocol") + +Security Protocol used in bootstrap server. + +Supported: +`PLAINTEXT`: Un-authenticated, non-encrypted channel +`SASL_PLAINTEXT`: SASL authenticated, non-encrypted channel +`SASL_SSL`: SASL authenticated, SSL channel +`SSL`: SSL channel +$$ + $$section ### SASL Mechanism $(id="saslMechanism") @@ -51,7 +63,7 @@ Supported: `GSSAPI`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `OAUTHBEARER`. $$ $$section -### Basic Auth User Info $(id="basicAuthUserInfo") +### Schema Registry Basic Auth User Info $(id="basicAuthUserInfo") Schema Registry Client HTTP credentials in the form of `username:password`. diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Messaging/Redpanda.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Messaging/Redpanda.md index 8e176a03c5ed..47169e04a878 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Messaging/Redpanda.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Messaging/Redpanda.md @@ -26,6 +26,18 @@ URL of the Schema Registry used to ingest the schemas of the topics. **NOTE**: For now, the schema will be the last version found for the schema name `{topic-name}-value`. An [issue](https://github.com/open-metadata/OpenMetadata/issues/10399) to improve how it currently works has been opened. $$ +$$section +### Security Protocol $(id="securityProtocol") + +Security Protocol used in bootstrap server. + +Supported: +`PLAINTEXT`: Un-authenticated, non-encrypted channel +`SASL_PLAINTEXT`: SASL authenticated, non-encrypted channel +`SASL_SSL`: SASL authenticated, SSL channel +`SSL`: SSL channel +$$ + $$section ### SASL Username $(id="saslUsername") @@ -49,7 +61,7 @@ Supported: `GSSAPI`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `OAUTHBEARER`. $$ $$section -### Basic Auth User Info $(id="basicAuthUserInfo") +### Schema Registry Basic Auth User Info $(id="basicAuthUserInfo") Schema Registry Client HTTP credentials in the form of `username:password`.