Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【zmq 4.3.4】pub-sub(tcp protocol) mode crash by _io_error #4364

Open
ABlueLight opened this issue Apr 2, 2022 · 5 comments
Open

【zmq 4.3.4】pub-sub(tcp protocol) mode crash by _io_error #4364

ABlueLight opened this issue Apr 2, 2022 · 5 comments

Comments

@ABlueLight
Copy link

ABlueLight commented Apr 2, 2022

Please use this template for reporting suspected bugs or requests for help.

Issue description

when use ZMQ_PUB ZMQ_SUB and set heart_beat_timout maybe crash by _io_error
error message
Assertion failed: !_io_error (/home/users/jiangkuan.liu/tools/zeromq-4.3.4/src/stream_engine_base.cpp:331)

Environment

  • libzmq version (commit hash if unreleased): 4.3.4
  • OS: ubuntu 16.04 gcc 5.4.0

Minimal test code / Steps to reproduce the issue

1. Minimal test code

PUB code

void* sockets;
std::mutex mtx;
std::atomic_int item_nums;
void *context = nullptr;

int main() {
  std::vector <std::string> zmq_protocol_items;
  std::string ip_port = "tcp://127.0.0.1:4124";
  zmq_protocol_items.push_back(ip_port);

  context = zmq_ctx_new();
  if (context == nullptr) {
    std::cout << "init zmq context failed!\n";
    return -1;
  }
  for (auto &proto : zmq_protocol_items) {
    std::cout << proto << "\n";
    void *socket = zmq_socket(context, ZMQ_PUB);

    int zmq_sndhwm = 100;
    zmq_setsockopt(socket,
                       ZMQ_SNDHWM,
                       &zmq_sndhwm,
                       sizeof(int));
    
    int zmq_heart_beat_ivl = 5;
    zmq_setsockopt(socket,
                     ZMQ_HEARTBEAT_IVL,
                     &zmq_heart_beat_ivl,
                     sizeof(int));

    int zmq_heart_beat_timeout = 5;
    zmq_setsockopt(socket,
                     ZMQ_HEARTBEAT_TIMEOUT,
                     &zmq_heart_beat_timeout,
                     sizeof(int));
    int ret = zmq_bind(socket, proto.c_str());
    if (ret < 0) {
      std::cout << "zmq linke failed!\n";
      continue;
    }
    sockets = socket;
  }
  std::vector<char *> data_vec;
  std::vector<int> data_size_vec;
  data_size_vec.push_back(10);
  data_size_vec.push_back(10);
  data_size_vec.push_back(100);
  char *data_0 = new char[10];
  data_0[0] = 1;
  char *data_1 = new char[10];
   data_1[0] = 2;
  char *data_2 = new char[100];
   data_2[0] = 3;
  data_vec.push_back(data_0);
  data_vec.push_back(data_1);
  data_vec.push_back(data_2);
  int send_indx = 0;
  while (1) {    
    int nums = 0;
    int flag = ZMQ_NOBLOCK | ZMQ_SNDMORE;
    int send_ret = 0;
    for (int i = 0; i < data_vec.size(); i++) {
      zmq_msg_t data_msg;
      if (zmq_msg_init_data(&data_msg, data_vec[i], data_size_vec[i],
                            NULL, NULL) !=0) {
        std::cout << "zmq_msg_init_data failed!\n";
        continue;                        
      }
      if (i == 2) {
        flag = ZMQ_NULL | ZMQ_NOBLOCK;
      }
      send_ret = zmq_msg_send(&data_msg, sockets, flag);
      zmq_msg_close(&data_msg);
    }
    send_indx++;
  }
  zmq_close(sockets);
  zmq_ctx_shutdown(context);
  zmq_ctx_term(context);
}

SUB code

std::vector <void*> sockets;
std::mutex mtx;
std::atomic_int item_nums;
void *context = nullptr;

int main() {
  std::vector <std::string> zmq_protocol_items;
  std::string ip_port = "tcp://127.0.0.1:4124";
  zmq_protocol_items.push_back(ip_port);
  context = zmq_ctx_new();
  if (context == nullptr) {
    std::cout << "init zmq context failed!\n";
    return -1;
  }
  for (auto &proto : zmq_protocol_items) {
    std::cout << proto << "\n";
    void *socket = zmq_socket(context, ZMQ_SUB);
    zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
    int hwm = 100;
    zmq_setsockopt(socket,
                     ZMQ_RCVHWM,
                     &hwm,
                     sizeof(int));
    int zmq_heart_beat_ivl = 5;
    zmq_setsockopt(socket,
                     ZMQ_HEARTBEAT_IVL,
                     &zmq_heart_beat_ivl,
                     sizeof(int));


    int zmq_heart_beat_timeout = 5;
    zmq_setsockopt(socket,
                     ZMQ_HEARTBEAT_TIMEOUT,
                     &zmq_heart_beat_timeout,
                     sizeof(int));
    int ret = zmq_connect(socket, proto.c_str());
    if (ret < 0) {
      std::cout << "zmq linke failed!\n";
      continue;
    }
    std::unique_lock<std::mutex> lk(mtx);
    sockets.push_back(socket);
    lk.unlock();
  }
  std::this_thread::sleep_for(std::chrono::milliseconds(1000));

  while (1) {
    zmq_msg_t msg;
    if (zmq_msg_init(&msg) < 0) {
      std::cout<< "rpc server recv msg init failed\n";
    }   
    if (zmq_msg_recv(&msg, sockets[0], 0) < 0) {
      zmq_msg_close(&msg);
    } else {
      zmq_msg_close(&msg);
    }
    // std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
  zmq_ctx_shutdown(context);
  zmq_ctx_term(context);
  return 0;
}

2. steps

first run sub program, and then run pub program
if first run two sub programs, and then run pub program will be more easy to produce this error

if i set ZMQ_HEARTBEAT_TIMEOUT 0 or not set ZMQ_HEARTBEAT_TIMEOUT , it will not crash.
if i set ZMQ_HEARTBEAT_TIMEOUT more large number, it will very difficult to create this bug
i know set ZMQ_HEARTBEAT_TIMEOUT euqal to ZMQ_HEARTBEAT_IVL, and use a very small value is not normal, but it is just easy to create this bug.
if i set reasonable ZMQ_HEARTBEAT_TIMEOUT and ZMQ_HEARTBEAT_IVL, it just very difficult to happen this bug, but it is also has possibility to create this bug.

What's the actual result? (include assertion message & call stack if applicable)

crash by assert failed _io_error

Assertion failed: !_io_error (/home/users/jiangkuan.liu/tools/zeromq-4.3.4/src/stream_engine_base.cpp:331)

Program received signal SIGABRT, Aborted.
[Switching to Thread 0x7ffff62b7700 (LWP 26901)]
0x00007ffff6cf3f37 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
54      ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0  0x00007ffff6cf3f37 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
#1  0x00007ffff6cf533a in __GI_abort () at abort.c:89
#2  0x00007ffff7b65e5e in zmq::zmq_abort(char const*) () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#3  0x00007ffff7b92992 in zmq::stream_engine_base_t::out_event() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#4  0x00007ffff7bb04fa in zmq::zmtp_engine_t::process_heartbeat_message(zmq::msg_t*) () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#5  0x00007ffff7b9247b in zmq::stream_engine_base_t::decode_and_push(zmq::msg_t*) () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#6  0x00007ffff7b93036 in zmq::stream_engine_base_t::restart_input() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#7  0x00007ffff7b66b6c in zmq::io_thread_t::in_event() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#8  0x00007ffff7b6560e in zmq::epoll_t::loop() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#9  0x00007ffff7b99ac9 in thread_routine () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#10 0x00007ffff70676e4 in start_thread (arg=0x7ffff62b7700) at pthread_create.c:333
#11 0x00007ffff6da729d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109

crash by assert failed _input_stoped

Assertion failed: _input_stopped (/home/users/jiangkuan.liu/tools/zeromq-4.3.4/src/stream_engine_base.cpp:417)

Program received signal SIGABRT, Aborted.
[Switching to Thread 0x7ffff62b7700 (LWP 3216)]
0x00007ffff6cf3f37 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
54      ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0  0x00007ffff6cf3f37 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
#1  0x00007ffff6cf533a in __GI_abort () at abort.c:89
#2  0x00007ffff7b65e5e in zmq::zmq_abort(char const*) () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#3  0x00007ffff7b93212 in zmq::stream_engine_base_t::restart_input() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#4  0x00007ffff7b66b6c in zmq::io_thread_t::in_event() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#5  0x00007ffff7b6560e in zmq::epoll_t::loop() () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#6  0x00007ffff7b99ac9 in thread_routine () from /home/users/jiangkuan.liu/project/zmq_test/zmq/lib64/libzmq.so.5
#7  0x00007ffff70676e4 in start_thread (arg=0x7ffff62b7700) at pthread_create.c:333
#8  0x00007ffff6da729d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
(gdb)

What's the expected result?

in this situation, it will not crash

@ABlueLight
Copy link
Author

Any advice on how to avoid this bug

@ABlueLight ABlueLight changed the title 【zmq 4.3.4】pub-sub mode crash by _io_error 【zmq 4.3.4】pub-sub(tcp protocol) mode crash by _io_error Apr 2, 2022
@ABlueLight
Copy link
Author

anyone can offer help? Thanks

@ABlueLight
Copy link
Author

#4229
#3937

seem is the same reason

@jmwample
Copy link

Not sure if this is still relevant, but leaving a comment just in case. I ran into this through the golang wrapper for this library (github.com/pebbe/zmq4). I don't think there is a direct fix, but the cause of the issue in my case has been thread starvation which is likely the case in general. The Heartbeat interval and timeout values can help delay, but if your zmq thread starves it will trigger this assertion.

My suggestion for a "work-around" would be to review where things are blocking (i.e on mutexes), where threads are being launched, and when threads are being scheduled to try to ensure that any zmq socket threads never block for more than heartbeat duration.

@ABlueLight
Copy link
Author

@jmwample thanks for your suggestion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants