Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-848] HB Error Code, Partial ack flow, OffsetCommit Request, Response and various fixes #4634

Merged
merged 8 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 38 additions & 38 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1972,44 +1972,44 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
release of librdkafka.


| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------| ----------- | ----------------------- |
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |
| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------|-----------|----------------|
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 9 | 7 |
emasab marked this conversation as resolved.
Show resolved Hide resolved
| 9 | OffsetFetch | 9 | 9 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |


# Recommendations for language binding developers
Expand Down
93 changes: 93 additions & 0 deletions examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,60 @@ static int is_printable(const char *buf, size_t size) {
return 1;
}

static void
emasab marked this conversation as resolved.
Show resolved Hide resolved
print_partition_list(FILE *fp,
const rd_kafka_topic_partition_list_t *partitions) {
int i;
for (i = 0; i < partitions->cnt; i++) {
fprintf(fp, "%s %s [%" PRId32 "] offset %" PRId64,
i > 0 ? "," : "", partitions->elems[i].topic,
partitions->elems[i].partition,
partitions->elems[i].offset);
}
fprintf(fp, "\n");
}

static void rebalance_cb(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque) {
rd_kafka_error_t *error = NULL;
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;

fprintf(stderr, "%% Consumer group rebalanced: ");

switch (err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
fprintf(stderr, "assigned (%s):\n",
rd_kafka_rebalance_protocol(rk));
print_partition_list(stderr, partitions);

error = rd_kafka_incremental_assign(rk, partitions);
break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
fprintf(stderr, "revoked (%s):\n",
rd_kafka_rebalance_protocol(rk));
print_partition_list(stderr, partitions);

error = rd_kafka_incremental_unassign(rk, partitions);
break;

default:
fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err));
rd_kafka_assign(rk, NULL);
break;
}

if (error) {
fprintf(stderr, "incremental assign failure: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
} else if (ret_err) {
fprintf(stderr, "assign failure: %s\n",
rd_kafka_err2str(ret_err));
}
}

int main(int argc, char **argv) {
rd_kafka_t *rk; /* Consumer instance handle */
Expand Down Expand Up @@ -127,6 +181,45 @@ int main(int argc, char **argv) {
return 1;
}

if (rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

if (rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

if (rd_kafka_conf_set(conf, "session.timeout.ms", "10000", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

if (rd_kafka_conf_set(conf, "max.poll.interval.ms", "20000", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

/* Callback called on partition assignment changes */
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);


// if (rd_kafka_conf_set(conf, "debug", "all", errstr,
// sizeof(errstr)) != RD_KAFKA_CONF_OK) {
// fprintf(stderr, "%s\n", errstr);
// rd_kafka_conf_destroy(conf);
// return 1;
// }

/* If there is no previously committed offset for a partition
* the auto.offset.reset strategy will be used to decide where
* in the partition to start fetching messages.
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,12 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"),
_ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH,
"Broker: The member epoch is fenced by the group coordinator"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID,
"Broker: The instance ID is still used by another member in the "
"consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR,
"Broker: The assignor or its version range is not supported by "
"the consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH,
"Broker: The member epoch is stale"),
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
Expand Down
15 changes: 13 additions & 2 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,20 @@ typedef enum {
RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97,
/** Unknown Topic Id */
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID = 100,
/** The member epoch is fenced by the group coordinator */
/** The member epoch is fenced by the group coordinator. The member must
* abandon all its partitions and rejoin. */
emasab marked this conversation as resolved.
Show resolved Hide resolved
RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110,
/** The member epoch is stale */
/** The instance ID is still used by another member in the
* consumer group. That member must leave first.
*/
emasab marked this conversation as resolved.
Show resolved Hide resolved
RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID = 111,
/** The assignor or its version range is not supported by
* the consumer group.
*/
emasab marked this conversation as resolved.
Show resolved Hide resolved
RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112,
/** The member epoch is stale.
* The member must retry after receiving its updated member epoch
* via the ConsumerGroupHeartbeat API. */
emasab marked this conversation as resolved.
Show resolved Hide resolved
RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113,
RD_KAFKA_RESP_ERR_END_ALL,
} rd_kafka_resp_err_t;
Expand Down
Loading