-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IPC sockets do not reconnect to active bind #2764
Comments
Sounds similar to #2763 can't remember right now, but I think AF_UNIX sockets have the same behaviour as AF_INET w.r.t. dead peer detection. Could you please try with the ZMQ_HEARTBEAT* options? |
@bluca I'll try that. But shouldn't 0-length read from a socket indicate that remote is closed? While debugging I also came a across that ZeroMQ allows the same process to bind to the same IPC address twice. Not sure that should be allowed. |
Yes 0 length read closes the socket: https://github.com/zeromq/libzmq/blob/master/src/stream_engine.cpp#L316 The problem is the kernel won't give you that in a lot of cases. For AF_UNIX as I mentioned I can't remember what's the exact behaviour right now. Regarding double binding, it's because as it is customs the library will unlink the socket file before binding. That's because those files are never automatically cleaned up - that's just how AF_UNIX works I'm afraid. |
I wonder if that problem also happens due to unlinking. I just tried the same test, but instead of running 2 parallel processes I bound 2 sockets to the same address around a single connect. Do you think socket should consider "unlink" as close (e.g. can be implemented via inode)?
Perhaps it can be solved at least at the context level by having a repo of in-use sockets? |
Well, at least plain sockets handle this situation properly: import socket
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind('/tmp/socket')
s.listen(1)
client, addr = s.accept()
client.send(b'123)
os.unlink('/tmp/socket')
client.send(b'456')
client.close() import socket
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect('/tmp/socket')
s.recv(16) # b'123'
s.recv(16) # b'456'
s.recv(16) # b'' |
Here is the test that uses C API:#include <fstream>
#include <sys/stat.h>
#include "testutil.hpp"
static const char* SOCKET_PATH = "/tmp/tester";
static const char* SOCKET_ADDR = "ipc:///tmp/tester";
void test_close_and_unlink()
{
int timeout = 100;
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PUSH);
assert (sb);
int rc = zmq_setsockopt (sb, ZMQ_SNDTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb, SOCKET_ADDR);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PULL);
assert (sc);
rc = zmq_setsockopt (sc, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_connect (sc, SOCKET_ADDR);
assert (rc == 0);
rc = zmq_send (sb, "42", 2, 0);
assert (rc == 2);
char buffer [2];
rc = zmq_recv(sc, buffer, 2, 0);
assert (rc == 2);
rc = zmq_close (sb);
assert (rc == 0);
assert (!std::ifstream(SOCKET_PATH).good());
sb = zmq_socket (ctx, ZMQ_PUSH);
assert (sb);
rc = zmq_setsockopt (sb, ZMQ_SNDTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb, SOCKET_ADDR);
assert (rc == 0);
rc = zmq_send (sb, "42", 2, 0);
assert (rc == 2);
rc = zmq_recv(sc, buffer, 2, 0);
assert (rc == 2);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
void test_unlink_and_close()
{
int timeout = 100;
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PUSH);
assert (sb);
int rc = zmq_setsockopt (sb, ZMQ_SNDTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb, SOCKET_ADDR);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PULL);
assert (sc);
rc = zmq_setsockopt (sc, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_connect (sc, SOCKET_ADDR);
assert (rc == 0);
rc = zmq_send (sb, "42", 2, 0);
assert (rc == 2);
char buffer [2];
rc = zmq_recv(sc, buffer, 2, 0);
assert (rc == 2);
struct stat stat_before_rebind = {};
rc = stat(SOCKET_PATH, &stat_before_rebind);
assert (rc == 0);
sb = zmq_socket (ctx, ZMQ_PUSH);
assert (sb);
rc = zmq_setsockopt (sb, ZMQ_SNDTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb, SOCKET_ADDR); // implicitly unlinks old socket
assert (rc == 0);
struct stat stat_after_rebind = {};
rc = stat(SOCKET_PATH, &stat_after_rebind);
assert (rc == 0);
assert (stat_before_rebind.st_ino != stat_after_rebind.st_ino);
rc = zmq_send (sb, "42", 2, 0);
assert (rc == 2);
rc = zmq_recv(sc, buffer, 2, 0);
assert (rc == 2);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
int main (void)
{
setup_test_environment();
test_close_and_unlink();
test_unlink_and_close();
return 0 ;
} |
It appears there is a race condition due to unlink:
can be executed as:
In which case close of the [1] socket will actually unlink file that belongs to [2] (assuming they are bound to the same address). |
Why does ZeroMQ unlink on close in the first place? If that can be avoided, many other issues can be easily avoided as well. |
Otherwise it's possible to close file that does not belong to the current socket. Refs zeromq#2764
IIRC the problem is that if the socket file is there, binding to it will give an "address in use" error. And if it's not removed before binding, an application that crashes and is restarted (eg: by the init system) will fail with the same error. In the end I think semantically double binding should not be allowed, as it really doesn't make much sense with SOCK_STREAM sockets. |
That's correct, but current behavior is to unlink before bind (acceptable) and on close (incorrect). It's incorrect, because on close it's possible to unlink file that does not belong to the socket anymore.
unlink'n'bind looks like a legitimate approach and should stay for backward compatibility. Perhaps an option should be added for users that desire stricter control over this. |
I see, you are right and it makes sense. Having an additional option sounds good as well. Thanks! |
Fixed by #2765 |
Please use this template for reporting suspected bugs or requests for help.
Issue description
Environment
Minimal test code / Steps to reproduce the issue
Here are 2 python files:
To reproduce the issue:
sub.py
pub.py
simultaneouslyWhat's the actual result? (include assertion message & call stack if applicable)
sub.py
received only one message (expected), but subsequent calls topub.py
will not causesub.py
to receive anything.What's the expected result?
sub.py
should notice that previously connected socket is closed, reconnect, and be able to receive messages from subsequentpub.py
.The text was updated successfully, but these errors were encountered: