From f4df20f6aa2062552c85278bb7a88edc8efefa9a Mon Sep 17 00:00:00 2001 From: xvblack Date: Thu, 26 Mar 2015 11:20:10 -0400 Subject: [PATCH 1/4] Multiplex using service_name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change the signature of register_processor to (service_name, processor) and add TMultiplexingProtocol, which use separator (“:”) to join service_name and function as the api Modified test and example --- examples/multiplexer/multiplexed_client.py | 10 ++++-- examples/multiplexer/multiplexed_server.py | 4 +-- tests/test_multiplexed.py | 16 +++++++--- thriftpy/protocol/__init__.py | 4 ++- thriftpy/protocol/multiplex.py | 36 ++++++++++++++++++++++ thriftpy/thrift.py | 30 ++++++++---------- 6 files changed, 73 insertions(+), 27 deletions(-) create mode 100644 thriftpy/protocol/multiplex.py diff --git a/examples/multiplexer/multiplexed_client.py b/examples/multiplexer/multiplexed_client.py index e4612af..0691889 100644 --- a/examples/multiplexer/multiplexed_client.py +++ b/examples/multiplexer/multiplexed_client.py @@ -2,18 +2,24 @@ import thriftpy from thriftpy.rpc import client_context +from thriftpy.protocol import TBinaryProtocolFactory, TMultiplexingProtocolFactory dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift") pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift") def main(): - with client_context(dd_thrift.DingService, '127.0.0.1', 9090) as c: + binary_factory = TBinaryProtocolFactory() + dd_factory = TMultiplexingProtocolFactory(binary_factory, "dd_thrift") + with client_context(dd_thrift.DingService, '127.0.0.1', 9090, + proto_factory=dd_factory) as c: # ring that doorbell dong = c.ding() print(dong) - with client_context(pp_thrift.PingService, '127.0.0.1', 9090) as c: + pp_factory = TMultiplexingProtocolFactory(binary_factory, "pp_thrift") + with client_context(pp_thrift.PingService, '127.0.0.1', 9090, + proto_factory=pp_factory) as c: # play table tennis like a champ pong = c.ping() print(pong) diff --git a/examples/multiplexer/multiplexed_server.py b/examples/multiplexer/multiplexed_server.py index 90f051b..e649134 100644 --- a/examples/multiplexer/multiplexed_server.py +++ b/examples/multiplexer/multiplexed_server.py @@ -28,8 +28,8 @@ def main(): pp_proc = TProcessor(pp_thrift.PingService, PingDispatcher()) mux_proc = TMultiplexingProcessor() - mux_proc.register_processor(dd_proc) - mux_proc.register_processor(pp_proc) + mux_proc.register_processor("dd_thrift", dd_proc) + mux_proc.register_processor("pp_thrift", pp_proc) server = TThreadedServer(mux_proc, TServerSocket(), iprot_factory=TBinaryProtocolFactory(), diff --git a/tests/test_multiplexed.py b/tests/test_multiplexed.py index ad53ea2..8cfbd29 100644 --- a/tests/test_multiplexed.py +++ b/tests/test_multiplexed.py @@ -9,7 +9,7 @@ import pytest import thriftpy -from thriftpy.protocol import TBinaryProtocolFactory +from thriftpy.protocol import TBinaryProtocolFactory, TMultiplexingProtocolFactory from thriftpy.rpc import client_context from thriftpy.server import TThreadedServer from thriftpy.thrift import TProcessor, TMultiplexingProcessor @@ -37,8 +37,8 @@ def server(request): p2 = TProcessor(mux.ThingTwoService, DispatcherTwo()) mux_proc = TMultiplexingProcessor() - mux_proc.register_processor(p1) - mux_proc.register_processor(p2) + mux_proc.register_processor("ThingOneService", p1) + mux_proc.register_processor("ThingTwoService", p2) _server = TThreadedServer(mux_proc, TServerSocket(unix_socket=sock_path), iprot_factory=TBinaryProtocolFactory(), @@ -58,13 +58,19 @@ def fin(): def client_one(timeout=3000): + binary_factory = TBinaryProtocolFactory() + multiplexing_factory = TMultiplexingProtocolFactory(binary_factory, "ThingOneService") return client_context(mux.ThingOneService, unix_socket=sock_path, - timeout=timeout) + timeout=timeout, + proto_factory=multiplexing_factory) def client_two(timeout=3000): + binary_factory = TBinaryProtocolFactory() + multiplexing_factory = TMultiplexingProtocolFactory(binary_factory, "ThingTwoService") return client_context(mux.ThingTwoService, unix_socket=sock_path, - timeout=timeout) + timeout=timeout, + proto_factory=multiplexing_factory) def test_multiplexed_server(server): diff --git a/thriftpy/protocol/__init__.py b/thriftpy/protocol/__init__.py index 7277adc..78c7adc 100644 --- a/thriftpy/protocol/__init__.py +++ b/thriftpy/protocol/__init__.py @@ -4,6 +4,7 @@ from .binary import TBinaryProtocol, TBinaryProtocolFactory from .json import TJSONProtocol, TJSONProtocolFactory +from .multiplex import TMultiplexingProtocol, TMultiplexingProtocolFactory from thriftpy._compat import PYPY, CYTHON if not PYPY: @@ -19,4 +20,5 @@ __all__ = ['TBinaryProtocol', 'TBinaryProtocolFactory', 'TCyBinaryProtocol', 'TCyBinaryProtocolFactory', - 'TJSONProtocol', 'TJSONProtocolFactory'] + 'TJSONProtocol', 'TJSONProtocolFactory', + 'TMultiplexingProtocol'] diff --git a/thriftpy/protocol/multiplex.py b/thriftpy/protocol/multiplex.py new file mode 100644 index 0000000..7f7947c --- /dev/null +++ b/thriftpy/protocol/multiplex.py @@ -0,0 +1,36 @@ +from thriftpy.thrift import TMultiplexingProcessor + + +class TMultiplexingProtocol(object): + + """ + + Multiplex protocol + + for writing message begin, it prepend the service name to the api + for other functions, it simply delegate to the original protocol + + """ + + def __init__(self, proto, service_name): + self.service_name = service_name + self.proto = proto + + def __getattr__(self, name): + return getattr(self.proto, name) + + def write_message_begin(self, name, ttype, seqid): + self.proto.write_message_begin( + self.service_name + TMultiplexingProcessor.Separator + name, + ttype, seqid) + +class TMultiplexingProtocolFactory(object): + + def __init__(self, proto_factory, service_name): + self.proto_factory = proto_factory + self.service_name = service_name + + def get_protocol(self, trans): + proto = self.proto_factory.get_protocol(trans) + multi_proto = TMultiplexingProtocol(proto, self.service_name) + return multi_proto diff --git a/thriftpy/thrift.py b/thriftpy/thrift.py index 9b64ed9..0a69c56 100644 --- a/thriftpy/thrift.py +++ b/thriftpy/thrift.py @@ -248,41 +248,37 @@ def process(self, iprot, oprot): class TMultiplexingProcessor(TProcessor): + Separator = ":" + processors = {} service_map = {} def __init__(self): pass - def register_processor(self, processor): + def register_processor(self, service_name, processor): service = processor._service - module = inspect.getmodule(processor) - name = '{0}:{1}'.format(module.__name__, service.__name__) - if name in self.processors: + + if service_name in self.processors: raise TApplicationException( type=TApplicationException.INTERNAL_ERROR, - message='processor for `{0}` already registered'.format(name)) - - for srv in service.thrift_services: - if srv in self.service_map: - raise TApplicationException( - type=TApplicationException.INTERNAL_ERROR, - message='cannot multiplex processor for `{0}`; ' - '`{1}` is already a registered method for `{2}`' - .format(name, srv, self.service_map[srv])) - self.service_map[srv] = name + message='processor for `{0}` already registered' + .format(service_name)) - self.processors[name] = processor + self.processors[service_name] = processor def process_in(self, iprot): api, type, seqid = iprot.read_message_begin() - if api not in self.service_map: + + service_name, api = api.split(TMultiplexingProcessor.Separator) + + if service_name not in self.processors: iprot.skip(TType.STRUCT) iprot.read_message_end() e = TApplicationException(TApplicationException.UNKNOWN_METHOD) return api, seqid, e, None # noqa - proc = self.processors[self.service_map[api]] + proc = self.processors[service_name] args = getattr(proc._service, api + "_args")() args.read(iprot) iprot.read_message_end() From f904bf34505f07f0a6ce901e32c762c54e2e4f6a Mon Sep 17 00:00:00 2001 From: xvblack Date: Thu, 26 Mar 2015 15:29:54 -0400 Subject: [PATCH 2/4] Fix flake8 test --- examples/multiplexer/multiplexed_client.py | 13 ++++++++----- tests/test_multiplexed.py | 9 ++++++--- thriftpy/protocol/__init__.py | 2 +- thriftpy/protocol/multiplex.py | 1 + thriftpy/thrift.py | 2 -- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/examples/multiplexer/multiplexed_client.py b/examples/multiplexer/multiplexed_client.py index 0691889..05531db 100644 --- a/examples/multiplexer/multiplexed_client.py +++ b/examples/multiplexer/multiplexed_client.py @@ -2,7 +2,10 @@ import thriftpy from thriftpy.rpc import client_context -from thriftpy.protocol import TBinaryProtocolFactory, TMultiplexingProtocolFactory +from thriftpy.protocol import ( + TBinaryProtocolFactory, + TMultiplexingProtocolFactory + ) dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift") pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift") @@ -11,15 +14,15 @@ def main(): binary_factory = TBinaryProtocolFactory() dd_factory = TMultiplexingProtocolFactory(binary_factory, "dd_thrift") - with client_context(dd_thrift.DingService, '127.0.0.1', 9090, - proto_factory=dd_factory) as c: + with client_context(dd_thrift.DingService, '127.0.0.1', 9090, + proto_factory=dd_factory) as c: # ring that doorbell dong = c.ding() print(dong) pp_factory = TMultiplexingProtocolFactory(binary_factory, "pp_thrift") - with client_context(pp_thrift.PingService, '127.0.0.1', 9090, - proto_factory=pp_factory) as c: + with client_context(pp_thrift.PingService, '127.0.0.1', 9090, + proto_factory=pp_factory) as c: # play table tennis like a champ pong = c.ping() print(pong) diff --git a/tests/test_multiplexed.py b/tests/test_multiplexed.py index 8cfbd29..173ac46 100644 --- a/tests/test_multiplexed.py +++ b/tests/test_multiplexed.py @@ -9,7 +9,8 @@ import pytest import thriftpy -from thriftpy.protocol import TBinaryProtocolFactory, TMultiplexingProtocolFactory +from thriftpy.protocol import TBinaryProtocolFactory +from thrift.protocol import TMultiplexingProtocolFactory from thriftpy.rpc import client_context from thriftpy.server import TThreadedServer from thriftpy.thrift import TProcessor, TMultiplexingProcessor @@ -59,7 +60,8 @@ def fin(): def client_one(timeout=3000): binary_factory = TBinaryProtocolFactory() - multiplexing_factory = TMultiplexingProtocolFactory(binary_factory, "ThingOneService") + multiplexing_factory = TMultiplexingProtocolFactory(binary_factory, + "ThingOneService") return client_context(mux.ThingOneService, unix_socket=sock_path, timeout=timeout, proto_factory=multiplexing_factory) @@ -67,7 +69,8 @@ def client_one(timeout=3000): def client_two(timeout=3000): binary_factory = TBinaryProtocolFactory() - multiplexing_factory = TMultiplexingProtocolFactory(binary_factory, "ThingTwoService") + multiplexing_factory = TMultiplexingProtocolFactory(binary_factory, + "ThingTwoService") return client_context(mux.ThingTwoService, unix_socket=sock_path, timeout=timeout, proto_factory=multiplexing_factory) diff --git a/thriftpy/protocol/__init__.py b/thriftpy/protocol/__init__.py index 78c7adc..37c5d1e 100644 --- a/thriftpy/protocol/__init__.py +++ b/thriftpy/protocol/__init__.py @@ -21,4 +21,4 @@ __all__ = ['TBinaryProtocol', 'TBinaryProtocolFactory', 'TCyBinaryProtocol', 'TCyBinaryProtocolFactory', 'TJSONProtocol', 'TJSONProtocolFactory', - 'TMultiplexingProtocol'] + 'TMultiplexingProtocol', 'TMultiplexingProtocolFactory'] diff --git a/thriftpy/protocol/multiplex.py b/thriftpy/protocol/multiplex.py index 7f7947c..baaada9 100644 --- a/thriftpy/protocol/multiplex.py +++ b/thriftpy/protocol/multiplex.py @@ -24,6 +24,7 @@ def write_message_begin(self, name, ttype, seqid): self.service_name + TMultiplexingProcessor.Separator + name, ttype, seqid) + class TMultiplexingProtocolFactory(object): def __init__(self, proto_factory, service_name): diff --git a/thriftpy/thrift.py b/thriftpy/thrift.py index 0a69c56..ff5534f 100644 --- a/thriftpy/thrift.py +++ b/thriftpy/thrift.py @@ -10,7 +10,6 @@ from __future__ import absolute_import import functools -import inspect from ._compat import init_func_generator, with_metaclass @@ -257,7 +256,6 @@ def __init__(self): pass def register_processor(self, service_name, processor): - service = processor._service if service_name in self.processors: raise TApplicationException( From c1054f59d67db71e698ca13a22685181b9d17102 Mon Sep 17 00:00:00 2001 From: xvblack Date: Thu, 26 Mar 2015 15:38:02 -0400 Subject: [PATCH 3/4] Fix flake8 --- examples/multiplexer/multiplexed_client.py | 8 ++++++-- examples/multiplexer/multiplexed_server.py | 6 ++++-- tests/test_multiplexed.py | 6 ++++-- thriftpy/protocol/multiplex.py | 2 +- thriftpy/thrift.py | 4 ++-- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/examples/multiplexer/multiplexed_client.py b/examples/multiplexer/multiplexed_client.py index 05531db..fc27f33 100644 --- a/examples/multiplexer/multiplexed_client.py +++ b/examples/multiplexer/multiplexed_client.py @@ -10,17 +10,21 @@ dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift") pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift") +DD_SERVICE_NAME = "dd_thrift" +PP_SERVICE_NAME = "pp_thrift" + + def main(): binary_factory = TBinaryProtocolFactory() - dd_factory = TMultiplexingProtocolFactory(binary_factory, "dd_thrift") + dd_factory = TMultiplexingProtocolFactory(binary_factory, DD_SERVICE_NAME) with client_context(dd_thrift.DingService, '127.0.0.1', 9090, proto_factory=dd_factory) as c: # ring that doorbell dong = c.ding() print(dong) - pp_factory = TMultiplexingProtocolFactory(binary_factory, "pp_thrift") + pp_factory = TMultiplexingProtocolFactory(binary_factory, PP_SERVICE_NAME) with client_context(pp_thrift.PingService, '127.0.0.1', 9090, proto_factory=pp_factory) as c: # play table tennis like a champ diff --git a/examples/multiplexer/multiplexed_server.py b/examples/multiplexer/multiplexed_server.py index e649134..485571a 100644 --- a/examples/multiplexer/multiplexed_server.py +++ b/examples/multiplexer/multiplexed_server.py @@ -10,6 +10,8 @@ dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift") pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift") +DD_SERVICE_NAME = "dd_thrift" +PP_SERVICE_NAME = "pp_thrift" class DingDispatcher(object): def ding(self): @@ -28,8 +30,8 @@ def main(): pp_proc = TProcessor(pp_thrift.PingService, PingDispatcher()) mux_proc = TMultiplexingProcessor() - mux_proc.register_processor("dd_thrift", dd_proc) - mux_proc.register_processor("pp_thrift", pp_proc) + mux_proc.register_processor(DD_SERVICE_NAME, dd_proc) + mux_proc.register_processor(PP_SERVICE_NAME, pp_proc) server = TThreadedServer(mux_proc, TServerSocket(), iprot_factory=TBinaryProtocolFactory(), diff --git a/tests/test_multiplexed.py b/tests/test_multiplexed.py index 173ac46..6757a23 100644 --- a/tests/test_multiplexed.py +++ b/tests/test_multiplexed.py @@ -9,8 +9,10 @@ import pytest import thriftpy -from thriftpy.protocol import TBinaryProtocolFactory -from thrift.protocol import TMultiplexingProtocolFactory +from thriftpy.protocol import ( + TBinaryProtocolFactory, + TMultiplexingProtocolFactory + ) from thriftpy.rpc import client_context from thriftpy.server import TThreadedServer from thriftpy.thrift import TProcessor, TMultiplexingProcessor diff --git a/thriftpy/protocol/multiplex.py b/thriftpy/protocol/multiplex.py index baaada9..4a7cce7 100644 --- a/thriftpy/protocol/multiplex.py +++ b/thriftpy/protocol/multiplex.py @@ -21,7 +21,7 @@ def __getattr__(self, name): def write_message_begin(self, name, ttype, seqid): self.proto.write_message_begin( - self.service_name + TMultiplexingProcessor.Separator + name, + self.service_name + TMultiplexingProcessor.SEPARATOR + name, ttype, seqid) diff --git a/thriftpy/thrift.py b/thriftpy/thrift.py index ff5534f..7801b82 100644 --- a/thriftpy/thrift.py +++ b/thriftpy/thrift.py @@ -247,7 +247,7 @@ def process(self, iprot, oprot): class TMultiplexingProcessor(TProcessor): - Separator = ":" + SEPARATOR = ":" processors = {} service_map = {} @@ -268,7 +268,7 @@ def register_processor(self, service_name, processor): def process_in(self, iprot): api, type, seqid = iprot.read_message_begin() - service_name, api = api.split(TMultiplexingProcessor.Separator) + service_name, api = api.split(TMultiplexingProcessor.SEPARATOR) if service_name not in self.processors: iprot.skip(TType.STRUCT) From 73fac66987cefdccc8a693d7d8962b0fc43ae736 Mon Sep 17 00:00:00 2001 From: xvblack Date: Sat, 28 Mar 2015 13:25:46 -0400 Subject: [PATCH 4/4] Fix multiplex Moved service map from class variable to instance variable --- thriftpy/thrift.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/thriftpy/thrift.py b/thriftpy/thrift.py index 7801b82..b570595 100644 --- a/thriftpy/thrift.py +++ b/thriftpy/thrift.py @@ -249,10 +249,8 @@ def process(self, iprot, oprot): class TMultiplexingProcessor(TProcessor): SEPARATOR = ":" - processors = {} - service_map = {} - def __init__(self): + self.processors = {} pass def register_processor(self, service_name, processor):