From 5981a26376c6bcbac6f9e90f2e756beac5cbdd2f Mon Sep 17 00:00:00 2001 From: Sappo Date: Fri, 22 Nov 2013 20:26:41 +0100 Subject: [PATCH 1/2] wrapped message data into an own structure --- include/zyre.h | 33 +++++++++- src/zyre.c | 164 +++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 167 insertions(+), 30 deletions(-) diff --git a/include/zyre.h b/include/zyre.h index 40c49ee022..f25f5bd5a9 100644 --- a/include/zyre.h +++ b/include/zyre.h @@ -52,6 +52,8 @@ extern "C" { typedef struct _zyre_t zyre_t; +typedef struct _zyre_msg_t zyre_msg_t; + // @interface // Constructor, creates a new Zyre node. Note that until you start the // node it is silent and invisible to other nodes on the network. @@ -63,6 +65,10 @@ CZMQ_EXPORT zyre_t * CZMQ_EXPORT void zyre_destroy (zyre_t **self_p); +// Destructor, destroys a Zyre message. +CZMQ_EXPORT void + zyre_msg_destroy (zyre_msg_t **self_p); + // Set node header; these are provided to other nodes during discovery // and come in each ENTER message. CZMQ_EXPORT void @@ -86,17 +92,17 @@ CZMQ_EXPORT int // Receive next message from network; the message may be a control // message (ENTER, EXIT, JOIN, LEAVE) or data (WHISPER, SHOUT). // Returns zmsg_t object, or NULL if interrupted -CZMQ_EXPORT zmsg_t * +CZMQ_EXPORT zyre_msg_t * zyre_recv (zyre_t *self); // Send message to single peer; peer ID is first frame in message // Destroys message after sending CZMQ_EXPORT int - zyre_whisper (zyre_t *self, zmsg_t **msg_p); + zyre_whisper (zyre_t *self, zmsg_t **msg_p, char *peerid); // Send message to a group of peers CZMQ_EXPORT int - zyre_shout (zyre_t *self, zmsg_t **msg_p); + zyre_shout (zyre_t *self, zmsg_t **msg_p, char *group); // Return handle to the Zyre node, for polling CZMQ_EXPORT void * @@ -105,6 +111,27 @@ CZMQ_EXPORT void * // Self test of this class CZMQ_EXPORT void zyre_test (bool verbose); + +// Gets the message type +CZMQ_EXPORT char * + zyre_msg_cmd (zyre_msg_t *self); + +// Gets the peer that did send the message +CZMQ_EXPORT char * + zyre_msg_peerid (zyre_msg_t *self); + +// Gets the headers that enter send +CZMQ_EXPORT zhash_t * + zyre_msg_headers (zyre_msg_t *self); + +// Gets the group name that shout send +CZMQ_EXPORT char * + zyre_msg_group (zyre_msg_t *self); + +// Gets the actual message data +CZMQ_EXPORT zmsg_t * + zyre_msg_data (zyre_msg_t *self); + // @end #ifdef __cplusplus diff --git a/src/zyre.c b/src/zyre.c index 3c6f89b233..897ad264a4 100644 --- a/src/zyre.c +++ b/src/zyre.c @@ -69,6 +69,17 @@ struct _zyre_t { void *pipe; // Pipe through to node }; +// --------------------------------------------------------------------- +// Structure of zyre_msg class + +struct _zyre_msg_t { + char *command; // command type of the message + char *peerid; // uuid from router + zhash_t *headers; // headers send by enter + char *group; // group name send by shout + zmsg_t *data; // actual message data +}; + // --------------------------------------------------------------------- // Constructor, creates a new Zyre node. Note that until you start the @@ -115,6 +126,37 @@ zyre_destroy (zyre_t **self_p) } } +// --------------------------------------------------------------------- +// Constructor, creates a new Zyre message. + +zyre_msg_t * +zyre_msg_new () +{ + zyre_msg_t *self = (zyre_msg_t *) zmalloc(sizeof (zyre_t)); + assert (self); + + return self; +} + +// --------------------------------------------------------------------- +// Destructor, destroys a Zyre message. + +void +zyre_msg_destroy (zyre_msg_t **self_p) +{ + assert (self_p); + if (*self_p) { + zyre_msg_t *self = *self_p; + free (self->command); + free (self->peerid); + if (self->headers) + zhash_destroy (&self->headers); + if (self->data) + zmsg_destroy (&self->data); + free (self->group); + } +} + // --------------------------------------------------------------------- // Set node header; these are provided to other nodes during discovery @@ -182,12 +224,32 @@ zyre_leave (zyre_t *self, const char *group) // message (ENTER, EXIT, JOIN, LEAVE) or data (WHISPER, SHOUT). // Returns zmsg_t object, or NULL if interrupted -zmsg_t * +zyre_msg_t * zyre_recv (zyre_t *self) { assert (self); zmsg_t *msg = zmsg_recv (self->pipe); - return msg; + + // recveice message, get command and peerid + zyre_msg_t *zyre_msg = zyre_msg_new (); + zyre_msg->command = zmsg_popstr (msg); + zyre_msg->peerid = zmsg_popstr (msg); + + if (streq (zyre_msg->command, "ENTER")) { + // get and unpack headers + zframe_t *headers_packed = zmsg_pop (msg); + zyre_msg->headers = zhash_dup (zhash_unpack (headers_packed)); + // cleanup + zframe_destroy (&headers_packed); + } else if (streq (zyre_msg->command, "SHOUT")) { + zyre_msg->group = zmsg_popstr (msg); + } + + + // rest of the message is data + zyre_msg->data = msg; + + return zyre_msg; } @@ -196,10 +258,11 @@ zyre_recv (zyre_t *self) // Destroys message after sending int -zyre_whisper (zyre_t *self, zmsg_t **msg_p) +zyre_whisper (zyre_t *self, zmsg_t **msg_p, char *peerid) { assert (self); zstr_sendm (self->pipe, "WHISPER"); + zmsg_pushstr (*msg_p, peerid); zmsg_send (msg_p, self->pipe); return 0; } @@ -209,10 +272,11 @@ zyre_whisper (zyre_t *self, zmsg_t **msg_p) // Send message to a group of peers int -zyre_shout (zyre_t *self, zmsg_t **msg_p) +zyre_shout (zyre_t *self, zmsg_t **msg_p, char *group) { assert (self); zstr_sendm (self->pipe, "SHOUT"); + zmsg_pushstr (*msg_p, group); // push group in fron of message zmsg_send (msg_p, self->pipe); return 0; } @@ -228,6 +292,55 @@ zyre_socket (zyre_t *self) return self->pipe; } +// --------------------------------------------------------------------- +// Gets the message type + +char * +zyre_msg_cmd (zyre_msg_t *self) +{ + assert (self); + return self->command; +} + +// --------------------------------------------------------------------- +// Gets the message peer uuid + +char * +zyre_msg_peerid (zyre_msg_t *self) +{ + assert (self); + return self->peerid; +} + +// --------------------------------------------------------------------- +// Gets the message headers + +zhash_t * +zyre_msg_headers (zyre_msg_t *self) +{ + assert (self); + return self->headers; +} + +// --------------------------------------------------------------------- +// Gets the message group in case of shout + +char * +zyre_msg_group (zyre_msg_t *self) +{ + assert (self); + return self->group; +} + +// --------------------------------------------------------------------- +// Gets the actual message data + +zmsg_t * +zyre_msg_data (zyre_msg_t *self) +{ + assert (self); + return self->data; +} // -------------------------------------------------------------------------- // Self test of this class @@ -254,37 +367,33 @@ zyre_test (bool verbose) // One node shouts to GLOBAL zmsg_t *msg = zmsg_new (); - zmsg_addstr (msg, "GLOBAL"); zmsg_addstr (msg, "Hello, World"); - zyre_shout (node1, &msg); + zyre_shout (node1, &msg, "GLOBAL"); // TODO: should timeout and not hang if there's no networking // ALSO why doesn't this work with localhost? zbeacon? // Second node should receive ENTER, JOIN, and SHOUT - msg = zyre_recv (node2); - char *command = zmsg_popstr (msg); - assert (streq (command, "ENTER")); - free (command); - char *peerid = zmsg_popstr (msg); - free (peerid); - zframe_t *headers_packed = zmsg_pop (msg); - zhash_t *headers = zhash_unpack (headers_packed); - zframe_destroy (&headers_packed); + + // parse ENTER + zyre_msg_t *zyre_msg = zyre_recv (node2); + msg = zyre_msg_data (zyre_msg); + assert (streq (zyre_msg_cmd (zyre_msg) , "ENTER")); + char *peerid = zyre_msg_peerid (zyre_msg); + zhash_t *headers = zyre_msg_headers (zyre_msg); assert (streq (zhash_lookup (headers, "X-HELLO"), "World")); - zhash_destroy (&headers); - zmsg_destroy (&msg); + zyre_msg_destroy (&zyre_msg); - msg = zyre_recv (node2); - command = zmsg_popstr (msg); - assert (streq (command, "JOIN")); - free (command); - zmsg_destroy (&msg); + // parse JOIN + zyre_msg = zyre_recv (node2); + assert (streq (zyre_msg_cmd (zyre_msg), "JOIN")); + zyre_msg_destroy (&zyre_msg); - msg = zyre_recv (node2); - command = zmsg_popstr (msg); - assert (streq (command, "SHOUT")); - free (command); - zmsg_destroy (&msg); + // parse SHOUT + zyre_msg = zyre_recv (node2); + msg = zyre_msg_data (zyre_msg); + assert (streq (zyre_msg_cmd (zyre_msg), "SHOUT")); + assert (streq (zyre_msg_group (zyre_msg), "GLOBAL")); + zyre_msg_destroy (&zyre_msg); zyre_destroy (&node1); zyre_destroy (&node2); @@ -292,3 +401,4 @@ zyre_test (bool verbose) // @end printf ("OK\n"); } + From eb9dc9115c21f6a3286dd54a7f915f9dc781a30c Mon Sep 17 00:00:00 2001 From: Sappo Date: Sat, 23 Nov 2013 23:40:31 +0100 Subject: [PATCH 2/2] moved message wrapper into own c file --- include/zyre.h | 34 +------ include/zyre_msg.h | 72 +++++++++++++++ src/Makefile.am | 2 + src/zyre.c | 163 ++++++--------------------------- src/zyre_classes.h | 1 + src/zyre_msg.c | 218 ++++++++++++++++++++++++++++++++++++++++++++ src/zyre_selftest.c | 1 + 7 files changed, 325 insertions(+), 166 deletions(-) create mode 100644 include/zyre_msg.h create mode 100644 src/zyre_msg.c diff --git a/include/zyre.h b/include/zyre.h index f25f5bd5a9..19d2e9bfc4 100644 --- a/include/zyre.h +++ b/include/zyre.h @@ -52,8 +52,6 @@ extern "C" { typedef struct _zyre_t zyre_t; -typedef struct _zyre_msg_t zyre_msg_t; - // @interface // Constructor, creates a new Zyre node. Note that until you start the // node it is silent and invisible to other nodes on the network. @@ -65,10 +63,6 @@ CZMQ_EXPORT zyre_t * CZMQ_EXPORT void zyre_destroy (zyre_t **self_p); -// Destructor, destroys a Zyre message. -CZMQ_EXPORT void - zyre_msg_destroy (zyre_msg_t **self_p); - // Set node header; these are provided to other nodes during discovery // and come in each ENTER message. CZMQ_EXPORT void @@ -92,17 +86,17 @@ CZMQ_EXPORT int // Receive next message from network; the message may be a control // message (ENTER, EXIT, JOIN, LEAVE) or data (WHISPER, SHOUT). // Returns zmsg_t object, or NULL if interrupted -CZMQ_EXPORT zyre_msg_t * +CZMQ_EXPORT zmsg_t * zyre_recv (zyre_t *self); // Send message to single peer; peer ID is first frame in message // Destroys message after sending CZMQ_EXPORT int - zyre_whisper (zyre_t *self, zmsg_t **msg_p, char *peerid); + zyre_whisper (zyre_t *self, zmsg_t **msg_p); // Send message to a group of peers CZMQ_EXPORT int - zyre_shout (zyre_t *self, zmsg_t **msg_p, char *group); + zyre_shout (zyre_t *self, zmsg_t **msg_p); // Return handle to the Zyre node, for polling CZMQ_EXPORT void * @@ -111,27 +105,6 @@ CZMQ_EXPORT void * // Self test of this class CZMQ_EXPORT void zyre_test (bool verbose); - -// Gets the message type -CZMQ_EXPORT char * - zyre_msg_cmd (zyre_msg_t *self); - -// Gets the peer that did send the message -CZMQ_EXPORT char * - zyre_msg_peerid (zyre_msg_t *self); - -// Gets the headers that enter send -CZMQ_EXPORT zhash_t * - zyre_msg_headers (zyre_msg_t *self); - -// Gets the group name that shout send -CZMQ_EXPORT char * - zyre_msg_group (zyre_msg_t *self); - -// Gets the actual message data -CZMQ_EXPORT zmsg_t * - zyre_msg_data (zyre_msg_t *self); - // @end #ifdef __cplusplus @@ -139,3 +112,4 @@ CZMQ_EXPORT zmsg_t * #endif #endif + diff --git a/include/zyre_msg.h b/include/zyre_msg.h new file mode 100644 index 0000000000..0eab82a58e --- /dev/null +++ b/include/zyre_msg.h @@ -0,0 +1,72 @@ +/* ========================================================================= + zyre_msg.h - Parsing Zyre messages + + ------------------------------------------------------------------------- + Copyright (c) 1991-2013 iMatix Corporation + Copyright other contributors as noted in the AUTHORS file. + + This file is part of Zyre, an open-source framework for proximity-based + peer-to-peer applications -- See http://zyre.org. + + This is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or (at + your option) any later version. + + This software is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this program. If not, see + . + ========================================================================= +*/ + +#ifndef __ZYRE_MSG_H_INCLUDED__ +#define __ZYRE_MSG_H_INCLUDED__ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _zyre_msg_t zyre_msg_t; + +// Destructor, destroys a Zyre message. +CZMQ_EXPORT void + zyre_msg_destroy (zyre_msg_t **self_p); + +// Wrapper for zyre_recv +CZMQ_EXPORT zyre_msg_t * + zyre_msg_recv (zyre_t *self); + +// Gets the message type +CZMQ_EXPORT char * + zyre_msg_cmd (zyre_msg_t *self); + +// Gets the peer that did send the message +CZMQ_EXPORT char * + zyre_msg_peerid (zyre_msg_t *self); + +// Gets the headers that enter send +CZMQ_EXPORT zhash_t * + zyre_msg_headers (zyre_msg_t *self); + +// Gets the group name that shout send +CZMQ_EXPORT char * + zyre_msg_group (zyre_msg_t *self); + +// Gets the actual message data +CZMQ_EXPORT zmsg_t * + zyre_msg_data (zyre_msg_t *self); + +// Self test of this class +CZMQ_EXPORT void + zyre_msg_test (bool verbose); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/Makefile.am b/src/Makefile.am index 5b253b2908..50b7c910b5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -11,6 +11,7 @@ libzyre_la_include_HEADERS = \ zyre_node.h \ ../include/zre_msg.h \ ../include/zre_log_msg.h \ + ../include/zyre_msg.h \ ../include/zyre.h libzyre_la_SOURCES = \ @@ -20,6 +21,7 @@ libzyre_la_SOURCES = \ zyre_node.c \ zre_msg.c \ zre_log_msg.c \ + zyre_msg.c \ zyre.c INCLUDES = -I$(top_srcdir)/include diff --git a/src/zyre.c b/src/zyre.c index 897ad264a4..3a76018c16 100644 --- a/src/zyre.c +++ b/src/zyre.c @@ -69,17 +69,6 @@ struct _zyre_t { void *pipe; // Pipe through to node }; -// --------------------------------------------------------------------- -// Structure of zyre_msg class - -struct _zyre_msg_t { - char *command; // command type of the message - char *peerid; // uuid from router - zhash_t *headers; // headers send by enter - char *group; // group name send by shout - zmsg_t *data; // actual message data -}; - // --------------------------------------------------------------------- // Constructor, creates a new Zyre node. Note that until you start the @@ -126,37 +115,6 @@ zyre_destroy (zyre_t **self_p) } } -// --------------------------------------------------------------------- -// Constructor, creates a new Zyre message. - -zyre_msg_t * -zyre_msg_new () -{ - zyre_msg_t *self = (zyre_msg_t *) zmalloc(sizeof (zyre_t)); - assert (self); - - return self; -} - -// --------------------------------------------------------------------- -// Destructor, destroys a Zyre message. - -void -zyre_msg_destroy (zyre_msg_t **self_p) -{ - assert (self_p); - if (*self_p) { - zyre_msg_t *self = *self_p; - free (self->command); - free (self->peerid); - if (self->headers) - zhash_destroy (&self->headers); - if (self->data) - zmsg_destroy (&self->data); - free (self->group); - } -} - // --------------------------------------------------------------------- // Set node header; these are provided to other nodes during discovery @@ -224,32 +182,12 @@ zyre_leave (zyre_t *self, const char *group) // message (ENTER, EXIT, JOIN, LEAVE) or data (WHISPER, SHOUT). // Returns zmsg_t object, or NULL if interrupted -zyre_msg_t * +zmsg_t * zyre_recv (zyre_t *self) { assert (self); zmsg_t *msg = zmsg_recv (self->pipe); - - // recveice message, get command and peerid - zyre_msg_t *zyre_msg = zyre_msg_new (); - zyre_msg->command = zmsg_popstr (msg); - zyre_msg->peerid = zmsg_popstr (msg); - - if (streq (zyre_msg->command, "ENTER")) { - // get and unpack headers - zframe_t *headers_packed = zmsg_pop (msg); - zyre_msg->headers = zhash_dup (zhash_unpack (headers_packed)); - // cleanup - zframe_destroy (&headers_packed); - } else if (streq (zyre_msg->command, "SHOUT")) { - zyre_msg->group = zmsg_popstr (msg); - } - - - // rest of the message is data - zyre_msg->data = msg; - - return zyre_msg; + return msg; } @@ -258,11 +196,10 @@ zyre_recv (zyre_t *self) // Destroys message after sending int -zyre_whisper (zyre_t *self, zmsg_t **msg_p, char *peerid) +zyre_whisper (zyre_t *self, zmsg_t **msg_p) { assert (self); zstr_sendm (self->pipe, "WHISPER"); - zmsg_pushstr (*msg_p, peerid); zmsg_send (msg_p, self->pipe); return 0; } @@ -272,11 +209,10 @@ zyre_whisper (zyre_t *self, zmsg_t **msg_p, char *peerid) // Send message to a group of peers int -zyre_shout (zyre_t *self, zmsg_t **msg_p, char *group) +zyre_shout (zyre_t *self, zmsg_t **msg_p) { assert (self); zstr_sendm (self->pipe, "SHOUT"); - zmsg_pushstr (*msg_p, group); // push group in fron of message zmsg_send (msg_p, self->pipe); return 0; } @@ -292,55 +228,6 @@ zyre_socket (zyre_t *self) return self->pipe; } -// --------------------------------------------------------------------- -// Gets the message type - -char * -zyre_msg_cmd (zyre_msg_t *self) -{ - assert (self); - return self->command; -} - -// --------------------------------------------------------------------- -// Gets the message peer uuid - -char * -zyre_msg_peerid (zyre_msg_t *self) -{ - assert (self); - return self->peerid; -} - -// --------------------------------------------------------------------- -// Gets the message headers - -zhash_t * -zyre_msg_headers (zyre_msg_t *self) -{ - assert (self); - return self->headers; -} - -// --------------------------------------------------------------------- -// Gets the message group in case of shout - -char * -zyre_msg_group (zyre_msg_t *self) -{ - assert (self); - return self->group; -} - -// --------------------------------------------------------------------- -// Gets the actual message data - -zmsg_t * -zyre_msg_data (zyre_msg_t *self) -{ - assert (self); - return self->data; -} // -------------------------------------------------------------------------- // Self test of this class @@ -367,33 +254,37 @@ zyre_test (bool verbose) // One node shouts to GLOBAL zmsg_t *msg = zmsg_new (); + zmsg_addstr (msg, "GLOBAL"); zmsg_addstr (msg, "Hello, World"); - zyre_shout (node1, &msg, "GLOBAL"); + zyre_shout (node1, &msg); // TODO: should timeout and not hang if there's no networking // ALSO why doesn't this work with localhost? zbeacon? // Second node should receive ENTER, JOIN, and SHOUT - - // parse ENTER - zyre_msg_t *zyre_msg = zyre_recv (node2); - msg = zyre_msg_data (zyre_msg); - assert (streq (zyre_msg_cmd (zyre_msg) , "ENTER")); - char *peerid = zyre_msg_peerid (zyre_msg); - zhash_t *headers = zyre_msg_headers (zyre_msg); + msg = zyre_recv (node2); + char *command = zmsg_popstr (msg); + assert (streq (command, "ENTER")); + free (command); + char *peerid = zmsg_popstr (msg); + free (peerid); + zframe_t *headers_packed = zmsg_pop (msg); + zhash_t *headers = zhash_unpack (headers_packed); + zframe_destroy (&headers_packed); assert (streq (zhash_lookup (headers, "X-HELLO"), "World")); - zyre_msg_destroy (&zyre_msg); + zhash_destroy (&headers); + zmsg_destroy (&msg); - // parse JOIN - zyre_msg = zyre_recv (node2); - assert (streq (zyre_msg_cmd (zyre_msg), "JOIN")); - zyre_msg_destroy (&zyre_msg); + msg = zyre_recv (node2); + command = zmsg_popstr (msg); + assert (streq (command, "JOIN")); + free (command); + zmsg_destroy (&msg); - // parse SHOUT - zyre_msg = zyre_recv (node2); - msg = zyre_msg_data (zyre_msg); - assert (streq (zyre_msg_cmd (zyre_msg), "SHOUT")); - assert (streq (zyre_msg_group (zyre_msg), "GLOBAL")); - zyre_msg_destroy (&zyre_msg); + msg = zyre_recv (node2); + command = zmsg_popstr (msg); + assert (streq (command, "SHOUT")); + free (command); + zmsg_destroy (&msg); zyre_destroy (&node1); zyre_destroy (&node2); diff --git a/src/zyre_classes.h b/src/zyre_classes.h index 110f44e825..94c21bb54b 100644 --- a/src/zyre_classes.h +++ b/src/zyre_classes.h @@ -31,6 +31,7 @@ #include "../include/zre_msg.h" #include "../include/zre_log_msg.h" #include "../include/zyre.h" +#include "../include/zyre_msg.h" #include "zyre_peer.h" #include "zyre_group.h" #include "zyre_log.h" diff --git a/src/zyre_msg.c b/src/zyre_msg.c new file mode 100644 index 0000000000..91f33deb0a --- /dev/null +++ b/src/zyre_msg.c @@ -0,0 +1,218 @@ +/* ========================================================================= + zyre_msg.h - Parsing Zyre messages + + ------------------------------------------------------------------------- + Copyright (c) 1991-2013 iMatix Corporation + Copyright other contributors as noted in the AUTHORS file. + + This file is part of Zyre, an open-source framework for proximity-based + peer-to-peer applications -- See http://zyre.org. + + This is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or (at + your option) any later version. + + This software is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this program. If not, see + . + ========================================================================= +*/ + +/* +@header +@discuss +@end +*/ + +#include "zyre_classes.h" + +// --------------------------------------------------------------------- +// Structure of zyre_msg class + +struct _zyre_msg_t { + char *command; // command type of the message + char *peerid; // uuid from router + zhash_t *headers; // headers send by enter + char *group; // group name send by shout + zmsg_t *data; // actual message data +}; + +// --------------------------------------------------------------------- +// Constructor, creates a new Zyre message. + +zyre_msg_t * +zyre_msg_new () +{ + zyre_msg_t *self = (zyre_msg_t *) zmalloc (sizeof (zyre_msg_t)); + assert (self); + + return self; +} + +// --------------------------------------------------------------------- +// Destructor, destroys a Zyre message. + +void +zyre_msg_destroy (zyre_msg_t **self_p) +{ + assert (self_p); + if (*self_p) { + zyre_msg_t *self = *self_p; + free (self->command); + free (self->peerid); + if (self->headers) + zhash_destroy (&self->headers); + if (self->data) + zmsg_destroy (&self->data); + free (self->group); + } +} + +// --------------------------------------------------------------------- +// Receive next message from network; the message may be a control +// message (ENTER, EXIT, JOIN, LEAVE) or data (WHISPER, SHOUT). +// Returns zmsg_t object, or NULL if interrupted + +zyre_msg_t * +zyre_msg_recv (zyre_t *self) +{ + assert (self); + zmsg_t *msg = zyre_recv (self); + zyre_msg_t *zyre_msg = zyre_msg_new (); + zyre_msg->command = zmsg_popstr (msg); + zyre_msg->peerid = zmsg_popstr (msg); + + if (streq (zyre_msg->command, "ENTER")) { + // get and unpack headers + zframe_t *headers_packed = zmsg_pop (msg); + zyre_msg->headers = zhash_dup (zhash_unpack (headers_packed)); + // cleanup + zframe_destroy (&headers_packed); + } else if (streq (zyre_msg->command, "SHOUT")) { + zyre_msg->group = zmsg_popstr (msg); + } + + + // rest of the message is data + zyre_msg->data = msg; + + return zyre_msg; +} + +// --------------------------------------------------------------------- +// Gets the message type + +char * +zyre_msg_cmd (zyre_msg_t *self) +{ + assert (self); + return self->command; +} + +// --------------------------------------------------------------------- +// Gets the message peer uuid + +char * +zyre_msg_peerid (zyre_msg_t *self) +{ + assert (self); + return self->peerid; +} + +// --------------------------------------------------------------------- +// Gets the message headers + +zhash_t * +zyre_msg_headers (zyre_msg_t *self) +{ + assert (self); + return self->headers; +} + +// --------------------------------------------------------------------- +// Gets the message group in case of shout + +char * +zyre_msg_group (zyre_msg_t *self) +{ + assert (self); + return self->group; +} + +// --------------------------------------------------------------------- +// Gets the actual message data + +zmsg_t * +zyre_msg_data (zyre_msg_t *self) +{ + assert (self); + return self->data; +} + +// -------------------------------------------------------------------------- +// Self test of this class + +void +zyre_msg_test (bool verbose) +{ + printf (" * zyre_msg: "); + + // @selftest + zctx_t *ctx = zctx_new (); + // Create two nodes + zyre_t *node1 = zyre_new (ctx); + zyre_t *node2 = zyre_new (ctx); + zyre_set_header (node1, "X-FILEMQ", "tcp://128.0.0.1:6777"); + zyre_set_header (node1, "X-HELLO", "World"); + zyre_start (node1); + zyre_start (node2); + zyre_join (node1, "GLOBAL"); + zyre_join (node2, "GLOBAL"); + + // Give time for them to interconnect + zclock_sleep (250); + + // One node shouts to GLOBAL + zmsg_t *msg = zmsg_new (); + zmsg_addstr (msg, "GLOBAL"); + zmsg_addstr (msg, "Hello, World"); + zyre_shout (node1, &msg); + + // TODO: should timeout and not hang if there's no networking + // ALSO why doesn't this work with localhost? zbeacon? + // Second node should receive ENTER, JOIN, and SHOUT + + // parse ENTER + zyre_msg_t *zyre_msg = zyre_msg_recv (node2); + msg = zyre_msg_data (zyre_msg); + assert (streq (zyre_msg_cmd (zyre_msg) , "ENTER")); + char *peerid = zyre_msg_peerid (zyre_msg); + zhash_t *headers = zyre_msg_headers (zyre_msg); + assert (streq (zhash_lookup (headers, "X-HELLO"), "World")); + zyre_msg_destroy (&zyre_msg); + + // parse JOIN + zyre_msg = zyre_msg_recv (node2); + assert (streq (zyre_msg_cmd (zyre_msg), "JOIN")); + zyre_msg_destroy (&zyre_msg); + + // parse SHOUT + zyre_msg = zyre_msg_recv (node2); + msg = zyre_msg_data (zyre_msg); + assert (streq (zyre_msg_cmd (zyre_msg), "SHOUT")); + assert (streq (zyre_msg_group (zyre_msg), "GLOBAL")); + zyre_msg_destroy (&zyre_msg); + + zyre_destroy (&node1); + zyre_destroy (&node2); + zctx_destroy (&ctx); + // @end + printf ("OK\n"); +} + diff --git a/src/zyre_selftest.c b/src/zyre_selftest.c index cd1d380dd2..cbcd296858 100644 --- a/src/zyre_selftest.c +++ b/src/zyre_selftest.c @@ -41,6 +41,7 @@ int main (int argc, char *argv []) zyre_group_test (verbose); zyre_node_test (verbose); zyre_test (verbose); + zyre_msg_test (verbose); printf ("Tests passed OK\n"); return 0; }