-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KIP-848] Mock handler and Integration tests passing #4662
base: master
Are you sure you want to change the base?
Conversation
6667956
to
ca1eb2d
Compare
1d9962c
to
ca5e6d0
Compare
ca1eb2d
to
953f668
Compare
fd13d77
to
f519c11
Compare
953f668
to
e4614be
Compare
f519c11
to
029b5ab
Compare
e4614be
to
10e7cc1
Compare
029b5ab
to
d7428d0
Compare
10e7cc1
to
5789307
Compare
d7428d0
to
0379129
Compare
434da75
to
e5f3101
Compare
0379129
to
c6c9d2e
Compare
e5f3101
to
109d348
Compare
c6c9d2e
to
cf1ad35
Compare
109d348
to
3dd0e45
Compare
3dd0e45
to
3bba91f
Compare
0396b1f
to
27817d0
Compare
3bba91f
to
f4fb7a1
Compare
91df086
to
82774f9
Compare
1d34b16
to
1f8a2a4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For src files.
src/rdkafka.h
Outdated
/** | ||
* @brief Get member epoch of a group metadata. | ||
* Corresponds to the generation id in consumer protocol classic; | ||
* | ||
* @param group_metadata The group metadata | ||
* | ||
* @returns The member epoch id contained in the passed \p group_metadata. | ||
*/ | ||
RD_EXPORT | ||
int32_t rd_kafka_consumer_group_metadata_member_epoch( | ||
const rd_kafka_consumer_group_metadata_t *group_metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have method for retrieving geenration id? If not, do we need one for member epoch? If we really need this for member epoch, let's create one for generation id separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Java GroupMetadata they haven't added a new one for member epoch, maybe naming can be changed in a future major release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was saying that shall we add a new method rd_kafka_consumer_group_metadata_generation_id
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think I'll leave only rd_kafka_consumer_group_metadata_generation_id
as they didn't add the memberEpoch
method in Java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
* @brief Get the generation id (classic protocol)
* or member epoch (consumer protocol) of a group
Is this fine description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct
src/rdkafka_mock_cgrp.c
Outdated
if (!RD_KAFKAP_STR_LEN(MemberId)) { | ||
/* Generate a member id */ | ||
rd_kafka_Uuid_t member_id = rd_kafka_Uuid_random(); | ||
member->id = | ||
rd_strdup(rd_kafka_Uuid_base64str(&member_id)); | ||
} else | ||
member->id = RD_KAFKAP_STR_DUP(MemberId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not supported by the real broker that we use the user provided MemberId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it needs to be kept because when broker assigns a MemberId and then member is fenced then it needs to rejoin with same MemberId as KIP says It is used in all the communication with the group coordinator and must be kept during the entirely life span of the member (e.g. the consumer).
. So even if broker forgot of the member it must accept the MemberId it sends. We have to see if this is possible with the real implementation too with David.
do_test_consumer_group_heartbeat_fenced_error:431: UNKNOWN_MEMBER_ID, variation 1
is failing without this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be possible with static membership. We can keep it but lets make sure that we are replicating the real broker behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we confirmed in the call that the broker is sometimes accepting the member id from the client, when member is fenced and forgotten but still has a member id given from the broker previously.
static void rd_kafka_mock_cgrp_consumer_session_tmr_cb(rd_kafka_timers_t *rkts, | ||
void *arg) { | ||
rd_kafka_mock_cgrp_consumer_t *mcgrp = arg; | ||
rd_kafka_mock_cgrp_consumer_member_t *member, *tmp; | ||
rd_ts_t now = rd_clock(); | ||
rd_kafka_mock_cluster_t *mcluster = mcgrp->cluster; | ||
|
||
mtx_unlock(&mcluster->lock); | ||
TAILQ_FOREACH_SAFE(member, &mcgrp->members, link, tmp) { | ||
if (member->ts_last_activity + | ||
(mcgrp->session_timeout_ms * 1000) > | ||
now) | ||
continue; | ||
|
||
rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK", | ||
"Member %s session timed out for group %s", | ||
member->id, mcgrp->id); | ||
|
||
rd_kafka_mock_cgrp_consumer_member_fenced(mcgrp, member); | ||
} | ||
mtx_unlock(&mcluster->lock); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do we send session timeout error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still not clear on where are we sending the actual error from the broker to the client. I think we can discuss this in a call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The session times out on the broker, then member is removed, then member receives FENCED_MEMBER_EPOCH error on next HB
src/rdkafka_mock_cgrp.c
Outdated
mtx_unlock(&mcluster->lock); | ||
} | ||
|
||
void rd_kafka_mock_set_default_session_timeout( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why default instead of normal session timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed it to rd_kafka_mock_group_consumer_session_timeout_ms
and rd_kafka_mock_group_consumer_heartbeat_interval_ms
following the convention introduced with rd_kafka_mock_group_initial_rebalance_delay_ms
of rd_kafka_mock + configuration name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add _set_
. rd_kafka_mock_set_group_consumer_session_timeout_ms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok even if different from rd_kafka_mock_group_initial_rebalance_delay_ms
but avoids collisions with the getter that should be without get
src/rdkafka_mock_cgrp.c
Outdated
mtx_unlock(&mcluster->lock); | ||
} | ||
|
||
void rd_kafka_mock_set_default_heartbeat_interval( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
src/rdkafka_mock_handlers.c
Outdated
rd_kafka_buf_read_str(rkbuf, &ServerAssignor); | ||
|
||
/* #TopicPartitions */ | ||
rd_kafka_buf_read_arraycnt(rkbuf, &TopicPartitionsCnt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use rd_kafka_buf_read_topic_partitions
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do that, then you need to scan the topic partition list, and for each topic partition you need to compare topic id to see if it has changed to call rd_kafka_mock_topic_find_by_id
. If it returned like a Map<Topic, List<Partition>>
or we didn't have to do an operation at each topic change I'd be more inclined to do than. What's your view on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its better to use 2 loops here. First to read the data from the rpc and second for our business logic. This ensures readability, reusability and separation of concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some 14 files more remaining but the main one is tests/0147-consumer_group_consumer_mock.c
.
do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0); | ||
/* FIXME: KRaft async CreateTopics is working differently than | ||
* wth Zookeeper | ||
* do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0); */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does AK team know about it?
Let's have another function to determine whether its a zookeeper or kraft cluster and disable only when it kraft.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I opened this issue.
Then talked about it internally. I think it was a bug and not a feature given they didn't acknowledge it was a feature to replicate with KRaft, we could remove the test for both when that is closed.
if (test_consumer_group_protocol_classic() && | ||
!(rebalance_cb1.assign_call_cnt == expected_cb1_assign_call_cnt && | ||
rebalance_cb2.assign_call_cnt == expected_cb2_assign_call_cnt)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have similar calculations for the new protocol as well.
Same for others as well below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With KIP-848 the callback count isn't fixed as in classic protocol so we can only verify the expected final assignment but not the number of callbacks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do that deterministically but might be more complex.
} else if (0) { | ||
/* FIXME: enable this once new errors are handled in OffsetCommit. */ | ||
t_consumer_group_consumer_retry_commit_on_fenced_member_epoch(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will enable this in my other opened PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked it and I think we should remove this test, as now OffsetCommit returns STALE_MEMBER_EPOCH that is not retried automatically and FENCED _MEMBER_EPOCH is returned by the Heartbeat only
if (test_consumer_group_protocol_classic()) { | ||
ApiKey = RD_KAFKAP_Heartbeat; | ||
} else { | ||
ApiKey = RD_KAFKAP_ConsumerGroupHeartbeat; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be opposite condition. if condition should be for consumer
as that is the new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've always used always test_consumer_group_protocol_classic()
, even if implementing the other function (that I can remove) so it's consistent for search, also it isolates the parts that we want to remove in a distant future.
Also if there was a third protocol, we hope not, that should start from the assumptions and tests of the consumer protocol, not with those of the classic protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be using test_consumer_group_protocol_consumer
at all the places instead of test_consumer_group_protocol_classic
. Reason behind this is:
- Let's say we add a new protocol in future. We want all the tests to run. We are making sure of that by using
!test_consumer_group_protocol_consumer()
here instead. - In
!test_consumer_group_protocol_classic()
condition, we will skip those tests with the new protocol added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, in this case we're running the RD_KAFKAP_ConsumerGroupHeartbeat
test that is more appropriate than running the classic one, if we use test_consumer_group_protocol_consumer
... else
in case of a new protocol we run the classic tests, while running the newer ones would be more appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am talking about all the places. Not just here. Basically the idea is to make sure that we run all the test cases when we add new protocols. Let's ensure that when there is a new protocol, all the tests are executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, all tests about the new protocol, the tests about the classic protocol don't have to be executed as they won't be applicable. Probably the tests about the 848 protocol won't be applicable either but it's more probable it'll be similar to 848 in case it happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments
tests/test.c
Outdated
va_start(ap, member_cnt); | ||
for (i = 0; i < member_cnt; i++) { | ||
rd_kafka_consumer_group_metadata_t *cgmetadata = NULL; | ||
rd_kafka_t *c = va_arg(ap, rd_kafka_t *); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better consumer_rk
instead of c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use just consumer
ok?
tests/trivup/trivup-0.12.6.tar.gz
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should release new trivup version. Lets check with Magnus.
1f8a2a4
to
d72576a
Compare
aa99639
to
19ab07f
Compare
🎉 All Contributor License Agreements have been signed. Ready to merge. |
19ab07f
to
3d70ed0
Compare
53ea4ba
to
50c4130
Compare
rd_kafka_topic_partition_list_t * | ||
target_assignment; /**< Target assignment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rd_kafka_topic_partition_list_t * | |
target_assignment; /**< Target assignment, | |
rd_kafka_topic_partition_list_t | |
*target_assignment; /**< Target assignment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be a problem of current clang-format version, when formatting it moves the pointer there
src/rdkafka_mock_cgrp.c
Outdated
rd_kafka_mock_cgrp_consumer_member_t *member, | ||
rd_kafka_topic_partition_list_t *current_assignment, | ||
int *member_epoch) { | ||
rd_kafka_topic_partition_list_t *returned_assignment = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to name it assignments_to_return
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change it to assignment_to_return
as assignment is a set, assignments are a set of sets, for example the assignments for all the group members, while assignment is for a single member.
} else { | ||
rd_kafka_mock_cgrp_consumer_member_leave( | ||
mcgrp, member); | ||
member = NULL; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the static membership case (-2), the member can rejoin. We shouldn't destroy the member. You can add a FIXME for now here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've implemented it and added some tests for static group membership with the mock cluster
tests/0120-asymmetric_subscription.c
Outdated
if (test_consumer_group_protocol_classic()) { | ||
do_test_asymmetric("range", bootstraps); | ||
do_test_asymmetric("roundrobin", bootstraps); | ||
do_test_asymmetric("cooperative-sticky", bootstraps); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not checking anything with this test now for the new protocol. I think we should enable "cooperative-sticky" test for the new protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before it was the range assignor that was tested, but I removed it, because, as said, we don't need to test the mock implementation, unless it's during development for making client implementation debugging more easy. For example I added the unit tests for that at the beginning to make sure we tested with a mock implementation without flaws.
If you prefer I can keep the range
test as before as the simple mock implementation is range based
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not reviewed this file. I wanted to review this separately. Can you add a new PR for this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you review it here? We need to merge it too and it's related to this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it to #4920
- make -j -C tests build | ||
- make -C tests run_local_quick | ||
- DESTDIR="$PWD/dest" make install | ||
- export TEST_KAFKA_VERSION=4.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this being used? 4.0.0 is not released yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in tests to check if a test can run or not, if for example it's >= a specific version, given we're using master I used the next planned release here, but could have used 9.9.9
too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want to expose these functions? Are we exposing for the tests only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, in Java they're public too see ConsumerGroupMetadata
- Mock handler implementation - Rename current consumer protocol from generic to classic - Mock handler with automatic or manual assignment - More consumer group metadata getters - Test helpers - Expedite next HB after FindCoordinator doing it with an exponential backoff to avoid tight loops - Configurable session timeout and HB interval - Fix mock handler ListOffsets response LeaderEpoch instead of CurrentLeaderEpoch - Integration tests passing with AK trunk - Improve documentation and KIP 848 specific mock tests - Add mock tests for unknown topic id in metadata request and partial reconciliation - Make test 0147 more reliable - Fix test 0106 after HB timeout change - Exclude test case with AK trunk - Rename rd_kafka_buf_write_tags to rd_kafka_buf_write_tags_empty - Trivup 0.12.5 can run a KafkaCluster directly with KRaft and AK trunk - Trivup 0.12.6 build with a specific commit
and Py 3.12
an issue with apache/kafka#16464 on AK > 3.8.0
Address comment about using rd_kafka_buf_read_topic_partitions Asserts on non-nullable fields
a865822
to
07f402b
Compare
doing it with an exponential backoff to avoid tight loops
LeaderEpoch instead of CurrentLeaderEpoch