Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

suddenly receive nothing when i use zeromq with openpgm in windows #56

Open
lqxandxl opened this issue Jan 11, 2019 · 5 comments
Open

Comments

@lqxandxl
Copy link

lqxandxl commented Jan 11, 2019

I use zeromq pub/sub pattern with epgm in windows and vs2015.

my pub code

int s_send(void * socket , char * string){
    int size = zmq_send(socket,string,strlen(string),0);
    return size;
}

context = zmq_ctx_new();
publisher = zmq_socket(context,ZMQ_PUB);
int rate = 200000;
int rc = zmq_setsockopt(publisher,ZMQ_RATE,&rate,sizeof(rate));
const int hwm = 600000;
rc = zmq_setsockopt(publisher,ZMQ_SNDHWM,&hwm,sizeof(hwm));
const int sndbuf = 5000;
rc = zmq_setsockopt(publisher,ZMQ_SNDBUF,&sndbuf,sizeof(sndbuf));
const int one = 1;
rc = zmq_setsockopt(publisher,ZMQ_XPUB_NODROP,&one,sizeof(one));
rc = zmq_setsockopt(publisher,ZMQ_LINGER,&one,sizeof(one));

rc = zmq_bind(publisher,"epgm://70.3.1.13;239.0.0.13:5566");
Sleep(1000);

string send_str = "01234567890123456789"; //20
send_str = send_str + send_str + send_str +send_str +send_str ; //100
send_str = send_str + send_str ; // 200

long long send_count = 0;
for(long long i = 0 ; i< x;++i){
    char update[250];
    sprintf(update,"%s",(char *) send_str.c_str());
    update[sendstr.size()] = '\0';
    int msgsize = s_send(publisher,update);
    send_count++;
    if(send_count%100 == 0){   // send 100000 message / s
        Sleep(1);
    }
}
zmq_close(publisher);
zmq_ctx_destroy(context);

my sub code

char * s_recv(void * socket){
    char buffer[256];

    int size = zmq_recv(socket,buffer,255,0);
    if(size == -1)
        return NULL;
    buffer[size] = '\0';
    return _strdup(buffer);

}

int main(){
    void * context = zmq_ctx_new();
    zmq_ctx_set(context,ZMQ_IO_THREADS,1);
    void * subsriber = zmq_socket(context,ZMQ_SUB);
    int rc = 0;
    const int rcvbuf = 200000;
    rc = zmq_setsockopt(subscriber,ZMQ_RCVBUF,&rcvbuf,sizeof(rcvbuf));
    rc = zmq_bind(subscriber,"epgm://70.3.1.14;239.0.0.13:5566");
    rc = zmq_setsockopt(subsriber,ZMQ_SUBSCRIBE,"",0);
    while(1){
        char * s1 = s_recv(subscriber);
        string res(s1);
         free(s1);

    }

    zmq_close(subscriber);
    zmq_ctx_destroy(context);
    return 0;
}

Everything went smoothly until the message the receiver received became the wrong message and stopped receiving the message for a short time.

I tried to find out what went wrong and the result is in this pgm_receiver.cpp.

void zmq::pgm_receiver_t::in_event ()
{
    // If active_tsi is not null, there is a pending restart_input.
    // Keep the internal state as is so that restart_input would process the right data
    if (active_tsi) {
        return;
    }

    // Read data from the underlying pgm_socket.
    const pgm_tsi_t *tsi = NULL;

    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

    //  TODO: This loop can effectively block other engines in the same I/O
    //  thread in the case of high load.
    while (true) {
        //  Get new batch of data.
        //  Note the workaround made not to break strict-aliasing rules.
        insize = 0;
        void *tmp = NULL;
        ssize_t received = pgm_socket.receive (&tmp, &tsi);

        //  No data to process. This may happen if the packet received is
        //  neither ODATA nor ODATA.
        if (received == 0) {
            if (errno == ENOMEM || errno == EBUSY) {
                const long timeout = pgm_socket.get_rx_timeout ();
                add_timer (timeout, rx_timer_id);
                has_rx_timer = true;
            }
            break;
        }
        ...
    }
}

errno == EBUSY

in pgm_socket.cpp
···
ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_){

    const int status = pgm_recvmsgv (sock, pgm_msgv, pgm_msgv_len,
                                     MSG_ERRQUEUE, &nbytes_rec, &pgm_error);

    //  Invalid parameters.
    zmq_assert (status != PGM_IO_STATUS_ERROR);

    last_rx_status = status;

    //  In a case when no ODATA/RDATA fired POLLIN event (SPM...)
    //  pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
    if (status == PGM_IO_STATUS_TIMER_PENDING) {
        zmq_assert (nbytes_rec == 0);

        //  In case if no RDATA/ODATA caused POLLIN 0 is
        //  returned.
        nbytes_rec = 0;
        errno = EBUSY;
        return 0;
    }

}

···

seem like pgm_recvmsgv always return PGM_IO_STATUS_TIMER_PENDING. and i don`t know why.What does this error mean?

When I lowered my sending speed, the time when such an error obviously appeared was delayed. For example, sending 20,000 messages can last for several hours per second. When I slow down to 1000 messages per second, there is no error for at least a few days (perhaps not wrong).

Is my sending speed too fast? or in windows, openpgm can not be used like this? So is there any good way to achieve high throughput in multicast situations?

@steve-o
Copy link
Owner

steve-o commented Jan 11, 2019

On Windows you have to use larger packet sizes and significantly less packet count. Jumbograms and less than 10,000 packets per second and you will be able to saturate gigabit ethernet. ZeroMQ does not utilize all OpenPGM socket options currently. Jumbogram means setting PGM_MTU to larger than 1,500 bytes, e.g. 64KB.

@lqxandxl
Copy link
Author

lqxandxl commented Feb 3, 2021

Thank you for your reply. I have another question, that is, the MTU cannot be so large in the network. If the PGM_MTU is set to be large, can it be transmitted between routers? Will it be directly truncated?

@steve-o
Copy link
Owner

steve-o commented Feb 3, 2021

When the MTU is large, routers will fragment the packets. It is not recommended to do this as the recovery mechanism is for the PGM packet, not the individual fragments.

@lqxandxl
Copy link
Author

lqxandxl commented Feb 3, 2021

If the work of fragmentation is the responsibility of the ip layer. So what is the meaning of setting PGM_MTU to 64KB? Can it be understood that if PGM_MTU is set to 1472, then when I send more than 1472 bytes of data, OPENPGM is responsible for fragmentation?or just reject to send.

@steve-o
Copy link
Owner

steve-o commented Feb 3, 2021

The goal with 64KB MTU is for maximum local LAN performance, especially for Windows hosts that have below-par UDP performance. You are highly unlikely to saturate a 1GbE on Windows otherwise, unfortunately.

PGM has the concepts of APDU, a large buffer fragmented and managed by the protocol itself. To use this, just send a large buffer to the API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants