Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libzmq binding the sub socket then connecting with pub sockets in a pub/sub pattern doesn't work #1650

Closed
Jeducious opened this issue Jan 19, 2022 · 5 comments
Labels

Comments

@Jeducious
Copy link

What's up?

I've been experimenting with creating an event bus using pyzmq's pub/sub pattern and sockets.

Whilst experimenting with zmq.asyncio based sockets, I have attempted to build a simple test case where the pub sub pattern is implemented so that the zmq.SUB type socket is bound first, then a zmq.PUB socket is created and connected to the sub socket.

When executing a simple test script which sends msgs on the pub and then tries to recv them on the sub, no messages are received. However, if I use purely synchronous code this pattern works just fine.

Also, if I create a forwarder device from zmq.devices and use zmq.asyncio.Context.instance() as the context factory, I can successfully bind the 'in' socket of the forwarder when set to zmq.SUB and successfully receive messages.

How can this be reproduced?

I've written a somewhat simple py script to reproduce the problem. The script tests three different implementations of connecting the pub socket to a bound sub socket.

  1. Synchronous example which works
  2. Asynchronous example which fails due to no msgs reaching the sub socket
  3. Asynchronous example using a zmq.devices forwarder which works

Running the script is simple, ensure you have pyzmq installed in your python3 env, and run the following

python3 test_script.py
""" Contents of test_script.py 

pyzmq pub/sub testing 

This file is a simple test of binding a sub socket and then trying to connect 
a pub socket to that sub socket and sending data from pub to sub """

import asyncio

import zmq,time
import zmq.asyncio

from zmq.devices import ThreadDevice as Device

loop = asyncio.get_event_loop()
EVENT_BUS_PUB_ADDR = "127.0.0.1:9996"
EVENT_BUS_SUB_ADDR = "127.0.0.1:9995"

#Create a fake event bus 
class FakeEventBus:

    def __init__(self):

        self.internal_event_forwarder = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)
        self.internal_event_forwarder.context_factory = self.context_factory
        self.internal_event_forwarder.bind_out(f'tcp://{EVENT_BUS_PUB_ADDR}')
        self.internal_event_forwarder.setsockopt_in(zmq.SUBSCRIBE, b'')
        self.internal_event_forwarder.bind_in(f'tcp://{EVENT_BUS_SUB_ADDR}')
        self.internal_event_forwarder.deamon= 1
        self.internal_event_forwarder.start()
        print(f"DEBUG:{__class__}, starting incoming event forwarder")

    def context_factory(self):

        return zmq.asyncio.Context.instance()

    def close(self):

        del self.internal_event_forwarder

#create a context

z_ctx = zmq.Context()

#create pub and sub sockets
sub = z_ctx.socket(zmq.SUB)
pub = z_ctx.socket(zmq.PUB)

#bind the subscriber socket
sub.bind("tcp://127.0.0.1:9998")
sub.subscribe(b'')

#connect the pub socket to the subscriber
pub.connect("tcp://127.0.0.1:9998")

time.sleep(2)

sequence = [i for i in range(0,100)]
counter = 0

for seq in sequence:

    try:

        print(f"Sending {seq}, how do you read?")
        pub.send_string(f"{seq}")
        print(f"Reading you at :{sub.recv_string(zmq.NOBLOCK)} ")
        
    except zmq.ZMQError as e:

        if e.errno == zmq.EAGAIN: 
            print("Failed to read sequence")
            #send another message
            
            time.sleep(1)
            counter +=1
            
    print(f"FINISHED! Missed {counter} messages ")
    
sub.close()
pub.close()
z_ctx.destroy()

print("NEXT TEST WITH zmq.asyncio sockets")

#get a context
z_ctx_async = zmq.asyncio.Context()

#create a socket for subscribing
sub = z_ctx_async.socket(zmq.SUB)
sub.bind("tcp://127.0.0.1:9997")
sub.subscribe(b'')

#create a socket for publishing
pub = z_ctx_async.socket(zmq.PUB)
pub.connect("tcp://127.0.0.1:9997")

time.sleep(1)

#define a producer to keep sending pub msgs
async def runner():
    
    seq = [i for i in range(0,100)]
    for num in seq: 
        await pub.send_string(f"{num}")
        print(f"Sending {num} how do you read?")
        print(f"Reading you at :{await sub.recv_string()}")

async def test():

    try:
        await asyncio.wait_for(runner(), 3.0)
    except asyncio.TimeoutError:
        print("TEST FAILED for using zmq.asyncio based sockets")
         
test = loop.create_task(test())

loop.run_until_complete(test)

sub.close()
pub.close()
z_ctx_async.destroy()

print("#NEXT TEST WITH zmq forwarder device and zmq.asyncio sockets")

#get a context
z_ctx_async = zmq.asyncio.Context()

#create a forwarder
forwarder =  FakeEventBus()

#create a socket for subscribing
sub = z_ctx_async.socket(zmq.SUB)
sub.connect(f"tcp://{EVENT_BUS_PUB_ADDR}")
sub.subscribe(b'')

#create a socket for publishing
pub = z_ctx_async.socket(zmq.PUB)
pub.connect(f"tcp://{EVENT_BUS_SUB_ADDR}")

time.sleep(1)

#define a producer to keep sending pub msgs
async def runner():
    
    seq = [i for i in range(0,100)]
    for num in seq: 
        await pub.send_string(f"{num}")
        print(f"Sending {num} how do you read?")
        print(f"Reading you at :{await sub.recv_string()}")

async def test():

    try:
        await asyncio.wait_for(runner(), 3.0)
    except asyncio.TimeoutError:
        print("TEST FAILED for using zmq.asyncio based sockets")
    
test = loop.create_task(test())


loop.run_until_complete(test)

sub.close()
pub.close()
z_ctx_async.destroy()

Ideal outcome for this issue

I think this should be investigated as a possible bug.

I cannot see a reason why this should not work. The documentation also makes no mention of limitations on binding sub sockets as far as I can see. Also it is perfectly possible to bind sub/connect pub when using a forwarder device on zmq.asyncio sockets. I doubt this is by design since sockets created with zmq.Context work fine when binding the sub socket.

If I've missed something in the docs or this is an undocumented feature of the design, it would be great to make this known more widely, I'd recommend making changes to the online docs to mention that the zmq.asyncio based sockets have this quirk.

Thanks!

@minrk
Copy link
Member

minrk commented Jan 19, 2022

I think there is a bug in libzmq when PUB connects and SUB binds, where the subscription does not propagate until a poll or recv is called for the first time. This is not a pyzmq issue, but a libzmq one. You can work around it by attempting a sub.poll(timeout=0) before you get started.

This is one of the main differences, though, where the sync code fails most of the time:

The sync code doesn't wait for messages to arrive, due to the use of NOBLOCK:

        pub.send_string(f"{seq}")
        print(f"Reading you at :{sub.recv_string(zmq.NOBLOCK)} ") # raises EAGAIN if the message is not _already waiting in the SUB's queue_

whereas the async implementation waits forver for each to arrive (no NOBLOCK):

        await pub.send_string(f"{num}")
        print(f"Sending {num} how do you read?")
        print(f"Reading you at :{await sub.recv_string()}") # waits indefinitely for a message to arrive

This means the sync code will drop messages, but never get stuck, whereas the async code will block forever if it misses any message.

but in general, there's an issue where you call one recv per send, because if the recv fails because the message isn't ready yet, you don't try again to recv that message. Calling recv with NOBLOCK should expect to drop messages, because it takes a finite time for the message to arrive. But there's no catchup where you recv a potential backlog of messages.

Here's a version of the script that completes successfully, adding only a couple calls to sub.poll(timeout=0) and removing NOBLOCK from the sync recv:

"""Contents of test_script.py

pyzmq pub/sub testing

This file is a simple test of binding a sub socket and then trying to connect
a pub socket to that sub socket and sending data from pub to sub """

import asyncio

import zmq,time
import zmq.asyncio

from zmq.devices import ThreadDevice as Device

loop = asyncio.get_event_loop()
EVENT_BUS_PUB_ADDR = "127.0.0.1:9996"
EVENT_BUS_SUB_ADDR = "127.0.0.1:9995"

#Create a fake event bus
class FakeEventBus:

    def __init__(self):

        self.internal_event_forwarder = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)
        self.internal_event_forwarder.context_factory = self.context_factory
        self.internal_event_forwarder.bind_out(f'tcp://{EVENT_BUS_PUB_ADDR}')
        self.internal_event_forwarder.setsockopt_in(zmq.SUBSCRIBE, b'')
        self.internal_event_forwarder.bind_in(f'tcp://{EVENT_BUS_SUB_ADDR}')
        self.internal_event_forwarder.deamon= 1
        self.internal_event_forwarder.start()
        print(f"DEBUG:{__class__}, starting incoming event forwarder")

    def context_factory(self):

        return zmq.asyncio.Context.instance()

    def close(self):

        del self.internal_event_forwarder

#create a context

z_ctx = zmq.Context()

#create pub and sub sockets
sub = z_ctx.socket(zmq.SUB)
pub = z_ctx.socket(zmq.PUB)

#bind the subscriber socket
sub.bind("tcp://127.0.0.1:9998")
sub.subscribe(b'')

#connect the pub socket to the subscriber
pub.connect("tcp://127.0.0.1:9998")

time.sleep(2)

sequence = [i for i in range(0,100)]
counter = 0
sub.RCVTIMEO = 3000 # 3 seconds
# prod libzmq with a poll
sub.poll(timeout=0)

for seq in sequence:

    try:

        print(f"Sending {seq}, how do you read?")
        pub.send_string(f"{seq}")
        print(f"Reading you at :{sub.recv_string()} ")

    except zmq.ZMQError as e:

        if e.errno == zmq.EAGAIN:
            print("Failed to read sequence")
            #send another message

            time.sleep(1)
            counter +=1

    print(f"FINISHED! Missed {counter} messages ")

sub.close()
pub.close()
z_ctx.destroy()

print("NEXT TEST WITH zmq.asyncio sockets")

#get a context
z_ctx_async = zmq.asyncio.Context()

#create a socket for subscribing
sub = z_ctx_async.socket(zmq.SUB)
sub.bind("tcp://127.0.0.1:9997")
sub.subscribe(b'')

#create a socket for publishing
pub = z_ctx_async.socket(zmq.PUB)
pub.connect("tcp://127.0.0.1:9997")

time.sleep(1)

#define a producer to keep sending pub msgs
async def runner():
    # prod libzmq with a poll to ensure subscription has propagated
    await sub.poll(timeout=0)
    time.sleep(0.1)

    seq = [i for i in range(0,100)]
    for num in seq:
        await pub.send_string(f"{num}")
        print(f"Sending {num} how do you read?")
        print(f"Reading you at :{await sub.recv_string()}")

async def test():

    try:
        await asyncio.wait_for(runner(), 3.0)
    except asyncio.TimeoutError:
        print("TEST FAILED for using zmq.asyncio based sockets")

test = loop.create_task(test())

loop.run_until_complete(test)

sub.close()
pub.close()
z_ctx_async.destroy()

print("#NEXT TEST WITH zmq forwarder device and zmq.asyncio sockets")

#get a context
z_ctx_async = zmq.asyncio.Context()

#create a forwarder
forwarder =  FakeEventBus()

#create a socket for subscribing
sub = z_ctx_async.socket(zmq.SUB)
sub.connect(f"tcp://{EVENT_BUS_PUB_ADDR}")
sub.subscribe(b'')

#create a socket for publishing
pub = z_ctx_async.socket(zmq.PUB)
pub.connect(f"tcp://{EVENT_BUS_SUB_ADDR}")

time.sleep(1)

#define a producer to keep sending pub msgs
async def runner():

    seq = [i for i in range(0,100)]
    for num in seq:
        await pub.send_string(f"{num}")
        print(f"Sending {num} how do you read?")
        print(f"Reading you at :{await sub.recv_string()}")

async def test():

    try:
        await asyncio.wait_for(runner(), 3.0)
    except asyncio.TimeoutError:
        print("TEST FAILED for using zmq.asyncio based sockets")

test = loop.create_task(test())


loop.run_until_complete(test)

sub.close()
pub.close()
z_ctx_async.destroy()

@Jeducious
Copy link
Author

Hi Min, makes sense and I've verified that this code works for me. I think the current operation of the system would be acceptable, but the docs for zmq.asyncio really need to mention this as I believe this will save others time if someone else decides to use a similar approach.

Though I'd not be surprised if this path is particularly not well trodden and the population of devs affected by this is a size of one :)

@minrk
Copy link
Member

minrk commented Jan 20, 2022

It isn't specific to asyncio or even pyzmq. I'm generally reluctant to include docs that are generic to libzmq caveats on pyzmq, since they can easily become out of sync. It's a longstanding known issue on libzmq itself.

I do agree that it's an issue that should probably be in the libzmq docs as a known violation of the libzmq concept that connection direction generally shouldn't matter.

@frobnitzem
Copy link
Contributor

I noticed that the async code can work just fine if you separate the PUB and SUB routines, then add a sleep() to the SUB socket.

"""pub/sub testing

In this example, we challenge the notion that the server should
always bind.  A SUB socket binds an address
and then the PUB socket tries connecting and publishing.
"""

import asyncio

import zmq
from zmq.asyncio import Context

from aiowire import EventLoop, Wire, Call

ctx = Context.instance()

sub = ctx.socket(zmq.SUB)
pub = ctx.socket(zmq.PUB)

url = 'tcp://127.0.0.1:9998'

## note: normally, we'd do this the other way 'round
#bind the subscriber socket
sub.bind("tcp://127.0.0.1:9998")
sub.subscribe(b'')

#connect the pub socket to the subscriber
pub.connect("tcp://127.0.0.1:9998")

#define a producer to keep sending pub msgs
async def produce(ev):
    #print(f"Sending... how do you read?")
    await pub.send_string(f"...")

class Consumer:
    def __init__(self, do_poll):
        self.recvs = 0
        self.do_poll = do_poll

    async def consume(self, ev):
        if self.do_poll and self.recvs == 0:
            ret = await sub.poll(timeout=0)
            print(f"Doing a poll, poll result = {ret}")

        msg = await sub.recv_string()
        print(f"Reading you at: {msg}")
        self.recvs += 1

async def test():
    C = Consumer(False)
    async with EventLoop(3.0) as ev:
        ev.start( Call(asyncio.sleep, 0.5) >> (Wire(produce) * 10) )
        ev.start( Wire(C.consume) * 10 )

    if C.recvs < 10:
        print(f"TEST FAILED, received only {C.recvs}")
    else:
        print("Test passed!")

loop = asyncio.get_event_loop()
test = loop.create_task(test())
loop.run_until_complete(test)

sub.close()
pub.close()
ctx.destroy()

@minrk
Copy link
Member

minrk commented Feb 9, 2022

@frobnitzem Thanks for the example! I don't believe it's the separate routines that makes it work, but rather this poll:

ret = await sub.poll(timeout=0)

which 'prods' the connection, a workaround for zeromq/libzmq#2267

@minrk minrk changed the title zmq.asyncio sockets : binding the sub socket then connecting with pub sockets in a pub/sub pattern doesn't work libzmq binding the sub socket then connecting with pub sockets in a pub/sub pattern doesn't work Mar 18, 2022
@minrk minrk added the question label Mar 18, 2022
@minrk minrk closed this as completed Feb 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants