From 625b6187767451444fb68035883f442393b87f9f Mon Sep 17 00:00:00 2001 From: Frederic Tregon Date: Sat, 2 Apr 2016 18:30:35 +0200 Subject: [PATCH] Fixed ZMQ_REQ_CORRELATE (see pull request #1730) Problem: Since pull request #1730 was merged, protocol for REQ socket is checked at the session level and this check does not take into account the possibility of a request_id being part of the message. Thus the option ZMQ_REQ_CORRELATE would no longer work. This is now fixed: the possiblity of a 4 bytes integer being present before the delimiter frame is taken into account (whether or not this breaks the REQ/REP RFC is another issue). --- Makefile.am | 3 +- src/req.cpp | 15 ++++++++++ src/req.hpp | 1 + tests/test_req_correlate.cpp | 55 ++---------------------------------- tests/test_req_relaxed.cpp | 24 ++++++---------- 5 files changed, 27 insertions(+), 71 deletions(-) diff --git a/Makefile.am b/Makefile.am index c61a43d94a..21140d8974 100644 --- a/Makefile.am +++ b/Makefile.am @@ -732,8 +732,7 @@ check_PROGRAMS = ${test_apps} # Run the test cases TESTS = $(test_apps) -XFAIL_TESTS = tests/test_req_correlate \ - tests/test_req_relaxed +XFAIL_TESTS = if !ON_LINUX XFAIL_TESTS += tests/test_abstract_ipc diff --git a/src/req.cpp b/src/req.cpp index 23263f07a5..dd2448a1c1 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -279,6 +279,21 @@ int zmq::req_session_t::push_msg (msg_t *msg_) { switch (state) { case bottom: + if (msg_->flags () == msg_t::more) { + // In case option ZMQ_CORRELATE is on, allow request_id to be + // transfered as first frame (would be too cumbersome to check + // whether the option is actually on or not). + if (msg_->size () == sizeof (uint32_t)) { + state = request_id; + return session_base_t::push_msg (msg_); + } + else if (msg_->size () == 0) { + state = body; + return session_base_t::push_msg (msg_); + } + } + break; + case request_id: if (msg_->flags () == msg_t::more && msg_->size () == 0) { state = body; return session_base_t::push_msg (msg_); diff --git a/src/req.hpp b/src/req.hpp index 6462720c09..db3f011d40 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -108,6 +108,7 @@ namespace zmq enum { bottom, + request_id, body } state; diff --git a/tests/test_req_correlate.cpp b/tests/test_req_correlate.cpp index 6e4aef5716..0466becffc 100644 --- a/tests/test_req_correlate.cpp +++ b/tests/test_req_correlate.cpp @@ -93,58 +93,7 @@ int main (void) // Receive the rest. s_recv_seq (router, 0, "ABC", "DEF", SEQ_END); - // Send back a bad reply: correct req id - zmq_msg_copy (&msg, &peer_id_msg); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); - zmq_msg_copy (&msg, &req_id_msg); - rc = zmq_msg_send (&msg, router, 0); - assert (rc != -1); - - // Send back a bad reply: wrong req id - zmq_msg_copy (&msg, &peer_id_msg); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); uint32_t bad_req_id = req_id + 1; - zmq_msg_init_data (&msg, &bad_req_id, sizeof (uint32_t), NULL, NULL); - rc = zmq_msg_send (&msg, router, 0); - assert (rc != -1); - - // Send back a bad reply: correct req id, 0 - zmq_msg_copy (&msg, &peer_id_msg); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); - zmq_msg_copy (&msg, &req_id_msg); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); - s_send_seq (router, 0, SEQ_END); - - // Send back a bad reply: correct req id, garbage - zmq_msg_copy (&msg, &peer_id_msg); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); - zmq_msg_copy (&msg, &req_id_msg); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); - s_send_seq (router, "FOO", SEQ_END); - - // Send back a bad reply: wrong req id, 0 - zmq_msg_copy (&msg, &peer_id_msg); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); - zmq_msg_init_data (&msg, &bad_req_id, sizeof (uint32_t), NULL, NULL); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); - s_send_seq (router, 0, SEQ_END); - - // Send back a bad reply: correct req id, garbage, data - zmq_msg_copy (&msg, &peer_id_msg); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); - zmq_msg_copy (&msg, &req_id_msg); - rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); - assert (rc != -1); - s_send_seq (router, "FOO", "DATA", SEQ_END); // Send back a bad reply: wrong req id, 0, data zmq_msg_copy (&msg, &peer_id_msg); @@ -155,7 +104,7 @@ int main (void) assert (rc != -1); s_send_seq (router, 0, "DATA", SEQ_END); - // Send back a good reply. + // Send back a good reply: good req id, 0, data zmq_msg_copy (&msg, &peer_id_msg); rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); assert (rc != -1); @@ -164,7 +113,7 @@ int main (void) assert (rc != -1); s_send_seq (router, 0, "GHI", SEQ_END); - // Receive reply. If any of the other messages got through, we wouldn't see + // Receive reply. If bad reply got through, we wouldn't see // this particular data. s_recv_seq (req, "GHI", SEQ_END); diff --git a/tests/test_req_relaxed.cpp b/tests/test_req_relaxed.cpp index 3b5c1cb455..c530ad136f 100644 --- a/tests/test_req_relaxed.cpp +++ b/tests/test_req_relaxed.cpp @@ -32,7 +32,7 @@ static void bounce (void *socket) { int more; - size_t more_size = sizeof(more); + size_t more_size = sizeof (more); do { zmq_msg_t recv_part, sent_part; int rc = zmq_msg_init (&recv_part); @@ -50,13 +50,13 @@ static void bounce (void *socket) rc = zmq_msg_send (&sent_part, socket, more ? ZMQ_SNDMORE : 0); assert (rc != -1); - zmq_msg_close(&recv_part); + zmq_msg_close (&recv_part); } while (more); } int main (void) { - setup_test_environment(); + setup_test_environment (); void *ctx = zmq_ctx_new (); assert (ctx); @@ -70,10 +70,6 @@ int main (void) rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int)); assert (rc == 0); - int rcvtimeo = 100; - rc = zmq_setsockopt (req, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int)); - assert (rc == 0); - rc = zmq_bind (req, "tcp://127.0.0.1:5555"); assert (rc == 0); @@ -127,7 +123,7 @@ int main (void) s_recv_seq (rep [3], "H", SEQ_END); s_send_seq (rep [3], "BAD", SEQ_END); - // Wait for message to be there. + // Wait for message to be there. msleep (SETTLE_TIME); // Without receiving that reply, send another request on the REQ socket @@ -142,8 +138,8 @@ int main (void) // communication pipes. For example pipe from req to rep[0] should not be // closed after executing Case 1. So rep[0] should be the next to receive, // not rep[1]. - s_send_seq(req, "J", SEQ_END); - s_recv_seq(rep [0], "J", SEQ_END); + s_send_seq (req, "J", SEQ_END); + s_recv_seq (rep [0], "J", SEQ_END); close_zero_linger (req); for (size_t peer = 0; peer < services; peer++) @@ -171,18 +167,14 @@ int main (void) // Setup ROUTER socket as server but do not bind it just yet void *router = zmq_socket (ctx, ZMQ_ROUTER); - assert(router); - - int timeout = 1000; - rc = zmq_setsockopt (router, ZMQ_RCVTIMEO, &timeout, sizeof(int)); - assert (rc == 0); + assert (router); // Send two requests s_send_seq (req, "TO_BE_DISCARDED", SEQ_END); s_send_seq (req, "TO_BE_ANSWERED", SEQ_END); // Bind server allowing it to receive messages - rc = zmq_bind(router, "tcp://127.0.0.1:5555"); + rc = zmq_bind (router, "tcp://127.0.0.1:5555"); assert (rc == 0); // Read the two messages and send them back as is