diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index dc1d3b9c..55ae763d 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -12,9 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys from typing import Optional, List, Union -from overrides import overrides +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override + from google.api_core.client_options import ClientOptions from google.api_core.operation import Operation from google.auth.credentials import Credentials @@ -78,39 +83,39 @@ def __init__( region, ) - @overrides + @override def region(self) -> CloudRegion: return self._impl.region() - @overrides + @override def create_topic(self, topic: Topic) -> Topic: return self._impl.create_topic(topic) - @overrides + @override def get_topic(self, topic_path: TopicPath) -> Topic: return self._impl.get_topic(topic_path) - @overrides + @override def get_topic_partition_count(self, topic_path: TopicPath) -> int: return self._impl.get_topic_partition_count(topic_path) - @overrides + @override def list_topics(self, location_path: LocationPath) -> List[Topic]: return self._impl.list_topics(location_path) - @overrides + @override def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic: return self._impl.update_topic(topic, update_mask) - @overrides + @override def delete_topic(self, topic_path: TopicPath): return self._impl.delete_topic(topic_path) - @overrides + @override def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPath]: return self._impl.list_topic_subscriptions(topic_path) - @overrides + @override def create_subscription( self, subscription: Subscription, @@ -119,21 +124,21 @@ def create_subscription( ) -> Subscription: return self._impl.create_subscription(subscription, target, starting_offset) - @overrides + @override def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: return self._impl.get_subscription(subscription_path) - @overrides + @override def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]: return self._impl.list_subscriptions(location_path) - @overrides + @override def update_subscription( self, subscription: Subscription, update_mask: FieldMask ) -> Subscription: return self._impl.update_subscription(subscription, update_mask) - @overrides + @override def seek_subscription( self, subscription_path: SubscriptionPath, @@ -141,33 +146,33 @@ def seek_subscription( ) -> Operation: return self._impl.seek_subscription(subscription_path, target) - @overrides + @override def delete_subscription(self, subscription_path: SubscriptionPath): return self._impl.delete_subscription(subscription_path) - @overrides + @override def create_reservation(self, reservation: Reservation) -> Reservation: return self._impl.create_reservation(reservation) - @overrides + @override def get_reservation(self, reservation_path: ReservationPath) -> Reservation: return self._impl.get_reservation(reservation_path) - @overrides + @override def list_reservations(self, location_path: LocationPath) -> List[Reservation]: return self._impl.list_reservations(location_path) - @overrides + @override def update_reservation( self, reservation: Reservation, update_mask: FieldMask ) -> Reservation: return self._impl.update_reservation(reservation, update_mask) - @overrides + @override def delete_reservation(self, reservation_path: ReservationPath): return self._impl.delete_reservation(reservation_path) - @overrides + @override def list_reservation_topics( self, reservation_path: ReservationPath ) -> List[TopicPath]: diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py index 9cacd275..b1167a56 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys from typing import Callable, Union, Mapping from google.api_core.exceptions import GoogleAPICallError @@ -26,7 +27,11 @@ AsyncPublisherClientInterface, ) from google.cloud.pubsublite.types import TopicPath -from overrides import overrides + +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override AsyncPublisherFactory = Callable[[TopicPath], AsyncSinglePublisher] @@ -47,7 +52,7 @@ async def _create_and_open(self, topic: TopicPath): await client.__aenter__() return client - @overrides + @override async def publish( self, topic: Union[TopicPath, str], @@ -67,11 +72,11 @@ async def publish( await self._multiplexer.try_erase(topic, publisher) raise e - @overrides + @override async def __aenter__(self): await self._multiplexer.__aenter__() return self - @overrides + @override async def __aexit__(self, exc_type, exc_value, traceback): await self._multiplexer.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py index 6becdfc4..c3a80097 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys from typing import ( Union, AsyncIterator, @@ -35,7 +36,11 @@ FlowControlSettings, Partition, ) -from overrides import overrides + +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override async def _iterate_subscriber( @@ -59,7 +64,7 @@ def __init__(self, underlying_factory: AsyncSubscriberFactory): self._underlying_factory = underlying_factory self._live_clients = set() - @overrides + @override async def subscribe( self, subscription: Union[SubscriptionPath, str], @@ -79,7 +84,7 @@ async def subscribe( subscriber, lambda: self._try_remove_client(subscriber) ) - @overrides + @override async def __aenter__(self): return self @@ -88,7 +93,7 @@ async def _try_remove_client(self, client: AsyncSingleSubscriber): self._live_clients.remove(client) await client.__aexit__(None, None, None) - @overrides + @override async def __aexit__(self, exc_type, exc_value, traceback): live_clients = self._live_clients self._live_clients = set() diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py index 4adb7a75..452e6e95 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -13,6 +13,7 @@ # limitations under the License. from concurrent.futures import Future +import sys from typing import Callable, Union, Mapping from google.api_core.exceptions import GoogleAPICallError @@ -27,7 +28,11 @@ PublisherClientInterface, ) from google.cloud.pubsublite.types import TopicPath -from overrides import overrides + +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override PublisherFactory = Callable[[TopicPath], SinglePublisher] @@ -42,7 +47,7 @@ def __init__(self, publisher_factory: PublisherFactory): lambda topic: self._create_and_start_publisher(topic) ) - @overrides + @override def publish( self, topic: Union[TopicPath, str], @@ -80,11 +85,11 @@ def _on_future_completion( except GoogleAPICallError: self._multiplexer.try_erase(topic, publisher) - @overrides + @override def __enter__(self): self._multiplexer.__enter__() return self - @overrides + @override def __exit__(self, exc_type, exc_value, traceback): self._multiplexer.__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py index 6978fd8b..4b4c7c96 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py @@ -13,6 +13,7 @@ # limitations under the License. from concurrent.futures.thread import ThreadPoolExecutor +import sys from typing import Union, Optional, Set from threading import Lock @@ -31,7 +32,11 @@ FlowControlSettings, Partition, ) -from overrides import overrides + +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override class MultiplexedSubscriberClient(SubscriberClientInterface): @@ -49,7 +54,7 @@ def __init__( self._lock = Lock() self._live_clients = set() - @overrides + @override def subscribe( self, subscription: Union[SubscriptionPath, str], @@ -84,12 +89,12 @@ def _try_remove_client(self, future: StreamingPullFuture): self._live_clients.remove(future) self._cancel_streaming_pull_future(future) - @overrides + @override def __enter__(self): self._executor.__enter__() return self - @overrides + @override def __exit__(self, exc_type, exc_value, traceback): with self._lock: live_clients = self._live_clients diff --git a/google/cloud/pubsublite/cloudpubsub/message_transformer.py b/google/cloud/pubsublite/cloudpubsub/message_transformer.py index 50a44b1b..8e56ea1d 100644 --- a/google/cloud/pubsublite/cloudpubsub/message_transformer.py +++ b/google/cloud/pubsublite/cloudpubsub/message_transformer.py @@ -13,10 +13,15 @@ # limitations under the License. from abc import ABC, abstractmethod +import sys from typing import Callable from google.pubsub_v1 import PubsubMessage -from overrides import overrides + +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override from google.cloud.pubsublite_v1 import SequencedMessage @@ -41,7 +46,7 @@ def transform(self, source: SequencedMessage) -> PubsubMessage: @staticmethod def of_callable(transformer: Callable[[SequencedMessage], PubsubMessage]): class CallableTransformer(MessageTransformer): - @overrides + @override def transform(self, source: SequencedMessage) -> PubsubMessage: return transformer(source) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client.py b/google/cloud/pubsublite/cloudpubsub/publisher_client.py index 999ce103..341b3718 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client.py @@ -13,6 +13,7 @@ # limitations under the License. from concurrent.futures import Future +import sys from typing import Optional, Mapping, Union from uuid import uuid4 @@ -43,7 +44,11 @@ DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING, ) from google.cloud.pubsublite.types import TopicPath -from overrides import overrides + +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override def _get_client_id(enable_idempotence: bool): @@ -98,7 +103,7 @@ def __init__( ) self._require_started = RequireStarted() - @overrides + @override def publish( self, topic: Union[TopicPath, str], @@ -111,13 +116,13 @@ def publish( topic=topic, data=data, ordering_key=ordering_key, **attrs ) - @overrides + @override def __enter__(self): self._require_started.__enter__() self._impl.__enter__() return self - @overrides + @override def __exit__(self, exc_type, exc_value, traceback): self._impl.__exit__(exc_type, exc_value, traceback) self._require_started.__exit__(exc_type, exc_value, traceback) @@ -173,7 +178,7 @@ def __init__( ) self._require_started = RequireStarted() - @overrides + @override async def publish( self, topic: Union[TopicPath, str], @@ -186,13 +191,13 @@ async def publish( topic=topic, data=data, ordering_key=ordering_key, **attrs ) - @overrides + @override async def __aenter__(self): self._require_started.__enter__() await self._impl.__aenter__() return self - @overrides + @override async def __aexit__(self, exc_type, exc_value, traceback): await self._impl.__aexit__(exc_type, exc_value, traceback) self._require_started.__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py index ab2b98cd..b1a3f675 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py @@ -13,6 +13,7 @@ # limitations under the License. from concurrent.futures.thread import ThreadPoolExecutor +import sys from typing import Optional, Union, Set, AsyncIterator from google.api_core.client_options import ClientOptions @@ -46,7 +47,11 @@ Partition, SubscriptionPath, ) -from overrides import overrides + +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override class SubscriberClient(SubscriberClientInterface, ConstructableFromServiceAccount): @@ -100,7 +105,7 @@ def __init__( ) self._require_started = RequireStarted() - @overrides + @override def subscribe( self, subscription: Union[SubscriptionPath, str], @@ -116,13 +121,13 @@ def subscribe( fixed_partitions, ) - @overrides + @override def __enter__(self): self._require_started.__enter__() self._impl.__enter__() return self - @overrides + @override def __exit__(self, exc_type, exc_value, traceback): self._impl.__exit__(exc_type, exc_value, traceback) self._require_started.__exit__(exc_type, exc_value, traceback) @@ -177,7 +182,7 @@ def __init__( ) self._require_started = RequireStarted() - @overrides + @override async def subscribe( self, subscription: Union[SubscriptionPath, str], @@ -189,13 +194,13 @@ async def subscribe( subscription, per_partition_flow_control_settings, fixed_partitions ) - @overrides + @override async def __aenter__(self): self._require_started.__enter__() await self._impl.__aenter__() return self - @overrides + @override async def __aexit__(self, exc_type, exc_value, traceback): await self._impl.__aexit__(exc_type, exc_value, traceback) self._require_started.__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/internal/wire/assigner_impl.py b/google/cloud/pubsublite/internal/wire/assigner_impl.py index eb0637c0..4becfd43 100644 --- a/google/cloud/pubsublite/internal/wire/assigner_impl.py +++ b/google/cloud/pubsublite/internal/wire/assigner_impl.py @@ -13,11 +13,15 @@ # limitations under the License. import asyncio +import sys from typing import Optional, Set import logging -from overrides import overrides +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors from google.cloud.pubsublite.internal.wire.assigner import Assigner @@ -106,14 +110,14 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._stop_receiver() await self._connection.__aexit__(exc_type, exc_val, exc_tb) - @overrides + @override async def stop_processing(self, error: GoogleAPICallError): await self._stop_receiver() self._outstanding_assignment = False while not self._new_assignment.empty(): self._new_assignment.get_nowait() - @overrides + @override async def reinitialize( self, connection: Connection[PartitionAssignmentRequest, PartitionAssignment], diff --git a/google/cloud/pubsublite/internal/wire/committer_impl.py b/google/cloud/pubsublite/internal/wire/committer_impl.py index b96bf331..193a48ac 100644 --- a/google/cloud/pubsublite/internal/wire/committer_impl.py +++ b/google/cloud/pubsublite/internal/wire/committer_impl.py @@ -13,11 +13,15 @@ # limitations under the License. import asyncio +import sys from typing import Optional, List import logging -from overrides import overrides +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors from google.cloud.pubsublite.internal.wire.committer import Committer @@ -154,11 +158,11 @@ def commit(self, cursor: Cursor) -> None: raise self._connection.error() self._next_to_commit = cursor - @overrides + @override async def stop_processing(self, error: GoogleAPICallError): await self._stop_loopers() - @overrides + @override async def reinitialize( self, connection: Connection[ diff --git a/google/cloud/pubsublite/internal/wire/serial_batcher.py b/google/cloud/pubsublite/internal/wire/serial_batcher.py index abc19773..9eb78ab9 100644 --- a/google/cloud/pubsublite/internal/wire/serial_batcher.py +++ b/google/cloud/pubsublite/internal/wire/serial_batcher.py @@ -15,7 +15,12 @@ from abc import abstractmethod, ABCMeta from typing import Generic, List, NamedTuple import asyncio -from overrides import overrides +import sys + +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override from google.cloud.pubsublite.internal.wire.connection import Request, Response from google.cloud.pubsublite.internal.wire.work_item import WorkItem @@ -46,7 +51,7 @@ def get_size(self, request: Request) -> BatchSize: class IgnoredRequestSizer(RequestSizer[Request]): - @overrides + @override def get_size(self, request) -> BatchSize: return BatchSize(0, 0) diff --git a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py index 4d0c258d..634f816f 100644 --- a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py +++ b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py @@ -13,9 +13,14 @@ # limitations under the License. import asyncio +import sys from typing import Optional, List, NamedTuple -from overrides import overrides +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override + import logging from google.cloud.pubsub_v1.types import BatchSettings @@ -198,11 +203,11 @@ async def publish(self, message: PubSubMessage) -> MessageMetadata: await self._flush() return MessageMetadata(self._partition, await future) - @overrides + @override async def stop_processing(self, error: GoogleAPICallError): await self._stop_loopers() - @overrides + @override async def reinitialize( self, connection: Connection[PublishRequest, PublishResponse], @@ -223,7 +228,7 @@ async def reinitialize( await connection.write(aggregate) self._start_loopers() - @overrides + @override def get_size(self, request: _MessageWithSequence) -> BatchSize: return BatchSize( element_count=1, byte_count=PubSubMessage.pb(request.message).ByteSize() diff --git a/google/cloud/pubsublite/internal/wire/subscriber_impl.py b/google/cloud/pubsublite/internal/wire/subscriber_impl.py index 7a63451e..ea902e12 100644 --- a/google/cloud/pubsublite/internal/wire/subscriber_impl.py +++ b/google/cloud/pubsublite/internal/wire/subscriber_impl.py @@ -14,10 +14,15 @@ import asyncio from copy import deepcopy +import sys from typing import Optional, List from google.api_core.exceptions import GoogleAPICallError, FailedPrecondition -from overrides import overrides + +if sys.version_info >= (3, 12): + from typing import override +else: + from overrides import overrides as override from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_errors from google.cloud.pubsublite.internal.wire.connection import ( @@ -156,7 +161,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._stop_loopers() await self._connection.__aexit__(exc_type, exc_val, exc_tb) - @overrides + @override async def stop_processing(self, error: GoogleAPICallError): await self._stop_loopers() if is_reset_signal(error): @@ -174,7 +179,7 @@ async def stop_processing(self, error: GoogleAPICallError): await self._reset_handler.handle_reset() self._last_received_offset = None - @overrides + @override async def reinitialize( self, connection: Connection[SubscribeRequest, SubscribeResponse] ): diff --git a/setup.py b/setup.py index 107e55c2..706d82d7 100644 --- a/setup.py +++ b/setup.py @@ -39,8 +39,7 @@ "google-cloud-pubsub >= 2.10.0, <3.0.0", "grpcio >= 1.38.1, <2.0.0", "grpcio-status >= 1.38.1, <2.0.0", - "overrides>=6.0.1, <8.0.0", - "overrides>=7.0.1, <8.0.0; python_version>='3.12'", + "overrides>=6.0.1, <8.0.0; python_version<'3.12'", "google-api-core[grpc] >= 1.33.2, <3.0.0,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*", "proto-plus >= 1.22.3, < 2.0.0", "proto-plus >= 1.25.0, < 2.0.0; python_version >= '3.13'",