Skip to content

Commit

Permalink
Added rd_kafka_assert()
Browse files Browse the repository at this point in the history
This assert version calls rd_kafka_dump() (if an rk is available)
thus providing some more information before abort():ing.

This is winbatch's idea from issue #84
  • Loading branch information
edenhill committed Mar 19, 2014
1 parent e92694f commit 11ae61b
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 47 deletions.
44 changes: 35 additions & 9 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <stdarg.h>
#include <syslog.h>
#include <pthread.h>
#include <stdlib.h>

#include "rdkafka_int.h"
#include "rdkafka_msg.h"
Expand Down Expand Up @@ -204,7 +205,7 @@ void rd_kafka_op_destroy (rd_kafka_op_t *rko) {
* Destroy a queue. The queue must be empty.
*/
void rd_kafka_q_destroy (rd_kafka_q_t *rkq) {
assert(TAILQ_EMPTY(&rkq->rkq_q));
rd_kafka_assert(NULL, TAILQ_EMPTY(&rkq->rkq_q));
pthread_mutex_destroy(&rkq->rkq_lock);
pthread_cond_destroy(&rkq->rkq_cond);
}
Expand Down Expand Up @@ -1340,7 +1341,7 @@ static void rd_kafka_poll_cb (rd_kafka_op_t *rko, void *opaque) {
default:
rd_kafka_dbg(rk, ALL, "POLLCB",
"cant handle op %i here", rko->rko_type);
assert(!*"cant handle op type");
rd_kafka_assert(rk, !*"cant handle op type");
break;
}
}
Expand Down Expand Up @@ -1372,20 +1373,22 @@ static void rd_kafka_toppar_dump (FILE *fp, const char *indent,
indent, rktp->rktp_c.tx_msgs, rktp->rktp_c.tx_bytes);
}

void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
static void rd_kafka_dump0 (FILE *fp, rd_kafka_t *rk, int locks) {
rd_kafka_broker_t *rkb;
rd_kafka_topic_t *rkt;
rd_kafka_toppar_t *rktp;

rd_kafka_lock(rk);
if (locks)
rd_kafka_lock(rk);
fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);

fprintf(fp, " refcnt %i\n", rk->rk_refcnt);
fprintf(fp, " rk_rep reply queue: %i ops\n", rk->rk_rep.rkq_qlen);

fprintf(fp, " brokers:\n");
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rd_kafka_broker_lock(rkb);
if (locks)
rd_kafka_broker_lock(rkb);
fprintf(fp, " rd_kafka_broker_t %p: %s NodeId %"PRId32
" in state %s\n",
rkb, rkb->rkb_name, rkb->rkb_nodeid,
Expand All @@ -1406,11 +1409,14 @@ void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
rkb->rkb_c.tx_retries);

fprintf(fp, " %i toppars:\n", rkb->rkb_toppar_cnt);
rd_kafka_broker_toppars_rdlock(rkb);
if (locks)
rd_kafka_broker_toppars_rdlock(rkb);
TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink)
rd_kafka_toppar_dump(fp, " ", rktp);
rd_kafka_broker_toppars_unlock(rkb);
rd_kafka_broker_unlock(rkb);
if (locks) {
rd_kafka_broker_toppars_unlock(rkb);
rd_kafka_broker_unlock(rkb);
}
}

fprintf(fp, " topics:\n");
Expand All @@ -1427,7 +1433,12 @@ void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
fprintf(fp, "\n");
}
}
rd_kafka_unlock(rk);
if (locks)
rd_kafka_unlock(rk);
}

void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
return rd_kafka_dump0(fp, rk, 1/*locks*/);
}


Expand Down Expand Up @@ -1463,3 +1474,18 @@ const char *rd_kafka_version_str (void) {

return ret;
}


/**
* Assert trampoline to print some debugging information on crash.
*/
void
__attribute__((noreturn))
rd_kafka_crash (const char *file, int line, const char *function,
rd_kafka_t *rk, const char *reason) {
fprintf(stderr, "*** %s:%i:%s: %s ***\n",
file, line, function, reason);
if (rk)
rd_kafka_dump0(stderr, rk, 0/*no locks*/);
abort();
}
63 changes: 33 additions & 30 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void rd_kafka_buf_destroy (rd_kafka_buf_t *rkbuf) {
}

static void rd_kafka_buf_auxbuf_add (rd_kafka_buf_t *rkbuf, void *auxbuf) {
assert(rkbuf->rkbuf_buf2 == NULL);
rd_kafka_assert(NULL, rkbuf->rkbuf_buf2 == NULL);
rkbuf->rkbuf_buf2 = auxbuf;
}

Expand All @@ -155,8 +155,8 @@ static void rd_kafka_buf_rewind (rd_kafka_buf_t *rkbuf, int iovindex) {


static struct iovec *rd_kafka_buf_iov_next (rd_kafka_buf_t *rkbuf) {

assert(rkbuf->rkbuf_msg.msg_iovlen + 1 <= rkbuf->rkbuf_iovcnt);
rd_kafka_assert(NULL,
rkbuf->rkbuf_msg.msg_iovlen + 1 <= rkbuf->rkbuf_iovcnt);
return &rkbuf->rkbuf_iov[rkbuf->rkbuf_msg.msg_iovlen++];
}

Expand Down Expand Up @@ -187,7 +187,7 @@ static rd_kafka_buf_t *rd_kafka_buf_new (int iovcnt, size_t size) {

rkbuf->rkbuf_iov = (struct iovec *)(rkbuf+1);
rkbuf->rkbuf_iovcnt = (iovcnt+iovcnt_fixed);
assert(rkbuf->rkbuf_iovcnt <= IOV_MAX);
rd_kafka_assert(NULL, rkbuf->rkbuf_iovcnt <= IOV_MAX);
rkbuf->rkbuf_msg.msg_iov = rkbuf->rkbuf_iov;

/* save the first two iovecs for the header + clientid */
Expand Down Expand Up @@ -229,7 +229,7 @@ static void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {

static void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
TAILQ_REMOVE(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
assert(rkbufq->rkbq_cnt > 0);
rd_kafka_assert(NULL, rkbufq->rkbq_cnt > 0);
(void)rd_atomic_sub(&rkbufq->rkbq_cnt, 1);
}

Expand Down Expand Up @@ -257,7 +257,7 @@ static void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err) {
rd_kafka_buf_t *rkbuf, *tmp;

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers",
rkbufq->rkbq_cnt);
Expand All @@ -275,7 +275,7 @@ static void rd_kafka_broker_waitresp_timeout_scan (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf, *tmp;
int cnt = 0;

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

TAILQ_FOREACH_SAFE(rkbuf,
&rkb->rkb_waitresps.rkbq_bufs, rkbuf_link, tmp) {
Expand Down Expand Up @@ -310,7 +310,7 @@ static void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp;
rd_kafka_bufq_t tmpq;

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);
rd_kafka_broker_lock(rkb);

rd_kafka_dbg(rkb->rkb_rk, BROKER, "BROKERFAIL",
Expand Down Expand Up @@ -411,8 +411,8 @@ static ssize_t rd_kafka_broker_send (rd_kafka_broker_t *rkb,
const struct msghdr *msg) {
ssize_t r;

assert(rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP);
assert(rkb->rkb_s != -1);
rd_kafka_assert(rkb->rkb_rk, rkb->rkb_state>=RD_KAFKA_BROKER_STATE_UP);
rd_kafka_assert(rkb->rkb_rk, rkb->rkb_s != -1);

r = sendmsg(rkb->rkb_s, msg, MSG_DONTWAIT
#ifdef MSG_NOSIGNAL
Expand Down Expand Up @@ -483,7 +483,7 @@ static int rd_kafka_broker_resolve (rd_kafka_broker_t *rkb) {

static void rd_kafka_broker_buf_enq0 (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf, int at_head) {
assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

if (unlikely(at_head)) {
/* Insert message at head of queue */
Expand Down Expand Up @@ -521,7 +521,7 @@ static void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
void *opaque),
void *opaque) {

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

rkbuf->rkbuf_corrid = ++rkb->rkb_corrid;

Expand Down Expand Up @@ -1058,7 +1058,7 @@ static rd_kafka_buf_t *rd_kafka_waitresp_find (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf;
rd_ts_t now = rd_clock();

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

TAILQ_FOREACH(rkbuf, &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link)
if (rkbuf->rkbuf_corrid == corrid) {
Expand All @@ -1081,7 +1081,7 @@ static int rd_kafka_req_response (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf) {
rd_kafka_buf_t *req;

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);


/* Find corresponding request message by correlation id */
Expand Down Expand Up @@ -1132,7 +1132,7 @@ static void rd_kafka_msghdr_rebuild (struct msghdr *dst, size_t dst_len,
vof = 0;

if (vof < src->msg_iov[i].iov_len) {
assert(dst->msg_iovlen < dst_len);
rd_kafka_assert(NULL, dst->msg_iovlen < dst_len);
dst->msg_iov[dst->msg_iovlen].iov_base =
(char *)src->msg_iov[i].iov_base + vof;
dst->msg_iov[dst->msg_iovlen].iov_len =
Expand Down Expand Up @@ -1217,7 +1217,7 @@ static int rd_kafka_recv (rd_kafka_broker_t *rkb) {
rkbuf->rkbuf_of);
}

assert(rd_kafka_msghdr_size(&msg) > 0);
rd_kafka_assert(rkb->rkb_rk, rd_kafka_msghdr_size(&msg) > 0);

if ((r = recvmsg(rkb->rkb_s, &msg, MSG_DONTWAIT)) == -1) {
if (errno == EAGAIN)
Expand Down Expand Up @@ -1272,7 +1272,8 @@ static int rd_kafka_recv (rd_kafka_broker_t *rkb) {
* data to be in contigious memory. */

rkbuf->rkbuf_buf2 = malloc(rkbuf->rkbuf_len);
assert(rkbuf->rkbuf_msg.msg_iovlen == 1);
rd_kafka_assert(rkb->rkb_rk,
rkbuf->rkbuf_msg.msg_iovlen == 1);
rkbuf->rkbuf_iov[1].iov_base = rkbuf->rkbuf_buf2;
rkbuf->rkbuf_iov[1].iov_len = rkbuf->rkbuf_len;
rkbuf->rkbuf_msg.msg_iovlen = 2;
Expand Down Expand Up @@ -1338,7 +1339,7 @@ static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {

sinx = rd_sockaddr_list_next(rkb->rkb_rsal);

assert(rkb->rkb_s == -1);
rd_kafka_assert(rkb->rkb_rk, rkb->rkb_s == -1);

if ((rkb->rkb_s =
rkb->rkb_rk->rk_conf.socket_cb(
Expand Down Expand Up @@ -1436,7 +1437,7 @@ static int rd_kafka_send (rd_kafka_broker_t *rkb) {
rd_kafka_buf_t *rkbuf;
unsigned int cnt = 0;

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

while (rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP &&
(rkbuf = TAILQ_FIRST(&rkb->rkb_outbufs.rkbq_bufs))) {
Expand Down Expand Up @@ -1504,7 +1505,7 @@ static int rd_kafka_send (rd_kafka_broker_t *rkb) {
static void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf) {

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

rkb->rkb_c.tx_retries++;

Expand Down Expand Up @@ -1744,10 +1745,11 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
*/

if (rktp->rktp_xmit_msgq.rkmq_msg_cnt > 0)
assert(TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs));
rd_kafka_assert(rkb->rkb_rk,
TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs));
msgcnt = RD_MIN(rktp->rktp_xmit_msgq.rkmq_msg_cnt,
rkb->rkb_rk->rk_conf.batch_num_messages);
assert(msgcnt > 0);
rd_kafka_assert(rkb->rkb_rk, msgcnt > 0);
iovcnt = 3 + (4 * msgcnt);

if (iovcnt > RD_KAFKA_PAYLOAD_IOV_MAX) {
Expand Down Expand Up @@ -1979,7 +1981,8 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
goto do_send;
}

assert(strm.avail_in == 0);
rd_kafka_assert(rkb->rkb_rk,
strm.avail_in == 0);
}

/* Finish the compression */
Expand Down Expand Up @@ -2123,7 +2126,7 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
static void rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
rd_kafka_op_t *rko) {

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

switch (rko->rko_type)
{
Expand All @@ -2139,7 +2142,7 @@ static void rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
break;

default:
assert(!*"unhandled op type");
rd_kafka_assert(rkb->rkb_rk, !*"unhandled op type");
}

rd_kafka_op_destroy(rko);
Expand Down Expand Up @@ -2200,7 +2203,7 @@ static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) {
rd_ts_t last_timeout_scan = rd_clock();
rd_kafka_msgq_t timedout = RD_KAFKA_MSGQ_INITIALIZER(timedout);

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

rd_kafka_broker_lock(rkb);

Expand Down Expand Up @@ -2829,7 +2832,7 @@ static void rd_kafka_broker_fetch_reply (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *reply,
rd_kafka_buf_t *request,
void *opaque) {
assert(rkb->rkb_fetching > 0);
rd_kafka_assert(rkb->rkb_rk, rkb->rkb_fetching > 0);
rkb->rkb_fetching = 0;

/* Parse and handle the messages (unless the request errored) */
Expand Down Expand Up @@ -3221,7 +3224,7 @@ static int rd_kafka_broker_fetch_toppars (rd_kafka_broker_t *rkb) {
*/
static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb) {

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_assert(rkb->rkb_rk, pthread_self() == rkb->rkb_thread);

rd_kafka_broker_lock(rkb);

Expand Down Expand Up @@ -3326,8 +3329,8 @@ void rd_kafka_broker_destroy (rd_kafka_broker_t *rkb) {
if (rd_atomic_sub(&rkb->rkb_refcnt, 1) > 0)
return;

assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
assert(TAILQ_EMPTY(&rkb->rkb_toppars));
rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_toppars));

if (rkb->rkb_recv_buf)
rd_kafka_buf_destroy(rkb->rkb_recv_buf);
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_defaultconf.c
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ rd_kafka_anyconf_set_prop0 (int scope, void *conf,
return RD_KAFKA_CONF_OK;
}
default:
assert(!*"unknown conf type");
rd_kafka_assert(NULL, !*"unknown conf type");
}

/* unreachable */
Expand Down Expand Up @@ -531,7 +531,7 @@ rd_kafka_anyconf_set_prop (int scope, void *conf,
}

default:
assert(!*"unknown conf type");
rd_kafka_assert(NULL, !*"unknown conf type");
}

/* not reachable */
Expand Down
12 changes: 12 additions & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,3 +784,15 @@ extern int rd_kafka_thread_cnt_curr;
int pthread_cond_timedwait_ms (pthread_cond_t *cond,
pthread_mutex_t *mutex,
int timeout_ms);


#define rd_kafka_assert(rk, cond) do { \
if (unlikely(!(cond))) \
rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
(rk), "assert: " # cond); \
} while (0)

void
__attribute__((noreturn))
rd_kafka_crash (const char *file, int line, const char *function,
rd_kafka_t *rk, const char *reason);
2 changes: 1 addition & 1 deletion src/rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm) {

assert(rk->rk_producer.msg_cnt > 0);
rd_kafka_assert(rk, rk->rk_producer.msg_cnt > 0);
(void)rd_atomic_sub(&rk->rk_producer.msg_cnt, 1);

if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE)
Expand Down
Loading

0 comments on commit 11ae61b

Please sign in to comment.