Skip to content

Commit

Permalink
Fixed recursive locking on broker fail+buf retry (issue #27)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Dec 3, 2013
1 parent 3131b75 commit 2c2370c
Showing 1 changed file with 28 additions and 11 deletions.
39 changes: 28 additions & 11 deletions rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,26 +221,32 @@ static void rd_kafka_bufq_init (rd_kafka_bufq_t *rkbufq) {
TAILQ_INIT(&rkbufq->rkbq_bufs);
rkbufq->rkbq_cnt = 0;
}

/**
* Concat all buffers from 'src' to tail of 'dst'
*/
static void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link);
rd_atomic_add(&dst->rkbq_cnt, src->rkbq_cnt);
rd_kafka_bufq_init(src);
}

/**
* Purge the wait-response queue.
* NOTE: 'rkbufq' must be a temporary queue and not one of rkb_waitresps
* or rkb_outbufs since buffers may be re-enqueued on those queues.
*/
static void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
rd_kafka_bufq_t *rkbufq,
rd_kafka_resp_err_t err) {
rd_kafka_buf_t *rkbuf, *tmp;
rd_kafka_bufq_t tmpq;

assert(pthread_self() == rkb->rkb_thread);

/* Move messages to temporary queue to avoid infinite loops if they
* are requeued on the same queue in the rkbuf_cb callback. */
rd_kafka_bufq_init(&tmpq);
TAILQ_MOVE(&tmpq.rkbq_bufs, &rkbufq->rkbq_bufs, rkbuf_link);
rd_kafka_bufq_init(rkbufq);
rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers",
rkbufq->rkbq_cnt);

rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq");

TAILQ_FOREACH_SAFE(rkbuf, &tmpq.rkbq_bufs, rkbuf_link, tmp)
TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp)
rkbuf->rkbuf_cb(rkb, err, NULL, rkbuf, rkbuf->rkbuf_opaque);
}

Expand Down Expand Up @@ -286,6 +292,7 @@ static void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
va_list ap;
int errno_save = errno;
rd_kafka_toppar_t *rktp;
rd_kafka_bufq_t tmpq;

assert(pthread_self() == rkb->rkb_thread);
rd_kafka_broker_lock(rkb);
Expand Down Expand Up @@ -338,11 +345,21 @@ static void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
strlen(rkb->rkb_err.msg));
}

rd_kafka_bufq_purge(rkb, &rkb->rkb_waitresps, err);
rd_kafka_bufq_purge(rkb, &rkb->rkb_outbufs, err);
/*
* Purge all buffers
* (put on a temporary queue since bufs may be requeued)
*/
rd_kafka_bufq_init(&tmpq);
rd_kafka_bufq_concat(&tmpq, &rkb->rkb_waitresps);
rd_kafka_bufq_concat(&tmpq, &rkb->rkb_outbufs);

/* Unlock broker since a requeue will try to lock it. */
rd_kafka_broker_unlock(rkb);

/* Purge the buffers */
rd_kafka_bufq_purge(rkb, &tmpq, err);


/* Undelegate all toppars from this broker. */
rd_kafka_broker_toppars_wrlock(rkb);
while ((rktp = TAILQ_FIRST(&rkb->rkb_toppars))) {
Expand Down

0 comments on commit 2c2370c

Please sign in to comment.