From d70714e8778c44ef9eeac746a10444ee5f69f800 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Mon, 28 May 2018 18:00:11 +0100 Subject: [PATCH] Problem: can't process ZMTP 3.1 cancel/subscribe commands Solution: add some msg helpers to parse commands, and check for subscribe or cancel commands and process them accordingly in the xpub and xsub classes. --- .gitignore | 1 + Makefile.am | 5 + src/msg.cpp | 37 ++++++ src/msg.hpp | 31 +++++ src/session_base.cpp | 4 +- src/stream_engine.cpp | 61 +++++++--- src/xpub.cpp | 116 +++++++++++------- src/xsub.cpp | 22 +++- tests/CMakeLists.txt | 1 + tests/test_mock_pub_sub.cpp | 226 ++++++++++++++++++++++++++++++++++++ 10 files changed, 439 insertions(+), 65 deletions(-) create mode 100644 tests/test_mock_pub_sub.cpp diff --git a/.gitignore b/.gitignore index 794282e870..032847477f 100644 --- a/.gitignore +++ b/.gitignore @@ -147,6 +147,7 @@ test_app_meta test_security_zap test_socket_null test_xpub_verbose +test_mock_pub_sub unittest_ip_resolver unittest_mtrie unittest_poller diff --git a/Makefile.am b/Makefile.am index 7670fcf117..4b7f08bd40 100644 --- a/Makefile.am +++ b/Makefile.am @@ -445,6 +445,7 @@ test_apps = \ tests/test_bind_after_connect_tcp \ tests/test_sodium \ tests/test_reconnect_ivl \ + tests/test_mock_pub_sub \ tests/test_socket_null UNITY_CPPFLAGS = -I$(top_srcdir)/external/unity -DUNITY_USE_COMMAND_LINE_ARGS -DUNITY_EXCLUDE_FLOAT @@ -699,6 +700,10 @@ tests_test_reconnect_ivl_SOURCES = tests/test_reconnect_ivl.cpp tests_test_reconnect_ivl_LDADD = src/libzmq.la ${UNITY_LIBS} tests_test_reconnect_ivl_CPPFLAGS = ${UNITY_CPPFLAGS} +tests_test_mock_pub_sub_SOURCES = tests/test_mock_pub_sub.cpp +tests_test_mock_pub_sub_LDADD = src/libzmq.la ${UNITY_LIBS} +tests_test_mock_pub_sub_CPPFLAGS = ${UNITY_CPPFLAGS} + if HAVE_CURVE test_apps += \ diff --git a/src/msg.cpp b/src/msg.cpp index 16ad1e541e..5e32341bd0 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -444,6 +444,43 @@ bool zmq::msg_t::is_leave () const return _u.base.type == type_leave; } +bool zmq::msg_t::is_ping () const +{ + return (_u.base.flags & CMD_TYPE_MASK) == ping; +} + +bool zmq::msg_t::is_pong () const +{ + return (_u.base.flags & CMD_TYPE_MASK) == pong; +} + +size_t zmq::msg_t::command_body_size () const +{ + if (this->is_ping () || this->is_pong ()) + return this->size () - ping_cmd_name_size; + if (this->is_subscribe ()) + return this->size () - sub_cmd_name_size; + if (this->is_cancel ()) + return this->size () - cancel_cmd_name_size; + + return 0; +} + +void *zmq::msg_t::command_body () +{ + unsigned char *data = NULL; + if (this->is_ping () || this->is_pong ()) + data = + static_cast (this->data ()) + ping_cmd_name_size; + if (this->is_subscribe ()) + data = static_cast (this->data ()) + sub_cmd_name_size; + if (this->is_cancel ()) + data = + static_cast (this->data ()) + cancel_cmd_name_size; + + return data; +} + void zmq::msg_t::add_refs (int refs_) { zmq_assert (refs_ >= 0); diff --git a/src/msg.hpp b/src/msg.hpp index e595ef3f9f..c4407c286c 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -39,6 +39,9 @@ #include "atomic_counter.hpp" #include "metadata.hpp" +// bits 2-5 +#define CMD_TYPE_MASK 0x1c + // Signature for free function to deallocate the message content. // Note that it has to be declared as "C" so that it is the same as // zmq_free_fn defined in zmq.h. @@ -75,6 +78,12 @@ class msg_t { more = 1, // Followed by more parts command = 2, // Command frame (see ZMTP spec) + // Command types, use only bits 2-5 and compare with ==, not bitwise, + // a command can never be of more that one type at the same time + ping = 4, + pong = 8, + subscribe = 12, + cancel = 16, credential = 32, routing_id = 64, shared = 128 @@ -115,6 +124,22 @@ class msg_t bool is_delimiter () const; bool is_join () const; bool is_leave () const; + bool is_ping () const; + bool is_pong () const; + + // These are called on each message received by the session_base class, + // so get them inlined to avoid the overhead of 2 function calls per msg + inline bool is_subscribe () const + { + return (_u.base.flags & CMD_TYPE_MASK) == subscribe; + } + inline bool is_cancel () const + { + return (_u.base.flags & CMD_TYPE_MASK) == cancel; + } + + size_t command_body_size () const; + void *command_body (); bool is_vsm () const; bool is_cmsg () const; bool is_lmsg () const; @@ -145,6 +170,12 @@ class msg_t max_vsm_size = msg_t_size - (sizeof (metadata_t *) + 3 + 16 + sizeof (uint32_t)) }; + enum + { + ping_cmd_name_size = 5, // 4PING + cancel_cmd_name_size = 7, // 6CANCEL + sub_cmd_name_size = 10 // 9SUBSCRIBE + }; private: zmq::atomic_counter_t *refcnt (); diff --git a/src/session_base.cpp b/src/session_base.cpp index db899eab30..c924a81494 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -163,7 +163,9 @@ int zmq::session_base_t::pull_msg (msg_t *msg_) int zmq::session_base_t::push_msg (msg_t *msg_) { - if (msg_->flags () & msg_t::command) + // pass subscribe/cancel to the sockets + if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe () + && !msg_->is_cancel ()) return 0; if (_pipe && _pipe->write (msg_)) { int rc = msg_->init (); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 94ceff20df..9dc966961f 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -1038,18 +1038,20 @@ void zmq::stream_engine_t::timer_event (int id_) int zmq::stream_engine_t::produce_ping_message (msg_t *msg_) { int rc = 0; + // 16-bit TTL + \4PING == 7 + const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2; zmq_assert (_mechanism != NULL); - // 16-bit TTL + \4PING == 7 - rc = msg_->init_size (7); + rc = msg_->init_size (ping_ttl_len); errno_assert (rc == 0); msg_->set_flags (msg_t::command); // Copy in the command message - memcpy (msg_->data (), "\4PING", 5); + memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size); uint16_t ttl_val = htons (_options.heartbeat_ttl); - memcpy ((static_cast (msg_->data ())) + 5, &ttl_val, - sizeof (ttl_val)); + memcpy ((static_cast (msg_->data ())) + + msg_t::ping_cmd_name_size, + &ttl_val, sizeof (ttl_val)); rc = _mechanism->encode (msg_); _next_msg = &stream_engine_t::pull_and_encode; @@ -1075,11 +1077,17 @@ int zmq::stream_engine_t::produce_pong_message (msg_t *msg_) int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_) { - if (memcmp (msg_->data (), "\4PING", 5) == 0) { + if (msg_->is_ping ()) { + // 16-bit TTL + \4PING == 7 + const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2; + const size_t ping_max_ctx_len = 16; uint16_t remote_heartbeat_ttl; + // Get the remote heartbeat TTL to setup the timer memcpy (&remote_heartbeat_ttl, - static_cast (msg_->data ()) + 5, 2); + static_cast (msg_->data ()) + + msg_t::ping_cmd_name_size, + ping_ttl_len - msg_t::ping_cmd_name_size); remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl); // The remote heartbeat is in 10ths of a second // so we multiply it by 100 to get the timer interval in ms. @@ -1095,14 +1103,18 @@ int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_) // here and store it. Truncate it if it's too long. // Given the engine goes straight to out_event, sequential PINGs will // not be a problem. - size_t context_len = msg_->size () - 7 > 16 ? 16 : msg_->size () - 7; - int rc = _pong_msg.init_size (5 + context_len); + size_t context_len = msg_->size () - ping_ttl_len > ping_max_ctx_len + ? ping_max_ctx_len + : msg_->size () - ping_ttl_len; + int rc = _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len); errno_assert (rc == 0); _pong_msg.set_flags (msg_t::command); - memcpy (_pong_msg.data (), "\4PONG", 5); + memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size); if (context_len > 0) - memcpy ((static_cast (_pong_msg.data ())) + 5, - (static_cast (msg_->data ())) + 7, context_len); + memcpy ((static_cast (_pong_msg.data ())) + + msg_t::ping_cmd_name_size, + (static_cast (msg_->data ())) + ping_ttl_len, + context_len); _next_msg = &stream_engine_t::produce_pong_message; out_event (); @@ -1115,15 +1127,28 @@ int zmq::stream_engine_t::process_command_message (msg_t *msg_) { const uint8_t cmd_name_size = *(static_cast (msg_->data ())); + const size_t ping_name_size = msg_t::ping_cmd_name_size - 1; + const size_t sub_name_size = msg_t::sub_cmd_name_size - 1; + const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1; // Malformed command - if (msg_->size () < cmd_name_size + sizeof (cmd_name_size)) + if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size))) return -1; - const uint8_t *cmd_name = - (static_cast (msg_->data ())) + 1; - if (cmd_name_size == 4 - && (memcmp (cmd_name, "PING", cmd_name_size) == 0 - || memcmp (cmd_name, "PONG", cmd_name_size) == 0)) + uint8_t *cmd_name = (static_cast (msg_->data ())) + 1; + if (cmd_name_size == ping_name_size + && memcmp (cmd_name, "PING", cmd_name_size) == 0) + msg_->set_flags (zmq::msg_t::ping); + if (cmd_name_size == ping_name_size + && memcmp (cmd_name, "PONG", cmd_name_size) == 0) + msg_->set_flags (zmq::msg_t::pong); + if (cmd_name_size == sub_name_size + && memcmp (cmd_name, "SUBSCRIBE", cmd_name_size) == 0) + msg_->set_flags (zmq::msg_t::subscribe); + if (cmd_name_size == cancel_name_size + && memcmp (cmd_name, "CANCEL", cmd_name_size) == 0) + msg_->set_flags (zmq::msg_t::cancel); + + if (msg_->is_ping () || msg_->is_pong ()) return process_heartbeat_message (msg_); return 0; diff --git a/src/xpub.cpp b/src/xpub.cpp index d35a26c083..c63939ff39 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -88,57 +88,89 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) // There are some subscriptions waiting. Let's process them. msg_t sub; while (pipe_->read (&sub)) { - // Apply the subscription to the trie - const unsigned char *const data = - static_cast (sub.data ()); - const size_t size = sub.size (); metadata_t *metadata = sub.metadata (); - if (size > 0 && (*data == 0 || *data == 1)) { - if (_manual) { - // Store manual subscription to use on termination - if (*data == 0) - _manual_subscriptions.rm (data + 1, size - 1, pipe_); - else - _manual_subscriptions.add (data + 1, size - 1, pipe_); + unsigned char *msg_data = static_cast (sub.data ()), + *data = NULL; + size_t size = 0; + bool subscribe = false; - _pending_pipes.push_back (pipe_); - _pending_data.push_back (blob_t (data, size)); - if (metadata) - metadata->add_ref (); - _pending_metadata.push_back (metadata); - _pending_flags.push_back (0); - } else { - bool notify; - if (*data == 0) { - mtrie_t::rm_result rm_result = - _subscriptions.rm (data + 1, size - 1, pipe_); - // TODO reconsider what to do if rm_result == mtrie_t::not_found - notify = - rm_result != mtrie_t::values_remain || _verbose_unsubs; - } else { - bool first_added = - _subscriptions.add (data + 1, size - 1, pipe_); - notify = first_added || _verbose_subs; - } - - // If the request was a new subscription, or the subscription - // was removed, or verbose mode is enabled, store it so that - // it can be passed to the user on next recv call. - if (options.type == ZMQ_XPUB && notify) { - _pending_data.push_back (blob_t (data, size)); - if (metadata) - metadata->add_ref (); - _pending_metadata.push_back (metadata); - _pending_flags.push_back (0); - } + // Apply the subscription to the trie + if (sub.is_subscribe () || sub.is_cancel ()) { + data = static_cast (sub.command_body ()); + size = sub.command_body_size (); + subscribe = sub.is_subscribe (); + } else if (sub.size () > 0) { + unsigned char first = *msg_data; + if (first == 0 || first == 1) { + data = msg_data + 1; + size = sub.size () - 1; + subscribe = first == 1; } } else { // Process user message coming upstream from xsub socket - _pending_data.push_back (blob_t (data, size)); + _pending_data.push_back (blob_t (msg_data, sub.size ())); if (metadata) metadata->add_ref (); _pending_metadata.push_back (metadata); _pending_flags.push_back (sub.flags ()); + sub.close (); + continue; + } + + if (_manual) { + // Store manual subscription to use on termination + if (!subscribe) + _manual_subscriptions.rm (data, size, pipe_); + else + _manual_subscriptions.add (data, size, pipe_); + + _pending_pipes.push_back (pipe_); + + // ZMTP 3.1 hack: we need to support sub/cancel commands, but + // we can't give them back to userspace as it would be an API + // breakage since the payload of the message is completely + // different. Manually craft an old-style message instead. + data = data - 1; + size = size + 1; + if (subscribe) + *data = 1; + else + *data = 0; + + _pending_data.push_back (blob_t (data, size)); + if (metadata) + metadata->add_ref (); + _pending_metadata.push_back (metadata); + _pending_flags.push_back (0); + } else { + bool notify; + if (!subscribe) { + mtrie_t::rm_result rm_result = + _subscriptions.rm (data, size, pipe_); + // TODO reconsider what to do if rm_result == mtrie_t::not_found + notify = rm_result != mtrie_t::values_remain || _verbose_unsubs; + } else { + bool first_added = _subscriptions.add (data, size, pipe_); + notify = first_added || _verbose_subs; + } + + // If the request was a new subscription, or the subscription + // was removed, or verbose mode is enabled, store it so that + // it can be passed to the user on next recv call. + if (options.type == ZMQ_XPUB && notify) { + data = data - 1; + size = size + 1; + if (subscribe) + *data = 1; + else + *data = 0; + + _pending_data.push_back (blob_t (data, size)); + if (metadata) + metadata->add_ref (); + _pending_metadata.push_back (metadata); + _pending_flags.push_back (0); + } } sub.close (); } diff --git a/src/xsub.cpp b/src/xsub.cpp index 2de77a49d7..a9ff5f816c 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -96,18 +96,32 @@ int zmq::xsub_t::xsend (msg_t *msg_) size_t size = msg_->size (); unsigned char *data = static_cast (msg_->data ()); - if (size > 0 && *data == 1) { + if (msg_->is_subscribe () || (size > 0 && *data == 1)) { // Process subscribe message // This used to filter out duplicate subscriptions, // however this is alread done on the XPUB side and // doing it here as well breaks ZMQ_XPUB_VERBOSE // when there are forwarding devices involved. - _subscriptions.add (data + 1, size - 1); + if (msg_->is_subscribe ()) { + data = static_cast (msg_->command_body ()); + size = msg_->command_body_size (); + } else { + data = data + 1; + size = size - 1; + } + _subscriptions.add (data, size); return _dist.send_to_all (msg_); } - if (size > 0 && *data == 0) { + if (msg_->is_cancel () || (size > 0 && *data == 0)) { // Process unsubscribe message - if (_subscriptions.rm (data + 1, size - 1)) + if (msg_->is_cancel ()) { + data = static_cast (msg_->command_body ()); + size = msg_->command_body_size (); + } else { + data = data + 1; + size = size - 1; + } + if (_subscriptions.rm (data, size)) return _dist.send_to_all (msg_); } else // User message sent upstream to XPUB socket diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index df7cb1d287..591da07fa2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -72,6 +72,7 @@ set(tests test_monitor test_socket_null test_reconnect_ivl + test_mock_pub_sub ) if(ZMQ_HAVE_CURVE) list(APPEND tests diff --git a/tests/test_mock_pub_sub.cpp b/tests/test_mock_pub_sub.cpp new file mode 100644 index 0000000000..13ca7fd2e9 --- /dev/null +++ b/tests/test_mock_pub_sub.cpp @@ -0,0 +1,226 @@ +/* + Copyright (c) 2018 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" +#if defined(ZMQ_HAVE_WINDOWS) +#include +#include +#include +#define close closesocket +typedef SOCKET raw_socket; +#else +#include +typedef int raw_socket; +#endif + + +void setUp () +{ + setup_test_context (); +} + +void tearDown () +{ + teardown_test_context (); +} + +// Read one event off the monitor socket; return value and address +// by reference, if not null, and event number by value. Returns -1 +// in case of error. + +static int get_monitor_event (void *monitor_) +{ + for (int i = 0; i < 2; i++) { + // First frame in message contains event number and value + zmq_msg_t msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); + if (zmq_msg_recv (&msg, monitor_, ZMQ_DONTWAIT) == -1) { + msleep (SETTLE_TIME); + continue; // Interrupted, presumably + } + TEST_ASSERT_TRUE (zmq_msg_more (&msg)); + + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + uint16_t event = *(uint16_t *) (data); + + // Second frame in message contains event address + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); + if (zmq_msg_recv (&msg, monitor_, 0) == -1) { + return -1; // Interrupted, presumably + } + TEST_ASSERT_FALSE (zmq_msg_more (&msg)); + + return event; + } + return -1; +} + +static void recv_with_retry (raw_socket fd_, char *buffer_, int bytes_) +{ + int received = 0; + while (true) { + int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + recv (fd_, buffer_ + received, bytes_ - received, 0)); + TEST_ASSERT_GREATER_THAN_INT (0, rc); + received += rc; + TEST_ASSERT_LESS_OR_EQUAL_INT (bytes_, received); + if (received == bytes_) + break; + } +} + +static void mock_handshake (raw_socket fd_) +{ + const uint8_t zmtp_greeting[33] = {0xff, 0, 0, 0, 0, 0, 0, 0, 0, + 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0}; + char buffer[128]; + memset (buffer, 0, sizeof (buffer)); + memcpy (buffer, zmtp_greeting, sizeof (zmtp_greeting)); + + int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 64, 0)); + TEST_ASSERT_EQUAL_INT (64, rc); + + recv_with_retry (fd_, buffer, 64); + + const uint8_t zmtp_ready[27] = { + 4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', + 't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'}; + + memset (buffer, 0, sizeof (buffer)); + memcpy (buffer, zmtp_ready, 27); + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 27, 0)); + TEST_ASSERT_EQUAL_INT (27, rc); + + // greeting - XPUB so one extra byte + recv_with_retry (fd_, buffer, 28); +} + +static void prep_server_socket (void **server_out_, + void **mon_out_, + char *endpoint_, + size_t ep_length_) +{ + // We'll be using this socket in raw mode + void *server = test_context_socket (ZMQ_XPUB); + + int value = 0; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value))); + + bind_loopback_ipv4 (server, endpoint_, ep_length_); + + // Create and connect a socket for collecting monitor events on xpub + void *server_mon = test_context_socket (ZMQ_PAIR); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor ( + server, "inproc://monitor-dealer", + ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED)); + + // Connect to the inproc endpoint so we'll get events + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_connect (server_mon, "inproc://monitor-dealer")); + + *server_out_ = server; + *mon_out_ = server_mon; +} + +static void test_mock_sub (bool sub_command) +{ + int rc; + char my_endpoint[MAX_SOCKET_STRING]; + + void *server, *server_mon; + prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING); + + struct sockaddr_in ip4addr; + raw_socket s; + + ip4addr.sin_family = AF_INET; + ip4addr.sin_port = htons (atoi (strrchr (my_endpoint, ':') + 1)); +#if defined(ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600) + ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1"); +#else + inet_pton (AF_INET, "127.0.0.1", &ip4addr.sin_addr); +#endif + + s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + connect (s, (struct sockaddr *) &ip4addr, sizeof ip4addr)); + TEST_ASSERT_GREATER_THAN_INT (-1, rc); + + // Mock a ZMTP 3 client so we can forcibly try sub commands + mock_handshake (s); + + // By now everything should report as connected + rc = get_monitor_event (server_mon); + TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc); + + if (sub_command) { + const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S', + 'C', 'R', 'I', 'B', 'E', 'A'}; + rc = + TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 13, 0)); + TEST_ASSERT_EQUAL_INT (13, rc); + } else { + const uint8_t sub[4] = {0, 2, 1, 'A'}; + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 4, 0)); + TEST_ASSERT_EQUAL_INT (4, rc); + } + + char buffer[16]; + memset (buffer, 0, sizeof (buffer)); + rc = zmq_recv (server, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2)); + + rc = zmq_send (server, "ALOL", 4, 0); + TEST_ASSERT_EQUAL_INT (4, rc); + + memset (buffer, 0, sizeof (buffer)); + recv_with_retry (s, buffer, 6); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6)); + + close (s); + + test_context_socket_close (server); + test_context_socket_close (server_mon); +} + +void test_mock_sub_command () +{ + test_mock_sub (true); +} + +void test_mock_sub_legacy () +{ + test_mock_sub (false); +} + +int main (void) +{ + setup_test_environment (); + + UNITY_BEGIN (); + + RUN_TEST (test_mock_sub_command); + RUN_TEST (test_mock_sub_legacy); + + return UNITY_END (); +}