Skip to content

Commit

Permalink
[KIP-848] Assign, revoke, leave group flows
Browse files Browse the repository at this point in the history
- Rebased with master
- WIP: assignment, revocation, leave group flow
- Remove print statements
- Remove print statement left
- Separate rd_kafka_cgrp_consumer_assignment_done
- Allow changing subscription to empty
- Expedite next heartbeat
- Static group membership
  and max poll interval checks
- Expedite next heartbeat
- Fix existing protocol
- Partial implementation
  of reconciliation and next assignment handling
- Uniform tests handling across scripts and KRaft mode
- Run tests with group.protocol=consumer
  and reusable condition to skip
  mock cluster
- Test 0113 partial
- Test 0018
- Test 0113 stickyness
- Test 0113 complete
  except regex subscription and
  u_multiple_subscription_changes(true)
- Skip some tests, fix subscription change
- Test 0029 exclusion clarified
- Debug statements
- Introduce current assignment
  rename rkcg_current_target_assignments to rkcg_target_assignment
  rename rkcg_next_target_assignments to rkcg_next_target_assignment
- change to ConsumerGroupHeartbeat in logs
- Add remote assignor to debug log
- Fix rd_kafka_buf_write_topic_partitions not
  using topic ids for comparison
  • Loading branch information
emasab committed Mar 4, 2024
1 parent f7a82ff commit 8b0d538
Show file tree
Hide file tree
Showing 43 changed files with 1,617 additions and 821 deletions.
4 changes: 2 additions & 2 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Configuration properties
## Global configuration properties

Property | C/P | Range | Default | Importance | Description
Property | C/P | Range | Default | Importance | Description
-----------------------------------------|-----|-----------------|--------------:|------------| --------------------------
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd, sasl_oauthbearer, http, oidc | low | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags*
client.id | * | | rdkafka | low | Client identifier. <br>*Type: string*
Expand Down Expand Up @@ -158,7 +158,7 @@ client.dns.lookup | * | use_all_dns_ips, resolve_canoni

## Topic configuration properties

Property | C/P | Range | Default | Importance | Description
Property | C/P | Range | Default | Importance | Description
-----------------------------------------|-----|-----------------|--------------:|------------| --------------------------
request.required.acks | P | -1 .. 1000 | -1 | high | This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *-1* or *all*=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than `min.insync.replicas` (broker configuration) in the ISR set the produce request will fail. <br>*Type: integer*
acks | P | -1 .. 1000 | -1 | high | Alias for `request.required.acks`: This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *-1* or *all*=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than `min.insync.replicas` (broker configuration) in the ISR set the produce request will fail. <br>*Type: integer*
Expand Down
14 changes: 0 additions & 14 deletions examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,6 @@ int main(int argc, char **argv) {
return 1;
}

if (rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

// if (rd_kafka_conf_set(conf, "debug", "all", errstr,
// sizeof(errstr)) != RD_KAFKA_CONF_OK) {
// fprintf(stderr, "%s\n", errstr);
// rd_kafka_conf_destroy(conf);
// return 1;
// }

/* If there is no previously committed offset for a partition
* the auto.offset.reset strategy will be used to decide where
* in the partition to start fetching messages.
Expand Down
22 changes: 22 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,7 @@ static void rd_kafka_stats_emit_broker_reqs(struct _stats_emit *st,
[RD_KAFKAP_BrokerHeartbeat] = rd_true,
[RD_KAFKAP_UnregisterBroker] = rd_true,
[RD_KAFKAP_AllocateProducerIds] = rd_true,
[RD_KAFKAP_ConsumerGroupHeartbeat] = rd_true,
},
[3 /*hide-unless-non-zero*/] = {
/* Hide Admin requests unless they've been used */
Expand Down Expand Up @@ -2172,6 +2173,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
int ret_errno = 0;
const char *conf_err;
rd_kafka_assignor_t *cooperative_assignor;
#ifndef _WIN32
sigset_t newset, oldset;
#endif
Expand Down Expand Up @@ -2363,6 +2365,26 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
goto fail;
}

/* Detect if chosen assignor is cooperative */
cooperative_assignor = rd_kafka_assignor_find(rk, "cooperative-sticky");
rk->rk_conf.partition_assignors_cooperative =
!rk->rk_conf.partition_assignors.rl_cnt ||
(cooperative_assignor && cooperative_assignor->rkas_enabled);

if (!rk->rk_conf.group_remote_assignor) {
/* Default remote assignor to the chosen local one. */
if (rk->rk_conf.partition_assignors_cooperative) {
rk->rk_conf.group_remote_assignor =
rd_strdup("uniform");
} else {
rd_kafka_assignor_t *range_assignor =
rd_kafka_assignor_find(rk, "range");
if (range_assignor && range_assignor->rkas_enabled)
rk->rk_conf.group_remote_assignor =
rd_strdup("range");
}
}

/* Create Mock cluster */
rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0);
if (rk->rk_conf.mock.broker_cnt > 0) {
Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3939,7 +3939,7 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
offsets = rd_kafka_buf_read_topic_partitions(
reply, rd_false /* don't use topic_id */, rd_true, 0, fields);
reply, rd_false /*don't use topic_id*/, rd_true, 0, fields);
if (!offsets)
rd_kafka_buf_parse_fail(reply,
"Failed to parse topic partitions");
Expand Down Expand Up @@ -4926,7 +4926,7 @@ rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t *rko_req,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
partitions = rd_kafka_buf_read_topic_partitions(
reply, rd_false /* don't use topic_id */, rd_true, 16, fields);
reply, rd_false /*don't use topic_id*/, rd_true, 16, fields);
if (!partitions) {
rd_snprintf(errstr, errstr_size,
"Failed to parse OffsetDeleteResponse partitions");
Expand Down Expand Up @@ -8114,7 +8114,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
{RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
partitions = rd_kafka_buf_read_topic_partitions(
rkbuf, rd_false /* don't use topic_id */,
rkbuf, rd_false /*don't use topic_id*/,
rd_true, 0, fields);
rd_kafka_buf_destroy(rkbuf);
if (!partitions)
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
rd_kafka_buf_write_topic_partitions(
rkbuf, owned_partitions,
rd_false /*don't skip invalid offsets*/,
rd_false /*any offset*/, rd_false /* use_topic name */,
rd_true, fields);
rd_false /*any offset*/, rd_false /*don't use topic id*/,
rd_true /*use topic name*/, fields);
}

/* Following data is ignored by consumer version < 2 */
Expand Down
Loading

0 comments on commit 8b0d538

Please sign in to comment.