Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stub typing for clients #1075

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions aiokafka/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from .abc import ConsumerRebalanceListener
from .consumer import AIOKafkaConsumer
from .errors import ConsumerStoppedError, IllegalOperation
from .structs import (
ConsumerRecord,
OffsetAndMetadata,
OffsetAndTimestamp,
TopicPartition,
)

__version__ = ...
__all__ = [
"AIOKafkaConsumer",
"ConsumerRebalanceListener",
"ConsumerStoppedError",
"IllegalOperation",
"ConsumerRecord",
"TopicPartition",
"OffsetAndTimestamp",
"OffsetAndMetadata",
]
139 changes: 139 additions & 0 deletions aiokafka/abc.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import abc

from aiokafka.structs import TopicPartition

class ConsumerRebalanceListener(abc.ABC):
"""
A callback interface that the user can implement to trigger custom actions
when the set of partitions assigned to the consumer changes.
This is applicable when the consumer is having Kafka auto-manage group
membership. If the consumer's directly assign partitions, those
partitions will never be reassigned and this callback is not applicable.
When Kafka is managing the group membership, a partition re-assignment will
be triggered any time the members of the group changes or the subscription
of the members changes. This can occur when processes die, new process
instances are added or old instances come back to life after failure.
Rebalances can also be triggered by changes affecting the subscribed
topics (e.g. when then number of partitions is administratively adjusted).
There are many uses for this functionality. One common use is saving
offsets in a custom store. By saving offsets in the
:meth:`on_partitions_revoked`, call we can ensure that any time partition
assignment changes the offset gets saved.
Another use is flushing out any kind of cache of intermediate results the
consumer may be keeping. For example, consider a case where the consumer is
subscribed to a topic containing user page views, and the goal is to count
the number of page views per users for each five minute window. Let's say
the topic is partitioned by the user id so that all events for a particular
user will go to a single consumer instance. The consumer can keep in memory
a running tally of actions per user and only flush these out to a remote
data store when its cache gets too big. However if a partition is
reassigned it may want to automatically trigger a flush of this cache,
before the new owner takes over consumption.
This callback will execute during the rebalance process, and Consumer will
wait for callbacks to finish before proceeding with group join.
It is guaranteed that all consumer processes will invoke
:meth:`on_partitions_revoked` prior to any process invoking
:meth:`on_partitions_assigned`. So if offsets or other state is saved in the
:meth:`on_partitions_revoked` call, it should be saved by the time the process
taking over that partition has their :meth:`on_partitions_assigned` callback
called to load the state.
"""

@abc.abstractmethod
def on_partitions_revoked(self, revoked: list[TopicPartition]) -> None:
"""
A coroutine or function the user can implement to provide cleanup or
custom state save on the start of a rebalance operation.
This method will be called *before* a rebalance operation starts and
*after* the consumer stops fetching data.
If you are using manual commit you have to commit all consumed offsets
here, to avoid duplicate message delivery after rebalance is finished.
.. note:: This method is only called before rebalances. It is not
called prior to :meth:`.AIOKafkaConsumer.stop`
Arguments:
revoked (list(TopicPartition)): the partitions that were assigned
to the consumer on the last rebalance
"""
...

@abc.abstractmethod
def on_partitions_assigned(self, assigned: list[TopicPartition]) -> None:
"""
A coroutine or function the user can implement to provide load of
custom consumer state or cache warmup on completion of a successful
partition re-assignment.
This method will be called *after* partition re-assignment completes
and *before* the consumer starts fetching data again.
It is guaranteed that all the processes in a consumer group will
execute their :meth:`on_partitions_revoked` callback before any instance
executes its :meth:`on_partitions_assigned` callback.
Arguments:
assigned (list(TopicPartition)): the partitions assigned to the
consumer (may include partitions that were previously assigned)
"""
...

class AbstractTokenProvider(abc.ABC):
"""
A Token Provider must be used for the `SASL OAuthBearer`_ protocol.
The implementation should ensure token reuse so that multiple
calls at connect time do not create multiple tokens.
The implementation should also periodically refresh the token in order to
guarantee that each call returns an unexpired token.
A timeout error should be returned after a short period of inactivity so
that the broker can log debugging info and retry.
Token Providers MUST implement the :meth:`token` method
.. _SASL OAuthBearer:
https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_oauth.html
"""

@abc.abstractmethod
async def token(self) -> None:
"""
An async callback returning a :class:`str` ID/Access Token to be sent to
the Kafka client. In case where a synchronous callback is needed,
implementations like following can be used:
.. code-block:: python
from aiokafka.abc import AbstractTokenProvider
class CustomTokenProvider(AbstractTokenProvider):
async def token(self):
return await asyncio.get_running_loop().run_in_executor(
None, self._token)
def _token(self):
# The actual synchronous token callback.
"""
...

def extensions(self) -> dict[str, str]:
"""
This is an OPTIONAL method that may be implemented.
Returns a map of key-value pairs that can be sent with the
SASL/OAUTHBEARER initial client request. If not implemented, the values
are ignored
This feature is only available in Kafka >= 2.1.0.
"""

__all__ = ["ConsumerRebalanceListener", "AbstractTokenProvider"]
7 changes: 7 additions & 0 deletions aiokafka/client.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import IntEnum

log = ...

class CoordinationType(IntEnum):
GROUP = ...
TRANSACTION = ...
198 changes: 198 additions & 0 deletions aiokafka/cluster.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
from collections.abc import Sequence
from concurrent.futures import Future
from typing import Any, Callable, TypedDict

from aiokafka.client import CoordinationType
from aiokafka.protocol.commit import (
GroupCoordinatorResponse_v0,
GroupCoordinatorResponse_v1,
)
from aiokafka.protocol.metadata import (
MetadataResponse_v0,
MetadataResponse_v1,
MetadataResponse_v2,
MetadataResponse_v3,
MetadataResponse_v4,
MetadataResponse_v5,
)
from aiokafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition

log = ...
MetadataResponse = (
MetadataResponse_v0
| MetadataResponse_v1
| MetadataResponse_v2
| MetadataResponse_v3
| MetadataResponse_v4
| MetadataResponse_v5
)
GroupCoordinatorResponse = GroupCoordinatorResponse_v0 | GroupCoordinatorResponse_v1

class ClusterConfig(TypedDict):
retry_backoff_ms: int
metadata_max_age_ms: int
bootstrap_servers: str | list[str]

class ClusterMetadata:
"""
A class to manage kafka cluster metadata.
This class does not perform any IO. It simply updates internal state
given API responses (MetadataResponse, GroupCoordinatorResponse).
Keyword Arguments:
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
metadata_max_age_ms (int): The period of time in milliseconds after
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
strings) that the client should contact to bootstrap initial
cluster metadata. This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request. Default port is 9092. If no servers are
specified, will default to localhost:9092.
"""

DEFAULT_CONFIG: ClusterConfig = ...
def __init__(self, **configs: int | str | list[str]) -> None: ...
def is_bootstrap(self, node_id: str) -> bool: ...
def brokers(self) -> set[BrokerMetadata]:
"""Get all BrokerMetadata
Returns:
set: {BrokerMetadata, ...}
"""

def broker_metadata(self, broker_id: str) -> BrokerMetadata | None:
"""Get BrokerMetadata
Arguments:
broker_id (str): node_id for a broker to check
Returns:
BrokerMetadata or None if not found
"""

def partitions_for_topic(self, topic: str) -> set[int] | None:
"""Return set of all partitions for topic (whether available or not)
Arguments:
topic (str): topic to check for partitions
Returns:
set: {partition (int), ...}
"""

def available_partitions_for_topic(self, topic: str) -> set[int] | None:
"""Return set of partitions with known leaders
Arguments:
topic (str): topic to check for partitions
Returns:
set: {partition (int), ...}
None if topic not found.
"""

def leader_for_partition(self, partition: PartitionMetadata) -> int | None:
"""Return node_id of leader, -1 unavailable, None if unknown."""

def partitions_for_broker(self, broker_id: int | str) -> set[TopicPartition] | None:
"""Return TopicPartitions for which the broker is a leader.
Arguments:
broker_id (int): node id for a broker
Returns:
set: {TopicPartition, ...}
None if the broker either has no partitions or does not exist.
"""

def coordinator_for_group(self, group: str) -> int | str | None:
"""Return node_id of group coordinator.
Arguments:
group (str): name of consumer group
Returns:
int: node_id for group coordinator
None if the group does not exist.
"""

def request_update(self) -> Future[ClusterMetadata]:
"""Flags metadata for update, return Future()
Actual update must be handled separately. This method will only
change the reported ttl()
Returns:
Future (value will be the cluster object after update)
"""

def topics(self, exclude_internal_topics: bool = ...) -> set[str]:
"""Get set of known topics.
Arguments:
exclude_internal_topics (bool): Whether records from internal topics
(such as offsets) should be exposed to the consumer. If set to
True the only way to receive records from an internal topic is
subscribing to it. Default True
Returns:
set: {topic (str), ...}
"""

def failed_update(self, exception: BaseException) -> None:
"""Update cluster state given a failed MetadataRequest."""

def update_metadata(self, metadata: MetadataResponse) -> None:
"""Update cluster state given a MetadataResponse.
Arguments:
metadata (MetadataResponse): broker response to a metadata request
Returns: None
"""

def add_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None:
"""Add a callback function to be called on each metadata update"""

def remove_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None:
"""Remove a previously added listener callback"""

def add_group_coordinator(
self, group: str, response: GroupCoordinatorResponse
) -> str | None:
"""Update with metadata for a group coordinator
Arguments:
group (str): name of group from GroupCoordinatorRequest
response (GroupCoordinatorResponse): broker response
Returns:
string: coordinator node_id if metadata is updated, None on error
"""

def with_partitions(
self, partitions_to_add: Sequence[PartitionMetadata]
) -> ClusterMetadata:
"""Returns a copy of cluster metadata with partitions added"""

def coordinator_metadata(self, node_id: int | str) -> BrokerMetadata | None: ...
def add_coordinator(
self,
node_id: int | str,
host: str,
port: int,
rack: str | None = ...,
*,
purpose: tuple[CoordinationType, str],
) -> None:
"""Keep track of all coordinator nodes separately and remove them if
a new one was elected for the same purpose (For example group
coordinator for group X).
"""

def __str__(self) -> str: ...
3 changes: 3 additions & 0 deletions aiokafka/consumer/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .consumer import AIOKafkaConsumer

__all__ = ["AIOKafkaConsumer"]
Loading
Loading