Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions google/cloud/pubsublite/admin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from typing import Optional, List, Union

from overrides import overrides
if sys.version_info >= (3, 12):
from typing import override
else:
from overrides import overrides as override

from google.api_core.client_options import ClientOptions
from google.api_core.operation import Operation
from google.auth.credentials import Credentials
Expand Down Expand Up @@ -78,39 +83,39 @@ def __init__(
region,
)

@overrides
@override
def region(self) -> CloudRegion:
return self._impl.region()

@overrides
@override
def create_topic(self, topic: Topic) -> Topic:
return self._impl.create_topic(topic)

@overrides
@override
def get_topic(self, topic_path: TopicPath) -> Topic:
return self._impl.get_topic(topic_path)

@overrides
@override
def get_topic_partition_count(self, topic_path: TopicPath) -> int:
return self._impl.get_topic_partition_count(topic_path)

@overrides
@override
def list_topics(self, location_path: LocationPath) -> List[Topic]:
return self._impl.list_topics(location_path)

@overrides
@override
def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic:
return self._impl.update_topic(topic, update_mask)

@overrides
@override
def delete_topic(self, topic_path: TopicPath):
return self._impl.delete_topic(topic_path)

@overrides
@override
def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPath]:
return self._impl.list_topic_subscriptions(topic_path)

@overrides
@override
def create_subscription(
self,
subscription: Subscription,
Expand All @@ -119,55 +124,55 @@ def create_subscription(
) -> Subscription:
return self._impl.create_subscription(subscription, target, starting_offset)

@overrides
@override
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
return self._impl.get_subscription(subscription_path)

@overrides
@override
def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]:
return self._impl.list_subscriptions(location_path)

@overrides
@override
def update_subscription(
self, subscription: Subscription, update_mask: FieldMask
) -> Subscription:
return self._impl.update_subscription(subscription, update_mask)

@overrides
@override
def seek_subscription(
self,
subscription_path: SubscriptionPath,
target: Union[BacklogLocation, PublishTime, EventTime],
) -> Operation:
return self._impl.seek_subscription(subscription_path, target)

@overrides
@override
def delete_subscription(self, subscription_path: SubscriptionPath):
return self._impl.delete_subscription(subscription_path)

@overrides
@override
def create_reservation(self, reservation: Reservation) -> Reservation:
return self._impl.create_reservation(reservation)

@overrides
@override
def get_reservation(self, reservation_path: ReservationPath) -> Reservation:
return self._impl.get_reservation(reservation_path)

@overrides
@override
def list_reservations(self, location_path: LocationPath) -> List[Reservation]:
return self._impl.list_reservations(location_path)

@overrides
@override
def update_reservation(
self, reservation: Reservation, update_mask: FieldMask
) -> Reservation:
return self._impl.update_reservation(reservation, update_mask)

@overrides
@override
def delete_reservation(self, reservation_path: ReservationPath):
return self._impl.delete_reservation(reservation_path)

@overrides
@override
def list_reservation_topics(
self, reservation_path: ReservationPath
) -> List[TopicPath]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from typing import Callable, Union, Mapping

from google.api_core.exceptions import GoogleAPICallError
Expand All @@ -26,7 +27,11 @@
AsyncPublisherClientInterface,
)
from google.cloud.pubsublite.types import TopicPath
from overrides import overrides

if sys.version_info >= (3, 12):
from typing import override
else:
from overrides import overrides as override


AsyncPublisherFactory = Callable[[TopicPath], AsyncSinglePublisher]
Expand All @@ -47,7 +52,7 @@ async def _create_and_open(self, topic: TopicPath):
await client.__aenter__()
return client

@overrides
@override
async def publish(
self,
topic: Union[TopicPath, str],
Expand All @@ -67,11 +72,11 @@ async def publish(
await self._multiplexer.try_erase(topic, publisher)
raise e

@overrides
@override
async def __aenter__(self):
await self._multiplexer.__aenter__()
return self

@overrides
@override
async def __aexit__(self, exc_type, exc_value, traceback):
await self._multiplexer.__aexit__(exc_type, exc_value, traceback)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from typing import (
Union,
AsyncIterator,
Expand All @@ -35,7 +36,11 @@
FlowControlSettings,
Partition,
)
from overrides import overrides

if sys.version_info >= (3, 12):
from typing import override
else:
from overrides import overrides as override


async def _iterate_subscriber(
Expand All @@ -59,7 +64,7 @@ def __init__(self, underlying_factory: AsyncSubscriberFactory):
self._underlying_factory = underlying_factory
self._live_clients = set()

@overrides
@override
async def subscribe(
self,
subscription: Union[SubscriptionPath, str],
Expand All @@ -79,7 +84,7 @@ async def subscribe(
subscriber, lambda: self._try_remove_client(subscriber)
)

@overrides
@override
async def __aenter__(self):
return self

Expand All @@ -88,7 +93,7 @@ async def _try_remove_client(self, client: AsyncSingleSubscriber):
self._live_clients.remove(client)
await client.__aexit__(None, None, None)

@overrides
@override
async def __aexit__(self, exc_type, exc_value, traceback):
live_clients = self._live_clients
self._live_clients = set()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from concurrent.futures import Future
import sys
from typing import Callable, Union, Mapping

from google.api_core.exceptions import GoogleAPICallError
Expand All @@ -27,7 +28,11 @@
PublisherClientInterface,
)
from google.cloud.pubsublite.types import TopicPath
from overrides import overrides

if sys.version_info >= (3, 12):
from typing import override
else:
from overrides import overrides as override

PublisherFactory = Callable[[TopicPath], SinglePublisher]

Expand All @@ -42,7 +47,7 @@ def __init__(self, publisher_factory: PublisherFactory):
lambda topic: self._create_and_start_publisher(topic)
)

@overrides
@override
def publish(
self,
topic: Union[TopicPath, str],
Expand Down Expand Up @@ -80,11 +85,11 @@ def _on_future_completion(
except GoogleAPICallError:
self._multiplexer.try_erase(topic, publisher)

@overrides
@override
def __enter__(self):
self._multiplexer.__enter__()
return self

@overrides
@override
def __exit__(self, exc_type, exc_value, traceback):
self._multiplexer.__exit__(exc_type, exc_value, traceback)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from concurrent.futures.thread import ThreadPoolExecutor
import sys
from typing import Union, Optional, Set
from threading import Lock

Expand All @@ -31,7 +32,11 @@
FlowControlSettings,
Partition,
)
from overrides import overrides

if sys.version_info >= (3, 12):
from typing import override
else:
from overrides import overrides as override


class MultiplexedSubscriberClient(SubscriberClientInterface):
Expand All @@ -49,7 +54,7 @@ def __init__(
self._lock = Lock()
self._live_clients = set()

@overrides
@override
def subscribe(
self,
subscription: Union[SubscriptionPath, str],
Expand Down Expand Up @@ -84,12 +89,12 @@ def _try_remove_client(self, future: StreamingPullFuture):
self._live_clients.remove(future)
self._cancel_streaming_pull_future(future)

@overrides
@override
def __enter__(self):
self._executor.__enter__()
return self

@overrides
@override
def __exit__(self, exc_type, exc_value, traceback):
with self._lock:
live_clients = self._live_clients
Expand Down
9 changes: 7 additions & 2 deletions google/cloud/pubsublite/cloudpubsub/message_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
# limitations under the License.

from abc import ABC, abstractmethod
import sys
from typing import Callable

from google.pubsub_v1 import PubsubMessage
from overrides import overrides

if sys.version_info >= (3, 12):
from typing import override
else:
from overrides import overrides as override

from google.cloud.pubsublite_v1 import SequencedMessage

Expand All @@ -41,7 +46,7 @@ def transform(self, source: SequencedMessage) -> PubsubMessage:
@staticmethod
def of_callable(transformer: Callable[[SequencedMessage], PubsubMessage]):
class CallableTransformer(MessageTransformer):
@overrides
@override
def transform(self, source: SequencedMessage) -> PubsubMessage:
return transformer(source)

Expand Down
Loading
Loading