From 3e2788c773408b995c7cb5b2834015819bc941ae Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Fri, 21 Oct 2016 23:20:07 -0700 Subject: [PATCH 1/2] Remapping (almost) all RPC status codes to our exceptions in datastore. Fixes #2497. --- .../google/cloud/datastore/connection.py | 94 +++++++++--- datastore/unit_tests/test_connection.py | 141 +++++++++--------- 2 files changed, 137 insertions(+), 98 deletions(-) diff --git a/datastore/google/cloud/datastore/connection.py b/datastore/google/cloud/datastore/connection.py index 74070b142355..2d038eff28c2 100644 --- a/datastore/google/cloud/datastore/connection.py +++ b/datastore/google/cloud/datastore/connection.py @@ -23,19 +23,35 @@ from google.cloud import connection as connection_module from google.cloud.environment_vars import DISABLE_GRPC from google.cloud.environment_vars import GCD_HOST -from google.cloud.exceptions import BadRequest -from google.cloud.exceptions import Conflict -from google.cloud.exceptions import GrpcRendezvous -from google.cloud.exceptions import make_exception +from google.cloud import exceptions from google.cloud.datastore._generated import datastore_pb2 as _datastore_pb2 try: from grpc import StatusCode from google.cloud.datastore._generated import datastore_grpc_pb2 except ImportError: # pragma: NO COVER + _GRPC_ERROR_MAPPING = {} _HAVE_GRPC = False datastore_grpc_pb2 = None StatusCode = None else: + # NOTE: We don't include OK -> 200 or CANCELLED -> 499 + _GRPC_ERROR_MAPPING = { + StatusCode.UNKNOWN: exceptions.InternalServerError, + StatusCode.INVALID_ARGUMENT: exceptions.BadRequest, + StatusCode.DEADLINE_EXCEEDED: exceptions.GatewayTimeout, + StatusCode.NOT_FOUND: exceptions.NotFound, + StatusCode.ALREADY_EXISTS: exceptions.Conflict, + StatusCode.PERMISSION_DENIED: exceptions.Forbidden, + StatusCode.UNAUTHENTICATED: exceptions.Unauthorized, + StatusCode.RESOURCE_EXHAUSTED: exceptions.TooManyRequests, + StatusCode.FAILED_PRECONDITION: exceptions.PreconditionFailed, + StatusCode.ABORTED: exceptions.Conflict, + StatusCode.OUT_OF_RANGE: exceptions.BadRequest, + StatusCode.UNIMPLEMENTED: exceptions.MethodNotImplemented, + StatusCode.INTERNAL: exceptions.InternalServerError, + StatusCode.UNAVAILABLE: exceptions.ServiceUnavailable, + StatusCode.DATA_LOSS: exceptions.InternalServerError, + } _HAVE_GRPC = True @@ -93,7 +109,8 @@ def _request(self, project, method, data): status = headers['status'] if status != '200': error_status = status_pb2.Status.FromString(content) - raise make_exception(headers, error_status.message, use_json=False) + raise exceptions.make_exception( + headers, error_status.message, use_json=False) return content @@ -220,6 +237,44 @@ def allocate_ids(self, project, request_pb): _datastore_pb2.AllocateIdsResponse) +def _grpc_catch_rendezvous(to_call, *args, **kwargs): + """Call a method/function and re-map gRPC exceptions. + + .. _code.proto: https://github.com/googleapis/googleapis/blob/\ + master/google/rpc/code.proto + + Remaps gRPC exceptions to the classes defined in + :mod:`~google.cloud.exceptions` (according to the description + in `code.proto`_). + + :type to_call: callable + :param to_call: Callable that makes a request which may raise a + :class:`~google.cloud.exceptions.GrpcRendezvous`. + + :type args: tuple + :param args: Positional arugments to the callable. + + :type kwargs: dict + :param kwargs: Keyword arguments to the callable. + + :rtype: object + :returns: The value returned from ``to_call``. + :raises: :class:`~google.cloud.exceptions.GrpcRendezvous` if one + is encountered that can't be re-mapped, otherwise maps + to a :class:`~google.cloud.exceptions.GoogleCloudError` + subclass. + """ + try: + return to_call(*args, **kwargs) + except exceptions.GrpcRendezvous as exc: + error_code = exc.code() + error_class = _GRPC_ERROR_MAPPING.get(error_code) + if error_class is None: + raise + else: + raise error_class(exc.details()) + + class _DatastoreAPIOverGRPC(object): """Helper mapping datastore API methods. @@ -276,13 +331,8 @@ def run_query(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - try: - return self._stub.RunQuery(request_pb) - except GrpcRendezvous as exc: - error_code = exc.code() - if error_code == StatusCode.INVALID_ARGUMENT: - raise BadRequest(exc.details()) - raise + return _grpc_catch_rendezvous( + self._stub.RunQuery, request_pb) def begin_transaction(self, project, request_pb): """Perform a ``beginTransaction`` request. @@ -299,7 +349,8 @@ def begin_transaction(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - return self._stub.BeginTransaction(request_pb) + return _grpc_catch_rendezvous( + self._stub.BeginTransaction, request_pb) def commit(self, project, request_pb): """Perform a ``commit`` request. @@ -315,15 +366,8 @@ def commit(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - try: - return self._stub.Commit(request_pb) - except GrpcRendezvous as exc: - error_code = exc.code() - if error_code == StatusCode.ABORTED: - raise Conflict(exc.details()) - if error_code == StatusCode.INVALID_ARGUMENT: - raise BadRequest(exc.details()) - raise + return _grpc_catch_rendezvous( + self._stub.Commit, request_pb) def rollback(self, project, request_pb): """Perform a ``rollback`` request. @@ -339,7 +383,8 @@ def rollback(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - return self._stub.Rollback(request_pb) + return _grpc_catch_rendezvous( + self._stub.Rollback, request_pb) def allocate_ids(self, project, request_pb): """Perform an ``allocateIds`` request. @@ -355,7 +400,8 @@ def allocate_ids(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - return self._stub.AllocateIds(request_pb) + return _grpc_catch_rendezvous( + self._stub.AllocateIds, request_pb) class Connection(connection_module.Connection): diff --git a/datastore/unit_tests/test_connection.py b/datastore/unit_tests/test_connection.py index cbafc72ac3f0..c7577cedb568 100644 --- a/datastore/unit_tests/test_connection.py +++ b/datastore/unit_tests/test_connection.py @@ -106,6 +106,67 @@ def test__request_not_200(self): [{'method': METHOD, 'project': PROJECT}]) +@unittest.skipUnless(_HAVE_GRPC, 'No gRPC') +class Test__grpc_catch_rendezvous(unittest.TestCase): + + def _callFUT(self, to_call, *args, **kwargs): + from google.cloud.datastore.connection import _grpc_catch_rendezvous + return _grpc_catch_rendezvous(to_call, *args, **kwargs) + + @staticmethod + def _fake_method(exc, result=None): + if exc is None: + return result + else: + raise exc + + def test_success(self): + expected = object() + result = self._callFUT(self._fake_method, None, expected) + self.assertIs(result, expected) + + def test_failure_aborted(self): + from grpc import StatusCode + from grpc._channel import _RPCState + from google.cloud.exceptions import Conflict + from google.cloud.exceptions import GrpcRendezvous + + details = 'Bad things.' + exc_state = _RPCState((), None, None, StatusCode.ABORTED, details) + exc = GrpcRendezvous(exc_state, None, None, None) + with self.assertRaises(Conflict): + self._callFUT(self._fake_method, exc) + + def test_failure_invalid_argument(self): + from grpc import StatusCode + from grpc._channel import _RPCState + from google.cloud.exceptions import BadRequest + from google.cloud.exceptions import GrpcRendezvous + + details = ('Cannot have inequality filters on multiple ' + 'properties: [created, priority]') + exc_state = _RPCState((), None, None, + StatusCode.INVALID_ARGUMENT, details) + exc = GrpcRendezvous(exc_state, None, None, None) + with self.assertRaises(BadRequest): + self._callFUT(self._fake_method, exc) + + def test_failure_cancelled(self): + from grpc import StatusCode + from grpc._channel import _RPCState + from google.cloud.exceptions import GrpcRendezvous + + exc_state = _RPCState((), None, None, StatusCode.CANCELLED, None) + exc = GrpcRendezvous(exc_state, None, None, None) + with self.assertRaises(GrpcRendezvous): + self._callFUT(self._fake_method, exc) + + def test_commit_failure_non_grpc_err(self): + exc = RuntimeError('Not a gRPC error') + with self.assertRaises(RuntimeError): + self._callFUT(self._fake_method, exc) + + class Test_DatastoreAPIOverGRPC(unittest.TestCase): def _getTargetClass(self): @@ -227,16 +288,6 @@ def test_run_query_invalid_argument(self): exc = GrpcRendezvous(exc_state, None, None, None) self._run_query_failure_helper(exc, BadRequest) - @unittest.skipUnless(_HAVE_GRPC, 'No gRPC') - def test_run_query_cancelled(self): - from grpc import StatusCode - from grpc._channel import _RPCState - from google.cloud.exceptions import GrpcRendezvous - - exc_state = _RPCState((), None, None, StatusCode.CANCELLED, None) - exc = GrpcRendezvous(exc_state, None, None, None) - self._run_query_failure_helper(exc, GrpcRendezvous) - def test_begin_transaction(self): return_val = object() stub = _GRPCStub(return_val) @@ -264,59 +315,6 @@ def test_commit_success(self): self.assertEqual(stub.method_calls, [(request_pb, 'Commit')]) - def _commit_failure_helper(self, exc, err_class): - stub = _GRPCStub(side_effect=exc) - datastore_api = self._makeOne(stub=stub) - - request_pb = _RequestPB() - project = 'PROJECT' - with self.assertRaises(err_class): - datastore_api.commit(project, request_pb) - - self.assertEqual(request_pb.project_id, project) - self.assertEqual(stub.method_calls, - [(request_pb, 'Commit')]) - - @unittest.skipUnless(_HAVE_GRPC, 'No gRPC') - def test_commit_failure_aborted(self): - from grpc import StatusCode - from grpc._channel import _RPCState - from google.cloud.exceptions import Conflict - from google.cloud.exceptions import GrpcRendezvous - - details = 'Bad things.' - exc_state = _RPCState((), None, None, StatusCode.ABORTED, details) - exc = GrpcRendezvous(exc_state, None, None, None) - self._commit_failure_helper(exc, Conflict) - - @unittest.skipUnless(_HAVE_GRPC, 'No gRPC') - def test_commit_failure_invalid_argument(self): - from grpc import StatusCode - from grpc._channel import _RPCState - from google.cloud.exceptions import BadRequest - from google.cloud.exceptions import GrpcRendezvous - - details = 'Too long content.' - exc_state = _RPCState((), None, None, - StatusCode.INVALID_ARGUMENT, details) - exc = GrpcRendezvous(exc_state, None, None, None) - self._commit_failure_helper(exc, BadRequest) - - @unittest.skipUnless(_HAVE_GRPC, 'No gRPC') - def test_commit_failure_cancelled(self): - from grpc import StatusCode - from grpc._channel import _RPCState - from google.cloud.exceptions import GrpcRendezvous - - exc_state = _RPCState((), None, None, StatusCode.CANCELLED, None) - exc = GrpcRendezvous(exc_state, None, None, None) - self._commit_failure_helper(exc, GrpcRendezvous) - - @unittest.skipUnless(_HAVE_GRPC, 'No gRPC') - def test_commit_failure_non_grpc_err(self): - exc = RuntimeError('Not a gRPC error') - self._commit_failure_helper(exc, RuntimeError) - def test_rollback(self): return_val = object() stub = _GRPCStub(return_val) @@ -1161,27 +1159,22 @@ def __init__(self, return_val=None, side_effect=Exception): def _method(self, request_pb, name): self.method_calls.append((request_pb, name)) - return self.return_val + if self.side_effect is Exception: + return self.return_val + else: + raise self.side_effect def Lookup(self, request_pb): return self._method(request_pb, 'Lookup') def RunQuery(self, request_pb): - result = self._method(request_pb, 'RunQuery') - if self.side_effect is Exception: - return result - else: - raise self.side_effect + return self._method(request_pb, 'RunQuery') def BeginTransaction(self, request_pb): return self._method(request_pb, 'BeginTransaction') def Commit(self, request_pb): - result = self._method(request_pb, 'Commit') - if self.side_effect is Exception: - return result - else: - raise self.side_effect + return self._method(request_pb, 'Commit') def Rollback(self, request_pb): return self._method(request_pb, 'Rollback') From b62c95fb941a0c78d5e7007e92ef17ace57abcb7 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Mon, 24 Oct 2016 09:45:12 -0700 Subject: [PATCH 2/2] Switching _grpc_catch_rendezvous to a context manager. --- .../google/cloud/datastore/connection.py | 45 +++++++------------ datastore/unit_tests/test_connection.py | 19 +++++--- 2 files changed, 27 insertions(+), 37 deletions(-) diff --git a/datastore/google/cloud/datastore/connection.py b/datastore/google/cloud/datastore/connection.py index 2d038eff28c2..aac5c85e0a88 100644 --- a/datastore/google/cloud/datastore/connection.py +++ b/datastore/google/cloud/datastore/connection.py @@ -14,6 +14,7 @@ """Connections to Google Cloud Datastore API servers.""" +import contextlib import os from google.rpc import status_pb2 @@ -237,8 +238,9 @@ def allocate_ids(self, project, request_pb): _datastore_pb2.AllocateIdsResponse) -def _grpc_catch_rendezvous(to_call, *args, **kwargs): - """Call a method/function and re-map gRPC exceptions. +@contextlib.contextmanager +def _grpc_catch_rendezvous(): + """Re-map gRPC exceptions that happen in context. .. _code.proto: https://github.com/googleapis/googleapis/blob/\ master/google/rpc/code.proto @@ -246,26 +248,9 @@ def _grpc_catch_rendezvous(to_call, *args, **kwargs): Remaps gRPC exceptions to the classes defined in :mod:`~google.cloud.exceptions` (according to the description in `code.proto`_). - - :type to_call: callable - :param to_call: Callable that makes a request which may raise a - :class:`~google.cloud.exceptions.GrpcRendezvous`. - - :type args: tuple - :param args: Positional arugments to the callable. - - :type kwargs: dict - :param kwargs: Keyword arguments to the callable. - - :rtype: object - :returns: The value returned from ``to_call``. - :raises: :class:`~google.cloud.exceptions.GrpcRendezvous` if one - is encountered that can't be re-mapped, otherwise maps - to a :class:`~google.cloud.exceptions.GoogleCloudError` - subclass. """ try: - return to_call(*args, **kwargs) + yield except exceptions.GrpcRendezvous as exc: error_code = exc.code() error_class = _GRPC_ERROR_MAPPING.get(error_code) @@ -331,8 +316,8 @@ def run_query(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - return _grpc_catch_rendezvous( - self._stub.RunQuery, request_pb) + with _grpc_catch_rendezvous(): + return self._stub.RunQuery(request_pb) def begin_transaction(self, project, request_pb): """Perform a ``beginTransaction`` request. @@ -349,8 +334,8 @@ def begin_transaction(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - return _grpc_catch_rendezvous( - self._stub.BeginTransaction, request_pb) + with _grpc_catch_rendezvous(): + return self._stub.BeginTransaction(request_pb) def commit(self, project, request_pb): """Perform a ``commit`` request. @@ -366,8 +351,8 @@ def commit(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - return _grpc_catch_rendezvous( - self._stub.Commit, request_pb) + with _grpc_catch_rendezvous(): + return self._stub.Commit(request_pb) def rollback(self, project, request_pb): """Perform a ``rollback`` request. @@ -383,8 +368,8 @@ def rollback(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - return _grpc_catch_rendezvous( - self._stub.Rollback, request_pb) + with _grpc_catch_rendezvous(): + return self._stub.Rollback(request_pb) def allocate_ids(self, project, request_pb): """Perform an ``allocateIds`` request. @@ -400,8 +385,8 @@ def allocate_ids(self, project, request_pb): :returns: The returned protobuf response object. """ request_pb.project_id = project - return _grpc_catch_rendezvous( - self._stub.AllocateIds, request_pb) + with _grpc_catch_rendezvous(): + return self._stub.AllocateIds(request_pb) class Connection(connection_module.Connection): diff --git a/datastore/unit_tests/test_connection.py b/datastore/unit_tests/test_connection.py index c7577cedb568..973a3241506e 100644 --- a/datastore/unit_tests/test_connection.py +++ b/datastore/unit_tests/test_connection.py @@ -109,9 +109,9 @@ def test__request_not_200(self): @unittest.skipUnless(_HAVE_GRPC, 'No gRPC') class Test__grpc_catch_rendezvous(unittest.TestCase): - def _callFUT(self, to_call, *args, **kwargs): + def _callFUT(self): from google.cloud.datastore.connection import _grpc_catch_rendezvous - return _grpc_catch_rendezvous(to_call, *args, **kwargs) + return _grpc_catch_rendezvous() @staticmethod def _fake_method(exc, result=None): @@ -122,7 +122,8 @@ def _fake_method(exc, result=None): def test_success(self): expected = object() - result = self._callFUT(self._fake_method, None, expected) + with self._callFUT(): + result = self._fake_method(None, expected) self.assertIs(result, expected) def test_failure_aborted(self): @@ -135,7 +136,8 @@ def test_failure_aborted(self): exc_state = _RPCState((), None, None, StatusCode.ABORTED, details) exc = GrpcRendezvous(exc_state, None, None, None) with self.assertRaises(Conflict): - self._callFUT(self._fake_method, exc) + with self._callFUT(): + self._fake_method(exc) def test_failure_invalid_argument(self): from grpc import StatusCode @@ -149,7 +151,8 @@ def test_failure_invalid_argument(self): StatusCode.INVALID_ARGUMENT, details) exc = GrpcRendezvous(exc_state, None, None, None) with self.assertRaises(BadRequest): - self._callFUT(self._fake_method, exc) + with self._callFUT(): + self._fake_method(exc) def test_failure_cancelled(self): from grpc import StatusCode @@ -159,12 +162,14 @@ def test_failure_cancelled(self): exc_state = _RPCState((), None, None, StatusCode.CANCELLED, None) exc = GrpcRendezvous(exc_state, None, None, None) with self.assertRaises(GrpcRendezvous): - self._callFUT(self._fake_method, exc) + with self._callFUT(): + self._fake_method(exc) def test_commit_failure_non_grpc_err(self): exc = RuntimeError('Not a gRPC error') with self.assertRaises(RuntimeError): - self._callFUT(self._fake_method, exc) + with self._callFUT(): + self._fake_method(exc) class Test_DatastoreAPIOverGRPC(unittest.TestCase):