diff --git a/aiokafka/__init__.pyi b/aiokafka/__init__.pyi new file mode 100644 index 00000000..296f5708 --- /dev/null +++ b/aiokafka/__init__.pyi @@ -0,0 +1,21 @@ +from .abc import ConsumerRebalanceListener +from .consumer import AIOKafkaConsumer +from .errors import ConsumerStoppedError, IllegalOperation +from .structs import ( + ConsumerRecord, + OffsetAndMetadata, + OffsetAndTimestamp, + TopicPartition, +) + +__version__ = ... +__all__ = [ + "AIOKafkaConsumer", + "ConsumerRebalanceListener", + "ConsumerStoppedError", + "IllegalOperation", + "ConsumerRecord", + "TopicPartition", + "OffsetAndTimestamp", + "OffsetAndMetadata", +] diff --git a/aiokafka/abc.pyi b/aiokafka/abc.pyi new file mode 100644 index 00000000..40246041 --- /dev/null +++ b/aiokafka/abc.pyi @@ -0,0 +1,139 @@ +import abc + +from aiokafka.structs import TopicPartition + +class ConsumerRebalanceListener(abc.ABC): + """ + A callback interface that the user can implement to trigger custom actions + when the set of partitions assigned to the consumer changes. + + This is applicable when the consumer is having Kafka auto-manage group + membership. If the consumer's directly assign partitions, those + partitions will never be reassigned and this callback is not applicable. + + When Kafka is managing the group membership, a partition re-assignment will + be triggered any time the members of the group changes or the subscription + of the members changes. This can occur when processes die, new process + instances are added or old instances come back to life after failure. + Rebalances can also be triggered by changes affecting the subscribed + topics (e.g. when then number of partitions is administratively adjusted). + + There are many uses for this functionality. One common use is saving + offsets in a custom store. By saving offsets in the + :meth:`on_partitions_revoked`, call we can ensure that any time partition + assignment changes the offset gets saved. + + Another use is flushing out any kind of cache of intermediate results the + consumer may be keeping. For example, consider a case where the consumer is + subscribed to a topic containing user page views, and the goal is to count + the number of page views per users for each five minute window. Let's say + the topic is partitioned by the user id so that all events for a particular + user will go to a single consumer instance. The consumer can keep in memory + a running tally of actions per user and only flush these out to a remote + data store when its cache gets too big. However if a partition is + reassigned it may want to automatically trigger a flush of this cache, + before the new owner takes over consumption. + + This callback will execute during the rebalance process, and Consumer will + wait for callbacks to finish before proceeding with group join. + + It is guaranteed that all consumer processes will invoke + :meth:`on_partitions_revoked` prior to any process invoking + :meth:`on_partitions_assigned`. So if offsets or other state is saved in the + :meth:`on_partitions_revoked` call, it should be saved by the time the process + taking over that partition has their :meth:`on_partitions_assigned` callback + called to load the state. + """ + + @abc.abstractmethod + def on_partitions_revoked(self, revoked: list[TopicPartition]) -> None: + """ + A coroutine or function the user can implement to provide cleanup or + custom state save on the start of a rebalance operation. + + This method will be called *before* a rebalance operation starts and + *after* the consumer stops fetching data. + + If you are using manual commit you have to commit all consumed offsets + here, to avoid duplicate message delivery after rebalance is finished. + + .. note:: This method is only called before rebalances. It is not + called prior to :meth:`.AIOKafkaConsumer.stop` + + Arguments: + revoked (list(TopicPartition)): the partitions that were assigned + to the consumer on the last rebalance + """ + ... + + @abc.abstractmethod + def on_partitions_assigned(self, assigned: list[TopicPartition]) -> None: + """ + A coroutine or function the user can implement to provide load of + custom consumer state or cache warmup on completion of a successful + partition re-assignment. + + This method will be called *after* partition re-assignment completes + and *before* the consumer starts fetching data again. + + It is guaranteed that all the processes in a consumer group will + execute their :meth:`on_partitions_revoked` callback before any instance + executes its :meth:`on_partitions_assigned` callback. + + Arguments: + assigned (list(TopicPartition)): the partitions assigned to the + consumer (may include partitions that were previously assigned) + """ + ... + +class AbstractTokenProvider(abc.ABC): + """ + A Token Provider must be used for the `SASL OAuthBearer`_ protocol. + + The implementation should ensure token reuse so that multiple + calls at connect time do not create multiple tokens. + The implementation should also periodically refresh the token in order to + guarantee that each call returns an unexpired token. + + A timeout error should be returned after a short period of inactivity so + that the broker can log debugging info and retry. + + Token Providers MUST implement the :meth:`token` method + + .. _SASL OAuthBearer: + https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_oauth.html + """ + + @abc.abstractmethod + async def token(self) -> None: + """ + An async callback returning a :class:`str` ID/Access Token to be sent to + the Kafka client. In case where a synchronous callback is needed, + implementations like following can be used: + + .. code-block:: python + + from aiokafka.abc import AbstractTokenProvider + + class CustomTokenProvider(AbstractTokenProvider): + async def token(self): + return await asyncio.get_running_loop().run_in_executor( + None, self._token) + + def _token(self): + # The actual synchronous token callback. + """ + ... + + def extensions(self) -> dict[str, str]: + """ + This is an OPTIONAL method that may be implemented. + + Returns a map of key-value pairs that can be sent with the + SASL/OAUTHBEARER initial client request. If not implemented, the values + are ignored + + This feature is only available in Kafka >= 2.1.0. + """ + +__all__ = ["ConsumerRebalanceListener", "AbstractTokenProvider"] diff --git a/aiokafka/client.pyi b/aiokafka/client.pyi new file mode 100644 index 00000000..3fbd3254 --- /dev/null +++ b/aiokafka/client.pyi @@ -0,0 +1,7 @@ +from enum import IntEnum + +log = ... + +class CoordinationType(IntEnum): + GROUP = ... + TRANSACTION = ... diff --git a/aiokafka/cluster.pyi b/aiokafka/cluster.pyi new file mode 100644 index 00000000..d0c09aa8 --- /dev/null +++ b/aiokafka/cluster.pyi @@ -0,0 +1,198 @@ +from collections.abc import Sequence +from concurrent.futures import Future +from typing import Any, Callable, TypedDict + +from aiokafka.client import CoordinationType +from aiokafka.protocol.commit import ( + GroupCoordinatorResponse_v0, + GroupCoordinatorResponse_v1, +) +from aiokafka.protocol.metadata import ( + MetadataResponse_v0, + MetadataResponse_v1, + MetadataResponse_v2, + MetadataResponse_v3, + MetadataResponse_v4, + MetadataResponse_v5, +) +from aiokafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition + +log = ... +MetadataResponse = ( + MetadataResponse_v0 + | MetadataResponse_v1 + | MetadataResponse_v2 + | MetadataResponse_v3 + | MetadataResponse_v4 + | MetadataResponse_v5 +) +GroupCoordinatorResponse = GroupCoordinatorResponse_v0 | GroupCoordinatorResponse_v1 + +class ClusterConfig(TypedDict): + retry_backoff_ms: int + metadata_max_age_ms: int + bootstrap_servers: str | list[str] + +class ClusterMetadata: + """ + A class to manage kafka cluster metadata. + + This class does not perform any IO. It simply updates internal state + given API responses (MetadataResponse, GroupCoordinatorResponse). + + Keyword Arguments: + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + metadata_max_age_ms (int): The period of time in milliseconds after + which we force a refresh of metadata even if we haven't seen any + partition leadership changes to proactively discover any new + brokers or partitions. Default: 300000 + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the client should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. + """ + + DEFAULT_CONFIG: ClusterConfig = ... + def __init__(self, **configs: int | str | list[str]) -> None: ... + def is_bootstrap(self, node_id: str) -> bool: ... + def brokers(self) -> set[BrokerMetadata]: + """Get all BrokerMetadata + + Returns: + set: {BrokerMetadata, ...} + """ + + def broker_metadata(self, broker_id: str) -> BrokerMetadata | None: + """Get BrokerMetadata + + Arguments: + broker_id (str): node_id for a broker to check + + Returns: + BrokerMetadata or None if not found + """ + + def partitions_for_topic(self, topic: str) -> set[int] | None: + """Return set of all partitions for topic (whether available or not) + + Arguments: + topic (str): topic to check for partitions + + Returns: + set: {partition (int), ...} + """ + + def available_partitions_for_topic(self, topic: str) -> set[int] | None: + """Return set of partitions with known leaders + + Arguments: + topic (str): topic to check for partitions + + Returns: + set: {partition (int), ...} + None if topic not found. + """ + + def leader_for_partition(self, partition: PartitionMetadata) -> int | None: + """Return node_id of leader, -1 unavailable, None if unknown.""" + + def partitions_for_broker(self, broker_id: int | str) -> set[TopicPartition] | None: + """Return TopicPartitions for which the broker is a leader. + + Arguments: + broker_id (int): node id for a broker + + Returns: + set: {TopicPartition, ...} + None if the broker either has no partitions or does not exist. + """ + + def coordinator_for_group(self, group: str) -> int | str | None: + """Return node_id of group coordinator. + + Arguments: + group (str): name of consumer group + + Returns: + int: node_id for group coordinator + None if the group does not exist. + """ + + def request_update(self) -> Future[ClusterMetadata]: + """Flags metadata for update, return Future() + + Actual update must be handled separately. This method will only + change the reported ttl() + + Returns: + Future (value will be the cluster object after update) + """ + + def topics(self, exclude_internal_topics: bool = ...) -> set[str]: + """Get set of known topics. + + Arguments: + exclude_internal_topics (bool): Whether records from internal topics + (such as offsets) should be exposed to the consumer. If set to + True the only way to receive records from an internal topic is + subscribing to it. Default True + + Returns: + set: {topic (str), ...} + """ + + def failed_update(self, exception: BaseException) -> None: + """Update cluster state given a failed MetadataRequest.""" + + def update_metadata(self, metadata: MetadataResponse) -> None: + """Update cluster state given a MetadataResponse. + + Arguments: + metadata (MetadataResponse): broker response to a metadata request + + Returns: None + """ + + def add_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None: + """Add a callback function to be called on each metadata update""" + + def remove_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None: + """Remove a previously added listener callback""" + + def add_group_coordinator( + self, group: str, response: GroupCoordinatorResponse + ) -> str | None: + """Update with metadata for a group coordinator + + Arguments: + group (str): name of group from GroupCoordinatorRequest + response (GroupCoordinatorResponse): broker response + + Returns: + string: coordinator node_id if metadata is updated, None on error + """ + + def with_partitions( + self, partitions_to_add: Sequence[PartitionMetadata] + ) -> ClusterMetadata: + """Returns a copy of cluster metadata with partitions added""" + + def coordinator_metadata(self, node_id: int | str) -> BrokerMetadata | None: ... + def add_coordinator( + self, + node_id: int | str, + host: str, + port: int, + rack: str | None = ..., + *, + purpose: tuple[CoordinationType, str], + ) -> None: + """Keep track of all coordinator nodes separately and remove them if + a new one was elected for the same purpose (For example group + coordinator for group X). + """ + + def __str__(self) -> str: ... diff --git a/aiokafka/consumer/__init__.pyi b/aiokafka/consumer/__init__.pyi new file mode 100644 index 00000000..1c61c672 --- /dev/null +++ b/aiokafka/consumer/__init__.pyi @@ -0,0 +1,3 @@ +from .consumer import AIOKafkaConsumer + +__all__ = ["AIOKafkaConsumer"] diff --git a/aiokafka/consumer/consumer.pyi b/aiokafka/consumer/consumer.pyi new file mode 100644 index 00000000..c8b073da --- /dev/null +++ b/aiokafka/consumer/consumer.pyi @@ -0,0 +1,853 @@ +import asyncio +from ssl import SSLContext +from types import ModuleType, TracebackType +from typing import Callable, Generic, Literal, TypeVar + +from aiokafka.abc import AbstractTokenProvider, ConsumerRebalanceListener +from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor +from aiokafka.structs import ( + ConsumerRecord, + OffsetAndMetadata, + OffsetAndTimestamp, + TopicPartition, +) + +log = ... +KT_co = TypeVar("KT_co", covariant=True) +VT_co = TypeVar("VT_co", covariant=True) +ET = TypeVar("ET", bound=BaseException) + +class AIOKafkaConsumer(Generic[KT_co, VT_co]): + """ + A client that consumes records from a Kafka cluster. + + The consumer will transparently handle the failure of servers in the Kafka + cluster, and adapt as topic-partitions are created or migrate between + brokers. + + It also interacts with the assigned Kafka Group Coordinator node to allow + multiple consumers to load balance consumption of topics (feature of Kafka + >= 0.9.0.0). + + .. _kip-62: + https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread + + Arguments: + *topics (tuple(str)): optional list of topics to subscribe to. If not set, + call :meth:`.subscribe` or :meth:`.assign` before consuming records. + Passing topics directly is same as calling :meth:`.subscribe` API. + bootstrap_servers (str, list(str)): a ``host[:port]`` string (or list of + ``host[:port]`` strings) that the consumer should contact to bootstrap + initial cluster metadata. + + This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to ``localhost:9092``. + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to :class:`~.consumer.group_coordinator.GroupCoordinator` + for logging with respect to consumer group administration. Default: + ``aiokafka-{version}`` + group_id (str or None): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. If None, auto-partition assignment (via + group coordinator) and offset commits are disabled. + Default: None + group_instance_id (str or None): name of the group instance ID used for + static membership (KIP-345) + key_deserializer (Callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (Callable, Optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + `fetch_max_wait_ms` for more data to accumulate. Default: 1. + fetch_max_bytes (int): The maximum amount of data the server should + return for a fetch request. This is not an absolute maximum, if + the first message in the first non-empty partition of the fetch + is larger than this value, the message will still be returned + to ensure that the consumer can make progress. NOTE: consumer + performs fetches to multiple brokers in parallel so memory + usage will depend on the number of brokers containing + partitions for the topic. + Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb). + fetch_max_wait_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request ``= #partitions * max_partition_fetch_bytes``. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + max_poll_records (int or None): The maximum number of records + returned in a single call to :meth:`.getmany`. + Defaults ``None``, no limit. + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + auto_offset_reset (str): A policy for resetting offsets on + :exc:`.OffsetOutOfRangeError` errors: ``earliest`` will move to the oldest + available message, ``latest`` will move to the most recent, and + ``none`` will raise an exception so you can handle this case. + Default: ``latest``. + enable_auto_commit (bool): If true the consumer's offset will be + periodically committed in the background. Default: True. + auto_commit_interval_ms (int): milliseconds between automatic + offset commits, if enable_auto_commit is True. Default: 5000. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + metadata_max_age_ms (int): The period of time in milliseconds after + which we force a refresh of metadata even if we haven't seen any + partition leadership changes to proactively discover any new + brokers or partitions. Default: 300000 + partition_assignment_strategy (list or tuple): List of objects to use to + distribute partition ownership amongst consumer instances when + group management is used. This preference is implicit in the order + of the strategies in the list. When assignment strategy changes: + to support a change to the assignment strategy, new versions must + enable support both for the old assignment strategy and the new + one. The coordinator will choose the old assignment strategy until + all members have been updated. Then it will choose the new + strategy. Default: [:class:`.RoundRobinPartitionAssignor`] + + max_poll_interval_ms (int): Maximum allowed time between calls to + consume messages (e.g., :meth:`.getmany`). If this interval + is exceeded the consumer is considered failed and the group will + rebalance in order to reassign the partitions to another consumer + group member. If API methods block waiting for messages, that time + does not count against this timeout. See `KIP-62`_ for more + information. Default 300000 + rebalance_timeout_ms (int): The maximum time server will wait for this + consumer to rejoin the group in a case of rebalance. In Java client + this behaviour is bound to `max.poll.interval.ms` configuration, + but as ``aiokafka`` will rejoin the group in the background, we + decouple this setting to allow finer tuning by users that use + :class:`.ConsumerRebalanceListener` to delay rebalacing. Defaults + to ``session_timeout_ms`` + session_timeout_ms (int): Client group session and failure detection + timeout. The consumer sends periodic heartbeats + (`heartbeat.interval.ms`) to indicate its liveness to the broker. + If no hearts are received by the broker for a group member within + the session timeout, the broker will remove the consumer from the + group and trigger a rebalance. The allowed range is configured with + the **broker** configuration properties + `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. + Default: 10000 + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than `session_timeout_ms`, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + + consumer_timeout_ms (int): maximum wait timeout for background fetching + routine. Mostly defines how fast the system will see rebalance and + request new data for new partitions. Default: 200 + api_version (str): specify which kafka API version to use. + :class:`AIOKafkaConsumer` supports Kafka API versions >=0.9 only. + If set to ``auto``, will attempt to infer the broker version by + probing various APIs. Default: ``auto`` + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: ``PLAINTEXT``, ``SSL``, ``SASL_PLAINTEXT``, + ``SASL_SSL``. Default: ``PLAINTEXT``. + ssl_context (ssl.SSLContext): pre-configured :class:`~ssl.SSLContext` + for wrapping socket connections. Directly passed into asyncio's + :meth:`~asyncio.loop.create_connection`. For more information see + :ref:`ssl_auth`. Default: None. + exclude_internal_topics (bool): Whether records from internal topics + (such as offsets) should be exposed to the consumer. If set to True + the only way to receive records from an internal topic is + subscribing to it. Requires 0.10+ Default: True + connections_max_idle_ms (int): Close idle connections after the number + of milliseconds specified by this config. Specifying `None` will + disable idle checks. Default: 540000 (9 minutes). + isolation_level (str): Controls how to read messages written + transactionally. + + If set to ``read_committed``, :meth:`.getmany` will only return + transactional messages which have been committed. + If set to ``read_uncommitted`` (the default), :meth:`.getmany` will + return all messages, even transactional messages which have been + aborted. + + Non-transactional messages will be returned unconditionally in + either mode. + + Messages will always be returned in offset order. Hence, in + `read_committed` mode, :meth:`.getmany` will only return + messages up to the last stable offset (LSO), which is the one less + than the offset of the first open transaction. In particular any + messages appearing after messages belonging to ongoing transactions + will be withheld until the relevant transaction has been completed. + As a result, `read_committed` consumers will not be able to read up + to the high watermark when there are in flight transactions. + Further, when in `read_committed` the seek_to_end method will + return the LSO. See method docs below. Default: ``read_uncommitted`` + + sasl_mechanism (str): Authentication mechanism when security_protocol + is configured for ``SASL_PLAINTEXT`` or ``SASL_SSL``. Valid values are: + ``PLAIN``, ``GSSAPI``, ``SCRAM-SHA-256``, ``SCRAM-SHA-512``, + ``OAUTHBEARER``. + Default: ``PLAIN`` + sasl_plain_username (str or None): username for SASL ``PLAIN`` authentication. + Default: None + sasl_plain_password (str or None): password for SASL ``PLAIN`` authentication. + Default: None + sasl_oauth_token_provider (~aiokafka.abc.AbstractTokenProvider or None): + OAuthBearer token provider instance. + Default: None + + Note: + Many configuration parameters are taken from Java Client: + https://kafka.apache.org/documentation.html#newconsumerconfigs + + """ + + _closed = ... + _source_traceback = ... + def __init__( + self, + *topics: str, + loop: asyncio.AbstractEventLoop | None = ..., + bootstrap_servers: str | list[str] = ..., + client_id: str = ..., + group_id: str | None = ..., + group_instance_id: str | None = ..., + key_deserializer: Callable[[bytes], KT_co] = lambda x: x, + value_deserializer: Callable[[bytes], VT_co] = lambda x: x, + fetch_max_wait_ms: int = ..., + fetch_max_bytes: int = ..., + fetch_min_bytes: int = ..., + max_partition_fetch_bytes: int = ..., + request_timeout_ms: int = ..., + retry_backoff_ms: int = ..., + auto_offset_reset: ( + Literal["earliest"] | Literal["latest"] | Literal["none"] + ) = ..., + enable_auto_commit: bool = ..., + auto_commit_interval_ms: int = ..., + check_crcs: bool = ..., + metadata_max_age_ms: int = ..., + partition_assignment_strategy: tuple[ + type[AbstractPartitionAssignor], ... + ] = ..., + max_poll_interval_ms: int = ..., + rebalance_timeout_ms: int | None = ..., + session_timeout_ms: int = ..., + heartbeat_interval_ms: int = ..., + consumer_timeout_ms: int = ..., + max_poll_records: int | None = ..., + ssl_context: SSLContext | None = ..., + security_protocol: ( + Literal["PLAINTEXT"] + | Literal["SSL"] + | Literal["SASL_PLAINTEXT"] + | Literal["SASL_SSL"] + ) = ..., + api_version: str = ..., + exclude_internal_topics: bool = ..., + connections_max_idle_ms: int = ..., + isolation_level: Literal["read_committed"] | Literal["read_uncommitted"] = ..., + sasl_mechanism: ( + Literal["PLAIN"] + | Literal["GSSAPI"] + | Literal["SCRAM-SHA-256"] + | Literal["SCRAM-SHA-512"] + | Literal["OAUTHBEARER"] + ) = ..., + sasl_plain_password: str | None = ..., + sasl_plain_username: str | None = ..., + sasl_kerberos_service_name: str = ..., + sasl_kerberos_domain_name: str | None = ..., + sasl_oauth_token_provider: AbstractTokenProvider | None = ..., + ) -> None: ... + def __del__(self, _warnings: ModuleType = ...) -> None: ... + async def start(self) -> None: + """Connect to Kafka cluster. This will: + + * Load metadata for all cluster nodes and partition allocation + * Wait for possible topic autocreation + * Join group if ``group_id`` provided + """ + + def assign(self, partitions: list[TopicPartition]) -> None: + """Manually assign a list of :class:`.TopicPartition` to this consumer. + + This interface does not support incremental assignment and will + replace the previous assignment (if there was one). + + Arguments: + partitions (list(TopicPartition)): assignment for this instance. + + Raises: + IllegalStateError: if consumer has already called :meth:`subscribe` + + Warning: + It is not possible to use both manual partition assignment with + :meth:`assign` and group assignment with :meth:`subscribe`. + + Note: + Manual topic assignment through this method does not use the + consumer's group management functionality. As such, there will be + **no rebalance operation triggered** when group membership or + cluster and topic metadata change. + """ + + def assignment(self) -> set[TopicPartition]: + """Get the set of partitions currently assigned to this consumer. + + If partitions were directly assigned using :meth:`assign`, then this will + simply return the same partitions that were previously assigned. + + If topics were subscribed using :meth:`subscribe`, then this will give + the set of topic partitions currently assigned to the consumer (which + may be empty if the assignment hasn't happened yet or if the partitions + are in the process of being reassigned). + + Returns: + set(TopicPartition): the set of partitions currently assigned to + this consumer + """ + + async def stop(self) -> None: + """Close the consumer, while waiting for finalizers: + + * Commit last consumed message if autocommit enabled + * Leave group if used Consumer Groups + """ + + async def commit( + self, + offsets: ( + dict[TopicPartition, int | tuple[int, str] | OffsetAndMetadata] | None + ) = ..., + ) -> None: + """Commit offsets to Kafka. + + This commits offsets only to Kafka. The offsets committed using this + API will be used on the first fetch after every rebalance and also on + startup. As such, if you need to store offsets in anything other than + Kafka, this API should not be used. + + Currently only supports kafka-topic offset storage (not Zookeeper) + + When explicitly passing `offsets` use either offset of next record, + or tuple of offset and metadata:: + + tp = TopicPartition(msg.topic, msg.partition) + metadata = "Some utf-8 metadata" + # Either + await consumer.commit({tp: msg.offset + 1}) + # Or position directly + await consumer.commit({tp: (msg.offset + 1, metadata)}) + + .. note:: If you want *fire and forget* commit, like + :meth:`~kafka.KafkaConsumer.commit_async` in `kafka-python`_, just + run it in a task. Something like:: + + fut = loop.create_task(consumer.commit()) + fut.add_done_callback(on_commit_done) + + Arguments: + offsets (dict, Optional): A mapping from :class:`.TopicPartition` to + ``(offset, metadata)`` to commit with the configured ``group_id``. + Defaults to current consumed offsets for all subscribed partitions. + Raises: + ~aiokafka.errors.CommitFailedError: If membership already changed on broker. + ~aiokafka.errors.IllegalOperation: If used with ``group_id == None``. + ~aiokafka.errors.IllegalStateError: If partitions not assigned. + ~aiokafka.errors.KafkaError: If commit failed on broker side. This + could be due to invalid offset, too long metadata, authorization + failure, etc. + ValueError: If offsets is of wrong format. + + .. versionchanged:: 0.4.0 + + Changed :exc:`AssertionError` to + :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned + partition. + + .. versionchanged:: 0.4.0 + + Will now raise :exc:`~aiokafka.errors.CommitFailedError` in case + membership changed, as (possibly) this partition is handled by + another consumer. + + .. _kafka-python: https://github.com/dpkp/kafka-python + """ + + async def committed(self, partition: TopicPartition) -> int | None: + """Get the last committed offset for the given partition. (whether the + commit happened by this process or another). + + This offset will be used as the position for the consumer in the event + of a failure. + + This call will block to do a remote call to get the latest offset, as + those are not cached by consumer (Transactional Producer can change + them without Consumer knowledge as of Kafka 0.11.0) + + Arguments: + partition (TopicPartition): the partition to check + + Returns: + The last committed offset, or None if there was no prior commit. + + Raises: + IllegalOperation: If used with ``group_id == None`` + """ + + async def topics(self) -> set[str]: + """Get all topics the user is authorized to view. + + Returns: + set: topics + """ + + def partitions_for_topic(self, topic: str) -> set[int] | None: + """Get metadata about the partitions for a given topic. + + This method will return `None` if Consumer does not already have + metadata for this topic. + + Arguments: + topic (str): topic to check + + Returns: + set: partition ids + """ + + async def position(self, partition: TopicPartition) -> int: + """Get the offset of the *next record* that will be fetched (if a + record with that offset exists on broker). + + Arguments: + partition (TopicPartition): partition to check + + Returns: + int: offset + + Raises: + IllegalStateError: partition is not assigned + + .. versionchanged:: 0.4.0 + + Changed :exc:`AssertionError` to + :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned + partition + """ + + def highwater(self, partition: TopicPartition) -> int | None: + """Last known highwater offset for a partition. + + A highwater offset is the offset that will be assigned to the next + message that is produced. It may be useful for calculating lag, by + comparing with the reported position. Note that both position and + highwater refer to the *next* offset - i.e., highwater offset is one + greater than the newest available message. + + Highwater offsets are returned as part of ``FetchResponse``, so will + not be available if messages for this partition were not requested yet. + + Arguments: + partition (TopicPartition): partition to check + + Returns: + int or None: offset if available + """ + + def last_stable_offset(self, partition: TopicPartition) -> int | None: + """Returns the Last Stable Offset of a topic. It will be the last + offset up to which point all transactions were completed. Only + available in with isolation_level `read_committed`, in + `read_uncommitted` will always return -1. Will return None for older + Brokers. + + As with :meth:`highwater` will not be available until some messages are + consumed. + + Arguments: + partition (TopicPartition): partition to check + + Returns: + int or None: offset if available + """ + + def last_poll_timestamp(self, partition: TopicPartition) -> int | None: + """Returns the timestamp of the last poll of this partition (in ms). + It is the last time :meth:`highwater` and :meth:`last_stable_offset` were + updated. However it does not mean that new messages were received. + + As with :meth:`highwater` will not be available until some messages are + consumed. + + Arguments: + partition (TopicPartition): partition to check + + Returns: + int or None: timestamp if available + """ + + def seek(self, partition: TopicPartition, offset: int) -> None: + """Manually specify the fetch offset for a :class:`.TopicPartition`. + + Overrides the fetch offsets that the consumer will use on the next + :meth:`getmany`/:meth:`getone` call. If this API is invoked for the same + partition more than once, the latest offset will be used on the next + fetch. + + Note: + You may lose data if this API is arbitrarily used in the middle + of consumption to reset the fetch offsets. Use it either on + rebalance listeners or after all pending messages are processed. + + Arguments: + partition (TopicPartition): partition for seek operation + offset (int): message offset in partition + + Raises: + ValueError: if offset is not a positive integer + IllegalStateError: partition is not currently assigned + + .. versionchanged:: 0.4.0 + + Changed :exc:`AssertionError` to + :exc:`~aiokafka.errors.IllegalStateError` and :exc:`ValueError` in + respective cases. + """ + + async def seek_to_beginning(self, *partitions: TopicPartition) -> bool: + """Seek to the oldest available offset for partitions. + + Arguments: + *partitions: Optionally provide specific :class:`.TopicPartition`, + otherwise default to all assigned partitions. + + Raises: + IllegalStateError: If any partition is not currently assigned + TypeError: If partitions are not instances of :class:`.TopicPartition` + + .. versionadded:: 0.3.0 + + """ + + async def seek_to_end(self, *partitions: TopicPartition) -> bool: + """Seek to the most recent available offset for partitions. + + Arguments: + *partitions: Optionally provide specific :class:`.TopicPartition`, + otherwise default to all assigned partitions. + + Raises: + IllegalStateError: If any partition is not currently assigned + TypeError: If partitions are not instances of :class:`.TopicPartition` + + .. versionadded:: 0.3.0 + + """ + + async def seek_to_committed( + self, *partitions: TopicPartition + ) -> dict[TopicPartition, int | None]: + """Seek to the committed offset for partitions. + + Arguments: + *partitions: Optionally provide specific :class:`.TopicPartition`, + otherwise default to all assigned partitions. + + Returns: + dict(TopicPartition, int): mapping + of the currently committed offsets. + + Raises: + IllegalStateError: If any partition is not currently assigned + IllegalOperation: If used with ``group_id == None`` + + .. versionchanged:: 0.3.0 + + Changed :exc:`AssertionError` to + :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned + partition + """ + + async def offsets_for_times( + self, timestamps: dict[TopicPartition, int] + ) -> dict[TopicPartition, OffsetAndTimestamp | None]: + """ + Look up the offsets for the given partitions by timestamp. The returned + offset for each partition is the earliest offset whose timestamp is + greater than or equal to the given timestamp in the corresponding + partition. + + The consumer does not have to be assigned the partitions. + + If the message format version in a partition is before 0.10.0, i.e. + the messages do not have timestamps, ``None`` will be returned for that + partition. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + timestamps (dict(TopicPartition, int)): mapping from partition + to the timestamp to look up. Unit should be milliseconds since + beginning of the epoch (midnight Jan 1, 1970 (UTC)) + + Returns: + dict(TopicPartition, OffsetAndTimestamp or None): mapping from + partition to the timestamp and offset of the first message with + timestamp greater than or equal to the target timestamp. + + Raises: + ValueError: If the target timestamp is negative + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in `request_timeout_ms` + + .. versionadded:: 0.3.0 + + """ + + async def beginning_offsets( + self, partitions: list[TopicPartition] + ) -> dict[TopicPartition, int]: + """Get the first offset for the given partitions. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list[TopicPartition]): List of :class:`.TopicPartition` + instances to fetch offsets for. + + Returns: + dict [TopicPartition, int]: mapping of partition to earliest + available offset. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in `request_timeout_ms`. + + .. versionadded:: 0.3.0 + + """ + + async def end_offsets( + self, partitions: list[TopicPartition] + ) -> dict[TopicPartition, int]: + """Get the last offset for the given partitions. The last offset of a + partition is the offset of the upcoming message, i.e. the offset of the + last available message + 1. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list[TopicPartition]): List of :class:`.TopicPartition` + instances to fetch offsets for. + + Returns: + dict [TopicPartition, int]: mapping of partition to last + available offset + 1. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in ``request_timeout_ms`` + + .. versionadded:: 0.3.0 + + """ + + def subscribe( + self, + topics: list[str] | tuple[str, ...] = ..., + pattern: str | None = ..., + listener: ConsumerRebalanceListener | None = ..., + ) -> None: + """Subscribe to a list of topics, or a topic regex pattern. + + Partitions will be dynamically assigned via a group coordinator. + Topic subscriptions are not incremental: this list will replace the + current assignment (if there is one). + + This method is incompatible with :meth:`assign`. + + Arguments: + topics (list or tuple): List of topics for subscription. + pattern (str): Pattern to match available topics. You must provide + either topics or pattern, but not both. + listener (ConsumerRebalanceListener): Optionally include listener + callback, which will be called before and after each rebalance + operation. + As part of group management, the consumer will keep track of + the list of consumers that belong to a particular group and + will trigger a rebalance operation if one of the following + events trigger: + + * Number of partitions change for any of the subscribed topics + * Topic is created or deleted + * An existing member of the consumer group dies + * A new member is added to the consumer group + + When any of these events are triggered, the provided listener + will be invoked first to indicate that the consumer's + assignment has been revoked, and then again when the new + assignment has been received. Note that this listener will + immediately override any listener set in a previous call + to subscribe. It is guaranteed, however, that the partitions + revoked/assigned + through this interface are from topics subscribed in this call. + Raises: + IllegalStateError: if called after previously calling :meth:`assign` + ValueError: if neither topics or pattern is provided or both + are provided + TypeError: if listener is not a :class:`.ConsumerRebalanceListener` + """ + + def subscription(self) -> set[str]: + """Get the current topics subscription. + + Returns: + set(str): a set of topics + """ + + def unsubscribe(self) -> None: + """Unsubscribe from all topics and clear all assigned partitions.""" + + async def getone(self, *partitions: TopicPartition) -> ConsumerRecord[KT_co, VT_co]: + """ + Get one message from Kafka. + If no new messages prefetched, this method will wait for it. + + Arguments: + partitions (list(TopicPartition)): Optional list of partitions to + return from. If no partitions specified then returned message + will be from any partition, which consumer is subscribed to. + + Returns: + ~aiokafka.structs.ConsumerRecord: the message + + Will return instance of + + .. code:: python + + collections.namedtuple( + "ConsumerRecord", + ["topic", "partition", "offset", "key", "value"]) + + Example usage: + + .. code:: python + + while True: + message = await consumer.getone() + topic = message.topic + partition = message.partition + # Process message + print(message.offset, message.key, message.value) + + """ + + async def getmany( + self, + *partitions: TopicPartition, + timeout_ms: int = ..., + max_records: int | None = ..., + ) -> dict[TopicPartition, list[ConsumerRecord[KT_co, VT_co]]]: + """Get messages from assigned topics / partitions. + + Prefetched messages are returned in batches by topic-partition. + If messages is not available in the prefetched buffer this method waits + `timeout_ms` milliseconds. + + Arguments: + partitions (list[TopicPartition]): The partitions that need + fetching message. If no one partition specified then all + subscribed partitions will be used + timeout_ms (int, Optional): milliseconds spent waiting if + data is not available in the buffer. If 0, returns immediately + with any records that are available currently in the buffer, + else returns empty. Must not be negative. Default: 0 + Returns: + dict(TopicPartition, list[ConsumerRecord]): topic to list of + records since the last fetch for the subscribed list of topics and + partitions + + Example usage: + + + .. code:: python + + data = await consumer.getmany() + for tp, messages in data.items(): + topic = tp.topic + partition = tp.partition + for message in messages: + # Process message + print(message.offset, message.key, message.value) + + """ + + def pause(self, *partitions: TopicPartition) -> None: + """Suspend fetching from the requested partitions. + + Future calls to :meth:`.getmany` will not return any records from these + partitions until they have been resumed using :meth:`.resume`. + + Note: This method does not affect partition subscription. + In particular, it does not cause a group rebalance when automatic + assignment is used. + + Arguments: + *partitions (list[TopicPartition]): Partitions to pause. + """ + + def paused(self) -> set[TopicPartition]: + """Get the partitions that were previously paused using + :meth:`.pause`. + + Returns: + set[TopicPartition]: partitions + """ + + def resume(self, *partitions: TopicPartition) -> None: + """Resume fetching from the specified (paused) partitions. + + Arguments: + *partitions (tuple[TopicPartition,...]): Partitions to resume. + """ + + def __aiter__(self) -> AIOKafkaConsumer[KT_co, VT_co]: ... + async def __anext__(self) -> ConsumerRecord[KT_co, VT_co]: + """Asyncio iterator interface for consumer + + Note: + TopicAuthorizationFailedError and OffsetOutOfRangeError + exceptions can be raised in iterator. + All other KafkaError exceptions will be logged and not raised + """ + + async def __aenter__(self) -> AIOKafkaConsumer[KT_co, VT_co]: ... + async def __aexit__( + self, exc_type: type[ET] | None, exc: ET | None, tb: TracebackType | None + ) -> None: ... diff --git a/aiokafka/coordinator/assignors/abstract.pyi b/aiokafka/coordinator/assignors/abstract.pyi new file mode 100644 index 00000000..3ee73f4d --- /dev/null +++ b/aiokafka/coordinator/assignors/abstract.pyi @@ -0,0 +1,66 @@ +import abc +from collections.abc import Iterable, Mapping + +from aiokafka.cluster import ClusterMetadata +from aiokafka.coordinator.protocol import ( + ConsumerProtocolMemberAssignment, + ConsumerProtocolMemberMetadata, +) + +log = ... + +class AbstractPartitionAssignor(abc.ABC): + """Abstract assignor implementation which does some common grunt work (in particular + collecting partition counts which are always needed in assignors). + """ + + @property + @abc.abstractmethod + def name(self) -> str: + """.name should be a string identifying the assignor""" + ... + + @classmethod + @abc.abstractmethod + def assign( + cls, + cluster: ClusterMetadata, + members: Mapping[str, ConsumerProtocolMemberMetadata], + ) -> dict[str, ConsumerProtocolMemberAssignment]: + """Perform group assignment given cluster metadata and member subscriptions + + Arguments: + cluster (ClusterMetadata): metadata for use in assignment + members (dict of {member_id: MemberMetadata}): decoded metadata for + each member in the group. + + Returns: + dict: {member_id: MemberAssignment} + """ + ... + + @classmethod + @abc.abstractmethod + def metadata(cls, topics: Iterable[str]) -> ConsumerProtocolMemberMetadata: + """Generate ProtocolMetadata to be submitted via JoinGroupRequest. + + Arguments: + topics (set): a member's subscribed topics + + Returns: + MemberMetadata struct + """ + ... + + @classmethod + @abc.abstractmethod + def on_assignment(cls, assignment: ConsumerProtocolMemberAssignment) -> None: + """Callback that runs on each assignment. + + This method can be used to update internal state, if any, of the + partition assignor. + + Arguments: + assignment (MemberAssignment): the member's assignment + """ + ... diff --git a/aiokafka/coordinator/protocol.pyi b/aiokafka/coordinator/protocol.pyi new file mode 100644 index 00000000..2f8e4492 --- /dev/null +++ b/aiokafka/coordinator/protocol.pyi @@ -0,0 +1,27 @@ +from typing import NamedTuple + +from aiokafka.protocol.struct import Struct +from aiokafka.structs import TopicPartition + +class ConsumerProtocolMemberMetadata(Struct): + version: int + subscription: list[str] + user_data: bytes + SCHEMA = ... + +class ConsumerProtocolMemberAssignment(Struct): + class Assignment(NamedTuple): + topic: str + partitions: list[int] + + version: int + assignment: list[Assignment] + user_data: bytes + SCHEMA = ... + def partitions(self) -> list[TopicPartition]: ... + +class ConsumerProtocol: + PROTOCOL_TYPE = ... + ASSIGNMENT_STRATEGIES = ... + METADATA = ConsumerProtocolMemberMetadata + ASSIGNMENT = ConsumerProtocolMemberAssignment diff --git a/aiokafka/errors.pyi b/aiokafka/errors.pyi new file mode 100644 index 00000000..5cc36c1e --- /dev/null +++ b/aiokafka/errors.pyi @@ -0,0 +1,560 @@ +from typing import Any, TypeVar + +__all__ = [ + "ConsumerStoppedError", + "NoOffsetForPartitionError", + "RecordTooLargeError", + "ProducerClosed", + "KafkaError", + "IllegalStateError", + "IllegalArgumentError", + "NoBrokersAvailable", + "NodeNotReadyError", + "KafkaProtocolError", + "CorrelationIdError", + "Cancelled", + "TooManyInFlightRequests", + "StaleMetadata", + "UnrecognizedBrokerVersion", + "IncompatibleBrokerVersion", + "CommitFailedError", + "AuthenticationMethodNotSupported", + "AuthenticationFailedError", + "BrokerResponseError", + "NoError", + "UnknownError", + "OffsetOutOfRangeError", + "CorruptRecordException", + "UnknownTopicOrPartitionError", + "InvalidFetchRequestError", + "LeaderNotAvailableError", + "NotLeaderForPartitionError", + "RequestTimedOutError", + "BrokerNotAvailableError", + "ReplicaNotAvailableError", + "MessageSizeTooLargeError", + "StaleControllerEpochError", + "OffsetMetadataTooLargeError", + "StaleLeaderEpochCodeError", + "GroupLoadInProgressError", + "GroupCoordinatorNotAvailableError", + "NotCoordinatorForGroupError", + "InvalidTopicError", + "RecordListTooLargeError", + "NotEnoughReplicasError", + "NotEnoughReplicasAfterAppendError", + "InvalidRequiredAcksError", + "IllegalGenerationError", + "InconsistentGroupProtocolError", + "InvalidGroupIdError", + "UnknownMemberIdError", + "InvalidSessionTimeoutError", + "RebalanceInProgressError", + "InvalidCommitOffsetSizeError", + "TopicAuthorizationFailedError", + "GroupAuthorizationFailedError", + "ClusterAuthorizationFailedError", + "InvalidTimestampError", + "UnsupportedSaslMechanismError", + "IllegalSaslStateError", + "UnsupportedVersionError", + "TopicAlreadyExistsError", + "InvalidPartitionsError", + "InvalidReplicationFactorError", + "InvalidReplicationAssignmentError", + "InvalidConfigurationError", + "NotControllerError", + "InvalidRequestError", + "UnsupportedForMessageFormatError", + "PolicyViolationError", + "KafkaUnavailableError", + "KafkaTimeoutError", + "KafkaConnectionError", + "UnsupportedCodecError", +] + +class KafkaError(RuntimeError): + retriable = ... + invalid_metadata = ... + def __str__(self) -> str: ... + +class IllegalStateError(KafkaError): ... +class IllegalArgumentError(KafkaError): ... + +class NoBrokersAvailable(KafkaError): + retriable = ... + invalid_metadata = ... + +class NodeNotReadyError(KafkaError): + retriable = ... + +class KafkaProtocolError(KafkaError): + retriable = ... + +class CorrelationIdError(KafkaProtocolError): + retriable = ... + +class Cancelled(KafkaError): + retriable = ... + +class TooManyInFlightRequests(KafkaError): + retriable = ... + +class StaleMetadata(KafkaError): + retriable = ... + invalid_metadata = ... + +class MetadataEmptyBrokerList(KafkaError): + retriable = ... + +class UnrecognizedBrokerVersion(KafkaError): ... +class IncompatibleBrokerVersion(KafkaError): ... + +class CommitFailedError(KafkaError): + def __init__(self, *args: Any, **kwargs: Any) -> None: ... + +class AuthenticationMethodNotSupported(KafkaError): ... + +class AuthenticationFailedError(KafkaError): + retriable = ... + +class KafkaUnavailableError(KafkaError): ... +class KafkaTimeoutError(KafkaError): ... + +class KafkaConnectionError(KafkaError): + retriable = ... + invalid_metadata = ... + +class UnsupportedCodecError(KafkaError): ... +class KafkaConfigurationError(KafkaError): ... +class QuotaViolationError(KafkaError): ... + +class ConsumerStoppedError(Exception): + """Raised on `get*` methods of Consumer if it's cancelled, even pending + ones. + """ + +class IllegalOperation(Exception): + """Raised if you try to execute an operation, that is not available with + current configuration. For example trying to commit if no group_id was + given. + """ + +class NoOffsetForPartitionError(KafkaError): ... +class RecordTooLargeError(KafkaError): ... +class ProducerClosed(KafkaError): ... + +class ProducerFenced(KafkaError): + """Another producer with the same transactional ID went online. + NOTE: As it seems this will be raised by Broker if transaction timeout + occurred also. + """ + + def __init__(self, msg: str = ...) -> None: ... + +class BrokerResponseError(KafkaError): + errno: int + message: str + description: str = ... + def __str__(self) -> str: + """Add errno to standard KafkaError str""" + +class NoError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class UnknownError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class OffsetOutOfRangeError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class CorruptRecordException(BrokerResponseError): + errno = ... + message = ... + description = ... + +InvalidMessageError = CorruptRecordException + +class UnknownTopicOrPartitionError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + invalid_metadata = ... + +class InvalidFetchRequestError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class LeaderNotAvailableError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + invalid_metadata = ... + +class NotLeaderForPartitionError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + invalid_metadata = ... + +class RequestTimedOutError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + +class BrokerNotAvailableError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class ReplicaNotAvailableError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class MessageSizeTooLargeError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class StaleControllerEpochError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class OffsetMetadataTooLargeError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class StaleLeaderEpochCodeError(BrokerResponseError): + errno = ... + message = ... + +class GroupLoadInProgressError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + +CoordinatorLoadInProgressError = GroupLoadInProgressError + +class GroupCoordinatorNotAvailableError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + +CoordinatorNotAvailableError = GroupCoordinatorNotAvailableError + +class NotCoordinatorForGroupError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + +NotCoordinatorError = NotCoordinatorForGroupError + +class InvalidTopicError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class RecordListTooLargeError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class NotEnoughReplicasError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + +class NotEnoughReplicasAfterAppendError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + +class InvalidRequiredAcksError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class IllegalGenerationError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InconsistentGroupProtocolError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidGroupIdError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class UnknownMemberIdError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidSessionTimeoutError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class RebalanceInProgressError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidCommitOffsetSizeError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class TopicAuthorizationFailedError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class GroupAuthorizationFailedError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class ClusterAuthorizationFailedError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidTimestampError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class UnsupportedSaslMechanismError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class IllegalSaslStateError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class UnsupportedVersionError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class TopicAlreadyExistsError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidPartitionsError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidReplicationFactorError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidReplicationAssignmentError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidConfigurationError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class NotControllerError(BrokerResponseError): + errno = ... + message = ... + description = ... + retriable = ... + +class InvalidRequestError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class UnsupportedForMessageFormatError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class PolicyViolationError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class OutOfOrderSequenceNumber(BrokerResponseError): + errno = ... + message = ... + description = ... + +class DuplicateSequenceNumber(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidProducerEpoch(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidTxnState(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidProducerIdMapping(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidTransactionTimeout(BrokerResponseError): + errno = ... + message = ... + description = ... + +class ConcurrentTransactions(BrokerResponseError): + errno = ... + message = ... + description = ... + +class TransactionCoordinatorFenced(BrokerResponseError): + errno = ... + message = ... + description = ... + +class TransactionalIdAuthorizationFailed(BrokerResponseError): + errno = ... + message = ... + description = ... + +class SecurityDisabled(BrokerResponseError): + errno = ... + message = ... + description = ... + +class OperationNotAttempted(BrokerResponseError): + errno = ... + message = ... + description = ... + +class KafkaStorageError(BrokerResponseError): + errno = ... + message = ... + description = ... + +class LogDirNotFound(BrokerResponseError): + errno = ... + message = ... + description = ... + +class SaslAuthenticationFailed(BrokerResponseError): + errno = ... + message = ... + description = ... + +class UnknownProducerId(BrokerResponseError): + errno = ... + message = ... + description = ... + +class ReassignmentInProgress(BrokerResponseError): + errno = ... + message = ... + description = ... + +class DelegationTokenAuthDisabled(BrokerResponseError): + errno = ... + message = ... + description = ... + +class DelegationTokenNotFound(BrokerResponseError): + errno = ... + message = ... + description = ... + +class DelegationTokenOwnerMismatch(BrokerResponseError): + errno = ... + message = ... + description = ... + +class DelegationTokenRequestNotAllowed(BrokerResponseError): + errno = ... + message = ... + description = ... + +class DelegationTokenAuthorizationFailed(BrokerResponseError): + errno = ... + message = ... + description = ... + +class DelegationTokenExpired(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidPrincipalType(BrokerResponseError): + errno = ... + message = ... + description = ... + +class NonEmptyGroup(BrokerResponseError): + errno = ... + message = ... + description = ... + +class GroupIdNotFound(BrokerResponseError): + errno = ... + message = ... + description = ... + +class FetchSessionIdNotFound(BrokerResponseError): + errno = ... + message = ... + description = ... + +class InvalidFetchSessionEpoch(BrokerResponseError): + errno = ... + message = ... + description = ... + +class ListenerNotFound(BrokerResponseError): + errno = ... + message = ... + description = ... + +class MemberIdRequired(BrokerResponseError): + errno = ... + message = ... + description = ... + +_T = TypeVar("_T", bound=type) +kafka_errors = ... + +def for_code(error_code: int) -> type[BrokerResponseError]: ... diff --git a/aiokafka/producer/__init__.pyi b/aiokafka/producer/__init__.pyi new file mode 100644 index 00000000..e34365d2 --- /dev/null +++ b/aiokafka/producer/__init__.pyi @@ -0,0 +1,3 @@ +from .producer import AIOKafkaProducer + +__all__ = ["AIOKafkaProducer"] diff --git a/aiokafka/producer/message_accumulator.pyi b/aiokafka/producer/message_accumulator.pyi new file mode 100644 index 00000000..75604c74 --- /dev/null +++ b/aiokafka/producer/message_accumulator.pyi @@ -0,0 +1 @@ +class BatchBuilder: ... diff --git a/aiokafka/producer/producer.pyi b/aiokafka/producer/producer.pyi new file mode 100644 index 00000000..e0854f2a --- /dev/null +++ b/aiokafka/producer/producer.pyi @@ -0,0 +1,345 @@ +import asyncio +from collections.abc import Iterable +from ssl import SSLContext +from types import ModuleType, TracebackType +from typing import Callable, Generic, Literal, TypeVar + +from aiokafka.abc import AbstractTokenProvider +from aiokafka.structs import OffsetAndMetadata, RecordMetadata, TopicPartition + +from .message_accumulator import BatchBuilder + +log = ... +_missing = ... +_DEFAULT_PARTITIONER = ... + +def _identity(data: bytes) -> bytes: ... + +KT_contra = TypeVar("KT_contra", contravariant=True) +VT_contra = TypeVar("VT_contra", contravariant=True) +ET = TypeVar("ET", bound=BaseException) + +class AIOKafkaProducer(Generic[KT_contra, VT_contra]): + """A Kafka client that publishes records to the Kafka cluster. + + The producer consists of a pool of buffer space that holds records that + haven't yet been transmitted to the server as well as a background task + that is responsible for turning these records into requests and + transmitting them to the cluster. + + The :meth:`send` method is asynchronous. When called it adds the record to a + buffer of pending record sends and immediately returns. This allows the + producer to batch together individual records for efficiency. + + The `acks` config controls the criteria under which requests are considered + complete. The ``all`` setting will result in waiting for all replicas to + respond, the slowest but most durable setting. + + The `key_serializer` and `value_serializer` instruct how to turn the key and + value objects the user provides into :class:`bytes`. + + Arguments: + bootstrap_servers (str, list(str)): a ``host[:port]`` string or list of + ``host[:port]`` strings that the producer should contact to + bootstrap initial cluster metadata. This does not have to be the + full node list. It just needs to have at least one broker that will + respond to a Metadata API Request. Default port is 9092. If no + servers are specified, will default to ``localhost:9092``. + client_id (str or None): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. + If ``None`` ``aiokafka-producer-#`` (appended with a unique number + per instance) is used. + Default: :data:`None` + key_serializer (Callable): used to convert user-supplied keys to bytes + If not :data:`None`, called as ``f(key),`` should return + :class:`bytes`. + Default: :data:`None`. + value_serializer (Callable): used to convert user-supplied message + values to :class:`bytes`. If not :data:`None`, called as + ``f(value)``, should return :class:`bytes`. + Default: :data:`None`. + acks (Any): one of ``0``, ``1``, ``all``. The number of acknowledgments + the producer requires the leader to have received before considering a + request complete. This controls the durability of records that are + sent. The following settings are common: + + * ``0``: Producer will not wait for any acknowledgment from the server + at all. The message will immediately be added to the socket + buffer and considered sent. No guarantee can be made that the + server has received the record in this case, and the retries + configuration will not take effect (as the client won't + generally know of any failures). The offset given back for each + record will always be set to -1. + * ``1``: The broker leader will write the record to its local log but + will respond without awaiting full acknowledgement from all + followers. In this case should the leader fail immediately + after acknowledging the record but before the followers have + replicated it then the record will be lost. + * ``all``: The broker leader will wait for the full set of in-sync + replicas to acknowledge the record. This guarantees that the + record will not be lost as long as at least one in-sync replica + remains alive. This is the strongest available guarantee. + + If unset, defaults to ``acks=1``. If `enable_idempotence` is + :data:`True` defaults to ``acks=all`` + compression_type (str): The compression type for all data generated by + the producer. Valid values are ``gzip``, ``snappy``, ``lz4``, ``zstd`` + or :data:`None`. + Compression is of full batches of data, so the efficacy of batching + will also impact the compression ratio (more batching means better + compression). Default: :data:`None`. + max_batch_size (int): Maximum size of buffered data per partition. + After this amount :meth:`send` coroutine will block until batch is + drained. + Default: 16384 + linger_ms (int): The producer groups together any records that arrive + in between request transmissions into a single batched request. + Normally this occurs only under load when records arrive faster + than they can be sent out. However in some circumstances the client + may want to reduce the number of requests even under moderate load. + This setting accomplishes this by adding a small amount of + artificial delay; that is, if first request is processed faster, + than `linger_ms`, producer will wait ``linger_ms - process_time``. + Default: 0 (i.e. no delay). + partitioner (Callable): Callable used to determine which partition + each message is assigned to. Called (after key serialization): + ``partitioner(key_bytes, all_partitions, available_partitions)``. + The default partitioner implementation hashes each non-None key + using the same murmur2 algorithm as the Java client so that + messages with the same key are assigned to the same partition. + When a key is :data:`None`, the message is delivered to a random partition + (filtered to partitions with available leaders only, if possible). + max_request_size (int): The maximum size of a request. This is also + effectively a cap on the maximum record size. Note that the server + has its own cap on record size which may be different from this. + This setting will limit the number of record batches the producer + will send in a single request to avoid sending huge requests. + Default: 1048576. + metadata_max_age_ms (int): The period of time in milliseconds after + which we force a refresh of metadata even if we haven't seen any + partition leadership changes to proactively discover any new + brokers or partitions. Default: 300000 + request_timeout_ms (int): Produce request timeout in milliseconds. + As it's sent as part of + :class:`~aiokafka.protocol.produce.ProduceRequest` (it's a blocking + call), maximum waiting time can be up to ``2 * + request_timeout_ms``. + Default: 40000. + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + api_version (str): specify which kafka API version to use. + If set to ``auto``, will attempt to infer the broker version by + probing various APIs. Default: ``auto`` + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: ``PLAINTEXT``, ``SSL``, ``SASL_PLAINTEXT``, + ``SASL_SSL``. Default: ``PLAINTEXT``. + ssl_context (ssl.SSLContext): pre-configured :class:`~ssl.SSLContext` + for wrapping socket connections. Directly passed into asyncio's + :meth:`~asyncio.loop.create_connection`. For more + information see :ref:`ssl_auth`. + Default: :data:`None` + connections_max_idle_ms (int): Close idle connections after the number + of milliseconds specified by this config. Specifying :data:`None` will + disable idle checks. Default: 540000 (9 minutes). + enable_idempotence (bool): When set to :data:`True`, the producer will + ensure that exactly one copy of each message is written in the + stream. If :data:`False`, producer retries due to broker failures, + etc., may write duplicates of the retried message in the stream. + Note that enabling idempotence acks to set to ``all``. If it is not + explicitly set by the user it will be chosen. If incompatible + values are set, a :exc:`ValueError` will be thrown. + New in version 0.5.0. + sasl_mechanism (str): Authentication mechanism when security_protocol + is configured for ``SASL_PLAINTEXT`` or ``SASL_SSL``. Valid values + are: ``PLAIN``, ``GSSAPI``, ``SCRAM-SHA-256``, ``SCRAM-SHA-512``, + ``OAUTHBEARER``. + Default: ``PLAIN`` + sasl_plain_username (str): username for SASL ``PLAIN`` authentication. + Default: :data:`None` + sasl_plain_password (str): password for SASL ``PLAIN`` authentication. + Default: :data:`None` + sasl_oauth_token_provider (:class:`~aiokafka.abc.AbstractTokenProvider`): + OAuthBearer token provider instance. + Default: :data:`None` + + Note: + Many configuration parameters are taken from the Java client: + https://kafka.apache.org/documentation.html#producerconfigs + """ + + _PRODUCER_CLIENT_ID_SEQUENCE = ... + _COMPRESSORS = ... + _closed = ... + _source_traceback = ... + def __init__( + self, + *, + loop: asyncio.AbstractEventLoop | None = ..., + bootstrap_servers: str | list[str] = ..., + client_id: str | None = ..., + metadata_max_age_ms: int = ..., + request_timeout_ms: int = ..., + api_version: str = ..., + acks: Literal[0] | Literal[1] | Literal["all"] | object = ..., + key_serializer: Callable[[KT_contra], bytes] = _identity, + value_serializer: Callable[[VT_contra], bytes] = _identity, + compression_type: ( + Literal["gzip"] + | Literal["snappy"] + | Literal["lz4"] + | Literal["zstd"] + | None + ) = ..., + max_batch_size: int = ..., + partitioner: Callable[[bytes, list[int], list[int]], int] = ..., + max_request_size: int = ..., + linger_ms: int = ..., + retry_backoff_ms: int = ..., + security_protocol: ( + Literal["PLAINTEXT"] + | Literal["SSL"] + | Literal["SASL_PLAINTEXT"] + | Literal["SASL_SSL"] + ) = ..., + ssl_context: SSLContext | None = ..., + connections_max_idle_ms: int = ..., + enable_idempotence: bool = ..., + transactional_id: int | str | None = ..., + transaction_timeout_ms: int = ..., + sasl_mechanism: ( + Literal["PLAIN"] + | Literal["GSSAPI"] + | Literal["SCRAM-SHA-256"] + | Literal["SCRAM-SHA-512"] + | Literal["OAUTHBEARER"] + ) = ..., + sasl_plain_password: str | None = ..., + sasl_plain_username: str | None = ..., + sasl_kerberos_service_name: str = ..., + sasl_kerberos_domain_name: str | None = ..., + sasl_oauth_token_provider: AbstractTokenProvider | None = ..., + ) -> None: ... + def __del__(self, _warnings: ModuleType = ...) -> None: ... + async def start(self) -> None: + """Connect to Kafka cluster and check server version""" + + async def flush(self) -> None: + """Wait until all batches are Delivered and futures resolved""" + + async def stop(self) -> None: + """Flush all pending data and close all connections to kafka cluster""" + + async def partitions_for(self, topic: str) -> set[int]: + """Returns set of all known partitions for the topic.""" + + async def send( + self, + topic: str, + value: VT_contra | None = ..., + key: KT_contra | None = ..., + partition: int | None = ..., + timestamp_ms: int | None = ..., + headers: Iterable[tuple[str, bytes]] | None = ..., + ) -> asyncio.Future[RecordMetadata]: + """Publish a message to a topic. + + Arguments: + topic (str): topic where the message will be published + value (Optional): message value. Must be type :class:`bytes`, or be + serializable to :class:`bytes` via configured `value_serializer`. If + value is :data:`None`, key is required and message acts as a + ``delete``. + + See `Kafka compaction documentation + `__ for + more details. (compaction requires kafka >= 0.8.1) + partition (int, Optional): optionally specify a partition. If not + set, the partition will be selected using the configured + `partitioner`. + key (Optional): a key to associate with the message. Can be used to + determine which partition to send the message to. If partition + is :data:`None` (and producer's partitioner config is left as default), + then messages with the same key will be delivered to the same + partition (but if key is :data:`None`, partition is chosen randomly). + Must be type :class:`bytes`, or be serializable to bytes via configured + `key_serializer`. + timestamp_ms (int, Optional): epoch milliseconds (from Jan 1 1970 + UTC) to use as the message timestamp. Defaults to current time. + headers (Optional): Kafka headers to be included in the message using + the format ``[("key", b"value")]``. Iterable of tuples where key + is a normal string and value is a byte string. + + Returns: + asyncio.Future: object that will be set when message is + processed + + Raises: + ~aiokafka.errors.KafkaTimeoutError: if we can't schedule this record + (pending buffer is full) in up to `request_timeout_ms` + milliseconds. + + Note: + The returned future will wait based on `request_timeout_ms` + setting. Cancelling the returned future **will not** stop event + from being sent, but cancelling the :meth:`send` coroutine itself + **will**. + """ + + async def send_and_wait( + self, + topic: str, + value: VT_contra | None = ..., + key: KT_contra | None = ..., + partition: int | None = ..., + timestamp_ms: int | None = ..., + headers: Iterable[tuple[str, bytes]] | None = ..., + ) -> RecordMetadata: + """Publish a message to a topic and wait the result""" + + def create_batch(self) -> BatchBuilder: + """Create and return an empty :class:`.BatchBuilder`. + + The batch is not queued for send until submission to :meth:`send_batch`. + + Returns: + BatchBuilder: empty batch to be filled and submitted by the caller. + """ + + async def send_batch( + self, batch: BatchBuilder, topic: str, *, partition: int + ) -> asyncio.Future[RecordMetadata]: + """Submit a BatchBuilder for publication. + + Arguments: + batch (BatchBuilder): batch object to be published. + topic (str): topic where the batch will be published. + partition (int): partition where this batch will be published. + + Returns: + asyncio.Future: object that will be set when the batch is + delivered. + """ + + async def begin_transaction(self) -> None: ... + async def commit_transaction(self) -> None: ... + async def abort_transaction(self) -> None: ... + def transaction(self) -> TransactionContext: + """Start a transaction context""" + + async def send_offsets_to_transaction( + self, + offsets: dict[TopicPartition, int | tuple[int, str] | OffsetAndMetadata], + group_id: str, + ) -> None: ... + async def __aenter__(self) -> AIOKafkaProducer[KT_contra, VT_contra]: ... + async def __aexit__( + self, exc_type: type[ET] | None, exc: ET | None, tb: TracebackType | None + ) -> None: ... + +class TransactionContext: + def __init__(self, producer: AIOKafkaProducer[KT_contra, VT_contra]) -> None: ... + async def __aenter__(self) -> TransactionContext: ... + async def __aexit__( + self, exc_type: type[ET] | None, exc: ET | None, tb: TracebackType | None + ) -> None: ... diff --git a/aiokafka/protocol/abstract.pyi b/aiokafka/protocol/abstract.pyi new file mode 100644 index 00000000..ab31d219 --- /dev/null +++ b/aiokafka/protocol/abstract.pyi @@ -0,0 +1,15 @@ +import abc +from io import BytesIO +from typing import Generic, TypeVar + +T = TypeVar("T") + +class AbstractType(Generic[T], metaclass=abc.ABCMeta): + @classmethod + @abc.abstractmethod + def encode(cls, value: T) -> bytes: ... + @classmethod + @abc.abstractmethod + def decode(cls, data: BytesIO) -> T: ... + @classmethod + def repr(cls, value: T) -> str: ... diff --git a/aiokafka/protocol/api.pyi b/aiokafka/protocol/api.pyi new file mode 100644 index 00000000..ebf9c608 --- /dev/null +++ b/aiokafka/protocol/api.pyi @@ -0,0 +1,86 @@ +import abc +from io import BytesIO +from typing import Any, ClassVar + +from .struct import Struct +from .types import Schema + +class RequestHeader_v0(Struct): + SCHEMA = ... + def __init__( + self, request: Request, correlation_id: int = ..., client_id: str = ... + ) -> None: ... + +class RequestHeader_v1(Struct): + SCHEMA = ... + def __init__( + self, + request: Request, + correlation_id: int = ..., + client_id: str = ..., + tags: dict[int, bytes] | None = ..., + ) -> None: ... + +class ResponseHeader_v0(Struct): + SCHEMA = ... + +class ResponseHeader_v1(Struct): + SCHEMA = ... + +class Request(Struct, metaclass=abc.ABCMeta): + FLEXIBLE_VERSION: ClassVar[bool] = ... + @property + @abc.abstractmethod + def API_KEY(self) -> int: + """Integer identifier for api request""" + ... + + @property + @abc.abstractmethod + def API_VERSION(self) -> int: + """Integer of api request version""" + ... + + @property + @abc.abstractmethod + def RESPONSE_TYPE(self) -> type[Response]: + """The Response class associated with the api request""" + ... + + @property + @abc.abstractmethod + def SCHEMA(self) -> Schema: + """An instance of Schema() representing the request structure""" + ... + + def expect_response(self) -> bool: + """Override this method if an api request does not always generate a response""" + + def to_object(self) -> dict[str, Any]: ... + def build_request_header( + self, correlation_id: int, client_id: str + ) -> RequestHeader_v0 | RequestHeader_v1: ... + def parse_response_header( + self, read_buffer: BytesIO | bytes + ) -> ResponseHeader_v0 | ResponseHeader_v1: ... + +class Response(Struct, metaclass=abc.ABCMeta): + @property + @abc.abstractmethod + def API_KEY(self) -> int: + """Integer identifier for api request/response""" + ... + + @property + @abc.abstractmethod + def API_VERSION(self) -> int: + """Integer of api request/response version""" + ... + + @property + @abc.abstractmethod + def SCHEMA(self) -> Schema: + """An instance of Schema() representing the response structure""" + ... + + def to_object(self) -> dict[str, Any]: ... diff --git a/aiokafka/protocol/commit.pyi b/aiokafka/protocol/commit.pyi new file mode 100644 index 00000000..6135e8ec --- /dev/null +++ b/aiokafka/protocol/commit.pyi @@ -0,0 +1,71 @@ +from .api import Request, Response + +class OffsetCommitResponse_v0(Response): + pass + +class OffsetCommitResponse_v1(Response): + pass + +class OffsetCommitResponse_v2(Response): + pass + +class OffsetCommitResponse_v3(Response): + pass + +class OffsetCommitRequest_v0(Request): + pass + +class OffsetCommitRequest_v1(Request): + pass + +class OffsetCommitRequest_v2(Request): + DEFAULT_GENERATION_ID = ... + DEFAULT_RETENTION_TIME = ... + +class OffsetCommitRequest_v3(Request): + pass + +OffsetCommitRequest = ... +OffsetCommitResponse = ... + +class OffsetFetchResponse_v0(Response): + pass + +class OffsetFetchResponse_v1(Response): + pass + +class OffsetFetchResponse_v2(Response): + pass + +class OffsetFetchResponse_v3(Response): + pass + +class OffsetFetchRequest_v0(Request): + pass + +class OffsetFetchRequest_v1(Request): + pass + +class OffsetFetchRequest_v2(Request): + pass + +class OffsetFetchRequest_v3(Request): + pass + +OffsetFetchRequest = ... +OffsetFetchResponse = ... + +class GroupCoordinatorResponse_v0(Response): + pass + +class GroupCoordinatorResponse_v1(Response): + pass + +class GroupCoordinatorRequest_v0(Request): + pass + +class GroupCoordinatorRequest_v1(Request): + pass + +GroupCoordinatorRequest = ... +GroupCoordinatorResponse = ... diff --git a/aiokafka/protocol/metadata.pyi b/aiokafka/protocol/metadata.pyi new file mode 100644 index 00000000..0e3bb0fe --- /dev/null +++ b/aiokafka/protocol/metadata.pyi @@ -0,0 +1,43 @@ +from .api import Request, Response + +class MetadataResponse_v0(Response): + pass + +class MetadataResponse_v1(Response): + pass + +class MetadataResponse_v2(Response): + pass + +class MetadataResponse_v3(Response): + pass + +class MetadataResponse_v4(Response): + pass + +class MetadataResponse_v5(Response): + pass + +class MetadataRequest_v0(Request): + pass + +class MetadataRequest_v1(Request): + pass + +class MetadataRequest_v2(Request): + pass + +class MetadataRequest_v3(Request): + pass + +class MetadataRequest_v4(Request): + pass + +class MetadataRequest_v5(Request): + """ + The v5 metadata request is the same as v4. + An additional field for offline_replicas has been added to the v5 metadata response + """ + +MetadataRequest = ... +MetadataResponse = ... diff --git a/aiokafka/protocol/struct.pyi b/aiokafka/protocol/struct.pyi new file mode 100644 index 00000000..3049d9a9 --- /dev/null +++ b/aiokafka/protocol/struct.pyi @@ -0,0 +1,14 @@ +from io import BytesIO +from typing import Any, ClassVar + +from typing_extensions import Self + +class Struct: + SCHEMA: ClassVar = ... + def __init__(self, *args: Any, **kwargs: Any) -> None: ... + def encode(self) -> bytes: ... + @classmethod + def decode(cls, data: BytesIO | bytes) -> Self: ... + def get_item(self, name: str) -> Any: ... + def __repr__(self) -> str: ... + def __eq__(self, other: object) -> bool: ... diff --git a/aiokafka/protocol/types.pyi b/aiokafka/protocol/types.pyi new file mode 100644 index 00000000..f737e328 --- /dev/null +++ b/aiokafka/protocol/types.pyi @@ -0,0 +1,147 @@ +from collections.abc import Sequence +from io import BytesIO +from typing import Any, TypeVar, overload + +from typing_extensions import TypeAlias + +from .abstract import AbstractType + +T = TypeVar("T") +ValueT: TypeAlias = type[AbstractType[Any]] | "String" | "Array" | "Schema" + +class Int8(AbstractType[int]): + _pack = ... + _unpack = ... + @classmethod + def encode(cls, value: int) -> bytes: ... + @classmethod + def decode(cls, data: BytesIO) -> int: ... + +class Int16(AbstractType[int]): + _pack = ... + _unpack = ... + @classmethod + def encode(cls, value: int) -> bytes: ... + @classmethod + def decode(cls, data: BytesIO) -> int: ... + +class Int32(AbstractType[int]): + _pack = ... + _unpack = ... + @classmethod + def encode(cls, value: int) -> bytes: ... + @classmethod + def decode(cls, data: BytesIO) -> int: ... + +class UInt32(AbstractType[int]): + _pack = ... + _unpack = ... + @classmethod + def encode(cls, value: int) -> bytes: ... + @classmethod + def decode(cls, data: BytesIO) -> int: ... + +class Int64(AbstractType[int]): + _pack = ... + _unpack = ... + @classmethod + def encode(cls, value: int) -> bytes: ... + @classmethod + def decode(cls, data: BytesIO) -> int: ... + +class Float64(AbstractType[float]): + _pack = ... + _unpack = ... + @classmethod + def encode(cls, value: float) -> bytes: ... + @classmethod + def decode(cls, data: BytesIO) -> float: ... + +class String: + def __init__(self, encoding: str = ...) -> None: ... + def encode(self, value: str | None) -> bytes: ... + def decode(self, data: BytesIO) -> str | None: ... + @classmethod + def repr(cls, value: str) -> str: ... + +class Bytes(AbstractType[bytes | None]): + @classmethod + def encode(cls, value: bytes | None) -> bytes: ... + @classmethod + def decode(cls, data: BytesIO) -> bytes | None: ... + @classmethod + def repr(cls, value: bytes | None) -> str: ... + +class Boolean(AbstractType[bool]): + _pack = ... + _unpack = ... + @classmethod + def encode(cls, value: bool) -> bytes: ... + @classmethod + def decode(cls, data: BytesIO) -> bool: ... + +class Schema: + names: tuple[str, ...] + fields: tuple[ValueT, ...] + def __init__(self, *fields: tuple[str, ValueT]) -> None: ... + def encode(self, item: Sequence[Any]) -> bytes: ... + def decode( + self, data: BytesIO + ) -> tuple[Any | str | None | list[Any | tuple[Any, ...]], ...]: ... + def __len__(self) -> int: ... + def repr(self, value: Any) -> str: ... + +class Array: + array_of: ValueT + @overload + def __init__(self, array_of_0: ValueT) -> None: ... + @overload + def __init__( + self, array_of_0: tuple[str, ValueT], *array_of: tuple[str, ValueT] + ) -> None: ... + def __init__( + self, + array_of_0: ValueT | tuple[str, ValueT], + *array_of: tuple[str, ValueT], + ) -> None: ... + def encode(self, items: Sequence[Any] | None) -> bytes: ... + def decode(self, data: BytesIO) -> list[Any | tuple[Any, ...]] | None: ... + def repr(self, list_of_items: Sequence[Any] | None) -> str: ... + +class UnsignedVarInt32(AbstractType[int]): + @classmethod + def decode(cls, data: BytesIO) -> int: ... + @classmethod + def encode(cls, value: int) -> bytes: ... + +class VarInt32(AbstractType[int]): + @classmethod + def decode(cls, data: BytesIO) -> int: ... + @classmethod + def encode(cls, value: int) -> bytes: ... + +class VarInt64(AbstractType[int]): + @classmethod + def decode(cls, data: BytesIO) -> int: ... + @classmethod + def encode(cls, value: int) -> bytes: ... + +class CompactString(String): + def decode(self, data: BytesIO) -> str | None: ... + def encode(self, value: str | None) -> bytes: ... + +class TaggedFields(AbstractType[dict[int, bytes]]): + @classmethod + def decode(cls, data: BytesIO) -> dict[int, bytes]: ... + @classmethod + def encode(cls, value: dict[int, bytes]) -> bytes: ... + +class CompactBytes(AbstractType[bytes | None]): + @classmethod + def decode(cls, data: BytesIO) -> bytes | None: ... + @classmethod + def encode(cls, value: bytes | None) -> bytes: ... + +class CompactArray(Array): + def encode(self, items: Sequence[Any] | None) -> bytes: ... + def decode(self, data: BytesIO) -> list[Any | tuple[Any, ...]] | None: ... diff --git a/aiokafka/py.typed b/aiokafka/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/aiokafka/structs.pyi b/aiokafka/structs.pyi new file mode 100644 index 00000000..dc2922eb --- /dev/null +++ b/aiokafka/structs.pyi @@ -0,0 +1,82 @@ +from collections.abc import Sequence +from dataclasses import dataclass +from typing import Generic, NamedTuple, TypeVar + +from aiokafka.errors import KafkaError + +__all__ = [ + "OffsetAndMetadata", + "TopicPartition", + "RecordMetadata", + "ConsumerRecord", + "BrokerMetadata", + "PartitionMetadata", +] + +class TopicPartition(NamedTuple): + """A topic and partition tuple""" + + topic: str + partition: int + +class BrokerMetadata(NamedTuple): + """A Kafka broker metadata used by admin tools""" + + nodeId: int | str + host: str + port: int + rack: str | None + +class PartitionMetadata(NamedTuple): + """A topic partition metadata describing the state in the MetadataResponse""" + + topic: str + partition: int + leader: int + replicas: list[int] + isr: list[int] + error: KafkaError | None + +class OffsetAndMetadata(NamedTuple): + """The Kafka offset commit API + + The Kafka offset commit API allows users to provide additional metadata + (in the form of a string) when an offset is committed. This can be useful + (for example) to store information about which node made the commit, + what time the commit was made, etc. + """ + + offset: int + metadata: str + +class RecordMetadata(NamedTuple): + """Returned when a :class:`~.AIOKafkaProducer` sends a message""" + + topic: str + partition: int + topic_partition: TopicPartition + offset: int + timestamp: int | None + timestamp_type: int + log_start_offset: int | None + +KT_co = TypeVar("KT_co", covariant=True) +VT_co = TypeVar("VT_co", covariant=True) + +@dataclass(frozen=True) +class ConsumerRecord(Generic[KT_co, VT_co]): + topic: str + partition: int + offset: int + timestamp: int + timestamp_type: int + key: KT_co | None + value: VT_co | None + checksum: int | None + serialized_key_size: int + serialized_value_size: int + headers: Sequence[tuple[str, bytes]] + +class OffsetAndTimestamp(NamedTuple): + offset: int + timestamp: int | None