Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848 ListGroups API #1775

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6024b83
[KIP-848] Added support for testing with new 'consumer' group protocol.
pranavrth Apr 25, 2024
cda669d
Build fixes
pranavrth Apr 25, 2024
d5711f5
updated trivup
pranavrth Apr 25, 2024
9166b2b
Updated trivup install path
pranavrth Apr 25, 2024
ad33f60
Fixed failing test
pranavrth Apr 26, 2024
31b7150
Style fix
pranavrth Apr 26, 2024
4c6b058
Added more tests to be run with the new protocol
pranavrth Apr 26, 2024
15e03a6
Fixed failing tests
pranavrth Apr 26, 2024
a42534d
Added Test common for common functionalities
pranavrth Apr 26, 2024
5ea17eb
Enabling SR again
pranavrth Apr 26, 2024
2e66f2c
Style fixes
pranavrth Apr 26, 2024
df10e0d
Some refactoring
pranavrth Apr 26, 2024
c39a865
Added consumer protocol integration tests in semaphore
pranavrth Apr 26, 2024
e1f0e79
Ignoring failing admin tests
pranavrth Apr 28, 2024
7f1c795
Fix typo
pranavrth Apr 28, 2024
3380c5a
Fixed failing test case
pranavrth Apr 30, 2024
ee61203
Added new fixure for single broker and using this fixure for test_ser…
pranavrth May 3, 2024
92b4b6b
Build fixes
pranavrth May 3, 2024
23c893d
Fixed transiet test failures for proto
pranavrth May 3, 2024
4917e15
Fixed another test
pranavrth May 3, 2024
0324fac
Added Test*Consumer classes instead of functions
pranavrth May 6, 2024
264be14
Build issue
pranavrth May 6, 2024
bc4896f
Added common TestUtils
pranavrth May 6, 2024
2c84234
Using specific commit for trivup
pranavrth May 7, 2024
b89e51a
Removed trivup 0.12.5
pranavrth May 7, 2024
5e13ed7
PR comments
pranavrth May 16, 2024
7544434
Style check
pranavrth May 16, 2024
beb5998
Skipping one list offsets assert for Zookeeper
pranavrth May 17, 2024
7823b51
feature/ListGroupsAPI KIP 848 (#2)
mahajanadhitya Aug 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,20 @@ blocks:
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
jobs:
- name: Build
- name: Build and Tests with 'classic' group protocol
commands:
- sem-version python 3.8
# use a virtualenv
- python3 -m venv _venv && source _venv/bin/activate
- chmod u+r+x tools/source-package-verification.sh
- tools/source-package-verification.sh
- name: Build and Tests with 'consumer' group protocol
commands:
- sem-version python 3.8
# use a virtualenv
- python3 -m venv _venv && source _venv/bin/activate
- chmod u+r+x tools/source-package-verification.sh
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- tools/source-package-verification.sh
- name: "Source package verification with Python 3 (Linux arm64)"
dependencies: []
Expand Down
63 changes: 56 additions & 7 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
# Example use of AdminClient operations.

from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
TopicPartition, ConsumerGroupState, TopicCollection,
IsolationLevel)
TopicPartition, ConsumerGroupState,
TopicCollection, IsolationLevel)
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
ConfigEntry, ConfigSource, AclBinding,
AclBindingFilter, ResourceType, ResourcePatternType,
AclOperation, AclPermissionType, AlterConfigOpType,
ScramMechanism, ScramCredentialInfo,
UserScramCredentialUpsertion, UserScramCredentialDeletion,
OffsetSpec)
from confluent_kafka._model import ConsumerGroupType

import sys
import threading
import logging
Expand Down Expand Up @@ -471,18 +473,64 @@ def example_list(a, args):
print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))


def getConsumerGroupState(state_string):
if state_string == "STABLE":
return ConsumerGroupState.STABLE
elif state_string == "DEAD":
return ConsumerGroupState.DEAD
elif state_string == "PREPARING_REBALANCING":
return ConsumerGroupState.PREPARING_REBALANCING
elif state_string == "COMPLETING_REBALANCING":
return ConsumerGroupState.COMPLETING_REBALANCING
elif state_string == "EMPTY":
return ConsumerGroupState.EMPTY
return ConsumerGroupState.UNKNOWN


def getConsumerGroupType(type_string):
if type_string == "CONSUMER":
return ConsumerGroupType.CONSUMER
elif type_string == "CLASSIC":
return ConsumerGroupType.CLASSIC
return ConsumerGroupType.UNKNOWN


def example_list_consumer_groups(a, args):
"""
List Consumer Groups
"""
states = {ConsumerGroupState[state] for state in args}
future = a.list_consumer_groups(request_timeout=10, states=states)
states = set()
group_types = set()
if len(args) > 0:
isType = False
isState = False
for i in range(0, len(args)):
if (args[i] == "-states"):
if (isState):
raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states <state1> <state2> ..] " +
"[-types <grouptype1> <grouptype2> ..]")
isState = True
elif (args[i] == "-types"):
if (isType):
raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states <state1> <state2> ..] " +
"[-types <grouptype1> <grouptype2> ..]")
isType = True
else:
if (isType):
group_types.add(getConsumerGroupType(args[i]))
elif (isState):
states.add(getConsumerGroupState(args[i]))
else:
raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states <state1> <state2> ..] " +
"[-types <grouptype1> <grouptype2> ..]")

future = a.list_consumer_groups(request_timeout=10, states=states, group_types=group_types)
try:
list_consumer_groups_result = future.result()
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
for valid in list_consumer_groups_result.valid:
print(" id: {} is_simple: {} state: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state))
print(" id: {} is_simple: {} state: {} group_type: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state, valid.group_type))
print("{} errors".format(len(list_consumer_groups_result.errors)))
for error in list_consumer_groups_result.errors:
print(" error: {}".format(error))
Expand Down Expand Up @@ -867,7 +915,8 @@ def example_list_offsets(a, args):
sys.stderr.write(' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
'<principal1> <host1> <operation1> <permission_type1> ..\n')
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
sys.stderr.write(' list_consumer_groups [<state1> <state2> ..]\n')
sys.stderr.write(' list_consumer_groups [-states <state1> <state2> ..] ' +
'[-types <grouptype1> <grouptype2> ..]\n')
sys.stderr.write(' describe_consumer_groups <include_authorized_operations> <group1> <group2> ..\n')
sys.stderr.write(' describe_topics <include_authorized_operations> <topic1> <topic2> ..\n')
sys.stderr.write(' describe_cluster <include_authorized_operations>\n')
Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
'Producer', 'DeserializingConsumer',
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'ConsumerGroupType', 'Uuid',
'IsolationLevel']

__version__ = version()[0]
Expand Down
18 changes: 18 additions & 0 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ def __lt__(self, other):
return self.value < other.value


class ConsumerGroupType(Enum):
"""
Enumerates the different types of Consumer Group Type.

"""
#: Type is not known or not set
UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN
#: Consumer Type
CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER
#: Classic Type
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC

def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value


class TopicCollection:
"""
Represents collection of topics in the form of different identifiers
Expand Down
14 changes: 13 additions & 1 deletion src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from ._listoffsets import (OffsetSpec, # noqa: F401
ListOffsetsResultInfo)

from .._model import TopicCollection as _TopicCollection
from .._model import TopicCollection as _TopicCollection, ConsumerGroupType as _ConsumerGroupType

from ..cimpl import (KafkaException, # noqa: F401
KafkaError,
Expand Down Expand Up @@ -864,6 +864,8 @@ def list_consumer_groups(self, **kwargs):
on broker, and response. Default: `socket.timeout.ms*1000.0`
:param set(ConsumerGroupState) states: only list consumer groups which are currently in
these states.
:param set(ConsumerGroupType) group_types: only list consumer groups which are currently of
these types.

:returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`.

Expand All @@ -883,6 +885,16 @@ def list_consumer_groups(self, **kwargs):
raise TypeError("All elements of states must be of type ConsumerGroupState")
kwargs["states_int"] = [state.value for state in states]
kwargs.pop("states")
if "group_types" in kwargs:
group_types = kwargs["group_types"]
if group_types is not None:
if not isinstance(group_types, set):
raise TypeError("'group_types' must be a set")
for group_type in group_types:
if not isinstance(group_type, _ConsumerGroupType):
raise TypeError("All elements of group_types must be of type ConsumerGroupType")
kwargs["group_types_int"] = [group_type.value for group_type in group_types]
kwargs.pop("group_types")

f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result)

Expand Down
15 changes: 12 additions & 3 deletions src/confluent_kafka/admin/_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


from .._util import ConversionUtil
from .._model import ConsumerGroupState
from .._model import ConsumerGroupState, ConsumerGroupType
from ._acl import AclOperation


Expand All @@ -31,13 +31,17 @@ class ConsumerGroupListing:
Whether a consumer group is simple or not.
state : ConsumerGroupState
Current state of the consumer group.
group_type : ConsumerGroupType
Current type of the consumer group.
"""

def __init__(self, group_id, is_simple_consumer_group, state=None):
def __init__(self, group_id, is_simple_consumer_group, state=None, group_type=None):
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
if group_type is not None:
self.group_type = ConversionUtil.convert_to_enum(group_type, ConsumerGroupType)


class ListConsumerGroupsResult:
Expand Down Expand Up @@ -119,14 +123,16 @@ class ConsumerGroupDescription:
Partition assignor.
state : ConsumerGroupState
Current state of the consumer group.
group_type : ConsumerGroupType
Current type of the consumer group.
coordinator: Node
Consumer group coordinator.
authorized_operations: list(AclOperation)
AclOperations allowed for the consumer group.
"""

def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state,
coordinator, authorized_operations=None):
coordinator, authorized_operations=None, group_type=None):
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
self.members = members
Expand All @@ -139,4 +145,7 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign
self.partition_assignor = partition_assignor
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
if group_type is not None:
self.group_type = ConversionUtil.convert_to_enum(group_type, ConsumerGroupType)

self.coordinator = coordinator
56 changes: 50 additions & 6 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ struct Admin_options {
rd_kafka_IsolationLevel_t isolation_level;
rd_kafka_consumer_group_state_t* states;
int states_cnt;
rd_kafka_consumer_group_type_t* group_types;
int group_types_cnt;
};

/**@brief "unset" value initializers for Admin_options
Expand Down Expand Up @@ -185,6 +187,13 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
goto err;
}

if (Admin_options_is_set_ptr(options->group_types) &&
(err_obj = rd_kafka_AdminOptions_set_match_consumer_group_types(
c_options, options->group_types, options->group_types_cnt))) {
snprintf(errstr, sizeof(errstr), "%s", rd_kafka_error_string(err_obj));
goto err;
}

return c_options;

err:
Expand Down Expand Up @@ -1698,24 +1707,28 @@ static const char Admin_delete_acls_doc[] = PyDoc_STR(
* @brief List consumer groups
*/
PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *future, *states_int = NULL;
PyObject *future, *states_int, *group_types_int = NULL;
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
CallState cs;
rd_kafka_queue_t *rkqu;
rd_kafka_consumer_group_state_t *c_states = NULL;
rd_kafka_consumer_group_type_t *c_group_types = NULL;
int states_cnt = 0;
int group_types_cnt = 0;
int i = 0;

static char *kws[] = {"future",
/* options */
"states_int",
"group_types_int",
"request_timeout",
NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Of", kws,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OOf", kws,
&future,
&states_int,
&group_types_int,
&options.request_timeout)) {
goto err;
}
Expand Down Expand Up @@ -1746,6 +1759,32 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
}
}

if(group_types_int != NULL && group_types_int != Py_None) {
if(!PyList_Check(group_types_int)) {
PyErr_SetString(PyExc_ValueError,
"group_types must of type list");
goto err;
}

group_types_cnt = (int)PyList_Size(group_types_int);

if(group_types_cnt > 0) {
c_group_types = (rd_kafka_consumer_group_type_t *)
malloc(group_types_cnt*sizeof(rd_kafka_consumer_group_type_t));
for(i = 0 ; i < group_types_cnt ; i++) {
PyObject *group_type = PyList_GET_ITEM(group_types_int, i);
if(!cfl_PyInt_Check(group_type)) {
PyErr_SetString(PyExc_ValueError,
"Element of group_types must be a valid group type");
goto err;
}
c_group_types[i] = (rd_kafka_consumer_group_type_t) cfl_PyInt_AsInt(group_type);
}
options.group_types = c_group_types;
options.group_types_cnt = group_types_cnt;
}
}

c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS,
&options, future);
if (!c_options) {
Expand All @@ -1760,7 +1799,6 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
/* Use librdkafka's background thread queue to automatically dispatch
* Admin_background_event_cb() when the admin operation is finished. */
rkqu = rd_kafka_queue_get_background(self->rk);

/*
* Call ListConsumerGroupOffsets
*
Expand All @@ -1774,22 +1812,27 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
if(c_states) {
free(c_states);
}
if(c_group_types) {
free(c_group_types);
}
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
rd_kafka_AdminOptions_destroy(c_options);

Py_RETURN_NONE;
err:
if(c_states) {
free(c_states);
}
if(c_group_types) {
free(c_group_types);
}
if (c_options) {
rd_kafka_AdminOptions_destroy(c_options);
Py_DECREF(future);
}
return NULL;
}
const char Admin_list_consumer_groups_doc[] = PyDoc_STR(
".. py:function:: list_consumer_groups(future, [states_int], [request_timeout])\n"
".. py:function:: list_consumer_groups(future, [states_int], [group_types_int], [request_timeout])\n"
"\n"
" List all the consumer groups.\n"
"\n"
Expand Down Expand Up @@ -3466,7 +3509,6 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py(
size_t valid_cnt,
const rd_kafka_error_t **c_errors_responses,
size_t errors_cnt) {

PyObject *result = NULL;
PyObject *ListConsumerGroupsResult_type = NULL;
PyObject *ConsumerGroupListing_type = NULL;
Expand Down Expand Up @@ -3509,6 +3551,8 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py(

cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupListing_state(c_valid_responses[i]));

cfl_PyDict_SetInt(kwargs, "group_type", rd_kafka_ConsumerGroupListing_type(c_valid_responses[i]));

args = PyTuple_New(0);

valid_result = PyObject_Call(ConsumerGroupListing_type, args, kwargs);
Expand Down
Loading