Skip to content

KAFKA-19156: Streamlined share group configs, with usage in ShareSessionCache #19505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,7 @@ class BrokerServer(
))
val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards))

val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize
)
val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())

sharePartitionManager = new SharePartitionManager(
replicaManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,6 @@ class KafkaConfigTest {
case ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;

public class ShareGroupConfig {
Expand All @@ -51,10 +50,6 @@ public class ShareGroupConfig {
public static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT = 5;
public static final String SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC = "The maximum number of delivery attempts for a record delivered to a share group.";

public static final String SHARE_GROUP_MAX_GROUPS_CONFIG = "group.share.max.groups";
public static final short SHARE_GROUP_MAX_GROUPS_DEFAULT = 10;
public static final String SHARE_GROUP_MAX_GROUPS_DOC = "The maximum number of share groups.";

public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG = "group.share.record.lock.duration.ms";
public static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT = 30000;
public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock duration in milliseconds for share groups.";
Expand Down Expand Up @@ -86,7 +81,6 @@ public class ShareGroupConfig {
.define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 3600000), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC)
.define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
Expand All @@ -95,25 +89,25 @@ public class ShareGroupConfig {
private final boolean isShareGroupEnabled;
private final int shareGroupPartitionMaxRecordLocks;
private final int shareGroupDeliveryCountLimit;
private final short shareGroupMaxGroups;
private final int shareGroupRecordLockDurationMs;
private final int shareGroupMaxRecordLockDurationMs;
private final int shareGroupMinRecordLockDurationMs;
private final int shareFetchPurgatoryPurgeIntervalRequests;
private final int shareGroupMaxShareSessions;
private final String shareGroupPersisterClassName;
private final AbstractConfig config;

public ShareGroupConfig(AbstractConfig config) {
// Share groups are enabled in two cases:
// 1. The internal configuration to enable it is explicitly set
this.config = config;
// Share groups are enabled in either of the two following cases:
// 1. The internal configuration to enable it is explicitly set; or
// 2. the share rebalance protocol is enabled.
Set<String> protocols = config.getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
.stream().map(String::toUpperCase).collect(Collectors.toSet());
isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG) ||
protocols.contains(GroupType.SHARE.name());
shareGroupPartitionMaxRecordLocks = config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG);
shareGroupDeliveryCountLimit = config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG);
shareGroupMaxGroups = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG);
shareGroupRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMaxRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMinRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
Expand All @@ -136,10 +130,6 @@ public int shareGroupDeliveryCountLimit() {
return shareGroupDeliveryCountLimit;
}

public short shareGroupMaxGroups() {
return shareGroupMaxGroups;
}

public int shareGroupRecordLockDurationMs() {
return shareGroupRecordLockDurationMs;
}
Expand Down Expand Up @@ -171,6 +161,9 @@ private void validate() {
Utils.require(shareGroupMaxRecordLockDurationMs >= shareGroupRecordLockDurationMs,
String.format("%s must be greater than or equal to %s",
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG));
Utils.require(shareGroupMaxShareSessions >= config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG),
String.format("%s must be greater than or equal to %s",
SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class GroupConfigTest {
private static final boolean SHARE_GROUP_ENABLE = true;
private static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS = 200;
private static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT = 5;
private static final short SHARE_GROUP_MAX_GROUPS = 10;
private static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS = 30000;
private static final int SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS = 15000;
private static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS = 60000;
Expand Down Expand Up @@ -284,6 +283,6 @@ private GroupCoordinatorConfig createGroupCoordinatorConfig() {

private ShareGroupConfig createShareGroupConfig() {
return ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_ENABLE, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS, SHARE_GROUP_DELIVERY_COUNT_LIMIT,
SHARE_GROUP_MAX_GROUPS, SHARE_GROUP_RECORD_LOCK_DURATION_MS, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
SHARE_GROUP_RECORD_LOCK_DURATION_MS, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public void testConfigs() {
configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, true);
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 200);
configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, 5);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, (short) 10);
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, 30000);
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, 15000);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, 60000);
Expand All @@ -51,7 +50,6 @@ public void testConfigs() {
assertTrue(config.isShareGroupEnabled());
assertEquals(200, config.shareGroupPartitionMaxRecordLocks());
assertEquals(5, config.shareGroupDeliveryCountLimit());
assertEquals(10, config.shareGroupMaxGroups());
assertEquals(30000, config.shareGroupRecordLockDurationMs());
assertEquals(15000, config.shareGroupMinRecordLockDurationMs());
assertEquals(60000, config.shareGroupMaxRecordLockDurationMs());
Expand Down Expand Up @@ -89,24 +87,6 @@ public void testInvalidConfigs() {
assertEquals("Invalid value 11 for configuration group.share.delivery.count.limit: Value must be no more than 10",
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());

configs.clear();
// test for when SHARE_GROUP_MAX_GROUPS_CONFIG is of incorrect data type
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add unit test for SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is atleast(1) validation itself for the config. And KIP defines no maximum, so do we need any other validation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is atleast(1) validation itself for the config. And KIP defines no maximum, so do we need any other validation?

That is interesting. Should we increase the min value? Maybe it should be equal to or larger than group.share.max.size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we increase the min value? Maybe it should be equal to or larger than group.share.max.size?

Yeah, that is something to consider. One should consider to have group.share.max.share.sessions >= group.share.max.size. @AndrewJSchofield wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 I think that makes a lot of sense.

assertEquals("Invalid value 10 for configuration group.share.max.groups: Expected value to be a 16-bit integer (short), but it was a java.lang.Integer",
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());

configs.clear();
// test for when SHARE_GROUP_MAX_GROUPS_CONFIG is out of bounds
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, (short) 0);
assertEquals("Invalid value 0 for configuration group.share.max.groups: Value must be at least 1",
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());

configs.clear();
// test for when SHARE_GROUP_MAX_GROUPS_CONFIG is out of bounds
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, (short) 110);
assertEquals("Invalid value 110 for configuration group.share.max.groups: Value must be no more than 100",
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());

configs.clear();
// test for when SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG is out of bounds
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 50);
Expand All @@ -118,13 +98,19 @@ public void testInvalidConfigs() {
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 20000);
assertEquals("Invalid value 20000 for configuration group.share.partition.max.record.locks: Value must be no more than 10000",
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());

configs.clear();
// test for when SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG is less than SHARE_GROUP_MAX_SIZE_CONFIG
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG, 2000);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, 1000);
assertEquals("Invalid value 2000 for configuration group.share.max.size: Value must be no more than 1000",
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
}

public static ShareGroupConfig createShareGroupConfig(
boolean shareGroupEnable,
int shareGroupPartitionMaxRecordLocks,
int shareGroupDeliveryCountLimit,
short shareGroupsMaxGroups,
int shareGroupRecordLockDurationsMs,
int shareGroupMinRecordLockDurationMs,
int shareGroupMaxRecordLockDurationMs
Expand All @@ -133,7 +119,6 @@ public static ShareGroupConfig createShareGroupConfig(
configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, shareGroupEnable);
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, shareGroupPartitionMaxRecordLocks);
configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, shareGroupDeliveryCountLimit);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, shareGroupsMaxGroups);
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupRecordLockDurationsMs);
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupMinRecordLockDurationMs);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupMaxRecordLockDurationMs);
Expand Down