diff --git a/src/rdkafka.c b/src/rdkafka.c index 1766742139..2d6d03e9dd 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -1953,7 +1953,7 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf, rd_interval_init(&rk->rk_suppress.sparse_connect_random); mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain); - rd_atomic64_init(&rk->rk_ts_last_poll, rd_clock()); + rd_atomic64_init(&rk->rk_ts_last_poll, INT64_MAX); rk->rk_rep = rd_kafka_q_new(rk); rk->rk_ops = rd_kafka_q_new(rk); @@ -2609,13 +2609,18 @@ rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt, void *opaque), void *opaque) { struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque }; + rd_kafka_op_res_t res; - rd_kafka_app_polled(rkq->rkq_rk); + if (timeout_ms) + rd_kafka_app_poll_blocking(rkq->rkq_rk); + + res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, + RD_KAFKA_Q_CB_RETURN, + rd_kafka_consume_cb, &ctx); - return rd_kafka_q_serve(rkq, timeout_ms, max_cnt, - RD_KAFKA_Q_CB_RETURN, - rd_kafka_consume_cb, &ctx); + rd_kafka_app_polled(rkq->rkq_rk); + return res; } @@ -2682,7 +2687,8 @@ static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk, rd_kafka_message_t *rkmessage = NULL; rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); - rd_kafka_app_polled(rk); + if (timeout_ms) + rd_kafka_app_poll_blocking(rk); rd_kafka_yield_thread = 0; while ((rko = rd_kafka_q_pop(rkq, @@ -2696,11 +2702,12 @@ static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk, break; if (unlikely(res == RD_KAFKA_OP_RES_YIELD || - rd_kafka_yield_thread)) { + rd_kafka_yield_thread)) { /* Callback called rd_kafka_yield(), we must * stop dispatching the queue and return. */ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, EINTR); + rd_kafka_app_polled(rk); return NULL; } @@ -2712,6 +2719,7 @@ static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk, /* Timeout reached with no op returned. */ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT, ETIMEDOUT); + rd_kafka_app_polled(rk); return NULL; } @@ -2727,6 +2735,8 @@ static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk, rd_kafka_set_last_error(0, 0); + rd_kafka_app_polled(rk); + return rkmessage; } @@ -3474,17 +3484,31 @@ rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, } int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) { + int r; + + if (timeout_ms) + rd_kafka_app_poll_blocking(rk); + + r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, + RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); + rd_kafka_app_polled(rk); - return rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, - RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); + + return r; } rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) { rd_kafka_op_t *rko; - rd_kafka_app_polled(rkqu->rkqu_rk); + + if (timeout_ms) + rd_kafka_app_poll_blocking(rkqu->rkqu_rk); + rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, timeout_ms, 0, RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL); + + rd_kafka_app_polled(rkqu->rkqu_rk); + if (!rko) return NULL; @@ -3492,9 +3516,17 @@ rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) { } int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms) { + int r; + + if (timeout_ms) + rd_kafka_app_poll_blocking(rkqu->rkqu_rk); + + r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0, + RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); + rd_kafka_app_polled(rkqu->rkqu_rk); - return rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0, - RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); + + return r; } diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 7f40db0b32..fdd2bb1a99 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2987,6 +2987,8 @@ rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq, break; case RD_KAFKA_OP_SUBSCRIBE: + rd_kafka_app_polled(rk); + /* New atomic subscription (may be NULL) */ err = rd_kafka_cgrp_subscribe( rkcg, rko->rko_u.subscribe.topics); diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 702f1dd923..87d05bad80 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -657,15 +657,41 @@ rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt); */ static RD_INLINE RD_UNUSED int rd_kafka_max_poll_exceeded (rd_kafka_t *rk) { - int exceeded = - (int)((rd_clock() - - rd_atomic64_get(&rk->rk_ts_last_poll)) / 1000ll) - + rd_ts_t last_poll = rd_atomic64_get(&rk->rk_ts_last_poll); + int exceeded; + + /* Application is blocked in librdkafka function, see + * rd_kafka_app_poll_blocking(). */ + if (last_poll == INT64_MAX) + return 0; + + exceeded = (int)((rd_clock() - last_poll) / 1000ll) - rk->rk_conf.max_poll_interval_ms; + if (unlikely(exceeded > 0)) return exceeded; + return 0; } +/** + * @brief Call on entry to blocking polling function to indicate + * that the application is blocked waiting for librdkafka + * and that max.poll.interval.ms should not be enforced. + * + * Call app_polled() Upon return from the function calling + * this function to register the application's last time of poll. + * + * @remark Only relevant for high-level consumer. + * + * @locality any + * @locks none + */ +static RD_INLINE RD_UNUSED void +rd_kafka_app_poll_blocking (rd_kafka_t *rk) { + rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX); +} + /** * @brief Set the last application poll time to now. * diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index eddc1d2359..61390ecbfa 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -526,8 +526,6 @@ int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms, rd_kafka_q_t *fwdq; struct timespec timeout_tspec; - rd_kafka_app_polled(rk); - mtx_lock(&rkq->rkq_lock); if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { /* Since the q_pop may block we need to release the parent @@ -540,6 +538,9 @@ int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms, } mtx_unlock(&rkq->rkq_lock); + if (timeout_ms) + rd_kafka_app_poll_blocking(rk); + rd_timeout_init_timespec(&timeout_tspec, timeout_ms); rd_kafka_yield_thread = 0; @@ -609,6 +610,7 @@ int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms, rd_kafka_op_destroy(rko); } + rd_kafka_app_polled(rk); return cnt; }