Skip to content

Commit

Permalink
zmq_poll implemented on Win32 platform
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown committed Dec 10, 2009
1 parent 986ab66 commit d4fdc26
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 7 deletions.
7 changes: 7 additions & 0 deletions bindings/c/zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ extern "C" {

#include <errno.h>
#include <stddef.h>
#if defined _WIN32
#include "winsock2.h"
#endif

// Microsoft Visual Studio uses non-standard way to export/import symbols.
#if defined ZMQ_BUILDING_LIBZMQ_WITH_MSVC
Expand Down Expand Up @@ -185,7 +188,11 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
typedef struct
{
void *socket;
#if defined _WIN32
SOCKET fd;
#else
int fd;
#endif
short events;
short revents;
} zmq_pollitem_t;
Expand Down
125 changes: 118 additions & 7 deletions src/zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "platform.hpp"
#include "stdint.hpp"
#include "err.hpp"
#include "fd.hpp"

#if defined ZMQ_HAVE_LINUX
#include <poll.h>
Expand Down Expand Up @@ -263,12 +264,10 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)

int zmq_poll (zmq_pollitem_t *items_, int nitems_)
{
// TODO: Replace the polling mechanism by the virtualised framework
// used in 0MQ I/O threads. That'll make the thing work on all platforms.
#if !defined ZMQ_HAVE_LINUX
errno = ENOTSUP;
return -1;
#else
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX

pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
Expand Down Expand Up @@ -368,6 +367,119 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
free (pollfds);
return nevents;

#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS

fd_set pollset_in;
FD_ZERO (&pollset_in);
fd_set pollset_out;
FD_ZERO (&pollset_out);
fd_set pollset_err;
FD_ZERO (&pollset_err);

zmq::app_thread_t *app_thread = NULL;
int nsockets = 0;
zmq::fd_t maxfd = zmq::retired_fd;
zmq::fd_t notify_fd = zmq::retired_fd;

for (int i = 0; i != nitems_; i++) {

// 0MQ sockets.
if (items_ [i].socket) {

// Get the app_thread the socket is living in. If there are two
// sockets in the same pollset with different app threads, fail.
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
if (app_thread) {
if (app_thread != s->get_thread ()) {
errno = EFAULT;
return -1;
}
}
else
app_thread = s->get_thread ();

nsockets++;
continue;
}

// Raw file descriptors.
if (items_ [i].events & ZMQ_POLLIN)
FD_SET (items_ [i].fd, &pollset_in);
if (items_ [i].events & ZMQ_POLLOUT)
FD_SET (items_ [i].fd, &pollset_out);
if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd)
maxfd = items_ [i].fd;
}

// If there's at least one 0MQ socket in the pollset we have to poll
// for 0MQ commands. If ZMQ_POLL was not set, fail.
if (nsockets) {
notify_fd = app_thread->get_signaler ()->get_fd ();
if (notify_fd == zmq::retired_fd) {
errno = ENOTSUP;
return -1;
}
FD_SET (notify_fd, &pollset_in);
if (maxfd == zmq::retired_fd || maxfd < notify_fd)
maxfd = notify_fd;
}

int nevents = 0;
bool initial = true;
while (!nevents) {

// Wait for activity. In the first iteration just check for events,
// don't wait. Waiting would prevent exiting on any events that may
// already be signaled on 0MQ sockets.
timeval timeout = {0, 0};
int rc = select (maxfd, &pollset_in, &pollset_out, &pollset_err,
initial ? &timeout : NULL);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
if (rc == -1 && errno == EINTR)
continue;
#endif

errno_assert (rc >= 0);
initial = false;

// Process 0MQ commands if needed.
if (nsockets && FD_ISSET (notify_fd, &pollset_in))
app_thread->process_commands (false, false);

// Check for the events.
int pollfd_pos = 0;
for (int i = 0; i != nitems_; i++) {

// If the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
if (!items_ [i].socket) {
items_ [i].revents =
(FD_ISSET (items_ [i].fd, &pollset_in) ? ZMQ_POLLIN : 0) |
(FD_ISSET (items_ [i].fd, &pollset_out) ? ZMQ_POLLOUT : 0);
if (items_ [i].revents)
nevents++;
continue;
}

// The poll item is a 0MQ socket.
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
items_ [i].revents = 0;
if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
items_ [i].revents |= ZMQ_POLLIN;
if (items_ [i].revents)
nevents++;
}
}

return nevents;

#else
errno = ENOTSUP;
return -1;
#endif
}

Expand Down Expand Up @@ -428,4 +540,3 @@ unsigned long zmq_stopwatch_stop (void *watch_)
free (watch_);
return (unsigned long) (end - start);
}

0 comments on commit d4fdc26

Please sign in to comment.