diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e68a1f22..1f5ce4e35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -120,7 +120,7 @@ confluent-kafka-python v2.8.2 is based on librdkafka v2.8.0, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.8.0) for a complete list of changes, enhancements, fixes and upgrade considerations. -Note: Versioning is skipped due to breaking change in v2.8.1. +Note: Versioning is skipped due to breaking change in v2.8.1. Do not run software with v2.8.1 installed. @@ -152,7 +152,7 @@ We apologize for the inconvenience and appreciate the feedback that we have gott v2.6.2 is a feature release with the following features, fixes and enhancements: -Note: This release modifies the dependencies of the Schema Registry client. +Note: This release modifies the dependencies of the Schema Registry client. If you are using the Schema Registry client, please ensure that you install the extra dependencies using the following syntax: @@ -232,7 +232,7 @@ for a complete list of changes, enhancements, fixes and upgrade considerations. ## v2.5.0 - 2024-07-10 > [!WARNING] -This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription. +This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription. > > You won't face any problem if: > * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability). @@ -240,7 +240,7 @@ This version has introduced a regression in which an assert is triggered during > * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`. > * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there. > * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client. -> +> > Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all. v2.5.0 is a feature release with the following features, fixes and enhancements: @@ -614,4 +614,3 @@ v1.5.0 is a maintenance release with the following fixes and enhancements: confluent-kafka-python is based on librdkafka v1.5.0, see the [librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.5.0) for a complete list of changes, enhancements, fixes and upgrade considerations. - diff --git a/README.md b/README.md index 0d2742b55..2ec865616 100644 --- a/README.md +++ b/README.md @@ -276,7 +276,7 @@ pip install confluent-kafka # With Schema Registry support pip install "confluent-kafka[avro,schemaregistry]" # Avro -pip install "confluent-kafka[json,schemaregistry]" # JSON Schema +pip install "confluent-kafka[json,schemaregistry]" # JSON Schema pip install "confluent-kafka[protobuf,schemaregistry]" # Protobuf # With Data Contract rules (includes CSFLE support) diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index 48296d346..8bea7f34d 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -95,19 +95,20 @@ class ThrottleEvent(object): :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request """ - def __init__(self, broker_name, broker_id, throttle_time): + def __init__(self, broker_name: str, + broker_id: int, + throttle_time: float) -> None: self.broker_name = broker_name self.broker_id = broker_id self.throttle_time = throttle_time - def __str__(self): - return "{}/{} throttled for {} ms".format( - self.broker_name, self.broker_id, int(self.throttle_time * 1000) - ) + def __str__(self) -> str: + return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, + int(self.throttle_time * 1000)) -def _resolve_plugins(plugins): - """Resolve embedded plugins from the wheel's library directory. +def _resolve_plugins(plugins: str) -> str: + """ Resolve embedded plugins from the wheel's library directory. For internal module use only. diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 55e624c22..8c775dbd9 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Optional, Any from enum import Enum from .. import cimpl +from ..cimpl import TopicPartition class Node: @@ -35,14 +37,14 @@ class Node: The rack for this node. """ - def __init__(self, id, host, port, rack=None): + def __init__(self, id: int, host: str, port: int, rack: Optional[str] = None) -> None: self.id = id self.id_string = str(id) self.host = host self.port = port self.rack = rack - def __str__(self): + def __str__(self) -> str: return f"({self.id}) {self.host}:{self.port} {f'(Rack - {self.rack})' if self.rack else ''}" @@ -60,7 +62,7 @@ class ConsumerGroupTopicPartitions: List of topic partitions information. """ - def __init__(self, group_id, topic_partitions=None): + def __init__(self, group_id: str, topic_partitions: Optional[List[TopicPartition]] = None) -> None: self.group_id = group_id self.topic_partitions = topic_partitions @@ -89,7 +91,7 @@ class ConsumerGroupState(Enum): #: Consumer Group is empty. EMPTY = cimpl.CONSUMER_GROUP_STATE_EMPTY - def __lt__(self, other): + def __lt__(self, other) -> Any: if self.__class__ != other.__class__: return NotImplemented return self.value < other.value @@ -109,7 +111,7 @@ class ConsumerGroupType(Enum): #: Classic Type CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC - def __lt__(self, other): + def __lt__(self, other) -> Any: if self.__class__ != other.__class__: return NotImplemented return self.value < other.value @@ -126,7 +128,7 @@ class TopicCollection: List of topic names. """ - def __init__(self, topic_names): + def __init__(self, topic_names: List[str]) -> None: self.topic_names = topic_names @@ -147,7 +149,7 @@ class TopicPartitionInfo: In-Sync-Replica brokers for the partition. """ - def __init__(self, id, leader, replicas, isr): + def __init__(self, id: int, leader: Node, replicas: List[Node], isr: List[Node]) -> None: self.id = id self.leader = leader self.replicas = replicas @@ -165,7 +167,7 @@ class IsolationLevel(Enum): READ_UNCOMMITTED = cimpl.ISOLATION_LEVEL_READ_UNCOMMITTED #: Receive all the offsets. READ_COMMITTED = cimpl.ISOLATION_LEVEL_READ_COMMITTED #: Skip offsets belonging to an aborted transaction. - def __lt__(self, other): + def __lt__(self, other) -> Any: if self.__class__ != other.__class__: return NotImplemented return self.value < other.value @@ -184,7 +186,7 @@ class ElectionType(Enum): #: Unclean election UNCLEAN = cimpl.ELECTION_TYPE_UNCLEAN - def __lt__(self, other): + def __lt__(self, other) -> Any: if self.__class__ != other.__class__: return NotImplemented return self.value < other.value diff --git a/src/confluent_kafka/_types.py b/src/confluent_kafka/_types.py new file mode 100644 index 000000000..2cf8ed8ea --- /dev/null +++ b/src/confluent_kafka/_types.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Common type definitions for confluent_kafka package. + +This module provides centralized type aliases to maintain DRY principle +and ensure consistency across the package. +""" + +from typing import Any, Optional, Dict, Union, Callable, List, Tuple + +# Headers can be either dict format or list of tuples format +HeadersType = Union[ + Dict[str, Union[str, bytes, None]], + List[Tuple[str, Union[str, bytes, None]]] +] + +# Serializer/Deserializer callback types (will need SerializationContext import where used) +Serializer = Callable[[Any, Any], bytes] # (obj, SerializationContext) -> bytes +Deserializer = Callable[[Optional[bytes], Any], Any] # (Optional[bytes], SerializationContext) -> obj + +# Forward declarations for callback types that reference classes from cimpl +# These are defined here to avoid circular imports +DeliveryCallback = Callable[[Optional[Any], Any], None] # (KafkaError, Message) -> None +RebalanceCallback = Callable[[Any, List[Any]], None] # (Consumer, List[TopicPartition]) -> None diff --git a/src/confluent_kafka/_util/conversion_util.py b/src/confluent_kafka/_util/conversion_util.py index 9f28e0c43..4bbbc6c38 100644 --- a/src/confluent_kafka/_util/conversion_util.py +++ b/src/confluent_kafka/_util/conversion_util.py @@ -12,12 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Union, Type from enum import Enum class ConversionUtil: @staticmethod - def convert_to_enum(val, enum_clazz): + def convert_to_enum(val: Union[str, int, Enum], enum_clazz: Type[Enum]) -> Enum: if type(enum_clazz) is not type(Enum): raise TypeError("'enum_clazz' must be of type Enum") diff --git a/src/confluent_kafka/_util/validation_util.py b/src/confluent_kafka/_util/validation_util.py index ffe5785f2..2d094660a 100644 --- a/src/confluent_kafka/_util/validation_util.py +++ b/src/confluent_kafka/_util/validation_util.py @@ -12,38 +12,39 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any, List from ..cimpl import KafkaError try: - string_type = basestring + string_type = basestring # type: ignore[name-defined] except NameError: string_type = str class ValidationUtil: @staticmethod - def check_multiple_not_none(obj, vars_to_check): + def check_multiple_not_none(obj: Any, vars_to_check: List[str]) -> None: for param in vars_to_check: ValidationUtil.check_not_none(obj, param) @staticmethod - def check_not_none(obj, param): + def check_not_none(obj: Any, param: str) -> None: if getattr(obj, param) is None: raise ValueError("Expected %s to be not None" % (param,)) @staticmethod - def check_multiple_is_string(obj, vars_to_check): + def check_multiple_is_string(obj: Any, vars_to_check: List[str]) -> None: for param in vars_to_check: ValidationUtil.check_is_string(obj, param) @staticmethod - def check_is_string(obj, param): + def check_is_string(obj: Any, param: str) -> None: param_value = getattr(obj, param) if param_value is not None and not isinstance(param_value, string_type): raise TypeError("Expected %s to be a string" % (param,)) @staticmethod - def check_kafka_errors(errors): + def check_kafka_errors(errors: List[KafkaError]) -> None: if not isinstance(errors, list): raise TypeError("errors should be None or a list") for error in errors: @@ -51,6 +52,6 @@ def check_kafka_errors(errors): raise TypeError("Expected list of KafkaError") @staticmethod - def check_kafka_error(error): + def check_kafka_error(error: KafkaError) -> None: if not isinstance(error, KafkaError): raise TypeError("Expected error to be a KafkaError") diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 98f7c1195..360d7b1cc 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -17,6 +17,7 @@ """ import warnings import concurrent.futures +from typing import Any, Dict, List, Optional, Union, Tuple, Set # Unused imports are keeped to be accessible using this public module from ._config import (ConfigSource, # noqa: F401 @@ -88,7 +89,7 @@ try: - string_type = basestring + string_type = basestring # type: ignore[name-defined] except NameError: string_type = str @@ -115,7 +116,7 @@ class AdminClient (_AdminClientImpl): Requires broker version v0.11.0.0 or later. """ - def __init__(self, conf, **kwargs): + def __init__(self, conf: Dict[str, Union[str, int, float, bool]], **kwargs: Any) -> None: """ Create a new AdminClient using the provided configuration dictionary. @@ -129,7 +130,7 @@ def __init__(self, conf, **kwargs): super(AdminClient, self).__init__(conf, **kwargs) @staticmethod - def _make_topics_result(f, futmap): + def _make_topics_result(f: concurrent.futures.Future, futmap: Dict[str, concurrent.futures.Future]) -> None: """ Map per-topic results to per-topic futures in futmap. The result value of each (successful) future is None. @@ -153,7 +154,8 @@ def _make_topics_result(f, futmap): fut.set_exception(e) @staticmethod - def _make_resource_result(f, futmap): + def _make_resource_result(f: concurrent.futures.Future, + futmap: Dict[ConfigResource, concurrent.futures.Future]) -> None: """ Map per-resource results to per-resource futures in futmap. The result value of each (successful) future is a ConfigResource. @@ -178,11 +180,12 @@ def _make_resource_result(f, futmap): fut.set_exception(e) @staticmethod - def _make_list_consumer_groups_result(f, futmap): + def _make_list_consumer_groups_result(f: concurrent.futures.Future, futmap: Any) -> None: pass @staticmethod - def _make_consumer_groups_result(f, futmap): + def _make_consumer_groups_result(f: concurrent.futures.Future, + futmap: Dict[str, concurrent.futures.Future]) -> None: """ Map per-group results to per-group futures in futmap. """ @@ -207,7 +210,8 @@ def _make_consumer_groups_result(f, futmap): fut.set_exception(e) @staticmethod - def _make_consumer_group_offsets_result(f, futmap): + def _make_consumer_group_offsets_result(f: concurrent.futures.Future, + futmap: Dict[str, concurrent.futures.Future]) -> None: """ Map per-group results to per-group futures in futmap. The result value of each (successful) future is ConsumerGroupTopicPartitions. @@ -233,7 +237,7 @@ def _make_consumer_group_offsets_result(f, futmap): fut.set_exception(e) @staticmethod - def _make_acls_result(f, futmap): + def _make_acls_result(f: concurrent.futures.Future, futmap: Dict[Any, concurrent.futures.Future]) -> None: """ Map create ACL binding results to corresponding futures in futmap. For create_acls the result value of each (successful) future is None. @@ -259,7 +263,8 @@ def _make_acls_result(f, futmap): fut.set_exception(e) @staticmethod - def _make_futmap_result_from_list(f, futmap): + def _make_futmap_result_from_list(f: concurrent.futures.Future, + futmap: Dict[Any, concurrent.futures.Future]) -> None: try: results = f.result() @@ -281,7 +286,7 @@ def _make_futmap_result_from_list(f, futmap): fut.set_exception(e) @staticmethod - def _make_futmap_result(f, futmap): + def _make_futmap_result(f: concurrent.futures.Future, futmap: Dict[str, concurrent.futures.Future]) -> None: try: results = f.result() len_results = len(results) @@ -303,14 +308,16 @@ def _make_futmap_result(f, futmap): fut.set_exception(e) @staticmethod - def _create_future(): - f = concurrent.futures.Future() + def _create_future() -> concurrent.futures.Future: + f: concurrent.futures.Future = concurrent.futures.Future() if not f.set_running_or_notify_cancel(): raise RuntimeError("Future was cancelled prematurely") return f @staticmethod - def _make_futures(futmap_keys, class_check, make_result_fn): + def _make_futures(futmap_keys: List[Any], class_check: Optional[type], + make_result_fn: Any) -> Tuple[concurrent.futures.Future, + Dict[Any, concurrent.futures.Future]]: """ Create futures and a futuremap for the keys in futmap_keys, and create a request-level future to be bassed to the C API. @@ -332,7 +339,9 @@ def _make_futures(futmap_keys, class_check, make_result_fn): return f, futmap @staticmethod - def _make_futures_v2(futmap_keys, class_check, make_result_fn): + def _make_futures_v2(futmap_keys: Union[List[Any], Set[Any]], class_check: Optional[type], + make_result_fn: Any) -> Tuple[concurrent.futures.Future, + Dict[Any, concurrent.futures.Future]]: """ Create futures and a futuremap for the keys in futmap_keys, and create a request-level future to be bassed to the C API. @@ -352,14 +361,14 @@ def _make_futures_v2(futmap_keys, class_check, make_result_fn): return f, futmap @staticmethod - def _make_single_future_pair(): + def _make_single_future_pair() -> Tuple[concurrent.futures.Future, concurrent.futures.Future]: """ Create an pair of futures, one for internal usage and one to use externally, the external one throws a KafkaException if any of the values in the map returned by the first future is a KafkaError. """ - def single_future_result(internal_f, f): + def single_future_result(internal_f: concurrent.futures.Future, f: concurrent.futures.Future) -> None: try: results = internal_f.result() for _, value in results.items(): @@ -376,11 +385,11 @@ def single_future_result(internal_f, f): return internal_f, f @staticmethod - def _has_duplicates(items): + def _has_duplicates(items: List[Any]) -> bool: return len(set(items)) != len(items) @staticmethod - def _check_list_consumer_group_offsets_request(request): + def _check_list_consumer_group_offsets_request(request: Optional[List[_ConsumerGroupTopicPartitions]]) -> None: if request is None: raise TypeError("request cannot be None") if not isinstance(request, list): @@ -418,7 +427,7 @@ def _check_list_consumer_group_offsets_request(request): raise ValueError("Element of 'topic_partitions' must not have 'offset' value") @staticmethod - def _check_alter_consumer_group_offsets_request(request): + def _check_alter_consumer_group_offsets_request(request: Optional[List[_ConsumerGroupTopicPartitions]]) -> None: if request is None: raise TypeError("request cannot be None") if not isinstance(request, list): @@ -457,7 +466,7 @@ def _check_alter_consumer_group_offsets_request(request): "Element of 'topic_partitions' must not have negative value for 'offset' field") @staticmethod - def _check_describe_user_scram_credentials_request(users): + def _check_describe_user_scram_credentials_request(users: Optional[List[str]]) -> None: if users is None: return if not isinstance(users, list): @@ -471,7 +480,7 @@ def _check_describe_user_scram_credentials_request(users): raise ValueError("'user' cannot be empty") @staticmethod - def _check_alter_user_scram_credentials_request(alterations): + def _check_alter_user_scram_credentials_request(alterations: List[UserScramCredentialAlteration]) -> None: if not isinstance(alterations, list): raise TypeError("Expected input to be list") if len(alterations) == 0: @@ -514,7 +523,8 @@ def _check_alter_user_scram_credentials_request(alterations): "UserScramCredentialDeletion") @staticmethod - def _check_list_offsets_request(topic_partition_offsets, kwargs): + def _check_list_offsets_request(topic_partition_offsets: Dict[_TopicPartition, OffsetSpec], + kwargs: Dict[str, Any]) -> None: if not isinstance(topic_partition_offsets, dict): raise TypeError("Expected topic_partition_offsets to be " + "dict of [TopicPartitions,OffsetSpec] for list offsets request") @@ -542,7 +552,7 @@ def _check_list_offsets_request(topic_partition_offsets, kwargs): raise TypeError("isolation_level argument should be an IsolationLevel") @staticmethod - def _check_delete_records(request): + def _check_delete_records(request: List[_TopicPartition]) -> None: if not isinstance(request, list): raise TypeError(f"Expected Request to be a list, got '{type(request).__name__}' ") for req in request: @@ -553,7 +563,7 @@ def _check_delete_records(request): raise ValueError("'partition' cannot be negative") @staticmethod - def _check_elect_leaders(election_type, partitions): + def _check_elect_leaders(election_type: _ElectionType, partitions: Optional[List[_TopicPartition]]) -> None: if not isinstance(election_type, _ElectionType): raise TypeError("Expected 'election_type' to be of type 'ElectionType'") if partitions is not None: @@ -568,7 +578,9 @@ def _check_elect_leaders(election_type, partitions): raise ValueError("Elements of the 'partitions' list must not have negative value" + " for 'partition' field") - def create_topics(self, new_topics, **kwargs): + def create_topics( # type: ignore[override] + self, new_topics: List[NewTopic], **kwargs: Any + ) -> Dict[str, concurrent.futures.Future]: """ Create one or more new topics. @@ -603,7 +615,9 @@ def create_topics(self, new_topics, **kwargs): return futmap - def delete_topics(self, topics, **kwargs): + def delete_topics( # type: ignore[override] + self, topics: List[str], **kwargs: Any + ) -> Dict[str, concurrent.futures.Future]: """ Delete one or more topics. @@ -634,15 +648,17 @@ def delete_topics(self, topics, **kwargs): return futmap - def list_topics(self, *args, **kwargs): + def list_topics(self, *args: Any, **kwargs: Any) -> ClusterMetadata: return super(AdminClient, self).list_topics(*args, **kwargs) - def list_groups(self, *args, **kwargs): + def list_groups(self, *args: Any, **kwargs: Any) -> GroupMetadata: return super(AdminClient, self).list_groups(*args, **kwargs) - def create_partitions(self, new_partitions, **kwargs): + def create_partitions( # type: ignore[override] + self, new_partitions: List[NewPartitions], **kwargs: Any + ) -> Dict[str, concurrent.futures.Future]: """ Create additional partitions for the given topics. @@ -676,7 +692,9 @@ def create_partitions(self, new_partitions, **kwargs): return futmap - def describe_configs(self, resources, **kwargs): + def describe_configs( # type: ignore[override] + self, resources: List[ConfigResource], **kwargs: Any + ) -> Dict[ConfigResource, concurrent.futures.Future]: """ Get the configuration of the specified resources. @@ -708,7 +726,9 @@ def describe_configs(self, resources, **kwargs): return futmap - def alter_configs(self, resources, **kwargs): + def alter_configs( # type: ignore[override] + self, resources: List[ConfigResource], **kwargs: Any + ) -> Dict[ConfigResource, concurrent.futures.Future]: """ .. deprecated:: 2.2.0 @@ -756,7 +776,9 @@ def alter_configs(self, resources, **kwargs): return futmap - def incremental_alter_configs(self, resources, **kwargs): + def incremental_alter_configs( # type: ignore[override] + self, resources: List[ConfigResource], **kwargs: Any + ) -> Dict[ConfigResource, concurrent.futures.Future]: """ Update configuration properties for the specified resources. Updates are incremental, i.e only the values mentioned are changed @@ -789,7 +811,9 @@ def incremental_alter_configs(self, resources, **kwargs): return futmap - def create_acls(self, acls, **kwargs): + def create_acls( # type: ignore[override] + self, acls: List[AclBinding], **kwargs: Any + ) -> Dict[AclBinding, concurrent.futures.Future]: """ Create one or more ACL bindings. @@ -818,7 +842,9 @@ def create_acls(self, acls, **kwargs): return futmap - def describe_acls(self, acl_binding_filter, **kwargs): + def describe_acls( # type: ignore[override] + self, acl_binding_filter: AclBindingFilter, **kwargs: Any + ) -> concurrent.futures.Future: """ Match ACL bindings by filter. @@ -853,7 +879,9 @@ def describe_acls(self, acl_binding_filter, **kwargs): return f - def delete_acls(self, acl_binding_filters, **kwargs): + def delete_acls( # type: ignore[override] + self, acl_binding_filters: List[AclBindingFilter], **kwargs: Any + ) -> Dict[AclBindingFilter, concurrent.futures.Future]: """ Delete ACL bindings matching one or more ACL binding filters. @@ -892,7 +920,9 @@ def delete_acls(self, acl_binding_filters, **kwargs): return futmap - def list_consumer_groups(self, **kwargs): + def list_consumer_groups( # type: ignore[override] + self, **kwargs: Any + ) -> concurrent.futures.Future: """ List consumer groups. @@ -939,7 +969,9 @@ def list_consumer_groups(self, **kwargs): return f - def describe_consumer_groups(self, group_ids, **kwargs): + def describe_consumer_groups( # type: ignore[override] + self, group_ids: List[str], **kwargs: Any + ) -> Dict[str, concurrent.futures.Future]: """ Describe consumer groups. @@ -968,11 +1000,13 @@ def describe_consumer_groups(self, group_ids, **kwargs): f, futmap = AdminClient._make_futures(group_ids, None, AdminClient._make_consumer_groups_result) - super(AdminClient, self).describe_consumer_groups(group_ids, f, **kwargs) + super(AdminClient, self).describe_consumer_groups(group_ids, f, **kwargs) # type: ignore[arg-type] return futmap - def describe_topics(self, topics, **kwargs): + def describe_topics( # type: ignore[override] + self, topics: _TopicCollection, **kwargs: Any + ) -> Dict[str, concurrent.futures.Future]: """ Describe topics. @@ -1003,11 +1037,13 @@ def describe_topics(self, topics, **kwargs): f, futmap = AdminClient._make_futures_v2(topic_names, None, AdminClient._make_futmap_result_from_list) - super(AdminClient, self).describe_topics(topic_names, f, **kwargs) + super(AdminClient, self).describe_topics(topic_names, f, **kwargs) # type: ignore[arg-type] return futmap - def describe_cluster(self, **kwargs): + def describe_cluster( # type: ignore[override] + self, **kwargs: Any + ) -> concurrent.futures.Future: """ Describe cluster. @@ -1031,7 +1067,9 @@ def describe_cluster(self, **kwargs): return f - def delete_consumer_groups(self, group_ids, **kwargs): + def delete_consumer_groups( # type: ignore[override] + self, group_ids: List[str], **kwargs: Any + ) -> Dict[str, concurrent.futures.Future]: """ Delete the given consumer groups. @@ -1055,13 +1093,17 @@ def delete_consumer_groups(self, group_ids, **kwargs): if len(group_ids) == 0: raise ValueError("Expected at least one group to be deleted") - f, futmap = AdminClient._make_futures(group_ids, string_type, AdminClient._make_consumer_groups_result) + f, futmap = AdminClient._make_futures(group_ids, string_type, + AdminClient._make_consumer_groups_result) super(AdminClient, self).delete_consumer_groups(group_ids, f, **kwargs) return futmap - def list_consumer_group_offsets(self, list_consumer_group_offsets_request, **kwargs): + def list_consumer_group_offsets( # type: ignore[override] + self, list_consumer_group_offsets_request: List[_ConsumerGroupTopicPartitions], + **kwargs: Any + ) -> Dict[str, concurrent.futures.Future]: """ List offset information for the consumer group and (optional) topic partition provided in the request. @@ -1089,15 +1131,19 @@ def list_consumer_group_offsets(self, list_consumer_group_offsets_request, **kwa AdminClient._check_list_consumer_group_offsets_request(list_consumer_group_offsets_request) - f, futmap = AdminClient._make_futures([request.group_id for request in list_consumer_group_offsets_request], - string_type, - AdminClient._make_consumer_group_offsets_result) + f, futmap = AdminClient._make_futures( + [request.group_id for request in list_consumer_group_offsets_request], + string_type, + AdminClient._make_consumer_group_offsets_result) super(AdminClient, self).list_consumer_group_offsets(list_consumer_group_offsets_request, f, **kwargs) return futmap - def alter_consumer_group_offsets(self, alter_consumer_group_offsets_request, **kwargs): + def alter_consumer_group_offsets( # type: ignore[override] + self, alter_consumer_group_offsets_request: List[_ConsumerGroupTopicPartitions], + **kwargs: Any + ) -> Dict[str, concurrent.futures.Future]: """ Alter offset for the consumer group and topic partition provided in the request. @@ -1130,7 +1176,7 @@ def alter_consumer_group_offsets(self, alter_consumer_group_offsets_request, **k return futmap - def set_sasl_credentials(self, username, password): + def set_sasl_credentials(self, username: str, password: str) -> None: """ Sets the SASL credentials used for this client. These credentials will overwrite the old ones, and will be used the @@ -1149,7 +1195,9 @@ def set_sasl_credentials(self, username, password): """ super(AdminClient, self).set_sasl_credentials(username, password) - def describe_user_scram_credentials(self, users=None, **kwargs): + def describe_user_scram_credentials( # type: ignore[override] + self, users: Optional[List[str]] = None, **kwargs: Any + ) -> Union[concurrent.futures.Future, Dict[str, concurrent.futures.Future]]: """ Describe user SASL/SCRAM credentials. @@ -1180,12 +1228,14 @@ def describe_user_scram_credentials(self, users=None, **kwargs): if users is None: internal_f, ret_fut = AdminClient._make_single_future_pair() else: - internal_f, ret_fut = AdminClient._make_futures_v2(users, None, - AdminClient._make_futmap_result) + internal_f, ret_fut = AdminClient._make_futures_v2( # type: ignore[assignment] + users, None, AdminClient._make_futmap_result) super(AdminClient, self).describe_user_scram_credentials(users, internal_f, **kwargs) return ret_fut - def alter_user_scram_credentials(self, alterations, **kwargs): + def alter_user_scram_credentials( # type: ignore[override] + self, alterations: List[UserScramCredentialAlteration], **kwargs: Any + ) -> Dict[str, concurrent.futures.Future]: """ Alter user SASL/SCRAM credentials. @@ -1207,13 +1257,16 @@ def alter_user_scram_credentials(self, alterations, **kwargs): """ AdminClient._check_alter_user_scram_credentials_request(alterations) - f, futmap = AdminClient._make_futures_v2(set([alteration.user for alteration in alterations]), None, - AdminClient._make_futmap_result) + f, futmap = AdminClient._make_futures_v2( + set([alteration.user for alteration in alterations]), None, + AdminClient._make_futmap_result) super(AdminClient, self).alter_user_scram_credentials(alterations, f, **kwargs) return futmap - def list_offsets(self, topic_partition_offsets, **kwargs): + def list_offsets( # type: ignore[override] + self, topic_partition_offsets: Dict[_TopicPartition, OffsetSpec], **kwargs: Any + ) -> Dict[_TopicPartition, concurrent.futures.Future]: """ Enables to find the beginning offset, end offset as well as the offset matching a timestamp @@ -1247,14 +1300,16 @@ def list_offsets(self, topic_partition_offsets, **kwargs): int(offset_spec._value)) for topic_partition, offset_spec in topic_partition_offsets.items()] - f, futmap = AdminClient._make_futures_v2(topic_partition_offsets_list, - _TopicPartition, - AdminClient._make_futmap_result) + f, futmap = AdminClient._make_futures_v2( + topic_partition_offsets_list, _TopicPartition, + AdminClient._make_futmap_result) super(AdminClient, self).list_offsets(topic_partition_offsets_list, f, **kwargs) return futmap - def delete_records(self, topic_partition_offsets, **kwargs): + def delete_records( # type: ignore[override] + self, topic_partition_offsets: List[_TopicPartition], **kwargs: Any + ) -> Dict[_TopicPartition, concurrent.futures.Future]: """ Deletes all the records before the specified offsets (not including), in the specified topics and partitions. @@ -1291,7 +1346,10 @@ def delete_records(self, topic_partition_offsets, **kwargs): super(AdminClient, self).delete_records(topic_partition_offsets, f, **kwargs) return futmap - def elect_leaders(self, election_type, partitions=None, **kwargs): + def elect_leaders( # type: ignore[override] + self, election_type: _ElectionType, partitions: Optional[List[_TopicPartition]] = None, + **kwargs: Any + ) -> concurrent.futures.Future: """ Perform Preferred or Unclean leader election for all the specified partitions or all partitions in the cluster. diff --git a/src/confluent_kafka/admin/_acl.py b/src/confluent_kafka/admin/_acl.py index 3512a74ca..d318c97ec 100644 --- a/src/confluent_kafka/admin/_acl.py +++ b/src/confluent_kafka/admin/_acl.py @@ -12,14 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any, List, Dict, Union, Tuple from enum import Enum import functools + from .. import cimpl as _cimpl from ._resource import ResourceType, ResourcePatternType from .._util import ValidationUtil, ConversionUtil try: - string_type = basestring + string_type = basestring # type: ignore[name-defined] except NameError: string_type = str @@ -42,7 +44,7 @@ class AclOperation(Enum): ALTER_CONFIGS = _cimpl.ACL_OPERATION_ALTER_CONFIGS #: ALTER_CONFIGS operation IDEMPOTENT_WRITE = _cimpl.ACL_OPERATION_IDEMPOTENT_WRITE #: IDEMPOTENT_WRITE operation - def __lt__(self, other): + def __lt__(self, other: 'AclOperation') -> Any: if self.__class__ != other.__class__: return NotImplemented return self.value < other.value @@ -57,7 +59,7 @@ class AclPermissionType(Enum): DENY = _cimpl.ACL_PERMISSION_TYPE_DENY #: Disallows access ALLOW = _cimpl.ACL_PERMISSION_TYPE_ALLOW #: Grants access - def __lt__(self, other): + def __lt__(self, other: 'AclPermissionType') -> Any: if self.__class__ != other.__class__: return NotImplemented return self.value < other.value @@ -89,9 +91,10 @@ class AclBinding(object): The permission type for the specified operation. """ - def __init__(self, restype, name, - resource_pattern_type, principal, host, - operation, permission_type): + def __init__(self, restype: Union[ResourceType, str, int], name: str, + resource_pattern_type: Union[ResourcePatternType, str, int], principal: str, host: str, + operation: Union[AclOperation, str, int], + permission_type: Union[AclPermissionType, str, int]) -> None: self.restype = restype self.name = name self.resource_pattern_type = resource_pattern_type @@ -101,34 +104,34 @@ def __init__(self, restype, name, self.permission_type = permission_type self._convert_args() # for the C code - self.restype_int = int(self.restype.value) - self.resource_pattern_type_int = int(self.resource_pattern_type.value) - self.operation_int = int(self.operation.value) - self.permission_type_int = int(self.permission_type.value) + self.restype_int = int(self.restype.value) # type: ignore[union-attr] + self.resource_pattern_type_int = int(self.resource_pattern_type.value) # type: ignore[union-attr] + self.operation_int = int(self.operation.value) # type: ignore[union-attr] + self.permission_type_int = int(self.permission_type.value) # type: ignore[union-attr] - def _convert_enums(self): - self.restype = ConversionUtil.convert_to_enum(self.restype, ResourceType) + def _convert_enums(self) -> None: + self.restype = ConversionUtil.convert_to_enum(self.restype, ResourceType) # type: ignore[assignment] self.resource_pattern_type = ConversionUtil.convert_to_enum( - self.resource_pattern_type, ResourcePatternType) + self.resource_pattern_type, ResourcePatternType) # type: ignore[assignment] self.operation = ConversionUtil.convert_to_enum( - self.operation, AclOperation) + self.operation, AclOperation) # type: ignore[assignment] self.permission_type = ConversionUtil.convert_to_enum( - self.permission_type, AclPermissionType) + self.permission_type, AclPermissionType) # type: ignore[assignment] - def _check_forbidden_enums(self, forbidden_enums): + def _check_forbidden_enums(self, forbidden_enums: Dict[str, List[Enum]]) -> None: for k, v in forbidden_enums.items(): enum_value = getattr(self, k) if enum_value in v: raise ValueError("Cannot use enum %s, value %s in this class" % (k, enum_value.name)) - def _not_none_args(self): + def _not_none_args(self) -> List[str]: return ["restype", "name", "resource_pattern_type", "principal", "host", "operation", "permission_type"] - def _string_args(self): + def _string_args(self) -> List[str]: return ["name", "principal", "host"] - def _forbidden_enums(self): + def _forbidden_enums(self) -> Dict[str, List[Enum]]: return { "restype": [ResourceType.ANY], "resource_pattern_type": [ResourcePatternType.ANY, @@ -137,7 +140,7 @@ def _forbidden_enums(self): "permission_type": [AclPermissionType.ANY] } - def _convert_args(self): + def _convert_args(self) -> None: not_none_args = self._not_none_args() string_args = self._string_args() forbidden_enums = self._forbidden_enums() @@ -146,25 +149,25 @@ def _convert_args(self): self._convert_enums() self._check_forbidden_enums(forbidden_enums) - def __repr__(self): + def __repr__(self) -> str: type_name = type(self).__name__ return "%s(%s,%s,%s,%s,%s,%s,%s)" % ((type_name,) + self._to_tuple()) - def _to_tuple(self): - return (self.restype, self.name, self.resource_pattern_type, + def _to_tuple(self) -> Tuple[ResourceType, str, ResourcePatternType, str, str, AclOperation, AclPermissionType]: + return (self.restype, self.name, self.resource_pattern_type, # type: ignore[return-value] self.principal, self.host, self.operation, self.permission_type) - def __hash__(self): + def __hash__(self) -> int: return hash(self._to_tuple()) - def __lt__(self, other): + def __lt__(self, other: 'AclBinding') -> Any: if self.__class__ != other.__class__: return NotImplemented return self._to_tuple() < other._to_tuple() - def __eq__(self, other): - if self.__class__ != other.__class__: + def __eq__(self, other: object) -> Any: + if not isinstance(other, AclBinding): return NotImplemented return self._to_tuple() == other._to_tuple() @@ -194,11 +197,11 @@ class AclBindingFilter(AclBinding): The permission type to match or :attr:`AclPermissionType.ANY` to match any value. """ - def _not_none_args(self): + def _not_none_args(self) -> List[str]: return ["restype", "resource_pattern_type", "operation", "permission_type"] - def _forbidden_enums(self): + def _forbidden_enums(self) -> Dict[str, List[Enum]]: return { "restype": [ResourceType.UNKNOWN], "resource_pattern_type": [ResourcePatternType.UNKNOWN], diff --git a/src/confluent_kafka/admin/_cluster.py b/src/confluent_kafka/admin/_cluster.py index b19dad362..0e83c4cc3 100644 --- a/src/confluent_kafka/admin/_cluster.py +++ b/src/confluent_kafka/admin/_cluster.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Optional, Union from .._util import ConversionUtil +from .._model import Node from ._acl import AclOperation @@ -34,7 +36,8 @@ class DescribeClusterResult: AclOperations allowed for the cluster. """ - def __init__(self, controller, nodes, cluster_id=None, authorized_operations=None): + def __init__(self, controller: Node, nodes: List[Node], cluster_id: Optional[str] = None, + authorized_operations: Optional[List[Union[str, int, AclOperation]]] = None) -> None: self.cluster_id = cluster_id self.controller = controller self.nodes = nodes diff --git a/src/confluent_kafka/admin/_config.py b/src/confluent_kafka/admin/_config.py index 5f4f4680e..c303f7bbf 100644 --- a/src/confluent_kafka/admin/_config.py +++ b/src/confluent_kafka/admin/_config.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, List, Optional, Union, Any from enum import Enum import functools + from .. import cimpl as _cimpl from ._resource import ResourceType @@ -67,14 +69,14 @@ class ConfigEntry(object): This class is typically not user instantiated. """ - def __init__(self, name, value, - source=ConfigSource.UNKNOWN_CONFIG, - is_read_only=False, - is_default=False, - is_sensitive=False, - is_synonym=False, - synonyms=[], - incremental_operation=None): + def __init__(self, name: str, value: Optional[str], + source: ConfigSource = ConfigSource.UNKNOWN_CONFIG, + is_read_only: bool = False, + is_default: bool = False, + is_sensitive: bool = False, + is_synonym: bool = False, + synonyms: Dict[str, 'ConfigEntry'] = {}, + incremental_operation: Optional[AlterConfigOpType] = None) -> None: """ This class is typically not user instantiated. """ @@ -104,10 +106,10 @@ def __init__(self, name, value, self.incremental_operation = incremental_operation """The incremental operation (AlterConfigOpType) to use in incremental_alter_configs.""" - def __repr__(self): + def __repr__(self) -> str: return "ConfigEntry(%s=\"%s\")" % (self.name, self.value) - def __str__(self): + def __str__(self) -> str: return "%s=\"%s\"" % (self.name, self.value) @@ -130,9 +132,11 @@ class ConfigResource(object): Type = ResourceType - def __init__(self, restype, name, - set_config=None, described_configs=None, error=None, - incremental_configs=None): + def __init__(self, restype: Union[ResourceType, str, int], name: str, + set_config: Optional[Dict[str, str]] = None, + described_configs: Optional[Dict[str, ConfigEntry]] = None, + error: Optional[Any] = None, + incremental_configs: Optional[List[ConfigEntry]] = None) -> None: """ :param ConfigResource.Type restype: Resource type. :param str name: The resource name, which depends on restype. @@ -172,31 +176,33 @@ def __init__(self, restype, name, self.configs = described_configs self.error = error - def __repr__(self): + def __repr__(self) -> str: if self.error is not None: return "ConfigResource(%s,%s,%r)" % (self.restype, self.name, self.error) else: return "ConfigResource(%s,%s)" % (self.restype, self.name) - def __hash__(self): + def __hash__(self) -> int: return hash((self.restype, self.name)) - def __lt__(self, other): + def __lt__(self, other: 'ConfigResource') -> bool: if self.restype < other.restype: return True return self.name.__lt__(other.name) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: + if not isinstance(other, ConfigResource): + return NotImplemented return self.restype == other.restype and self.name == other.name - def __len__(self): + def __len__(self) -> int: """ :rtype: int :returns: number of configuration entries/operations """ return len(self.set_config_dict) - def set_config(self, name, value, overwrite=True): + def set_config(self, name: str, value: str, overwrite: bool = True) -> None: """ Set/overwrite a configuration value. @@ -214,7 +220,7 @@ def set_config(self, name, value, overwrite=True): return self.set_config_dict[name] = value - def add_incremental_config(self, config_entry): + def add_incremental_config(self, config_entry: ConfigEntry) -> None: """ Add a ConfigEntry for incremental alter configs, using the configured incremental_operation. diff --git a/src/confluent_kafka/admin/_group.py b/src/confluent_kafka/admin/_group.py index 7823e976a..3ff3fa8be 100644 --- a/src/confluent_kafka/admin/_group.py +++ b/src/confluent_kafka/admin/_group.py @@ -13,8 +13,12 @@ # limitations under the License. +from typing import List, Optional, Union + +from confluent_kafka.cimpl import TopicPartition + from .._util import ConversionUtil -from .._model import ConsumerGroupState, ConsumerGroupType +from .._model import ConsumerGroupState, ConsumerGroupType, Node from ._acl import AclOperation @@ -35,7 +39,9 @@ class ConsumerGroupListing: Type of the consumer group. """ - def __init__(self, group_id, is_simple_consumer_group, state=None, type=None): + def __init__(self, group_id: str, is_simple_consumer_group: bool, + state: Optional[Union[ConsumerGroupState, str, int]] = None, + type: Optional[Union[ConsumerGroupType, str, int]] = None) -> None: self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group if state is not None: @@ -57,7 +63,8 @@ class ListConsumerGroupsResult: List of errors encountered during the operation, if any. """ - def __init__(self, valid=None, errors=None): + def __init__(self, valid: Optional[List[ConsumerGroupListing]] = None, + errors: Optional[List[Exception]] = None) -> None: self.valid = valid self.errors = errors @@ -73,10 +80,8 @@ class MemberAssignment: The topic partitions assigned to a group member. """ - def __init__(self, topic_partitions=[]): - self.topic_partitions = topic_partitions - if self.topic_partitions is None: - self.topic_partitions = [] + def __init__(self, topic_partitions: List[TopicPartition] = []) -> None: + self.topic_partitions = topic_partitions or [] class MemberDescription: @@ -100,7 +105,9 @@ class MemberDescription: The instance id of the group member. """ - def __init__(self, member_id, client_id, host, assignment, group_instance_id=None, target_assignment=None): + def __init__(self, member_id: str, client_id: str, host: str, assignment: MemberAssignment, + group_instance_id: Optional[str] = None, + target_assignment: Optional[MemberAssignment] = None) -> None: self.member_id = member_id self.client_id = client_id self.host = host @@ -134,8 +141,11 @@ class ConsumerGroupDescription: AclOperations allowed for the consumer group. """ - def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state, - coordinator, authorized_operations=None, type=ConsumerGroupType.UNKNOWN): + def __init__(self, group_id: str, is_simple_consumer_group: bool, members: List[MemberDescription], + partition_assignor: str, state: Optional[Union[ConsumerGroupState, str, int]], + coordinator: Node, + authorized_operations: Optional[List[Union[AclOperation, str, int]]] = None, + type: Union[ConsumerGroupType, str, int] = ConsumerGroupType.UNKNOWN) -> None: self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group self.members = members diff --git a/src/confluent_kafka/admin/_listoffsets.py b/src/confluent_kafka/admin/_listoffsets.py index 7b764a462..d8def288c 100644 --- a/src/confluent_kafka/admin/_listoffsets.py +++ b/src/confluent_kafka/admin/_listoffsets.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, Any, Optional from abc import ABC, abstractmethod + from .. import cimpl @@ -21,15 +23,18 @@ class OffsetSpec(ABC): Used in `AdminClient.list_offsets` to specify the desired offsets of the partition being queried. """ - _values = {} + _values: Dict[int, 'OffsetSpec'] = {} + _max_timestamp: Optional['MaxTimestampSpec'] = None + _earliest: Optional['EarliestSpec'] = None + _latest: Optional['LatestSpec'] = None @property @abstractmethod - def _value(self): + def _value(self) -> int: pass @classmethod - def _fill_values(cls): + def _fill_values(cls) -> None: cls._max_timestamp = MaxTimestampSpec() cls._earliest = EarliestSpec() cls._latest = LatestSpec() @@ -52,10 +57,10 @@ def max_timestamp(cls): return cls._max_timestamp @classmethod - def for_timestamp(cls, timestamp): + def for_timestamp(cls, timestamp: int): return TimestampSpec(timestamp) - def __new__(cls, index): + def __new__(cls, index: int): # Trying to instantiate returns one of the subclasses. # Subclasses can be instantiated but aren't accessible externally. if index < 0: @@ -63,7 +68,7 @@ def __new__(cls, index): else: return cls.for_timestamp(index) - def __lt__(self, other): + def __lt__(self, other) -> Any: if not isinstance(other, OffsetSpec): return NotImplemented return self._value < other._value @@ -82,13 +87,13 @@ class TimestampSpec(OffsetSpec): """ @property - def _value(self): + def _value(self) -> int: return self.timestamp - def __new__(cls, _): + def __new__(cls, _: int): return object.__new__(cls) - def __init__(self, timestamp): + def __init__(self, timestamp: int) -> None: self.timestamp = timestamp @@ -103,7 +108,7 @@ def __new__(cls): return object.__new__(cls) @property - def _value(self): + def _value(self) -> int: return cimpl.OFFSET_SPEC_MAX_TIMESTAMP @@ -116,7 +121,7 @@ def __new__(cls): return object.__new__(cls) @property - def _value(self): + def _value(self) -> int: return cimpl.OFFSET_SPEC_LATEST @@ -129,7 +134,7 @@ def __new__(cls): return object.__new__(cls) @property - def _value(self): + def _value(self) -> int: return cimpl.OFFSET_SPEC_EARLIEST @@ -151,9 +156,7 @@ class ListOffsetsResultInfo: leader_epoch: int The leader epoch corresponding to the offset (optional). """ - def __init__(self, offset, timestamp, leader_epoch): + def __init__(self, offset: int, timestamp: int, leader_epoch: int) -> None: self.offset = offset self.timestamp = timestamp - self.leader_epoch = leader_epoch - if self.leader_epoch < 0: - self.leader_epoch = None + self.leader_epoch: Optional[int] = leader_epoch if leader_epoch >= 0 else None diff --git a/src/confluent_kafka/admin/_metadata.py b/src/confluent_kafka/admin/_metadata.py index 201e4534b..e56b0637b 100644 --- a/src/confluent_kafka/admin/_metadata.py +++ b/src/confluent_kafka/admin/_metadata.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -class ClusterMetadata (object): +from typing import Dict, List + + +class ClusterMetadata(object): """ Provides information about the Kafka cluster, brokers, and topics. Returned by list_topics(). @@ -20,35 +23,35 @@ class ClusterMetadata (object): This class is typically not user instantiated. """ - def __init__(self): + def __init__(self) -> None: self.cluster_id = None """Cluster id string, if supported by the broker, else None.""" self.controller_id = -1 """Current controller broker id, or -1.""" - self.brokers = {} + self.brokers: Dict[int, 'BrokerMetadata'] = {} """Map of brokers indexed by the broker id (int). Value is a BrokerMetadata object.""" - self.topics = {} + self.topics: Dict[str, 'TopicMetadata'] = {} """Map of topics indexed by the topic name. Value is a TopicMetadata object.""" self.orig_broker_id = -1 """The broker this metadata originated from.""" self.orig_broker_name = None """The broker name/address this metadata originated from.""" - def __repr__(self): + def __repr__(self) -> str: return "ClusterMetadata({})".format(self.cluster_id) - def __str__(self): + def __str__(self) -> str: return str(self.cluster_id) -class BrokerMetadata (object): +class BrokerMetadata(object): """ Provides information about a Kafka broker. This class is typically not user instantiated. """ - def __init__(self): + def __init__(self) -> None: self.id = -1 """Broker id""" self.host = None @@ -56,14 +59,14 @@ def __init__(self): self.port = -1 """Broker port""" - def __repr__(self): + def __repr__(self) -> str: return "BrokerMetadata({}, {}:{})".format(self.id, self.host, self.port) - def __str__(self): + def __str__(self) -> str: return "{}:{}/{}".format(self.host, self.port, self.id) -class TopicMetadata (object): +class TopicMetadata(object): """ Provides information about a Kafka topic. @@ -73,25 +76,25 @@ class TopicMetadata (object): # Sphinx issue where it tries to reference the same instance variable # on other classes which raises a warning/error. - def __init__(self): + def __init__(self) -> None: self.topic = None """Topic name""" - self.partitions = {} + self.partitions: Dict[int, 'PartitionMetadata'] = {} """Map of partitions indexed by partition id. Value is a PartitionMetadata object.""" self.error = None """Topic error, or None. Value is a KafkaError object.""" - def __repr__(self): + def __repr__(self) -> str: if self.error is not None: return "TopicMetadata({}, {} partitions, {})".format(self.topic, len(self.partitions), self.error) else: return "TopicMetadata({}, {} partitions)".format(self.topic, len(self.partitions)) - def __str__(self): - return self.topic + def __str__(self) -> str: + return str(self.topic) -class PartitionMetadata (object): +class PartitionMetadata(object): """ Provides information about a Kafka partition. @@ -103,25 +106,25 @@ class PartitionMetadata (object): of a broker id in the brokers dict. """ - def __init__(self): + def __init__(self) -> None: self.id = -1 """Partition id.""" self.leader = -1 """Current leader broker for this partition, or -1.""" - self.replicas = [] + self.replicas: List[int] = [] """List of replica broker ids for this partition.""" - self.isrs = [] + self.isrs: List[int] = [] """List of in-sync-replica broker ids for this partition.""" self.error = None """Partition error, or None. Value is a KafkaError object.""" - def __repr__(self): + def __repr__(self) -> str: if self.error is not None: return "PartitionMetadata({}, {})".format(self.id, self.error) else: return "PartitionMetadata({})".format(self.id) - def __str__(self): + def __str__(self) -> str: return "{}".format(self.id) @@ -134,7 +137,7 @@ class GroupMember(object): This class is typically not user instantiated. """ # noqa: E501 - def __init__(self,): + def __init__(self) -> None: self.id = None """Member id (generated by broker).""" self.client_id = None @@ -153,7 +156,7 @@ class GroupMetadata(object): This class is typically not user instantiated. """ - def __init__(self): + def __init__(self) -> None: self.broker = None """Originating broker metadata.""" self.id = None @@ -166,14 +169,14 @@ def __init__(self): """Group protocol type.""" self.protocol = None """Group protocol.""" - self.members = [] + self.members: List[GroupMember] = [] """Group members.""" - def __repr__(self): + def __repr__(self) -> str: if self.error is not None: return "GroupMetadata({}, {})".format(self.id, self.error) else: return "GroupMetadata({})".format(self.id) - def __str__(self): - return self.id + def __str__(self) -> str: + return str(self.id) diff --git a/src/confluent_kafka/admin/_records.py b/src/confluent_kafka/admin/_records.py index 4638924e4..85aca0cd8 100644 --- a/src/confluent_kafka/admin/_records.py +++ b/src/confluent_kafka/admin/_records.py @@ -23,5 +23,5 @@ class DeletedRecords: low_watermark: int The "low watermark" for the topic partition on which the deletion was executed. """ - def __init__(self, low_watermark): + def __init__(self, low_watermark: int) -> None: self.low_watermark = low_watermark diff --git a/src/confluent_kafka/admin/_resource.py b/src/confluent_kafka/admin/_resource.py index 896ccddb4..8fa6dd19b 100644 --- a/src/confluent_kafka/admin/_resource.py +++ b/src/confluent_kafka/admin/_resource.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any from enum import Enum from .. import cimpl as _cimpl @@ -27,7 +28,7 @@ class ResourceType(Enum): BROKER = _cimpl.RESOURCE_BROKER #: Broker resource. Resource name is broker id. TRANSACTIONAL_ID = _cimpl.RESOURCE_TRANSACTIONAL_ID #: Transactional ID resource. - def __lt__(self, other): + def __lt__(self, other: 'ResourceType') -> Any: if self.__class__ != other.__class__: return NotImplemented return self.value < other.value @@ -43,7 +44,7 @@ class ResourcePatternType(Enum): LITERAL = _cimpl.RESOURCE_PATTERN_LITERAL #: Literal: A literal resource name PREFIXED = _cimpl.RESOURCE_PATTERN_PREFIXED #: Prefixed: A prefixed resource name - def __lt__(self, other): + def __lt__(self, other: 'ResourcePatternType') -> Any: if self.__class__ != other.__class__: return NotImplemented return self.value < other.value diff --git a/src/confluent_kafka/admin/_scram.py b/src/confluent_kafka/admin/_scram.py index c20f55bbc..76c999dbc 100644 --- a/src/confluent_kafka/admin/_scram.py +++ b/src/confluent_kafka/admin/_scram.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .. import cimpl - +from typing import List, Optional, Any from enum import Enum +from .. import cimpl + class ScramMechanism(Enum): """ @@ -25,7 +26,7 @@ class ScramMechanism(Enum): SCRAM_SHA_256 = cimpl.SCRAM_MECHANISM_SHA_256 #: SCRAM-SHA-256 mechanism SCRAM_SHA_512 = cimpl.SCRAM_MECHANISM_SHA_512 #: SCRAM-SHA-512 mechanism - def __lt__(self, other): + def __lt__(self, other) -> Any: if self.__class__ != other.__class__: return NotImplemented return self.value < other.value @@ -43,7 +44,7 @@ class ScramCredentialInfo: iterations: int Positive number of iterations used when creating the credential. """ - def __init__(self, mechanism, iterations): + def __init__(self, mechanism: ScramMechanism, iterations: int) -> None: self.mechanism = mechanism self.iterations = iterations @@ -60,7 +61,9 @@ class UserScramCredentialsDescription: scram_credential_infos: list(ScramCredentialInfo) SASL/SCRAM credential representations for the user. """ - def __init__(self, user, scram_credential_infos): + def __init__(self, + user: str, + scram_credential_infos: List[ScramCredentialInfo]) -> None: self.user = user self.scram_credential_infos = scram_credential_infos @@ -74,7 +77,7 @@ class UserScramCredentialAlteration: user: str The user name. """ - def __init__(self, user: str): + def __init__(self, user: str) -> None: self.user = user @@ -93,7 +96,11 @@ class UserScramCredentialUpsertion(UserScramCredentialAlteration): salt: bytes Salt to use. Will be generated randomly if None. (optional) """ - def __init__(self, user, scram_credential_info, password, salt=None): + def __init__(self, + user: str, + scram_credential_info: ScramCredentialInfo, + password: bytes, + salt: Optional[bytes] = None) -> None: super(UserScramCredentialUpsertion, self).__init__(user) self.scram_credential_info = scram_credential_info self.password = password @@ -111,6 +118,6 @@ class UserScramCredentialDeletion(UserScramCredentialAlteration): mechanism: ScramMechanism SASL/SCRAM mechanism. """ - def __init__(self, user, mechanism): + def __init__(self, user: str, mechanism: ScramMechanism) -> None: super(UserScramCredentialDeletion, self).__init__(user) self.mechanism = mechanism diff --git a/src/confluent_kafka/admin/_topic.py b/src/confluent_kafka/admin/_topic.py index f686b4a06..a804631be 100644 --- a/src/confluent_kafka/admin/_topic.py +++ b/src/confluent_kafka/admin/_topic.py @@ -12,8 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Optional, Union from .._util import ConversionUtil +from .._model import TopicPartitionInfo +from ..cimpl import Uuid from ._acl import AclOperation @@ -36,7 +39,9 @@ class TopicDescription: AclOperations allowed for the topic. """ - def __init__(self, name, topic_id, is_internal, partitions, authorized_operations=None): + def __init__(self, name: str, topic_id: Uuid, is_internal: bool, + partitions: List[TopicPartitionInfo], + authorized_operations: Optional[List[Union[str, int, AclOperation]]] = None) -> None: self.name = name self.topic_id = topic_id self.is_internal = is_internal diff --git a/src/confluent_kafka/cimpl.pyi b/src/confluent_kafka/cimpl.pyi new file mode 100644 index 000000000..9faa97f01 --- /dev/null +++ b/src/confluent_kafka/cimpl.pyi @@ -0,0 +1,488 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Type stubs for confluent_kafka.cimpl + +This combines automatic stubgen output (constants, functions) with +manual class definitions based on runtime introspection and domain knowledge. + +⚠️ WARNING: MAINTENANCE REQUIRED ⚠️ +This stub file must be kept in sync with the C extension source code in src/: +- src/Admin.c (AdminClientImpl methods) +- src/Producer.c (Producer class) +- src/Consumer.c (Consumer class) +- src/AdminTypes.c (NewTopic, NewPartitions classes) +- src/confluent_kafka.c (KafkaError, Message, TopicPartition, Uuid classes) + +When modifying C extension interfaces (method signatures, parameters, defaults), +you MUST update the corresponding type definitions in this file. +Failure to do so will result in incorrect type hints and mypy errors. + +TODO: Consider migrating to Cython in the future to eliminate this dual +maintenance burden and get type hints directly from the implementation. +""" + +from typing import Any, Optional, Callable, List, Tuple, Dict, Union, overload +from typing_extensions import Self, Literal +import builtins + +from ._types import HeadersType + +# Callback types with proper class references (defined locally to avoid circular imports) +DeliveryCallback = Callable[[Optional['KafkaError'], 'Message'], None] +RebalanceCallback = Callable[['Consumer', List['TopicPartition']], None] + +# ===== CLASSES (Manual - stubgen missed these) ===== + +class KafkaError: + _KEY_DESERIALIZATION: int + _KEY_SERIALIZATION: int + _VALUE_DESERIALIZATION: int + _VALUE_SERIALIZATION: int + + def __init__(self, code: int, str: Optional[str] = None, fatal: bool = False) -> None: ... + def code(self) -> int: ... + def name(self) -> builtins.str: ... + def str(self) -> builtins.str: ... + def fatal(self) -> bool: ... + def retriable(self) -> bool: ... + def txn_requires_abort(self) -> bool: ... + def __str__(self) -> builtins.str: ... + def __bool__(self) -> bool: ... + def __hash__(self) -> int: ... + def __eq__(self, other: object) -> bool: ... + def __ne__(self, other: object) -> bool: ... + def __lt__(self, other: Union['KafkaError', int]) -> bool: ... + def __le__(self, other: Union['KafkaError', int]) -> bool: ... + def __gt__(self, other: Union['KafkaError', int]) -> bool: ... + def __ge__(self, other: Union['KafkaError', int]) -> bool: ... + +class KafkaException(Exception): + def __init__(self, *args: Any, **kwargs: Any) -> None: ... + args: Tuple[Any, ...] + +class Message: + def topic(self) -> str: ... + def partition(self) -> int: ... + def offset(self) -> int: ... + def key(self) -> Optional[bytes]: ... + def value(self) -> Optional[bytes]: ... + def headers(self) -> Optional[HeadersType]: ... + def error(self) -> Optional[KafkaError]: ... + def timestamp(self) -> Tuple[int, int]: ... # (timestamp_type, timestamp) + def latency(self) -> Optional[float]: ... + def leader_epoch(self) -> Optional[int]: ... + def set_headers(self, headers: HeadersType) -> None: ... + def set_key(self, key: bytes) -> None: ... + def set_value(self, value: bytes) -> None: ... + def __len__(self) -> int: ... + +class TopicPartition: + def __init__(self, topic: str, partition: int = -1, offset: int = -1001) -> None: ... + topic: str + partition: int + offset: int + leader_epoch: int + metadata: Optional[str] + error: Optional[KafkaError] + def __str__(self) -> str: ... + def __repr__(self) -> str: ... + def __hash__(self) -> int: ... + def __eq__(self, other: object) -> bool: ... + def __lt__(self, other: 'TopicPartition') -> bool: ... + +class Uuid: + def __init__(self, uuid_str: Optional[str] = None) -> None: ... + def __str__(self) -> str: ... + def __repr__(self) -> str: ... + def __int__(self) -> int: ... + def __hash__(self) -> int: ... + def __eq__(self, other: object) -> bool: ... + +class Producer: + def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ... + def produce( + self, + topic: str, + value: Optional[bytes] = None, + key: Optional[bytes] = None, + partition: int = -1, + callback: Optional[DeliveryCallback] = None, + on_delivery: Optional[DeliveryCallback] = None, + timestamp: int = 0, + headers: Optional[HeadersType] = None + ) -> None: ... + def produce_batch( + self, + topic: str, + messages: List[Dict[str, Any]], + partition: int = -1, + callback: Optional[DeliveryCallback] = None, + on_delivery: Optional[DeliveryCallback] = None + ) -> int: ... + def poll(self, timeout: float = -1) -> int: ... + def flush(self, timeout: float = -1) -> int: ... + def purge( + self, + in_queue: bool = True, + in_flight: bool = True, + blocking: bool = True + ) -> None: ... + def abort_transaction(self, timeout: float = -1) -> None: ... + def begin_transaction(self) -> None: ... + def commit_transaction(self, timeout: float = -1) -> None: ... + def init_transactions(self, timeout: float = -1) -> None: ... + def send_offsets_to_transaction( + self, + positions: List[TopicPartition], + group_metadata: Any, # ConsumerGroupMetadata + timeout: float = -1 + ) -> None: ... + def list_topics(self, topic: Optional[str] = None, timeout: float = -1) -> Any: ... + def set_sasl_credentials(self, username: str, password: str) -> None: ... + def __len__(self) -> int: ... + def __bool__(self) -> bool: ... + +class Consumer: + def __init__(self, config: Dict[str, Union[str, int, float, bool, None]]) -> None: ... + def subscribe( + self, + topics: List[str], + on_assign: Optional[RebalanceCallback] = None, + on_revoke: Optional[RebalanceCallback] = None, + on_lost: Optional[RebalanceCallback] = None + ) -> None: ... + def unsubscribe(self) -> None: ... + def poll(self, timeout: float = -1) -> Optional[Message]: ... + def consume(self, num_messages: int = 1, timeout: float = -1) -> List[Message]: ... + def assign(self, partitions: List[TopicPartition]) -> None: ... + def unassign(self) -> None: ... + def assignment(self) -> List[TopicPartition]: ... + @overload + def commit( + self, + message: Optional['Message'] = None, + offsets: Optional[List[TopicPartition]] = None, + asynchronous: Literal[True] = True + ) -> None: ... + @overload + def commit( + self, + message: Optional['Message'] = None, + offsets: Optional[List[TopicPartition]] = None, + asynchronous: Literal[False] = False + ) -> List[TopicPartition]: ... + def get_watermark_offsets( + self, + partition: TopicPartition, + timeout: float = -1, + cached: bool = False + ) -> Tuple[int, int]: ... + def pause(self, partitions: List[TopicPartition]) -> None: ... + def resume(self, partitions: List[TopicPartition]) -> None: ... + def seek(self, partition: TopicPartition) -> None: ... + def position(self, partitions: List[TopicPartition]) -> List[TopicPartition]: ... + def store_offsets( + self, + message: Optional['Message'] = None, + offsets: Optional[List[TopicPartition]] = None + ) -> None: ... + def close(self) -> None: ... + def list_topics(self, topic: Optional[str] = None, timeout: float = -1) -> Any: ... + def offsets_for_times( + self, + partitions: List[TopicPartition], + timeout: float = -1 + ) -> List[TopicPartition]: ... + def incremental_assign(self, partitions: List[TopicPartition]) -> None: ... + def incremental_unassign(self, partitions: List[TopicPartition]) -> None: ... + def consumer_group_metadata(self) -> Any: ... # ConsumerGroupMetadata + def memberid(self) -> str: ... + def set_sasl_credentials(self, username: str, password: str) -> None: ... + def __bool__(self) -> bool: ... + +class _AdminClientImpl: + def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ... + def create_topics( + self, + topics: List['NewTopic'], + future: Any, + validate_only: bool = False, + request_timeout: float = -1, + operation_timeout: float = -1 + ) -> None: ... + def delete_topics( + self, + topics: List[str], + future: Any, + request_timeout: float = -1, + operation_timeout: float = -1 + ) -> None: ... + def create_partitions( + self, + topics: List['NewPartitions'], + future: Any, + validate_only: bool = False, + request_timeout: float = -1, + operation_timeout: float = -1 + ) -> None: ... + def describe_topics( + self, + future: Any, + topic_names: List[str], + request_timeout: float = -1, + include_authorized_operations: bool = False + ) -> None: ... + def describe_cluster( + self, + future: Any, + request_timeout: float = -1, + include_authorized_operations: bool = False + ) -> None: ... + def list_topics( + self, + topic: Optional[str] = None, + timeout: float = -1 + ) -> Any: ... + def list_groups( + self, + group: Optional[str] = None, + timeout: float = -1 + ) -> Any: ... + def describe_consumer_groups( + self, + future: Any, + group_ids: List[str], + request_timeout: float = -1, + include_authorized_operations: bool = False + ) -> None: ... + def list_consumer_groups( + self, + future: Any, + states_int: Optional[List[int]] = None, + types_int: Optional[List[int]] = None, + request_timeout: float = -1 + ) -> None: ... + def list_consumer_group_offsets( + self, + request: Any, # ConsumerGroupTopicPartitions + future: Any, + require_stable: bool = False, + request_timeout: float = -1 + ) -> None: ... + def alter_consumer_group_offsets( + self, + requests: Any, # List[ConsumerGroupTopicPartitions] + future: Any, + request_timeout: float = -1 + ) -> None: ... + def delete_consumer_groups( + self, + group_ids: List[str], + future: Any, + request_timeout: float = -1 + ) -> None: ... + def create_acls( + self, + acls: List[Any], # List[AclBinding] + future: Any, + request_timeout: float = -1 + ) -> None: ... + def describe_acls( + self, + acl_binding_filter: Any, # AclBindingFilter + future: Any, + request_timeout: float = -1 + ) -> None: ... + def delete_acls( + self, + acls: List[Any], # List[AclBindingFilter] + future: Any, + request_timeout: float = -1 + ) -> None: ... + def describe_configs( + self, + resources: List[Any], # List[ConfigResource] + future: Any, + request_timeout: float = -1, + broker: int = -1 + ) -> None: ... + def alter_configs( + self, + resources: List[Any], # List[ConfigResource] + future: Any, + validate_only: bool = False, + request_timeout: float = -1, + broker: int = -1 + ) -> None: ... + def incremental_alter_configs( + self, + resources: List[Any], # List[ConfigResource] + future: Any, + validate_only: bool = False, + request_timeout: float = -1, + broker: int = -1 + ) -> None: ... + def describe_user_scram_credentials( + self, + users: Optional[List[str]], + future: Any, + request_timeout: float = -1 + ) -> None: ... + def alter_user_scram_credentials( + self, + alterations: List[Any], # List[UserScramCredentialAlteration] + future: Any, + request_timeout: float = -1 + ) -> None: ... + def list_offsets( + self, + topic_partitions: List[TopicPartition], + future: Any, + isolation_level_value: Optional[int] = None, + request_timeout: float = -1 + ) -> None: ... + def delete_records( + self, + topic_partition_offsets: List[TopicPartition], + future: Any, + request_timeout: float = -1, + operation_timeout: float = -1 + ) -> None: ... + def elect_leaders( + self, + election_type: int, + partitions: Optional[List[TopicPartition]], + future: Any, + request_timeout: float = -1, + operation_timeout: float = -1 + ) -> None: ... + def poll(self, timeout: float = -1) -> int: ... + def set_sasl_credentials(self, username: str, password: str) -> None: ... + +class NewTopic: + def __init__( + self, + topic: str, + num_partitions: int = -1, + replication_factor: int = -1, + replica_assignment: Optional[List[List[int]]] = None, + config: Optional[Dict[str, str]] = None + ) -> None: ... + topic: str + num_partitions: int + replication_factor: int + replica_assignment: Optional[List[List[int]]] + config: Optional[Dict[str, str]] + def __str__(self) -> str: ... + def __hash__(self) -> int: ... + def __eq__(self, other: object) -> bool: ... + def __ne__(self, other: object) -> bool: ... + def __lt__(self, other: 'NewTopic') -> bool: ... + def __le__(self, other: 'NewTopic') -> bool: ... + def __gt__(self, other: 'NewTopic') -> bool: ... + def __ge__(self, other: 'NewTopic') -> bool: ... + +class NewPartitions: + def __init__( + self, + topic: str, + new_total_count: int, + replica_assignment: Optional[List[List[int]]] = None + ) -> None: ... + topic: str + new_total_count: int + replica_assignment: Optional[List[List[int]]] + def __str__(self) -> str: ... + def __hash__(self) -> int: ... + def __eq__(self, other: object) -> bool: ... + def __ne__(self, other: object) -> bool: ... + def __lt__(self, other: 'NewPartitions') -> bool: ... + def __le__(self, other: 'NewPartitions') -> bool: ... + def __gt__(self, other: 'NewPartitions') -> bool: ... + def __ge__(self, other: 'NewPartitions') -> bool: ... + +# ===== MODULE FUNCTIONS (From stubgen) ===== + +def libversion() -> Tuple[str, int]: ... +def version() -> Tuple[str, int]: ... + +# ===== CONSTANTS (From stubgen) ===== + +ACL_OPERATION_ALL: int +ACL_OPERATION_ALTER: int +ACL_OPERATION_ALTER_CONFIGS: int +ACL_OPERATION_ANY: int +ACL_OPERATION_CLUSTER_ACTION: int +ACL_OPERATION_CREATE: int +ACL_OPERATION_DELETE: int +ACL_OPERATION_DESCRIBE: int +ACL_OPERATION_DESCRIBE_CONFIGS: int +ACL_OPERATION_IDEMPOTENT_WRITE: int +ACL_OPERATION_READ: int +ACL_OPERATION_UNKNOWN: int +ACL_OPERATION_WRITE: int +ACL_PERMISSION_TYPE_ALLOW: int +ACL_PERMISSION_TYPE_ANY: int +ACL_PERMISSION_TYPE_DENY: int +ACL_PERMISSION_TYPE_UNKNOWN: int +ALTER_CONFIG_OP_TYPE_APPEND: int +ALTER_CONFIG_OP_TYPE_DELETE: int +ALTER_CONFIG_OP_TYPE_SET: int +ALTER_CONFIG_OP_TYPE_SUBTRACT: int +CONFIG_SOURCE_DEFAULT_CONFIG: int +CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG: int +CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG: int +CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG: int +CONFIG_SOURCE_GROUP_CONFIG: int +CONFIG_SOURCE_STATIC_BROKER_CONFIG: int +CONFIG_SOURCE_UNKNOWN_CONFIG: int +CONSUMER_GROUP_STATE_COMPLETING_REBALANCE: int +CONSUMER_GROUP_STATE_DEAD: int +CONSUMER_GROUP_STATE_EMPTY: int +CONSUMER_GROUP_STATE_PREPARING_REBALANCE: int +CONSUMER_GROUP_STATE_STABLE: int +CONSUMER_GROUP_STATE_UNKNOWN: int +CONSUMER_GROUP_TYPE_CLASSIC: int +CONSUMER_GROUP_TYPE_CONSUMER: int +CONSUMER_GROUP_TYPE_UNKNOWN: int +ELECTION_TYPE_PREFERRED: int +ELECTION_TYPE_UNCLEAN: int +ISOLATION_LEVEL_READ_COMMITTED: int +ISOLATION_LEVEL_READ_UNCOMMITTED: int +OFFSET_BEGINNING: int +OFFSET_END: int +OFFSET_INVALID: int +OFFSET_SPEC_EARLIEST: int +OFFSET_SPEC_LATEST: int +OFFSET_SPEC_MAX_TIMESTAMP: int +OFFSET_STORED: int +RESOURCE_ANY: int +RESOURCE_BROKER: int +RESOURCE_GROUP: int +RESOURCE_PATTERN_ANY: int +RESOURCE_PATTERN_LITERAL: int +RESOURCE_PATTERN_MATCH: int +RESOURCE_PATTERN_PREFIXED: int +RESOURCE_PATTERN_UNKNOWN: int +RESOURCE_TOPIC: int +RESOURCE_TRANSACTIONAL_ID: int +RESOURCE_UNKNOWN: int +SCRAM_MECHANISM_SHA_256: int +SCRAM_MECHANISM_SHA_512: int +SCRAM_MECHANISM_UNKNOWN: int +TIMESTAMP_CREATE_TIME: int +TIMESTAMP_LOG_APPEND_TIME: int +TIMESTAMP_NOT_AVAILABLE: int diff --git a/src/confluent_kafka/deserializing_consumer.py b/src/confluent_kafka/deserializing_consumer.py index 294dae99d..793271b86 100644 --- a/src/confluent_kafka/deserializing_consumer.py +++ b/src/confluent_kafka/deserializing_consumer.py @@ -16,12 +16,15 @@ # limitations under the License. # -from confluent_kafka.cimpl import Consumer as _ConsumerImpl +from typing import Any, Dict, List, Optional + +from confluent_kafka.cimpl import Consumer as _ConsumerImpl, Message from .error import (ConsumeError, KeyDeserializationError, ValueDeserializationError) from .serialization import (SerializationContext, MessageField) +from ._types import Deserializer class DeserializingConsumer(_ConsumerImpl): @@ -70,14 +73,14 @@ class DeserializingConsumer(_ConsumerImpl): ValueError: if configuration validation fails """ # noqa: E501 - def __init__(self, conf): + def __init__(self, conf: Dict[str, Any]) -> None: conf_copy = conf.copy() self._key_deserializer = conf_copy.pop('key.deserializer', None) self._value_deserializer = conf_copy.pop('value.deserializer', None) super(DeserializingConsumer, self).__init__(conf_copy) - def poll(self, timeout=-1): + def poll(self, timeout: float = -1) -> Optional[Message]: """ Consume messages and calls callbacks. @@ -100,8 +103,9 @@ def poll(self, timeout=-1): if msg is None: return None - if msg.error() is not None: - raise ConsumeError(msg.error(), kafka_message=msg) + error = msg.error() + if error is not None: + raise ConsumeError(error, kafka_message=msg) ctx = SerializationContext(msg.topic(), MessageField.VALUE, msg.headers()) value = msg.value() @@ -119,11 +123,11 @@ def poll(self, timeout=-1): except Exception as se: raise KeyDeserializationError(exception=se, kafka_message=msg) - msg.set_key(key) - msg.set_value(value) + msg.set_key(key) # type: ignore[arg-type] + msg.set_value(value) # type: ignore[arg-type] return msg - def consume(self, num_messages=1, timeout=-1): + def consume(self, num_messages: int = 1, timeout: float = -1) -> List[Message]: """ :py:func:`Consumer.consume` not implemented, use :py:func:`DeserializingConsumer.poll` instead diff --git a/src/confluent_kafka/error.py b/src/confluent_kafka/error.py index 07c733c23..2df3557cb 100644 --- a/src/confluent_kafka/error.py +++ b/src/confluent_kafka/error.py @@ -15,7 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from confluent_kafka.cimpl import KafkaException, KafkaError + +from typing import Optional + +from confluent_kafka.cimpl import KafkaException, KafkaError, Message from confluent_kafka.serialization import SerializationError @@ -32,17 +35,18 @@ class _KafkaClientError(KafkaException): by the broker. """ - def __init__(self, kafka_error, exception=None, kafka_message=None): + def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None, + kafka_message: Optional[Message] = None) -> None: super(_KafkaClientError, self).__init__(kafka_error) self.exception = exception self.kafka_message = kafka_message @property - def code(self): + def code(self) -> int: return self.args[0].code() @property - def name(self): + def name(self) -> str: return self.args[0].name() @@ -64,7 +68,8 @@ class ConsumeError(_KafkaClientError): """ - def __init__(self, kafka_error, exception=None, kafka_message=None): + def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None, + kafka_message: Optional[Message] = None) -> None: super(ConsumeError, self).__init__(kafka_error, exception, kafka_message) @@ -81,7 +86,7 @@ class KeyDeserializationError(ConsumeError, SerializationError): """ - def __init__(self, exception=None, kafka_message=None): + def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None: super(KeyDeserializationError, self).__init__( KafkaError(KafkaError._KEY_DESERIALIZATION, str(exception)), exception=exception, kafka_message=kafka_message) @@ -100,7 +105,7 @@ class ValueDeserializationError(ConsumeError, SerializationError): """ - def __init__(self, exception=None, kafka_message=None): + def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None: super(ValueDeserializationError, self).__init__( KafkaError(KafkaError._VALUE_DESERIALIZATION, str(exception)), exception=exception, kafka_message=kafka_message) @@ -116,7 +121,7 @@ class ProduceError(_KafkaClientError): exception(Exception, optional): The original exception. """ - def __init__(self, kafka_error, exception=None): + def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None) -> None: super(ProduceError, self).__init__(kafka_error, exception, None) @@ -128,7 +133,7 @@ class KeySerializationError(ProduceError, SerializationError): exception (Exception): The exception that occurred during serialization. """ - def __init__(self, exception=None): + def __init__(self, exception: Optional[Exception] = None) -> None: super(KeySerializationError, self).__init__( KafkaError(KafkaError._KEY_SERIALIZATION, str(exception)), exception=exception) @@ -142,7 +147,7 @@ class ValueSerializationError(ProduceError, SerializationError): exception (Exception): The exception that occurred during serialization. """ - def __init__(self, exception=None): + def __init__(self, exception: Optional[Exception] = None) -> None: super(ValueSerializationError, self).__init__( KafkaError(KafkaError._VALUE_SERIALIZATION, str(exception)), exception=exception) diff --git a/src/confluent_kafka/experimental/aio/_AIOConsumer.py b/src/confluent_kafka/experimental/aio/_AIOConsumer.py index 40340f4c2..b169da80c 100644 --- a/src/confluent_kafka/experimental/aio/_AIOConsumer.py +++ b/src/confluent_kafka/experimental/aio/_AIOConsumer.py @@ -14,12 +14,20 @@ import asyncio import concurrent.futures +from typing import Any, Callable, Dict, Optional, Tuple + import confluent_kafka + from . import _common as _common class AIOConsumer: - def __init__(self, consumer_conf, max_workers=2, executor=None): + def __init__( + self, + consumer_conf: Dict[str, Any], + max_workers: int = 2, + executor: Optional[concurrent.futures.Executor] = None + ) -> None: if executor is not None: # Executor must have at least one worker. # At least two workers are needed when calling re-entrant @@ -37,40 +45,57 @@ def __init__(self, consumer_conf, max_workers=2, executor=None): wrap_common_callbacks(loop, consumer_conf) wrap_conf_callback(loop, consumer_conf, 'on_commit') - self._consumer = confluent_kafka.Consumer(consumer_conf) - - async def _call(self, blocking_task, *args, **kwargs): - return await _common.async_call(self.executor, blocking_task, *args, **kwargs) - - def _wrap_callback(self, loop, callback, edit_args=None, edit_kwargs=None): - def ret(*args, **kwargs): + self._consumer: confluent_kafka.Consumer = confluent_kafka.Consumer( + consumer_conf + ) + + async def _call( + self, + blocking_task: Callable[..., Any], + *args: Any, + **kwargs: Any + ) -> Any: + return await _common.async_call( + self.executor, blocking_task, *args, **kwargs + ) + + def _wrap_callback( + self, + loop: asyncio.AbstractEventLoop, + callback: Callable[..., Any], + edit_args: Optional[Callable[[Tuple[Any, ...]], Tuple[Any, ...]]] = None, + edit_kwargs: Optional[Callable[[Any], Any]] = None + ) -> Callable[..., Any]: + def ret(*args: Any, **kwargs: Any) -> Any: if edit_args: args = edit_args(args) if edit_kwargs: kwargs = edit_kwargs(kwargs) - f = asyncio.run_coroutine_threadsafe(callback(*args, **kwargs), - loop) + f = asyncio.run_coroutine_threadsafe( + callback(*args, **kwargs), loop + ) return f.result() return ret - async def poll(self, *args, **kwargs): + async def poll(self, *args: Any, **kwargs: Any) -> Any: """ Polls for a single message from the subscribed topics. Performance Note: For high-throughput applications, prefer consume() over poll(): - consume() can retrieve multiple messages per call and amortize the async - overhead across the entire batch. + consume() can retrieve multiple messages per call and amortize the + async overhead across the entire batch. On the other hand, poll() retrieves one message per call, which means the ThreadPoolExecutor overhead is applied to each individual message. - This can result inlower throughput compared to the synchronous consumer.poll() - due tothe async coordination overhead not being amortized. + This can result in lower throughput compared to the synchronous + consumer.poll() due to the async coordination overhead not being + amortized. """ return await self._call(self._consumer.poll, *args, **kwargs) - async def consume(self, *args, **kwargs): + async def consume(self, *args: Any, **kwargs: Any) -> Any: """ Consumes a batch of messages from the subscribed topics. @@ -83,12 +108,15 @@ async def consume(self, *args, **kwargs): """ return await self._call(self._consumer.consume, *args, **kwargs) - def _edit_rebalance_callbacks_args(self, args): - args = list(args) - args[0] = self - return args + def _edit_rebalance_callbacks_args( + self, + args: Tuple[Any, ...] + ) -> Tuple[Any, ...]: + args_list = list(args) + args_list[0] = self + return tuple(args_list) - async def subscribe(self, *args, **kwargs): + async def subscribe(self, *args: Any, **kwargs: Any) -> Any: loop = asyncio.get_event_loop() for callback in ['on_assign', 'on_revoke', 'on_lost']: if callback in kwargs: @@ -96,60 +124,71 @@ async def subscribe(self, *args, **kwargs): self._edit_rebalance_callbacks_args) # noqa: E501 return await self._call(self._consumer.subscribe, *args, **kwargs) - async def unsubscribe(self, *args, **kwargs): + async def unsubscribe(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.unsubscribe, *args, **kwargs) - async def commit(self, *args, **kwargs): + async def commit(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.commit, *args, **kwargs) - async def close(self, *args, **kwargs): + async def close(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.close, *args, **kwargs) - async def seek(self, *args, **kwargs): + async def seek(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.seek, *args, **kwargs) - async def pause(self, *args, **kwargs): + async def pause(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.pause, *args, **kwargs) - async def resume(self, *args, **kwargs): + async def resume(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.resume, *args, **kwargs) - async def store_offsets(self, *args, **kwargs): + async def store_offsets(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.store_offsets, *args, **kwargs) - async def committed(self, *args, **kwargs): + async def committed(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.committed, *args, **kwargs) - async def assign(self, *args, **kwargs): + async def assign(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.assign, *args, **kwargs) - async def unassign(self, *args, **kwargs): + async def unassign(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.unassign, *args, **kwargs) - async def incremental_assign(self, *args, **kwargs): - return await self._call(self._consumer.incremental_assign, *args, **kwargs) + async def incremental_assign(self, *args: Any, **kwargs: Any) -> Any: + return await self._call( + self._consumer.incremental_assign, *args, **kwargs + ) - async def incremental_unassign(self, *args, **kwargs): - return await self._call(self._consumer.incremental_unassign, *args, **kwargs) + async def incremental_unassign(self, *args: Any, **kwargs: Any) -> Any: + return await self._call( + self._consumer.incremental_unassign, *args, **kwargs + ) - async def assignment(self, *args, **kwargs): + async def assignment(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.assignment, *args, **kwargs) - async def position(self, *args, **kwargs): + async def position(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.position, *args, **kwargs) - async def consumer_group_metadata(self, *args, **kwargs): - return await self._call(self._consumer.consumer_group_metadata, *args, **kwargs) + async def consumer_group_metadata(self, *args: Any, **kwargs: Any) -> Any: + return await self._call( + self._consumer.consumer_group_metadata, *args, **kwargs + ) - async def set_sasl_credentials(self, *args, **kwargs): - return await self._call(self._consumer.set_sasl_credentials, - *args, **kwargs) + async def set_sasl_credentials(self, *args: Any, **kwargs: Any) -> Any: + return await self._call( + self._consumer.set_sasl_credentials, *args, **kwargs + ) - async def list_topics(self, *args, **kwargs): + async def list_topics(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._consumer.list_topics, *args, **kwargs) - async def get_watermark_offsets(self, *args, **kwargs): - return await self._call(self._consumer.get_watermark_offsets, *args, **kwargs) + async def get_watermark_offsets(self, *args: Any, **kwargs: Any) -> Any: + return await self._call( + self._consumer.get_watermark_offsets, *args, **kwargs + ) - async def offsets_for_times(self, *args, **kwargs): - return await self._call(self._consumer.offsets_for_times, *args, **kwargs) + async def offsets_for_times(self, *args: Any, **kwargs: Any) -> Any: + return await self._call( + self._consumer.offsets_for_times, *args, **kwargs + ) diff --git a/src/confluent_kafka/experimental/aio/_common.py b/src/confluent_kafka/experimental/aio/_common.py index 0e3d733c9..3bf274064 100644 --- a/src/confluent_kafka/experimental/aio/_common.py +++ b/src/confluent_kafka/experimental/aio/_common.py @@ -14,42 +14,69 @@ import asyncio import functools +import logging +import concurrent.futures +from typing import Any, Callable, Dict, Optional, Tuple, TypeVar + +T = TypeVar('T') class AsyncLogger: - def __init__(self, loop, logger): + def __init__( + self, + loop: asyncio.AbstractEventLoop, + logger: logging.Logger + ) -> None: self.loop = loop self.logger = logger - def log(self, *args, **kwargs): - self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs) + def log(self, *args: Any, **kwargs: Any) -> None: + self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs)) -def wrap_callback(loop, callback, edit_args=None, edit_kwargs=None): - def ret(*args, **kwargs): +def wrap_callback( + loop: asyncio.AbstractEventLoop, + callback: Callable[..., Any], + edit_args: Optional[Callable[[Tuple[Any, ...]], Tuple[Any, ...]]] = None, + edit_kwargs: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None +) -> Callable[..., Any]: + def ret(*args: Any, **kwargs: Any) -> Any: if edit_args: args = edit_args(args) if edit_kwargs: kwargs = edit_kwargs(kwargs) - f = asyncio.run_coroutine_threadsafe(callback(*args, **kwargs), - loop) + f = asyncio.run_coroutine_threadsafe( + callback(*args, **kwargs), loop + ) return f.result() return ret -def wrap_conf_callback(loop, conf, name): +def wrap_conf_callback( + loop: asyncio.AbstractEventLoop, + conf: Dict[str, Any], + name: str +) -> None: if name in conf: cb = conf[name] conf[name] = wrap_callback(loop, cb) -def wrap_conf_logger(loop, conf): +def wrap_conf_logger( + loop: asyncio.AbstractEventLoop, + conf: Dict[str, Any] +) -> None: if 'logger' in conf: conf['logger'] = AsyncLogger(loop, conf['logger']) -async def async_call(executor, blocking_task, *args, **kwargs): +async def async_call( + executor: concurrent.futures.Executor, + blocking_task: Callable[..., T], + *args: Any, + **kwargs: Any +) -> T: """Helper function for blocking operations that need ThreadPool execution Args: @@ -61,15 +88,17 @@ async def async_call(executor, blocking_task, *args, **kwargs): Result of the blocking function execution """ return (await asyncio.gather( - asyncio.get_running_loop().run_in_executor(executor, - functools.partial( - blocking_task, - *args, - **kwargs)) + asyncio.get_running_loop().run_in_executor( + executor, + functools.partial(blocking_task, *args, **kwargs) + ) ))[0] -def wrap_common_callbacks(loop, conf): +def wrap_common_callbacks( + loop: asyncio.AbstractEventLoop, + conf: Dict[str, Any] +) -> None: wrap_conf_callback(loop, conf, 'error_cb') wrap_conf_callback(loop, conf, 'throttle_cb') wrap_conf_callback(loop, conf, 'stats_cb') diff --git a/src/confluent_kafka/experimental/aio/producer/_AIOProducer.py b/src/confluent_kafka/experimental/aio/producer/_AIOProducer.py index dc4466714..b2c7b86df 100644 --- a/src/confluent_kafka/experimental/aio/producer/_AIOProducer.py +++ b/src/confluent_kafka/experimental/aio/producer/_AIOProducer.py @@ -15,6 +15,7 @@ import asyncio import concurrent.futures import logging +from typing import Any, Callable, Dict, Optional import confluent_kafka @@ -33,7 +34,14 @@ class AIOProducer: # INITIALIZATION AND LIFECYCLE MANAGEMENT # ======================================================================== - def __init__(self, producer_conf, max_workers=4, executor=None, batch_size=1000, buffer_timeout=1.0): + def __init__( + self, + producer_conf: Dict[str, Any], + max_workers: int = 4, + executor: Optional[concurrent.futures.Executor] = None, + batch_size: int = 1000, + buffer_timeout: float = 1.0 + ) -> None: if executor is not None: self.executor = executor else: @@ -45,27 +53,32 @@ def __init__(self, producer_conf, max_workers=4, executor=None, batch_size=1000, wrap_common_callbacks = _common.wrap_common_callbacks wrap_common_callbacks(self._loop, producer_conf) - self._producer = confluent_kafka.Producer(producer_conf) + self._producer: confluent_kafka.Producer = confluent_kafka.Producer( + producer_conf + ) # Batching configuration - self._batch_size = batch_size + self._batch_size: int = batch_size # Producer state management - self._is_closed = False # Track if producer is closed + self._is_closed: bool = False # Track if producer is closed # Initialize Kafka batch executor for handling Kafka operations - self._kafka_executor = ProducerBatchExecutor(self._producer, self.executor) + self._kafka_executor = ProducerBatchExecutor( + self._producer, self.executor + ) # Initialize batch processor for message batching and processing self._batch_processor = ProducerBatchManager(self._kafka_executor) # Initialize buffer timeout manager for timeout handling self._buffer_timeout_manager = BufferTimeoutManager( - self._batch_processor, self._kafka_executor, buffer_timeout) + self._batch_processor, self._kafka_executor, buffer_timeout + ) if buffer_timeout > 0: self._buffer_timeout_manager.start_timeout_monitoring() - async def close(self): + async def close(self) -> None: """Close the producer and cleanup resources This method performs a graceful shutdown sequence to ensure all resources @@ -111,7 +124,7 @@ async def close(self): None, self.executor.shutdown, True ) - def __del__(self): + def __del__(self) -> None: """Cleanup method called during garbage collection This ensures that the timeout task is properly cancelled even if @@ -126,16 +139,21 @@ def __del__(self): # CORE PRODUCER OPERATIONS - Main public API # ======================================================================== - async def poll(self, timeout=0, *args, **kwargs): - """Processes delivery callbacks from librdkafka - blocking behavior depends on timeout + async def poll( + self, + timeout: float = 0, + *args: Any, + **kwargs: Any + ) -> int: + """Processes delivery callbacks from librdkafka - blocking depends on timeout This method triggers any pending delivery reports that have been queued by librdkafka when messages are delivered or fail to deliver. Args: timeout: Timeout in seconds for waiting for callbacks: - - 0 = non-blocking, return immediately after processing available callbacks - - >0 = block up to timeout seconds waiting for new callbacks to arrive + - 0 = non-blocking, return after processing available callbacks + - >0 = block up to timeout seconds waiting for new callbacks - -1 = block indefinitely until callbacks are available Returns: @@ -143,7 +161,14 @@ async def poll(self, timeout=0, *args, **kwargs): """ return await self._call(self._producer.poll, timeout, *args, **kwargs) - async def produce(self, topic, value=None, key=None, *args, **kwargs): + async def produce( + self, + topic: str, + value: Optional[Any] = None, + key: Optional[Any] = None, + *args: Any, + **kwargs: Any + ) -> asyncio.Future[Any]: """Batched produce: Accumulates messages in buffer and flushes when threshold reached Args: @@ -186,7 +211,7 @@ async def produce(self, topic, value=None, key=None, *args, **kwargs): return result - async def flush(self, *args, **kwargs): + async def flush(self, *args: Any, **kwargs: Any) -> Any: """Waits until all messages are delivered or timeout This method performs a complete flush: @@ -199,10 +224,10 @@ async def flush(self, *args, **kwargs): # Update buffer activity since we just flushed self._buffer_timeout_manager.mark_activity() - # Then flush the underlying producer and wait for delivery confirmation + # Then flush underlying producer and wait for delivery confirmation return await self._call(self._producer.flush, *args, **kwargs) - async def purge(self, *args, **kwargs): + async def purge(self, *args: Any, **kwargs: Any) -> Any: """Purges messages from internal queues - may block during cleanup""" # Cancel all pending futures self._batch_processor.cancel_pending_futures() @@ -215,73 +240,86 @@ async def purge(self, *args, **kwargs): return await self._call(self._producer.purge, *args, **kwargs) - async def list_topics(self, *args, **kwargs): + async def list_topics(self, *args: Any, **kwargs: Any) -> Any: return await self._call(self._producer.list_topics, *args, **kwargs) # ======================================================================== # TRANSACTION OPERATIONS - Kafka transaction support # ======================================================================== - async def init_transactions(self, *args, **kwargs): + async def init_transactions(self, *args: Any, **kwargs: Any) -> Any: """Network call to initialize transactions""" - return await self._call(self._producer.init_transactions, - *args, **kwargs) + return await self._call( + self._producer.init_transactions, *args, **kwargs + ) - async def begin_transaction(self, *args, **kwargs): + async def begin_transaction(self, *args: Any, **kwargs: Any) -> Any: """Network call to begin transaction""" # Flush messages to set a clean state before entering a transaction await self.flush() - return await self._call(self._producer.begin_transaction, - *args, **kwargs) + return await self._call( + self._producer.begin_transaction, *args, **kwargs + ) - async def send_offsets_to_transaction(self, *args, **kwargs): + async def send_offsets_to_transaction( + self, + *args: Any, + **kwargs: Any + ) -> Any: """Network call to send offsets to transaction""" - return await self._call(self._producer.send_offsets_to_transaction, - *args, **kwargs) + return await self._call( + self._producer.send_offsets_to_transaction, *args, **kwargs + ) - async def commit_transaction(self, *args, **kwargs): + async def commit_transaction(self, *args: Any, **kwargs: Any) -> Any: """Commit transaction after flushing all buffered messages""" - # Flush to ensure messages in the local batch_processor buffer are delivered to librdkafka + # Flush to ensure messages in the local batch_processor buffer are + # delivered to librdkafka await self.flush() # Then commit transaction - return await self._call(self._producer.commit_transaction, - *args, **kwargs) + return await self._call( + self._producer.commit_transaction, *args, **kwargs + ) - async def abort_transaction(self, *args, **kwargs): + async def abort_transaction(self, *args: Any, **kwargs: Any) -> Any: """Network call to abort transaction Messages produced before the call (i.e. inside the transaction boundary) will be aborted. Messages that are still in flight may be failed by librdkafka as they are considered outside the transaction boundary. - Refer to librdkafka documentation section "Transactional producer API" for more details: + Refer to librdkafka documentation section "Transactional producer API" + for more details: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#transactional-producer """ - # Flush to ensure messages in the local batch_processor buffer are delivered to librdkafka + # Flush to ensure messages in the local batch_processor buffer are + # delivered to librdkafka await self.flush() - return await self._call(self._producer.abort_transaction, - *args, **kwargs) + return await self._call( + self._producer.abort_transaction, *args, **kwargs + ) # ======================================================================== # AUTHENTICATION AND SECURITY # ======================================================================== - async def set_sasl_credentials(self, *args, **kwargs): + async def set_sasl_credentials(self, *args: Any, **kwargs: Any) -> Any: """Authentication operation that may involve network calls""" - return await self._call(self._producer.set_sasl_credentials, - *args, **kwargs) + return await self._call( + self._producer.set_sasl_credentials, *args, **kwargs + ) # ======================================================================== # BATCH PROCESSING OPERATIONS - Delegated to BatchProcessor # ======================================================================== - async def _flush_buffer(self, target_topic=None): - """Flush the current message buffer using clean batch processing workflow + async def _flush_buffer(self, target_topic: Optional[str] = None) -> None: + """Flush the current message buffer using clean batch processing flow This method demonstrates the new architecture where AIOProducer simply orchestrates the workflow between components: @@ -295,6 +333,13 @@ async def _flush_buffer(self, target_topic=None): # UTILITY METHODS - Helper functions and internal utilities # ======================================================================== - async def _call(self, blocking_task, *args, **kwargs): + async def _call( + self, + blocking_task: Callable[..., Any], + *args: Any, + **kwargs: Any + ) -> Any: """Helper method for blocking operations that need ThreadPool execution""" - return await _common.async_call(self.executor, blocking_task, *args, **kwargs) + return await _common.async_call( + self.executor, blocking_task, *args, **kwargs + ) diff --git a/src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py b/src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py index 6b9497609..86ee28004 100644 --- a/src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py +++ b/src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py @@ -16,6 +16,12 @@ import logging import time import weakref +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + # Import only for type checking to avoid circular dependency + from ._producer_batch_processor import ProducerBatchManager + from ._kafka_batch_executor import ProducerBatchExecutor logger = logging.getLogger(__name__) @@ -30,7 +36,12 @@ class BufferTimeoutManager: - Coordinating between batch processor and executor for timeout flushes """ - def __init__(self, batch_processor, kafka_executor, timeout): + def __init__( + self, + batch_processor: "ProducerBatchManager", + kafka_executor: "ProducerBatchExecutor", + timeout: float + ) -> None: """Initialize the buffer timeout manager Args: @@ -41,19 +52,20 @@ def __init__(self, batch_processor, kafka_executor, timeout): self._batch_processor = batch_processor self._kafka_executor = kafka_executor self._timeout = timeout - self._last_activity = time.time() - self._timeout_task = None - self._running = False + self._last_activity: float = time.time() + self._timeout_task: Optional[asyncio.Task[None]] = None + self._running: bool = False - def start_timeout_monitoring(self): + def start_timeout_monitoring(self) -> None: """Start the background task that monitors buffer inactivity Creates an async task that runs in the background and periodically checks - if messages have been sitting in the buffer for too long without being flushed. + if messages have been sitting in the buffer for too long without being + flushed. Key design decisions: - 1. **Weak Reference**: Uses weakref.ref(self) to prevent circular references - 2. **Self-Canceling**: The task stops itself if the manager is garbage collected + 1. **Weak Reference**: Uses weakref.ref(self) to prevent circular refs + 2. **Self-Canceling**: The task stops itself if manager is GC'd 3. **Adaptive Check Interval**: Uses timeout to determine check frequency """ if not self._timeout or self._timeout <= 0: @@ -62,14 +74,14 @@ def start_timeout_monitoring(self): self._running = True self._timeout_task = asyncio.create_task(self._monitor_timeout()) - def stop_timeout_monitoring(self): + def stop_timeout_monitoring(self) -> None: """Stop and cleanup the buffer timeout monitoring task""" self._running = False if self._timeout_task and not self._timeout_task.done(): self._timeout_task.cancel() self._timeout_task = None - def mark_activity(self): + def mark_activity(self) -> None: """Update the timestamp of the last buffer activity This method should be called whenever: @@ -79,11 +91,11 @@ def mark_activity(self): """ self._last_activity = time.time() - async def _monitor_timeout(self): + async def _monitor_timeout(self) -> None: """Monitor buffer timeout in background task - This method runs continuously in the background, checking for buffer inactivity - and triggering flushes when the timeout threshold is exceeded. + This method runs continuously in the background, checking for buffer + inactivity and triggering flushes when the timeout threshold is exceeded. """ # Use weak reference to avoid circular reference and allow garbage collection manager_ref = weakref.ref(self) @@ -119,8 +131,8 @@ async def _monitor_timeout(self): # Re-raise all exceptions - don't swallow any errors raise - async def _flush_buffer_due_to_timeout(self): - """Flush buffer due to timeout by coordinating batch processor and executor + async def _flush_buffer_due_to_timeout(self) -> None: + """Flush buffer due to timeout by coordinating batch processor/executor This method handles the complete timeout flush workflow: 1. Create batches from the batch processor diff --git a/src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py b/src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py index ca555cb6c..3253bb083 100644 --- a/src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py +++ b/src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py @@ -13,7 +13,11 @@ # limitations under the License. import asyncio +import concurrent.futures import logging +from typing import Any, Dict, List, Sequence + +import confluent_kafka logger = logging.getLogger(__name__) @@ -29,7 +33,11 @@ class ProducerBatchExecutor: - Supporting partition-specific batch operations """ - def __init__(self, producer, executor): + def __init__( + self, + producer: confluent_kafka.Producer, + executor: concurrent.futures.Executor + ) -> None: """Initialize the Kafka batch executor Args: @@ -39,7 +47,12 @@ def __init__(self, producer, executor): self._producer = producer self._executor = executor - async def execute_batch(self, topic, batch_messages, partition=-1): + async def execute_batch( + self, + topic: str, + batch_messages: Sequence[Dict[str, Any]], + partition: int = -1 + ) -> int: """Execute a batch operation via thread pool This method handles the complete batch execution workflow: @@ -58,7 +71,7 @@ async def execute_batch(self, topic, batch_messages, partition=-1): Raises: Exception: Any exception from the batch operation is propagated """ - def _produce_batch_and_poll(): + def _produce_batch_and_poll() -> int: """Helper function to run in thread pool This function encapsulates all the blocking Kafka operations: @@ -68,15 +81,20 @@ def _produce_batch_and_poll(): """ # Call produce_batch with specific partition and individual callbacks # Convert tuple to list since produce_batch expects a list - messages_list = list(batch_messages) if isinstance(batch_messages, tuple) else batch_messages + messages_list: List[Dict[str, Any]] = ( + list(batch_messages) + if isinstance(batch_messages, tuple) + else batch_messages # type: ignore + ) # Use the provided partition for the entire batch # This enables proper partition control while working around librdkafka limitations self._producer.produce_batch(topic, messages_list, partition=partition) - # Handle partial batch failures: Check for messages that failed during produce_batch - # These messages have their msgstates destroyed in Producer.c and won't get callbacks - # from librdkafka, so we need to manually invoke their callbacks + # Handle partial batch failures: Check for messages that failed + # during produce_batch. These messages have their msgstates + # destroyed in Producer.c and won't get callbacks from librdkafka, + # so we need to manually invoke their callbacks self._handle_partial_failures(messages_list) # Immediately poll to process delivery callbacks for successful messages @@ -88,20 +106,25 @@ def _produce_batch_and_poll(): loop = asyncio.get_running_loop() return await loop.run_in_executor(self._executor, _produce_batch_and_poll) - def _handle_partial_failures(self, batch_messages): + def _handle_partial_failures( + self, + batch_messages: List[Dict[str, Any]] + ) -> None: """Handle messages that failed during produce_batch - When produce_batch encounters messages that fail immediately (e.g., message too large, - invalid topic, etc.), librdkafka destroys their msgstates and won't call their callbacks. - We detect these failures by checking for '_error' in the message dict (set by Producer.c) - and manually invoke the simple future-resolving callbacks. + When produce_batch encounters messages that fail immediately (e.g., + message too large, invalid topic, etc.), librdkafka destroys their + msgstates and won't call their callbacks. We detect these failures by + checking for '_error' in the message dict (set by Producer.c) and + manually invoke the simple future-resolving callbacks. Args: batch_messages: List of message dictionaries that were passed to produce_batch """ for msg_dict in batch_messages: if '_error' in msg_dict: - # This message failed during produce_batch - its callback won't be called by librdkafka + # This message failed during produce_batch - its callback + # won't be called by librdkafka callback = msg_dict.get('callback') if callback: # Extract the error from the message dict (set by Producer.c) @@ -111,5 +134,8 @@ def _handle_partial_failures(self, batch_messages): try: callback(error, None) except Exception: - logger.warning("Exception in callback during partial failure handling", exc_info=True) + logger.warning( + "Exception in callback during partial failure handling", + exc_info=True + ) raise diff --git a/src/confluent_kafka/experimental/aio/producer/_message_batch.py b/src/confluent_kafka/experimental/aio/producer/_message_batch.py index 48ac7f041..f5be8ec45 100644 --- a/src/confluent_kafka/experimental/aio/producer/_message_batch.py +++ b/src/confluent_kafka/experimental/aio/producer/_message_batch.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import NamedTuple, Sequence, Any, Optional +from typing import Dict, NamedTuple, Sequence, Any, Optional import asyncio @@ -24,8 +24,8 @@ class MessageBatch(NamedTuple): along with their associated futures for delivery confirmation. """ topic: str # Target topic for this batch - messages: Sequence[dict] # Prepared message dictionaries - futures: Sequence[asyncio.Future] # Futures to resolve on delivery + messages: Sequence[Dict[str, Any]] # Prepared message dictionaries + futures: Sequence[asyncio.Future[Any]] # Futures to resolve on delivery partition: int = -1 # Target partition for this batch (-1 = RD_KAFKA_PARTITION_UA) @property @@ -39,11 +39,13 @@ def info(self) -> str: return f"MessageBatch(topic='{self.topic}', partition={self.partition}, size={len(self.messages)})" -def create_message_batch(topic: str, - messages: Sequence[dict], - futures: Sequence[asyncio.Future], - callbacks: Optional[Any] = None, - partition: int = -1) -> MessageBatch: +def create_message_batch( + topic: str, + messages: Sequence[Dict[str, Any]], + futures: Sequence[asyncio.Future[Any]], + callbacks: Optional[Any] = None, + partition: int = -1 +) -> MessageBatch: """Create an immutable MessageBatch from sequences This factory function converts mutable sequences into an immutable MessageBatch object. diff --git a/src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py b/src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py index b25b4e41f..04e4b7752 100644 --- a/src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py +++ b/src/confluent_kafka/experimental/aio/producer/_producer_batch_processor.py @@ -12,11 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import copy import logging +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING from confluent_kafka import KafkaException as _KafkaException -from ._message_batch import create_message_batch +from ._message_batch import create_message_batch, MessageBatch + +if TYPE_CHECKING: + # Import only for type checking to avoid circular dependency + from ._kafka_batch_executor import ProducerBatchExecutor logger = logging.getLogger(__name__) @@ -31,17 +37,21 @@ class ProducerBatchManager: - Executing batch operations via librdkafka """ - def __init__(self, kafka_executor): + def __init__(self, kafka_executor: "ProducerBatchExecutor") -> None: """Initialize the batch processor Args: kafka_executor: KafkaBatchExecutor instance for Kafka operations """ self._kafka_executor = kafka_executor - self._message_buffer = [] - self._buffer_futures = [] - - def add_message(self, msg_data, future): + self._message_buffer: List[Dict[str, Any]] = [] + self._buffer_futures: List[asyncio.Future[Any]] = [] + + def add_message( + self, + msg_data: Dict[str, Any], + future: asyncio.Future[Any] + ) -> None: """Add a message to the batch buffer Args: @@ -51,30 +61,33 @@ def add_message(self, msg_data, future): self._message_buffer.append(msg_data) self._buffer_futures.append(future) - def get_buffer_size(self): + def get_buffer_size(self) -> int: """Get the current number of messages in the buffer""" return len(self._message_buffer) - def is_buffer_empty(self): + def is_buffer_empty(self) -> bool: """Check if the buffer is empty""" return len(self._message_buffer) == 0 - def clear_buffer(self): + def clear_buffer(self) -> None: """Clear the entire buffer""" self._message_buffer.clear() self._buffer_futures.clear() - def cancel_pending_futures(self): + def cancel_pending_futures(self) -> None: """Cancel all pending futures in the buffer""" for future in self._buffer_futures: if not future.done(): future.cancel() - def create_batches(self, target_topic=None): + def create_batches( + self, + target_topic: Optional[str] = None + ) -> List[MessageBatch]: """Create MessageBatch objects from the current buffer Args: - target_topic: Optional topic to create batches for (None for all topics) + target_topic: Optional topic to create batches for (None for all) Returns: List[MessageBatch]: List of immutable MessageBatch objects @@ -106,7 +119,7 @@ def create_batches(self, target_topic=None): return batches - def _clear_topic_from_buffer(self, target_topic): + def _clear_topic_from_buffer(self, target_topic: str) -> None: """Remove messages for a specific topic from the buffer Args: @@ -123,7 +136,7 @@ def _clear_topic_from_buffer(self, target_topic): self._message_buffer = messages_to_keep self._buffer_futures = futures_to_keep - async def flush_buffer(self, target_topic=None): + async def flush_buffer(self, target_topic: Optional[str] = None) -> None: """Flush the current message buffer using produce_batch Args: @@ -158,7 +171,11 @@ async def flush_buffer(self, target_topic=None): raise raise - async def _execute_batches(self, batches, target_topic=None): + async def _execute_batches( + self, + batches: List[MessageBatch], + target_topic: Optional[str] = None + ) -> None: """Execute batches and handle cleanup after successful execution Args: @@ -185,7 +202,7 @@ async def _execute_batches(self, batches, target_topic=None): # Re-raise the exception so caller knows the batch operation failed raise - def _add_batches_back_to_buffer(self, batches): + def _add_batches_back_to_buffer(self, batches: List[MessageBatch]) -> None: """Add batches back to the buffer when execution fails Args: @@ -213,8 +230,10 @@ def _add_batches_back_to_buffer(self, batches): self._message_buffer.append(msg_data) self._buffer_futures.append(batch.futures[i]) - def _group_messages_by_topic_and_partition(self): - """Group buffered messages by topic and partition for optimal batch processing + def _group_messages_by_topic_and_partition( + self + ) -> Dict[Tuple[str, int], Dict[str, List[Any]]]: + """Group buffered messages by topic and partition for optimal batching This function efficiently organizes the mixed-topic message buffer into topic+partition-specific groups, enabling proper partition control while @@ -224,18 +243,19 @@ def _group_messages_by_topic_and_partition(self): - Single O(n) pass through message buffer - Groups related data (messages, futures) by (topic, partition) tuple - Maintains index relationships between buffer arrays - - Uses partition from message data, defaults to RD_KAFKA_PARTITION_UA (-1) if not specified + - Uses partition from message data, defaults to RD_KAFKA_PARTITION_UA + (-1) if not specified Returns: dict: Topic+partition groups with structure: { ('topic_name', partition): { - 'messages': [msg_data1, msg_data2, ...], # Message dictionaries - 'futures': [future1, future2, ...], # Corresponding asyncio.Future objects + 'messages': [msg_data1, ...], # Message dicts + 'futures': [future1, ...], # asyncio.Future objects } } """ - topic_partition_groups = {} + topic_partition_groups: Dict[Tuple[str, int], Dict[str, Any]] = {} # Iterate through buffer once - O(n) complexity for i, msg_data in enumerate(self._message_buffer): @@ -260,7 +280,10 @@ def _group_messages_by_topic_and_partition(self): return topic_partition_groups - def _prepare_batch_messages(self, messages): + def _prepare_batch_messages( + self, + messages: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: """Prepare messages for produce_batch by removing internal fields Args: @@ -280,8 +303,12 @@ def _prepare_batch_messages(self, messages): return batch_messages - def _assign_future_callbacks(self, batch_messages, futures): - """Assign simple future-resolving callbacks to each message in the batch + def _assign_future_callbacks( + self, + batch_messages: List[Dict[str, Any]], + futures: Sequence[asyncio.Future[Any]] + ) -> None: + """Assign simple future-resolving callbacks to each message in batch Args: batch_messages: List of message dictionaries for produce_batch @@ -290,10 +317,12 @@ def _assign_future_callbacks(self, batch_messages, futures): for i, batch_msg in enumerate(batch_messages): future = futures[i] - def create_simple_callback(fut): + def create_simple_callback( + fut: asyncio.Future[Any] + ) -> Callable[[Any, Any], None]: """Create a simple callback that only resolves the future""" - def simple_callback(err, msg): + def simple_callback(err: Any, msg: Any) -> None: if err: if not fut.done(): fut.set_exception(_KafkaException(err)) @@ -306,7 +335,11 @@ def simple_callback(err, msg): # Assign the simple callback to this message batch_msg["callback"] = create_simple_callback(future) - def _handle_batch_failure(self, exception, batch_futures): + def _handle_batch_failure( + self, + exception: Exception, + batch_futures: Sequence[asyncio.Future[Any]] + ) -> None: """Handle batch operation failure by failing all unresolved futures When a batch operation fails before any individual callbacks are invoked, diff --git a/src/confluent_kafka/py.typed b/src/confluent_kafka/py.typed new file mode 100644 index 000000000..0519ecba6 --- /dev/null +++ b/src/confluent_kafka/py.typed @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/src/confluent_kafka/schema_registry/error.py b/src/confluent_kafka/schema_registry/error.py index 2aa4d6dcd..d0f0bbf88 100644 --- a/src/confluent_kafka/schema_registry/error.py +++ b/src/confluent_kafka/schema_registry/error.py @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from typing import Optional + try: from fastavro.schema import SchemaParseException, UnknownType except ImportError: @@ -41,15 +43,15 @@ class SchemaRegistryError(Exception): """ # noqa: E501 UNKNOWN = -1 - def __init__(self, http_status_code: int, error_code: int, error_message: str): + def __init__(self, http_status_code: int, error_code: int, error_message: str) -> None: self.http_status_code = http_status_code self.error_code = error_code self.error_message = error_message - def __repr__(self): + def __repr__(self) -> str: return str(self) - def __str__(self): + def __str__(self) -> str: return "{} (HTTP status code {}, SR code {})".format(self.error_message, self.http_status_code, self.error_code) @@ -58,7 +60,8 @@ def __str__(self): class OAuthTokenError(Exception): """Raised when an OAuth token cannot be retrieved.""" - def __init__(self, message, status_code=None, response_text=None): + def __init__(self, message: str, status_code: Optional[int] = None, + response_text: Optional[str] = None) -> None: self.message = message self.status_code = status_code self.response_text = response_text diff --git a/src/confluent_kafka/schema_registry/rules/cel/cel_field_presence.py b/src/confluent_kafka/schema_registry/rules/cel/cel_field_presence.py index f78e4887e..329077a3e 100644 --- a/src/confluent_kafka/schema_registry/rules/cel/cel_field_presence.py +++ b/src/confluent_kafka/schema_registry/rules/cel/cel_field_presence.py @@ -14,6 +14,7 @@ # limitations under the License. import threading +from typing import Any import celpy # type: ignore @@ -33,9 +34,9 @@ def in_has() -> bool: class InterpretedRunner(celpy.InterpretedRunner): - def evaluate(self, context): + def evaluate(self, context: Any) -> Any: class Evaluator(celpy.Evaluator): - def macro_has_eval(self, exprlist) -> celpy.celtypes.BoolType: + def macro_has_eval(self, exprlist: Any) -> celpy.celtypes.BoolType: _has_state.in_has = True result = super().macro_has_eval(exprlist) _has_state.in_has = False diff --git a/src/confluent_kafka/schema_registry/rules/encryption/awskms/aws_driver.py b/src/confluent_kafka/schema_registry/rules/encryption/awskms/aws_driver.py index 9530b6f7c..0dec96b60 100644 --- a/src/confluent_kafka/schema_registry/rules/encryption/awskms/aws_driver.py +++ b/src/confluent_kafka/schema_registry/rules/encryption/awskms/aws_driver.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +from typing import Dict, Any, Optional import boto3 import tink @@ -34,13 +35,13 @@ class AwsKmsDriver(KmsDriver): - def __init__(self): + def __init__(self) -> None: pass def get_key_url_prefix(self) -> str: return _PREFIX - def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: + def new_kms_client(self, conf: Dict[str, Any], key_url: Optional[str]) -> KmsClient: uri_prefix = _PREFIX if key_url is not None: uri_prefix = key_url @@ -92,7 +93,7 @@ def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: return new_client(boto3_client=client, key_uri=uri_prefix) @classmethod - def register(cls): + def register(cls) -> None: register_kms_driver(AwsKmsDriver()) diff --git a/src/confluent_kafka/schema_registry/rules/encryption/azurekms/azure_driver.py b/src/confluent_kafka/schema_registry/rules/encryption/azurekms/azure_driver.py index daedc62c3..9fe235120 100644 --- a/src/confluent_kafka/schema_registry/rules/encryption/azurekms/azure_driver.py +++ b/src/confluent_kafka/schema_registry/rules/encryption/azurekms/azure_driver.py @@ -11,7 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, Any, Optional +from azure.core.credentials import TokenCredential from azure.identity import DefaultAzureCredential, ClientSecretCredential from tink import KmsClient @@ -28,13 +30,13 @@ class AzureKmsDriver(KmsDriver): - def __init__(self): + def __init__(self) -> None: pass def get_key_url_prefix(self) -> str: return _PREFIX - def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: + def new_kms_client(self, conf: Dict[str, Any], key_url: Optional[str]) -> KmsClient: uri_prefix = _PREFIX if key_url is not None: uri_prefix = key_url @@ -42,6 +44,7 @@ def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: client_id = conf.get(_CLIENT_ID) client_secret = conf.get(_CLIENT_SECRET) + creds: TokenCredential if tenant_id is None or client_id is None or client_secret is None: creds = DefaultAzureCredential() else: @@ -50,5 +53,5 @@ def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: return AzureKmsClient(uri_prefix, creds) @classmethod - def register(cls): + def register(cls) -> None: register_kms_driver(AzureKmsDriver()) diff --git a/src/confluent_kafka/schema_registry/rules/encryption/gcpkms/gcp_client.py b/src/confluent_kafka/schema_registry/rules/encryption/gcpkms/gcp_client.py index 7e21caedb..bc6237c2b 100644 --- a/src/confluent_kafka/schema_registry/rules/encryption/gcpkms/gcp_client.py +++ b/src/confluent_kafka/schema_registry/rules/encryption/gcpkms/gcp_client.py @@ -31,7 +31,7 @@ class _GcpKmsClient(GcpKmsClient): """Basic GCP client for AEAD.""" def __init__( - self, key_uri: Optional[str], credentials: service_account.Credentials + self, key_uri: Optional[str], credentials: Optional[service_account.Credentials] ) -> None: """Creates a new GcpKmsClient that is bound to the key specified in 'key_uri'. diff --git a/src/confluent_kafka/schema_registry/rules/encryption/gcpkms/gcp_driver.py b/src/confluent_kafka/schema_registry/rules/encryption/gcpkms/gcp_driver.py index 50a3880bc..ab74f7f6f 100644 --- a/src/confluent_kafka/schema_registry/rules/encryption/gcpkms/gcp_driver.py +++ b/src/confluent_kafka/schema_registry/rules/encryption/gcpkms/gcp_driver.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, Any, Optional import tink from google.oauth2 import service_account @@ -32,13 +33,13 @@ class GcpKmsDriver(KmsDriver): - def __init__(self): + def __init__(self) -> None: pass def get_key_url_prefix(self) -> str: return _PREFIX - def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: + def new_kms_client(self, conf: Dict[str, Any], key_url: Optional[str]) -> KmsClient: uri_prefix = _PREFIX if key_url is not None: uri_prefix = key_url @@ -70,7 +71,7 @@ def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: return _GcpKmsClient(uri_prefix, creds) @classmethod - def register(cls): + def register(cls) -> None: register_kms_driver(GcpKmsDriver()) diff --git a/src/confluent_kafka/schema_registry/rules/encryption/hcvault/hcvault_driver.py b/src/confluent_kafka/schema_registry/rules/encryption/hcvault/hcvault_driver.py index 9581d0ccf..ad796f7a4 100644 --- a/src/confluent_kafka/schema_registry/rules/encryption/hcvault/hcvault_driver.py +++ b/src/confluent_kafka/schema_registry/rules/encryption/hcvault/hcvault_driver.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +from typing import Dict, Any, Optional from tink import KmsClient @@ -28,13 +29,13 @@ class HcVaultKmsDriver(KmsDriver): - def __init__(self): + def __init__(self) -> None: pass def get_key_url_prefix(self) -> str: return _PREFIX - def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: + def new_kms_client(self, conf: Dict[str, Any], key_url: Optional[str]) -> KmsClient: uri_prefix = _PREFIX if key_url is not None: uri_prefix = key_url @@ -53,5 +54,5 @@ def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: return HcVaultKmsClient(uri_prefix, token, namespace, role_id, secret_id) @classmethod - def register(cls): + def register(cls) -> None: register_kms_driver(HcVaultKmsDriver()) diff --git a/src/confluent_kafka/schema_registry/rules/encryption/localkms/local_driver.py b/src/confluent_kafka/schema_registry/rules/encryption/localkms/local_driver.py index 1ea19fafc..903551f7e 100644 --- a/src/confluent_kafka/schema_registry/rules/encryption/localkms/local_driver.py +++ b/src/confluent_kafka/schema_registry/rules/encryption/localkms/local_driver.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +from typing import Dict, Any, Optional import tink from tink import KmsClient @@ -26,13 +27,13 @@ class LocalKmsDriver(KmsDriver): - def __init__(self): + def __init__(self) -> None: pass def get_key_url_prefix(self) -> str: return _PREFIX - def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: + def new_kms_client(self, conf: Dict[str, Any], key_url: Optional[str]) -> KmsClient: secret = conf.get(_SECRET) if secret is None: secret = os.getenv("LOCAL_SECRET") @@ -41,5 +42,5 @@ def new_kms_client(self, conf: dict, key_url: str) -> KmsClient: return LocalKmsClient(secret) @classmethod - def register(cls): + def register(cls) -> None: register_kms_driver(LocalKmsDriver()) diff --git a/src/confluent_kafka/serialization/__init__.py b/src/confluent_kafka/serialization/__init__.py index 70f3044b6..ed59f3c1e 100644 --- a/src/confluent_kafka/serialization/__init__.py +++ b/src/confluent_kafka/serialization/__init__.py @@ -17,8 +17,10 @@ # import struct as _struct from enum import Enum +from typing import Any, Optional from confluent_kafka.error import KafkaException +from confluent_kafka._types import HeadersType __all__ = ['Deserializer', 'IntegerDeserializer', @@ -65,7 +67,7 @@ class SerializationContext(object): headers (list): List of message header tuples. Defaults to None. """ - def __init__(self, topic, field, headers=None): + def __init__(self, topic: str, field: MessageField, headers: Optional[HeadersType] = None) -> None: self.topic = topic self.field = field self.headers = headers @@ -114,7 +116,7 @@ class Serializer(object): __slots__ = [] - def __call__(self, obj, ctx=None): + def __call__(self, obj: Any, ctx: Optional[SerializationContext] = None) -> Optional[bytes]: """ Converts obj to bytes. @@ -171,7 +173,7 @@ class Deserializer(object): __slots__ = [] - def __call__(self, value, ctx=None): + def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Any: """ Convert bytes to object @@ -200,7 +202,7 @@ class DoubleSerializer(Serializer): """ # noqa: E501 - def __call__(self, obj, ctx=None): + def __call__(self, obj: Optional[float], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: """ Args: obj (object): object to be serialized @@ -235,7 +237,7 @@ class DoubleDeserializer(Deserializer): `DoubleDeserializer Javadoc `_ """ # noqa: E501 - def __call__(self, value, ctx=None): + def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[float]: """ Deserializes float from IEEE 764 binary64 bytes. @@ -269,7 +271,7 @@ class IntegerSerializer(Serializer): `IntegerSerializer Javadoc `_ """ # noqa: E501 - def __call__(self, obj, ctx=None): + def __call__(self, obj: Optional[int], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: """ Serializes int as int32 bytes. @@ -306,7 +308,7 @@ class IntegerDeserializer(Deserializer): `IntegerDeserializer Javadoc `_ """ # noqa: E501 - def __call__(self, value, ctx=None): + def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[int]: """ Deserializes int from int32 bytes. @@ -348,10 +350,10 @@ class StringSerializer(Serializer): `StringSerializer Javadoc `_ """ # noqa: E501 - def __init__(self, codec='utf_8'): + def __init__(self, codec: str = 'utf_8') -> None: self.codec = codec - def __call__(self, obj, ctx=None): + def __call__(self, obj: Optional[str], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: """ Serializes a str(py2:unicode) to bytes. @@ -394,10 +396,10 @@ class StringDeserializer(Deserializer): `StringDeserializer Javadoc `_ """ # noqa: E501 - def __init__(self, codec='utf_8'): + def __init__(self, codec: str = 'utf_8') -> None: self.codec = codec - def __call__(self, value, ctx=None): + def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[str]: """ Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. diff --git a/src/confluent_kafka/serializing_producer.py b/src/confluent_kafka/serializing_producer.py index 3b3ff82b0..9234ce5e1 100644 --- a/src/confluent_kafka/serializing_producer.py +++ b/src/confluent_kafka/serializing_producer.py @@ -16,11 +16,14 @@ # limitations under the License. # +from typing import Any, Dict, Optional + from confluent_kafka.cimpl import Producer as _ProducerImpl from .serialization import (MessageField, SerializationContext) from .error import (KeySerializationError, ValueSerializationError) +from ._types import HeadersType, DeliveryCallback, Serializer class SerializingProducer(_ProducerImpl): @@ -66,7 +69,7 @@ class SerializingProducer(_ProducerImpl): conf (producer): SerializingProducer configuration. """ # noqa E501 - def __init__(self, conf): + def __init__(self, conf: Dict[str, Any]) -> None: conf_copy = conf.copy() self._key_serializer = conf_copy.pop('key.serializer', None) @@ -74,8 +77,11 @@ def __init__(self, conf): super(SerializingProducer, self).__init__(conf_copy) - def produce(self, topic, key=None, value=None, partition=-1, - on_delivery=None, timestamp=0, headers=None): + def produce( # type: ignore[override] + self, topic: str, key: Any = None, value: Any = None, partition: int = -1, + on_delivery: Optional[DeliveryCallback] = None, timestamp: int = 0, + headers: Optional[HeadersType] = None + ) -> None: """ Produce a message. @@ -105,7 +111,7 @@ def produce(self, topic, key=None, value=None, partition=-1, :py:func:`SerializingProducer.flush` on successful or failed delivery. - timestamp (float, optional): Message timestamp (CreateTime) in + timestamp (int, optional): Message timestamp (CreateTime) in milliseconds since Unix epoch UTC (requires broker >= 0.10.0.0). Default value is current time. diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 451017ad6..e9b636453 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -14,6 +14,17 @@ * limitations under the License. */ +/** + * ⚠️ WARNING: UPDATE TYPE STUBS WHEN MODIFYING INTERFACES ⚠️ + * + * This file defines the _AdminClientImpl class and its methods. + * When changing method signatures, parameters, or defaults, you MUST + * also update the corresponding type definitions in: + * src/confluent_kafka/cimpl.pyi + * + * Failure to keep both in sync will result in incorrect type hints. + */ + #include "confluent_kafka.h" #include @@ -3011,7 +3022,7 @@ const char Admin_list_offsets_doc[] = PyDoc_STR( /** - * @brief Delete records + * @brief Delete records */ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ PyObject *topic_partition_offsets = NULL, *future; @@ -3054,7 +3065,7 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ if(!c_topic_partition_offsets) { goto err; /* Exception raised by py_to_c_parts() */ } - + c_obj = malloc(sizeof(rd_kafka_DeleteRecords_t *) * del_record_cnt); c_obj[0] = rd_kafka_DeleteRecords_new(c_topic_partition_offsets); @@ -3078,11 +3089,11 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ rd_kafka_DeleteRecords_destroy_array(c_obj, del_record_cnt); free(c_obj); - rd_kafka_topic_partition_list_destroy(c_topic_partition_offsets); + rd_kafka_topic_partition_list_destroy(c_topic_partition_offsets); Py_XDECREF(topic_partition_offsets); - + Py_RETURN_NONE; -err: +err: if (c_obj) { rd_kafka_DeleteRecords_destroy_array(c_obj, del_record_cnt); free(c_obj); @@ -3155,7 +3166,7 @@ PyObject *Admin_elect_leaders(Handle *self, PyObject *args, PyObject *kwargs) { } c_elect_leaders = rd_kafka_ElectLeaders_new(c_election_type, c_partitions); - + if(c_partitions) { rd_kafka_topic_partition_list_destroy(c_partitions); } @@ -3365,12 +3376,12 @@ static PyMethodDef Admin_methods[] = { { "list_offsets", (PyCFunction)Admin_list_offsets, METH_VARARGS|METH_KEYWORDS, Admin_list_offsets_doc }, - + { "delete_records", (PyCFunction)Admin_delete_records, METH_VARARGS|METH_KEYWORDS, Admin_delete_records_doc }, - { "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS, + { "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS, Admin_elect_leaders_doc }, @@ -4644,7 +4655,7 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li int i; - DeletedRecords_type = cfl_PyObject_lookup("confluent_kafka.admin", + DeletedRecords_type = cfl_PyObject_lookup("confluent_kafka.admin", "DeletedRecords"); if(!DeletedRecords_type) goto raise; /* Exception raised by lookup() */ @@ -4653,7 +4664,7 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li for(i=0; icnt; i++){ PyObject *key = NULL; PyObject *value = NULL; - + rd_kafka_topic_partition_t *c_topic_partition = &c_topic_partitions->elems[i]; key = c_part_to_py(c_topic_partition); @@ -4674,7 +4685,7 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li goto raise; } } - + PyDict_SetItem(result, key, value); Py_DECREF(key); Py_DECREF(value); @@ -5036,12 +5047,12 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, { const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev); const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res); - + result = Admin_c_DeletedRecords_to_py(c_delete_records_res_list); break; } - case RD_KAFKA_EVENT_ELECTLEADERS_RESULT: + case RD_KAFKA_EVENT_ELECTLEADERS_RESULT: { size_t c_result_cnt; diff --git a/src/confluent_kafka/src/AdminTypes.c b/src/confluent_kafka/src/AdminTypes.c index 61adeb279..7289c677d 100644 --- a/src/confluent_kafka/src/AdminTypes.c +++ b/src/confluent_kafka/src/AdminTypes.c @@ -14,6 +14,17 @@ * limitations under the License. */ +/** + * ⚠️ WARNING: UPDATE TYPE STUBS WHEN MODIFYING INTERFACES ⚠️ + * + * This file defines the NewTopic and NewPartitions classes. + * When changing method signatures, parameters, or defaults, you MUST + * also update the corresponding type definitions in: + * src/confluent_kafka/cimpl.pyi + * + * Failure to keep both in sync will result in incorrect type hints. + */ + #include "confluent_kafka.h" #include diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 7376c6d84..80db86d78 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -14,6 +14,17 @@ * limitations under the License. */ +/** + * ⚠️ WARNING: UPDATE TYPE STUBS WHEN MODIFYING INTERFACES ⚠️ + * + * This file defines the Consumer class and its methods. + * When changing method signatures, parameters, or defaults, you MUST + * also update the corresponding type definitions in: + * src/confluent_kafka/cimpl.pyi + * + * Failure to keep both in sync will result in incorrect type hints. + */ + #include "confluent_kafka.h" @@ -496,7 +507,7 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args, } m = (Message *)msg; - + if (m->error != Py_None) { PyObject *error = Message_error(m, NULL); PyObject *errstr = PyObject_CallMethod(error, "str", NULL); diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index e76210151..524d24615 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -14,6 +14,17 @@ * limitations under the License. */ +/** + * ⚠️ WARNING: UPDATE TYPE STUBS WHEN MODIFYING INTERFACES ⚠️ + * + * This file defines the Producer class and its methods. + * When changing method signatures, parameters, or defaults, you MUST + * also update the corresponding type definitions in: + * src/confluent_kafka/cimpl.pyi + * + * Failure to keep both in sync will result in incorrect type hints. + */ + #include "confluent_kafka.h" diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index f34a96f31..b298f8885 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -14,6 +14,17 @@ * limitations under the License. */ +/** + * ⚠️ WARNING: UPDATE TYPE STUBS WHEN MODIFYING INTERFACES ⚠️ + * + * This file defines core classes: KafkaError, Message, TopicPartition, Uuid. + * When changing method signatures, parameters, or defaults, you MUST + * also update the corresponding type definitions in: + * src/confluent_kafka/cimpl.pyi + * + * Failure to keep both in sync will result in incorrect type hints. + */ + #include "confluent_kafka.h" #include @@ -854,7 +865,7 @@ static PyObject *Uuid_str0 (Uuid *self) { } static long Uuid_hash (Uuid *self) { - + return rd_kafka_Uuid_most_significant_bits(self->cUuid) ^ rd_kafka_Uuid_least_significant_bits(self->cUuid); } @@ -876,7 +887,7 @@ static int Uuid_init (PyObject *self0, PyObject *args, int64_t least_significant_bits; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "LL", kws, - &most_significant_bits, + &most_significant_bits, &least_significant_bits)) return -1; @@ -925,7 +936,7 @@ PyTypeObject UuidType = { PyObject_GenericGetAttr, /* tp_getattro */ 0, /* tp_setattro */ 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC, /* tp_flags */ "Generic Uuid. Being used in various identifiers including topic_id.\n" "\n" @@ -1355,7 +1366,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { /** * @brief Convert C rd_kafka_topic_partition_result_t to Python dict(TopicPartition, KafkaException). - * + * * @returns The new Python dict object. */ PyObject *c_topic_partition_result_to_py_dict( @@ -1379,7 +1390,7 @@ PyObject *c_topic_partition_result_to_py_dict( value = KafkaError_new_or_None(rd_kafka_error_code(c_error), rd_kafka_error_string(c_error)); key = c_part_to_py(c_topic_partition); - + PyDict_SetItem(result, key, value); Py_DECREF(key); @@ -1681,7 +1692,7 @@ PyObject *c_Uuid_to_py(const rd_kafka_Uuid_t *c_uuid) { PyObject *Uuid_type = NULL; PyObject *args = NULL; PyObject *kwargs = NULL; - + if(!c_uuid) Py_RETURN_NONE; @@ -1691,7 +1702,7 @@ PyObject *c_Uuid_to_py(const rd_kafka_Uuid_t *c_uuid) { goto err; } - + kwargs = PyDict_New(); cfl_PyDict_SetLong(kwargs, "most_significant_bits", rd_kafka_Uuid_most_significant_bits(c_uuid));