Skip to content
This repository has been archived by the owner on Dec 10, 2018. It is now read-only.

Multiplex using service_name #117

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions examples/multiplexer/multiplexed_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,31 @@

import thriftpy
from thriftpy.rpc import client_context
from thriftpy.protocol import (
TBinaryProtocolFactory,
TMultiplexingProtocolFactory
)

dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift")
pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift")

DD_SERVICE_NAME = "dd_thrift"
PP_SERVICE_NAME = "pp_thrift"



def main():
with client_context(dd_thrift.DingService, '127.0.0.1', 9090) as c:
binary_factory = TBinaryProtocolFactory()
dd_factory = TMultiplexingProtocolFactory(binary_factory, DD_SERVICE_NAME)
with client_context(dd_thrift.DingService, '127.0.0.1', 9090,
proto_factory=dd_factory) as c:
# ring that doorbell
dong = c.ding()
print(dong)

with client_context(pp_thrift.PingService, '127.0.0.1', 9090) as c:
pp_factory = TMultiplexingProtocolFactory(binary_factory, PP_SERVICE_NAME)
with client_context(pp_thrift.PingService, '127.0.0.1', 9090,
proto_factory=pp_factory) as c:
# play table tennis like a champ
pong = c.ping()
print(pong)
Expand Down
6 changes: 4 additions & 2 deletions examples/multiplexer/multiplexed_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift")
pp_thrift = thriftpy.load("pingpong.thrift", module_name="pp_thrift")

DD_SERVICE_NAME = "dd_thrift"
PP_SERVICE_NAME = "pp_thrift"

class DingDispatcher(object):
def ding(self):
Expand All @@ -28,8 +30,8 @@ def main():
pp_proc = TProcessor(pp_thrift.PingService, PingDispatcher())

mux_proc = TMultiplexingProcessor()
mux_proc.register_processor(dd_proc)
mux_proc.register_processor(pp_proc)
mux_proc.register_processor(DD_SERVICE_NAME, dd_proc)
mux_proc.register_processor(PP_SERVICE_NAME, pp_proc)

server = TThreadedServer(mux_proc, TServerSocket(),
iprot_factory=TBinaryProtocolFactory(),
Expand Down
21 changes: 16 additions & 5 deletions tests/test_multiplexed.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import pytest

import thriftpy
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.protocol import (
TBinaryProtocolFactory,
TMultiplexingProtocolFactory
)
from thriftpy.rpc import client_context
from thriftpy.server import TThreadedServer
from thriftpy.thrift import TProcessor, TMultiplexingProcessor
Expand Down Expand Up @@ -37,8 +40,8 @@ def server(request):
p2 = TProcessor(mux.ThingTwoService, DispatcherTwo())

mux_proc = TMultiplexingProcessor()
mux_proc.register_processor(p1)
mux_proc.register_processor(p2)
mux_proc.register_processor("ThingOneService", p1)
mux_proc.register_processor("ThingTwoService", p2)

_server = TThreadedServer(mux_proc, TServerSocket(unix_socket=sock_path),
iprot_factory=TBinaryProtocolFactory(),
Expand All @@ -58,13 +61,21 @@ def fin():


def client_one(timeout=3000):
binary_factory = TBinaryProtocolFactory()
multiplexing_factory = TMultiplexingProtocolFactory(binary_factory,
"ThingOneService")
return client_context(mux.ThingOneService, unix_socket=sock_path,
timeout=timeout)
timeout=timeout,
proto_factory=multiplexing_factory)


def client_two(timeout=3000):
binary_factory = TBinaryProtocolFactory()
multiplexing_factory = TMultiplexingProtocolFactory(binary_factory,
"ThingTwoService")
return client_context(mux.ThingTwoService, unix_socket=sock_path,
timeout=timeout)
timeout=timeout,
proto_factory=multiplexing_factory)


def test_multiplexed_server(server):
Expand Down
4 changes: 3 additions & 1 deletion thriftpy/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .binary import TBinaryProtocol, TBinaryProtocolFactory
from .json import TJSONProtocol, TJSONProtocolFactory
from .multiplex import TMultiplexingProtocol, TMultiplexingProtocolFactory

from thriftpy._compat import PYPY, CYTHON
if not PYPY:
Expand All @@ -19,4 +20,5 @@

__all__ = ['TBinaryProtocol', 'TBinaryProtocolFactory',
'TCyBinaryProtocol', 'TCyBinaryProtocolFactory',
'TJSONProtocol', 'TJSONProtocolFactory']
'TJSONProtocol', 'TJSONProtocolFactory',
'TMultiplexingProtocol', 'TMultiplexingProtocolFactory']
37 changes: 37 additions & 0 deletions thriftpy/protocol/multiplex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from thriftpy.thrift import TMultiplexingProcessor


class TMultiplexingProtocol(object):

"""

Multiplex protocol

for writing message begin, it prepend the service name to the api
for other functions, it simply delegate to the original protocol

"""

def __init__(self, proto, service_name):
self.service_name = service_name
self.proto = proto

def __getattr__(self, name):
return getattr(self.proto, name)

def write_message_begin(self, name, ttype, seqid):
self.proto.write_message_begin(
self.service_name + TMultiplexingProcessor.SEPARATOR + name,
ttype, seqid)


class TMultiplexingProtocolFactory(object):

def __init__(self, proto_factory, service_name):
self.proto_factory = proto_factory
self.service_name = service_name

def get_protocol(self, trans):
proto = self.proto_factory.get_protocol(trans)
multi_proto = TMultiplexingProtocol(proto, self.service_name)
return multi_proto
34 changes: 13 additions & 21 deletions thriftpy/thrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from __future__ import absolute_import

import functools
import inspect

from ._compat import init_func_generator, with_metaclass

Expand Down Expand Up @@ -248,41 +247,34 @@ def process(self, iprot, oprot):


class TMultiplexingProcessor(TProcessor):
processors = {}
service_map = {}
SEPARATOR = ":"

def __init__(self):
self.processors = {}
pass

def register_processor(self, processor):
service = processor._service
module = inspect.getmodule(processor)
name = '{0}:{1}'.format(module.__name__, service.__name__)
if name in self.processors:
def register_processor(self, service_name, processor):

if service_name in self.processors:
raise TApplicationException(
type=TApplicationException.INTERNAL_ERROR,
message='processor for `{0}` already registered'.format(name))

for srv in service.thrift_services:
if srv in self.service_map:
raise TApplicationException(
type=TApplicationException.INTERNAL_ERROR,
message='cannot multiplex processor for `{0}`; '
'`{1}` is already a registered method for `{2}`'
.format(name, srv, self.service_map[srv]))
self.service_map[srv] = name
message='processor for `{0}` already registered'
.format(service_name))

self.processors[name] = processor
self.processors[service_name] = processor

def process_in(self, iprot):
api, type, seqid = iprot.read_message_begin()
if api not in self.service_map:

service_name, api = api.split(TMultiplexingProcessor.SEPARATOR)

if service_name not in self.processors:
iprot.skip(TType.STRUCT)
iprot.read_message_end()
e = TApplicationException(TApplicationException.UNKNOWN_METHOD)
return api, seqid, e, None # noqa

proc = self.processors[self.service_map[api]]
proc = self.processors[service_name]
args = getattr(proc._service, api + "_args")()
args.read(iprot)
iprot.read_message_end()
Expand Down