Skip to content

Commit

Permalink
[KIP-848] Added Heartbeat Errorcodes, OffsetCommit Request,
Browse files Browse the repository at this point in the history
OffsetCommit Response and fixed multiple issues including segfaults

- Added error handling to ConsumerGroupHeartbeat API
- Added type new errors - UNRELEASED_INSTANCE_ID and UNSUPPORTED_ASSIGNOR
- Added partial acknowledgement flow
- Upgraded OffsetCommit Request and response to v9
- Fixed metadata being called with duplicate topic id
- Fixed next_target_assignment not getting reset to NULL
- Fixed member stuck if fenced during rebalancing
- Fixed segfault with current and target assignment while resetting consumer group
- Fixed segfault due to deleted topic in metadata
- Fixed leave not being called if the consumer without any assignment leaves
  • Loading branch information
pranavrth committed Mar 6, 2024
1 parent 8b0d538 commit 4565547
Show file tree
Hide file tree
Showing 16 changed files with 519 additions and 165 deletions.
76 changes: 38 additions & 38 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1972,44 +1972,44 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
release of librdkafka.


| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------| ----------- | ----------------------- |
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |
| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------|-----------|----------------|
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 9 | 7 |
| 9 | OffsetFetch | 9 | 9 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |


# Recommendations for language binding developers
Expand Down
93 changes: 93 additions & 0 deletions examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,60 @@ static int is_printable(const char *buf, size_t size) {
return 1;
}

static void
print_partition_list(FILE *fp,
const rd_kafka_topic_partition_list_t *partitions) {
int i;
for (i = 0; i < partitions->cnt; i++) {
fprintf(fp, "%s %s [%" PRId32 "] offset %" PRId64,
i > 0 ? "," : "", partitions->elems[i].topic,
partitions->elems[i].partition,
partitions->elems[i].offset);
}
fprintf(fp, "\n");
}

static void rebalance_cb(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque) {
rd_kafka_error_t *error = NULL;
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;

fprintf(stderr, "%% Consumer group rebalanced: ");

switch (err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
fprintf(stderr, "assigned (%s):\n",
rd_kafka_rebalance_protocol(rk));
print_partition_list(stderr, partitions);

error = rd_kafka_incremental_assign(rk, partitions);
break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
fprintf(stderr, "revoked (%s):\n",
rd_kafka_rebalance_protocol(rk));
print_partition_list(stderr, partitions);

error = rd_kafka_incremental_unassign(rk, partitions);
break;

default:
fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err));
rd_kafka_assign(rk, NULL);
break;
}

if (error) {
fprintf(stderr, "incremental assign failure: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
} else if (ret_err) {
fprintf(stderr, "assign failure: %s\n",
rd_kafka_err2str(ret_err));
}
}

int main(int argc, char **argv) {
rd_kafka_t *rk; /* Consumer instance handle */
Expand Down Expand Up @@ -127,6 +181,45 @@ int main(int argc, char **argv) {
return 1;
}

if (rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

if (rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

if (rd_kafka_conf_set(conf, "session.timeout.ms", "10000", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

if (rd_kafka_conf_set(conf, "max.poll.interval.ms", "20000", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

/* Callback called on partition assignment changes */
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);


// if (rd_kafka_conf_set(conf, "debug", "all", errstr,
// sizeof(errstr)) != RD_KAFKA_CONF_OK) {
// fprintf(stderr, "%s\n", errstr);
// rd_kafka_conf_destroy(conf);
// return 1;
// }

/* If there is no previously committed offset for a partition
* the auto.offset.reset strategy will be used to decide where
* in the partition to start fetching messages.
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,12 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"),
_ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH,
"Broker: The member epoch is fenced by the group coordinator"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID,
"Broker: The instance ID is still used by another member in the "
"consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR,
"Broker: The assignor or its version range is not supported by "
"the consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH,
"Broker: The member epoch is stale"),
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
Expand Down
15 changes: 13 additions & 2 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,20 @@ typedef enum {
RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97,
/** Unknown Topic Id */
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID = 100,
/** The member epoch is fenced by the group coordinator */
/** The member epoch is fenced by the group coordinator. The member must
* abandon all its partitions and rejoin. */
RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110,
/** The member epoch is stale */
/** The instance ID is still used by another member in the
* consumer group. That member must leave first.
*/
RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID = 111,
/** The assignor or its version range is not supported by
* the consumer group.
*/
RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112,
/** The member epoch is stale.
* The member must retry after receiving its updated member epoch
* via the ConsumerGroupHeartbeat API. */
RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113,
RD_KAFKA_RESP_ERR_END_ALL,
} rd_kafka_resp_err_t;
Expand Down
Loading

0 comments on commit 4565547

Please sign in to comment.