Skip to content

Commit

Permalink
Re-delegate all affected topic+partitions when a broker fails (issue #14
Browse files Browse the repository at this point in the history
)
  • Loading branch information
edenhill committed Aug 20, 2013
1 parent 4c745a4 commit edf5336
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
40 changes: 33 additions & 7 deletions rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ static void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
const char *fmt, ...) {
va_list ap;
int errno_save = errno;
rd_kafka_toppar_t *rktp;

pthread_mutex_lock(&rkb->rkb_lock);

Expand All @@ -246,7 +247,13 @@ static void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
rkb->rkb_s = -1;
rkb->rkb_pfd.fd = rkb->rkb_s;
}


/* The caller may omit the format if it thinks this is a recurring
* failure, in which case the following things are omitted:
* - log message
* - application OP_ERR
* - metadata request
*/
if (fmt) {
int of;

Expand All @@ -272,6 +279,28 @@ static void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
rd_kafka_broker_waitresp_purge(rkb, err);

pthread_mutex_unlock(&rkb->rkb_lock);

/* Undelegate all toppars from this broker. */
rd_kafka_broker_toppars_wrlock(rkb);
while ((rktp = TAILQ_FIRST(&rkb->rkb_toppars))) {
rd_kafka_broker_toppars_unlock(rkb);
rd_rkb_dbg(rkb, "BRKTP",
"Undelegating %.*s [%"PRId32"]",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition);

/* Undelegate */
rd_kafka_topic_wrlock(rktp->rktp_rkt);
rd_kafka_toppar_broker_delegate(rktp, NULL);
rd_kafka_topic_unlock(rktp->rktp_rkt);

rd_kafka_broker_toppars_wrlock(rkb);
}
rd_kafka_broker_toppars_unlock(rkb);

/* Query for the topic leaders (async) */
if (fmt)
rd_kafka_topic_leader_query(rkb->rkb_rk, NULL);
}

static ssize_t rd_kafka_broker_send (rd_kafka_broker_t *rkb,
Expand Down Expand Up @@ -1054,9 +1083,7 @@ static int rd_kafka_recv (rd_kafka_broker_t *rkb) {
return 1;

err:
/* FIXME */
rd_kafka_dbg(rkb->rkb_rk, "RECV", "Receive failed");
rd_kafka_broker_fail(rkb, err_code, "%s", errstr);
rd_kafka_broker_fail(rkb, err_code, "Receive failed: %s", errstr);
return -1;
}

Expand Down Expand Up @@ -1096,7 +1123,7 @@ static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
RD_SOCKADDR2STR_F_FAMILY),
strerror(errno));
/* Avoid duplicate log messages */
if (rkb->rkb_err.err == errno && 0/*FIXME*/)
if (rkb->rkb_err.err == errno)
rd_kafka_broker_fail(rkb,
RD_KAFKA_RESP_ERR__FAIL, NULL);
else
Expand Down Expand Up @@ -1132,8 +1159,7 @@ static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
rkb->rkb_pfd.events = EPOLLIN;

/* Request metadata (async) */
rd_kafka_broker_metadata_req(rkb, 1 /*all topics*/, NULL);

rd_kafka_broker_metadata_req(rkb, 1 /* all topics */, NULL);
return 0;
}

Expand Down
9 changes: 6 additions & 3 deletions rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ const char *rd_kafka_topic_name (const rd_kafka_topic_t *rkt) {
*
* Locks: Caller must have topic_lock held.
*/
static void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *rkb) {
void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *rkb) {

if (rktp->rktp_leader == rkb)
return;
Expand All @@ -259,7 +259,7 @@ static void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,

if (rkb) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, "BRKDELGT",
"Broker %s is leader for topic %.*s "
"Broker %s is now leader for topic %.*s "
"[%"PRId32"] with %i messages "
"(%"PRIu64" bytes) queued",
rkb->rkb_name,
Expand Down Expand Up @@ -314,6 +314,9 @@ void rd_kafka_topic_update (rd_kafka_t *rk,
/* Topic lost its leader */
rd_kafka_toppar_broker_delegate(rktp, NULL);
rd_kafka_topic_unlock(rkt);

/* Query for the topic leader (async) */
rd_kafka_topic_leader_query(rk, rkt);
return;
}

Expand Down
2 changes: 2 additions & 0 deletions rdkafka_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
#define rd_kafka_topic_keep(rkt) rd_atomic_add(&(rkt->rkt_refcnt), 1)
void rd_kafka_topic_destroy0 (rd_kafka_topic_t *rkt);

void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *rkb);

void rd_kafka_topic_update (rd_kafka_t *rk,
const char *topic, int32_t partition,
Expand Down

0 comments on commit edf5336

Please sign in to comment.