Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Improve Kafka Test Connection #15207

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dataclasses import dataclass
from typing import Optional, Union

from confluent_kafka.admin import AdminClient
from confluent_kafka.admin import AdminClient, KafkaException
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient

Expand All @@ -35,6 +35,15 @@
logger = ingestion_logger()


class InvalidKafkaCreds(Exception):
"""
Class to indicate invalid kafka credentials exception
"""


TIMEOUT_SECONDS = 10


@dataclass
class KafkaClient:
def __init__(self, admin_client, schema_registry_client, consumer_client) -> None:
Expand All @@ -60,6 +69,14 @@ def get_connection(
if connection.saslMechanism:
connection.consumerConfig["sasl.mechanism"] = connection.saslMechanism.value

if (
connection.consumerConfig.get("security.protocol") is None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is not None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is None is intended, let's say user has already provided the security.protocol in consumer config then that would be given precedence over the new field that has been added.

and connection.securityProtocol
):
connection.consumerConfig[
"security.protocol"
] = connection.securityProtocol.value

if connection.basicAuthUserInfo:
connection.schemaRegistryConfig = connection.schemaRegistryConfig or {}
connection.schemaRegistryConfig[
Expand Down Expand Up @@ -109,9 +126,18 @@ def test_connection(
"""

def custom_executor():
_ = client.admin_client.list_topics().topics

test_fn = {"GetTopics": custom_executor}
try:
client.admin_client.list_topics(timeout=TIMEOUT_SECONDS).topics
except KafkaException as err:
raise InvalidKafkaCreds(
f"Failed to fetch topics due to: {err}. "
"Please validate credentials and check if you are using correct security protocol"
)

test_fn = {
"GetTopics": custom_executor,
"CheckSchemaRegistry": client.schema_registry_client.get_subjects,
}

test_connection_steps(
metadata=metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
"errorMessage": "Failed to fetch topics, please validate the credentials",
"shortCircuit": true,
"mandatory": true
},
{
"name": "CheckSchemaRegistry",
"description": "Validate schema registry credentials",
"errorMessage": "Failed to interact with schema registry API, please validate the schema registry credentials",
"shortCircuit": false,
"mandatory": false
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
"errorMessage": "Failed to fetch topics, please validate the credentials",
"shortCircuit": true,
"mandatory": true
},
{
"name": "CheckSchemaRegistry",
"description": "Validate schema registry credentials",
"errorMessage": "Failed to interact with schema registry API, please validate the schema registry credentials",
"shortCircuit": false,
"mandatory": false
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,21 @@
"type": "string",
"format": "password"
},
"securityProtocol": {
"title": "Security Protocol",
"description": "security.protocol consumer config property",
"type": "string",
"enum": ["PLAINTEXT","SASL_PLAINTEXT","SASL_SSL","SSL"],
"default": "PLAINTEXT"
},
"saslMechanism": {
"title": "SASL Mechanism",
"description": "sasl.mechanism Consumer Config property",
"$ref": "saslMechanismType.json",
"default": "PLAIN"
},
"basicAuthUserInfo": {
"title": "Basic Auth User Info",
"title": "Schema Registry Basic Auth User Info",
"description": "basic.auth.user.info schema registry config property, Client HTTP credentials in the form of username:password.",
"type": "string",
"format": "password"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,21 @@
"type": "string",
"format": "password"
},
"securityProtocol": {
"title": "Security Protocol",
"description": "security.protocol consumer config property",
"type": "string",
"enum": ["PLAINTEXT","SASL_PLAINTEXT","SASL_SSL","SSL"],
"default": "PLAINTEXT"
},
"saslMechanism": {
"title": "SASL Mechanism",
"description": "sasl.mechanism Consumer Config property",
"$ref": "saslMechanismType.json",
"default": "PLAIN"
},
"basicAuthUserInfo": {
"title": "Basic Auth User Info",
"title": "Schema Registry Basic Auth User Info",
"description": "basic.auth.user.info schema registry config property, Client HTTP credentials in the form of username:password.",
"type": "string",
"format": "password"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ $$section
SASL password for use with the PLAIN and SASL-SCRAM mechanisms.
$$

$$section
### Security Protocol $(id="securityProtocol")

Security Protocol used in bootstrap server.

Supported:
`PLAINTEXT`: Un-authenticated, non-encrypted channel
`SASL_PLAINTEXT`: SASL authenticated, non-encrypted channel
`SASL_SSL`: SASL authenticated, SSL channel
`SSL`: SSL channel
$$

$$section
### SASL Mechanism $(id="saslMechanism")

Expand All @@ -51,7 +63,7 @@ Supported: `GSSAPI`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `OAUTHBEARER`.
$$

$$section
### Basic Auth User Info $(id="basicAuthUserInfo")
### Schema Registry Basic Auth User Info $(id="basicAuthUserInfo")

Schema Registry Client HTTP credentials in the form of `username:password`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ URL of the Schema Registry used to ingest the schemas of the topics.
**NOTE**: For now, the schema will be the last version found for the schema name `{topic-name}-value`. An [issue](https://github.com/open-metadata/OpenMetadata/issues/10399) to improve how it currently works has been opened.
$$

$$section
### Security Protocol $(id="securityProtocol")

Security Protocol used in bootstrap server.

Supported:
`PLAINTEXT`: Un-authenticated, non-encrypted channel
`SASL_PLAINTEXT`: SASL authenticated, non-encrypted channel
`SASL_SSL`: SASL authenticated, SSL channel
`SSL`: SSL channel
$$

$$section
### SASL Username $(id="saslUsername")

Expand All @@ -49,7 +61,7 @@ Supported: `GSSAPI`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `OAUTHBEARER`.
$$

$$section
### Basic Auth User Info $(id="basicAuthUserInfo")
### Schema Registry Basic Auth User Info $(id="basicAuthUserInfo")

Schema Registry Client HTTP credentials in the form of `username:password`.

Expand Down
Loading