Skip to content

Commit

Permalink
removed conversion decorators
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Jul 11, 2024
1 parent 48bb06f commit 245bd08
Show file tree
Hide file tree
Showing 11 changed files with 0 additions and 211 deletions.
11 changes: 0 additions & 11 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ class _EntryWithProto: # noqa: F811
proto: types_pb.MutateRowsRequest.Entry


@CrossSync.export_sync(
path="google.cloud.bigtable.data._sync._mutate_rows._MutateRowsOperation",
)
class _MutateRowsOperationAsync:
"""
MutateRowsOperation manages the logic of sending a set of row mutations,
Expand All @@ -84,12 +81,6 @@ class _MutateRowsOperationAsync:
If not specified, the request will run until operation_timeout is reached.
"""

@CrossSync.convert(
replace_symbols={
"BigtableAsyncClient": "BigtableClient",
"TableAsync": "Table",
}
)
def __init__(
self,
gapic_client: "BigtableAsyncClient",
Expand Down Expand Up @@ -141,7 +132,6 @@ def __init__(
self.remaining_indices = list(range(len(self.mutations)))
self.errors: dict[int, list[Exception]] = {}

@CrossSync.convert
async def start(self):
"""
Start the operation, and run until completion
Expand Down Expand Up @@ -174,7 +164,6 @@ async def start(self):
if all_errors:
raise MutationsExceptionGroup(all_errors, len(self.mutations))

@CrossSync.convert
async def _run_attempt(self):
"""
Run a single attempt of the mutate_rows rpc.
Expand Down
8 changes: 0 additions & 8 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ def __init__(self, chunk):
self.chunk = chunk


@CrossSync.export_sync(
path="google.cloud.bigtable.data._sync._read_rows._ReadRowsOperation",
)
class _ReadRowsOperationAsync:
"""
ReadRowsOperation handles the logic of merging chunks from a ReadRowsResponse stream
Expand Down Expand Up @@ -85,7 +82,6 @@ class _ReadRowsOperationAsync:
"_remaining_count",
)

@CrossSync.convert(replace_symbols={"TableAsync": "Table"})
def __init__(
self,
query: ReadRowsQuery,
Expand Down Expand Up @@ -166,7 +162,6 @@ def _read_rows_attempt(self) -> CrossSync.Iterable[Row]:
chunked_stream = self.chunk_stream(gapic_stream)
return self.merge_rows(chunked_stream)

@CrossSync.convert
async def chunk_stream(
self, stream: CrossSync.Awaitable[CrossSync.Iterable[ReadRowsResponsePB]]
) -> CrossSync.Iterable[ReadRowsResponsePB.CellChunk]:
Expand Down Expand Up @@ -219,9 +214,6 @@ async def chunk_stream(
current_key = None

@staticmethod
@CrossSync.convert(
replace_symbols={"__aiter__": "__iter__", "__anext__": "__next__"}
)
async def merge_rows(
chunks: CrossSync.Iterable[ReadRowsResponsePB.CellChunk] | None,
) -> CrossSync.Iterable[Row]:
Expand Down
43 changes: 0 additions & 43 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,7 @@
from google.cloud.bigtable.data._helpers import ShardedQuery


@CrossSync.export_sync(
path="google.cloud.bigtable.data._sync.client.BigtableDataClient",
)
class BigtableDataClientAsync(ClientWithProject):
@CrossSync.convert(
replace_symbols={
"BigtableAsyncClient": "BigtableClient",
"PooledBigtableGrpcAsyncIOTransport": "PooledBigtableGrpcTransport",
"AsyncPooledChannel": "PooledChannel",
}
)
def __init__(
self,
*,
Expand Down Expand Up @@ -266,7 +256,6 @@ def _start_background_channel_refresh(self) -> None:
lambda _: self._channel_refresh_tasks.remove(refresh_task) if refresh_task in self._channel_refresh_tasks else None
)

@CrossSync.convert
async def close(self, timeout: float | None = None):
"""
Cancel all background tasks
Expand All @@ -279,7 +268,6 @@ async def close(self, timeout: float | None = None):
self._executor.shutdown(wait=False)
await CrossSync.wait(self._channel_refresh_tasks, timeout=timeout)

@CrossSync.convert
async def _ping_and_warm_instances(
self, channel: Channel, instance_key: _helpers._WarmedInstanceKey | None = None
) -> list[BaseException | None]:
Expand Down Expand Up @@ -321,7 +309,6 @@ async def _ping_and_warm_instances(
)
return [r or None for r in result_list]

@CrossSync.convert
async def _manage_channel(
self,
channel_idx: int,
Expand Down Expand Up @@ -378,7 +365,6 @@ async def _manage_channel(
next_refresh = random.uniform(refresh_interval_min, refresh_interval_max)
next_sleep = next_refresh - (time.monotonic() - start_timestamp)

@CrossSync.convert(replace_symbols={"TableAsync": "Table"})
async def _register_instance(self, instance_id: str, owner: TableAsync) -> None:
"""
Registers an instance with the client, and warms the channel pool
Expand Down Expand Up @@ -409,7 +395,6 @@ async def _register_instance(self, instance_id: str, owner: TableAsync) -> None:
# refresh tasks aren't active. start them as background tasks
self._start_background_channel_refresh()

@CrossSync.convert(replace_symbols={"TableAsync": "Table"})
async def _remove_instance_registration(
self, instance_id: str, owner: TableAsync
) -> bool:
Expand Down Expand Up @@ -440,7 +425,6 @@ async def _remove_instance_registration(
except KeyError:
return False

@CrossSync.convert(replace_symbols={"TableAsync": "Table"})
def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> TableAsync:
"""
Returns a table instance for making data API requests. All arguments are passed
Expand Down Expand Up @@ -482,18 +466,15 @@ def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> TableAs
"""
return TableAsync(self, instance_id, table_id, *args, **kwargs)

@CrossSync.convert(sync_name="__enter__")
async def __aenter__(self):
self._start_background_channel_refresh()
return self

@CrossSync.convert(sync_name="__exit__", replace_symbols={"__aexit__": "__exit__"})
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
await self._gapic_client.__aexit__(exc_type, exc_val, exc_tb)


@CrossSync.export_sync(path="google.cloud.bigtable.data._sync.client.Table")
class TableAsync:
"""
Main Data API surface
Expand All @@ -502,9 +483,6 @@ class TableAsync:
each call
"""

@CrossSync.convert(
replace_symbols={"BigtableDataClientAsync": "BigtableDataClient"}
)
def __init__(
self,
client: BigtableDataClientAsync,
Expand Down Expand Up @@ -625,12 +603,6 @@ def __init__(
f"{self.__class__.__name__} must be created within an async event loop context."
) from e

@CrossSync.convert(
replace_symbols={
"AsyncIterable": "Iterable",
"_ReadRowsOperationAsync": "_ReadRowsOperation",
}
)
async def read_rows_stream(
self,
query: ReadRowsQuery,
Expand Down Expand Up @@ -681,7 +653,6 @@ async def read_rows_stream(
)
return row_merger.start_operation()

@CrossSync.convert
async def read_rows(
self,
query: ReadRowsQuery,
Expand Down Expand Up @@ -729,7 +700,6 @@ async def read_rows(
)
return [row async for row in row_generator]

@CrossSync.convert
async def read_row(
self,
row_key: str | bytes,
Expand Down Expand Up @@ -779,7 +749,6 @@ async def read_row(
return None
return results[0]

@CrossSync.convert
async def read_rows_sharded(
self,
sharded_query: ShardedQuery,
Expand Down Expand Up @@ -879,7 +848,6 @@ async def read_rows_with_semaphore(query):
)
return results_list

@CrossSync.convert
async def row_exists(
self,
row_key: str | bytes,
Expand Down Expand Up @@ -928,7 +896,6 @@ async def row_exists(
)
return len(results) > 0

@CrossSync.convert
async def sample_row_keys(
self,
*,
Expand Down Expand Up @@ -1001,7 +968,6 @@ async def execute_rpc():
exception_factory=_helpers._retry_exception_factory,
)

@CrossSync.convert(replace_symbols={"MutationsBatcherAsync": "MutationsBatcher"})
def mutations_batcher(
self,
*,
Expand Down Expand Up @@ -1051,7 +1017,6 @@ def mutations_batcher(
batch_retryable_errors=batch_retryable_errors,
)

@CrossSync.convert
async def mutate_row(
self,
row_key: str | bytes,
Expand Down Expand Up @@ -1130,9 +1095,6 @@ async def mutate_row(
exception_factory=_helpers._retry_exception_factory,
)

@CrossSync.convert(
replace_symbols={"_MutateRowsOperationAsync": "_MutateRowsOperation"}
)
async def bulk_mutate_rows(
self,
mutation_entries: list[RowMutationEntry],
Expand Down Expand Up @@ -1188,7 +1150,6 @@ async def bulk_mutate_rows(
)
await operation.start()

@CrossSync.convert
async def check_and_mutate_row(
self,
row_key: str | bytes,
Expand Down Expand Up @@ -1255,7 +1216,6 @@ async def check_and_mutate_row(
)
return result.predicate_matched

@CrossSync.convert
async def read_modify_write_row(
self,
row_key: str | bytes,
Expand Down Expand Up @@ -1306,7 +1266,6 @@ async def read_modify_write_row(
# construct Row from result
return Row._from_pb(result.row)

@CrossSync.convert
async def close(self):
"""
Called to close the Table instance and release any resources held by it.
Expand All @@ -1315,7 +1274,6 @@ async def close(self):
self._register_instance_future.cancel()
await self.client._remove_instance_registration(self.instance_id, self)

@CrossSync.convert(sync_name="__enter__")
async def __aenter__(self):
"""
Implement async context manager protocol
Expand All @@ -1327,7 +1285,6 @@ async def __aenter__(self):
await self._register_instance_future
return self

@CrossSync.convert(sync_name="__exit__")
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Implement async context manager protocol
Expand Down
22 changes: 0 additions & 22 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@
from google.cloud.bigtable.data._sync.client import Table # noqa: F401


@CrossSync.export_sync(
path="google.cloud.bigtable.data._sync.mutations_batcher._FlowControl"
)
class _FlowControlAsync:
"""
Manages flow control for batched mutations. Mutations are registered against
Expand Down Expand Up @@ -110,7 +107,6 @@ def _has_capacity(self, additional_count: int, additional_size: int) -> bool:
new_count = self._in_flight_mutation_count + additional_count
return new_size <= acceptable_size and new_count <= acceptable_count

@CrossSync.convert
async def remove_from_flow(
self, mutations: RowMutationEntry | list[RowMutationEntry]
) -> None:
Expand All @@ -132,7 +128,6 @@ async def remove_from_flow(
async with self._capacity_condition:
self._capacity_condition.notify_all()

@CrossSync.convert
async def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry]):
"""
Generator function that registers mutations with flow control. As mutations
Expand Down Expand Up @@ -182,10 +177,6 @@ async def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry]
yield mutations[start_idx:end_idx]


@CrossSync.export_sync(
path="google.cloud.bigtable.data._sync.mutations_batcher.MutationsBatcher",
mypy_ignore=["unreachable"],
)
class MutationsBatcherAsync:
"""
Allows users to send batches using context manager API:
Expand Down Expand Up @@ -217,9 +208,6 @@ class MutationsBatcherAsync:
Defaults to the Table's default_mutate_rows_retryable_errors.
"""

@CrossSync.convert(
replace_symbols={"TableAsync": "Table", "_FlowControlAsync": "_FlowControl"}
)
def __init__(
self,
table: TableAsync,
Expand Down Expand Up @@ -275,7 +263,6 @@ def __init__(
# clean up on program exit
atexit.register(self._on_exit)

@CrossSync.convert
async def _timer_routine(self, interval: float | None) -> None:
"""
Set up a background task to flush the batcher every interval seconds
Expand All @@ -296,7 +283,6 @@ async def _timer_routine(self, interval: float | None) -> None:
if not self._closed.is_set() and self._staged_entries:
self._schedule_flush()

@CrossSync.convert
async def append(self, mutation_entry: RowMutationEntry):
"""
Add a new set of mutations to the internal queue
Expand Down Expand Up @@ -346,7 +332,6 @@ def _schedule_flush(self) -> CrossSync.Future[None] | None:
return new_task
return None

@CrossSync.convert
async def _flush_internal(self, new_entries: list[RowMutationEntry]):
"""
Flushes a set of mutations to the server, and updates internal state
Expand All @@ -367,9 +352,6 @@ async def _flush_internal(self, new_entries: list[RowMutationEntry]):
self._entries_processed_since_last_raise += len(new_entries)
self._add_exceptions(found_exceptions)

@CrossSync.convert(
replace_symbols={"_MutateRowsOperationAsync": "_MutateRowsOperation"}
)
async def _execute_mutate_rows(
self, batch: list[RowMutationEntry]
) -> list[FailedMutationEntryError]:
Expand Down Expand Up @@ -450,12 +432,10 @@ def _raise_exceptions(self):
entry_count=entry_count,
)

@CrossSync.convert(sync_name="__enter__")
async def __aenter__(self):
"""Allow use of context manager API"""
return self

@CrossSync.convert(sync_name="__exit__")
async def __aexit__(self, exc_type, exc, tb):
"""
Allow use of context manager API.
Expand All @@ -472,7 +452,6 @@ def closed(self) -> bool:
"""
return self._closed.is_set()

@CrossSync.convert
async def close(self):
"""
Flush queue and clean up resources
Expand Down Expand Up @@ -500,7 +479,6 @@ def _on_exit(self):
)

@staticmethod
@CrossSync.convert
async def _wait_for_batch_results(
*tasks: CrossSync.Future[list[FailedMutationEntryError]]
| CrossSync.Future[None],
Expand Down
Loading

0 comments on commit 245bd08

Please sign in to comment.