diff --git a/examples/adminapi.py b/examples/adminapi.py index 0645b9b7a..f2924415d 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -503,7 +503,8 @@ def example_describe_consumer_groups(a, args): print(" Is Simple : {}".format(g.is_simple_consumer_group)) print(" State : {}".format(g.state)) print(" Partition Assignor : {}".format(g.partition_assignor)) - print(f" Coordinator : ({g.coordinator.id}) {g.coordinator.host}:{g.coordinator.port} {f'(Rack - {g.coordinator.rack})' if g.coordinator.rack else ''}") + print( + f" Coordinator : {g.coordinator}") print(" Members: ") for member in g.members: print(" Id : {}".format(member.member_id)) @@ -514,7 +515,7 @@ def example_describe_consumer_groups(a, args): print(" Assignments :") for toppar in member.assignment.topic_partitions: print(" {} [{}]".format(toppar.topic, toppar.partition)) - if(include_auth_ops): + if (include_auth_ops): print(" Authorized operations: ") op_string = "" for acl_op in g.authorized_operations: @@ -539,10 +540,10 @@ def example_describe_topics(a, args): try: t = future.result() print("Topic name : {}".format(t.name)) - if(t.is_internal): + if (t.is_internal): print("Topic is Internal") - if(include_auth_ops): + if (include_auth_ops): print("Authorized operations : ") op_string = "" for acl_op in t.authorized_operations: @@ -553,13 +554,13 @@ def example_describe_topics(a, args): for partition in t.partitions: print(" Id : {}".format(partition.id)) leader = partition.leader - print(f" Leader : ({leader.id}) {leader.host}:{leader.port} {f'(Rack - {leader.rack})' if leader.rack else ''}") + print(f" Leader : {leader}") print(" Replicas : {}".format(len(partition.replicas))) for replica in partition.replicas: - print(f" Replica : ({replica.id}) {replica.host}:{replica.port} {f'(Rack - {replica.rack})' if replica.rack else ''}") + print(f" Replica : {replica}") print(" In-Sync Replicas : {}".format(len(partition.isr))) for isr in partition.isr: - print(f" In-Sync Replica : ({isr.id}) {isr.host}:{isr.port} {f'(Rack - {isr.rack})' if isr.rack else ''}") + print(f" In-Sync Replica : {isr}") print("") print("") @@ -580,16 +581,16 @@ def example_describe_cluster(a, args): c = future.result() print("Cluster_id : {}".format(c.cluster_id)) - if(c.controller): - print(f"Controller: ({c.controller.id}) {c.controller.host}:{c.controller.port} {f'(Rack - {c.controller.rack})' if c.controller.rack else ''}") + if (c.controller): + print(f"Controller: {c.controller}") else: print("No Controller Information Available") print("Nodes :") for node in c.nodes: - print(f" Node: ({node.id}) {node.host}:{node.port} {f'(Rack - {node.rack})' if node.rack else ''}") + print(f" Node: {node}") - if(include_auth_ops): + if (include_auth_ops): print("Authorized operations: ") op_string = "" for acl_op in c.authorized_operations: diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index f903b3276..3f09e84ee 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -20,9 +20,9 @@ from .serializing_producer import SerializingProducer from .error import KafkaException, KafkaError from ._model import (Node, # noqa: F401 - ConsumerGroupTopicPartitions, - ConsumerGroupState, - TopicCollection, + ConsumerGroupTopicPartitions, + ConsumerGroupState, + TopicCollection, TopicPartitionInfo) from .cimpl import (Producer, diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 82e25d5c9..e806b9a41 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -34,6 +34,7 @@ class Node: rack: str The rack for this node. """ + def __init__(self, id, host, port, rack=None): self.id = id self.id_string = str(id) @@ -41,6 +42,9 @@ def __init__(self, id, host, port, rack=None): self.port = port self.rack = rack + def __str__(self): + return f"({self.id}) {self.host}:{self.port} {f'(Rack - {self.rack})' if self.rack else ''}" + class ConsumerGroupTopicPartitions: """ @@ -55,6 +59,7 @@ class ConsumerGroupTopicPartitions: topic_partitions: list(TopicPartition) List of topic partitions information. """ + def __init__(self, group_id, topic_partitions=None): self.group_id = group_id self.topic_partitions = topic_partitions @@ -126,4 +131,4 @@ def __init__(self, id, leader, replicas, isr): self.id = id self.leader = leader self.replicas = replicas - self.isr = isr \ No newline at end of file + self.isr = isr diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 2eed4cba2..b7ea7efff 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -890,7 +890,7 @@ def describe_topics(self, topics, **kwargs): if not isinstance(topics, _TopicCollection): raise TypeError("Expected input to be instance of TopicCollection") - + topic_names = topics.topic_names if not isinstance(topic_names, list): diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py index 4326b6f8d..9b5209909 100644 --- a/src/confluent_kafka/schema_registry/avro.py +++ b/src/confluent_kafka/schema_registry/avro.py @@ -164,7 +164,8 @@ class AvroSerializer(Serializer): Args: schema_registry_client (SchemaRegistryClient): Schema Registry client instance. - schema_str (str or Schema): Avro `Schema Declaration. `_ + schema_str (str or Schema): + Avro `Schema Declaration. `_ Accepts either a string or a :py:class:`Schema` instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a :py:class:`Schema` instance. @@ -327,9 +328,9 @@ class AvroDeserializer(Deserializer): client instance. schema_str (str, Schema, optional): Avro reader schema declaration Accepts - either a string or a :py:class:`Schema` instance. If not provided, the + either a string or a :py:class:`Schema` instance. If not provided, the writer schema will be used as the reader schema. Note that string - definitions cannot reference other schemas. For referencing other schemas, + definitions cannot reference other schemas. For referencing other schemas, use a :py:class:`Schema` instance. from_dict (callable, optional): Callable(dict, SerializationContext) -> object. diff --git a/src/confluent_kafka/schema_registry/json_schema.py b/src/confluent_kafka/schema_registry/json_schema.py index 7e973690f..656937c24 100644 --- a/src/confluent_kafka/schema_registry/json_schema.py +++ b/src/confluent_kafka/schema_registry/json_schema.py @@ -141,7 +141,8 @@ class JSONSerializer(Serializer): callable with JSONSerializer. Args: - schema_str (str, Schema): `JSON Schema definition. `_ + schema_str (str, Schema): + `JSON Schema definition. `_ Accepts schema as either a string or a :py:class:`Schema` instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a :py:class:`Schema` instance. @@ -295,7 +296,8 @@ class JSONDeserializer(Deserializer): framing. Args: - schema_str (str, Schema): `JSON schema definition `_ + schema_str (str, Schema): + `JSON schema definition `_ Accepts schema as either a string or a :py:class:`Schema` instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a :py:class:`Schema` instance. diff --git a/tests/integration/admin/test_basic_operations.py b/tests/integration/admin/test_basic_operations.py index aee829e56..70595e131 100644 --- a/tests/integration/admin/test_basic_operations.py +++ b/tests/integration/admin/test_basic_operations.py @@ -107,6 +107,7 @@ def verify_admin_acls(admin_client, "ACL bindings don't match, actual: {} expected: {}".format(acl_bindings, expected_acl_bindings) + def verify_topic_metadata(client, exp_topics, *args, **kwargs): """ Verify that exp_topics (dict) is reported in metadata. diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 72593689d..ef5d94987 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -21,6 +21,7 @@ topic_prefix = "test-topic" + def verify_commit_result(err, _): assert err is not None @@ -70,12 +71,12 @@ def get_future_key(*arg): elif arg_type is TopicCollection: return get_future_key_TopicCollection(arg[0]) return None - + def perform_admin_operation_sync(operation, *arg, **kwargs): future_key = get_future_key(*arg) fs = operation(*arg, **kwargs) - fs = fs[future_key] if future_key else fs + fs = fs[future_key] if future_key else fs return fs.result() @@ -87,14 +88,21 @@ def delete_acls(admin_client, acl_binding_filters): perform_admin_operation_sync(admin_client.delete_acls, acl_binding_filters) -def verify_provided_describe_for_authorized_operations(admin_client, describe_fn, operation_to_allow, operation_to_check, restype, resname, *arg): +def verify_provided_describe_for_authorized_operations( + admin_client, + describe_fn, + operation_to_allow, + operation_to_check, + restype, + resname, + *arg): kwargs = {} kwargs['request_timeout'] = 10 # Check with include_authorized_operations as False kwargs['include_authorized_operations'] = False desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs) - assert desc.authorized_operations == None + assert desc.authorized_operations is None # Check with include_authorized_operations as True kwargs['include_authorized_operations'] = True @@ -105,7 +113,7 @@ def verify_provided_describe_for_authorized_operations(admin_client, describe_fn # Update Authorized Operation by creating new ACLs acl_binding = AclBinding(restype, resname, ResourcePatternType.LITERAL, - "User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW) + "User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW) create_acls(admin_client, [acl_binding]) # Check with updated authorized operations @@ -116,18 +124,19 @@ def verify_provided_describe_for_authorized_operations(admin_client, describe_fn # Delete Updated ACLs acl_binding_filter = AclBindingFilter(restype, resname, ResourcePatternType.ANY, - None, None, AclOperation.ANY, AclPermissionType.ANY) + None, None, AclOperation.ANY, AclPermissionType.ANY) delete_acls(admin_client, [acl_binding_filter]) return desc + def verify_describe_topics(admin_client, topic_name): - desc = verify_provided_describe_for_authorized_operations(admin_client, - admin_client.describe_topics, - AclOperation.READ, - AclOperation.DELETE, - ResourceType.TOPIC, - topic_name, - TopicCollection([topic_name])) + desc = verify_provided_describe_for_authorized_operations(admin_client, + admin_client.describe_topics, + AclOperation.READ, + AclOperation.DELETE, + ResourceType.TOPIC, + topic_name, + TopicCollection([topic_name])) assert desc.name == topic_name assert len(desc.partitions) == 1 assert not desc.is_internal @@ -151,12 +160,12 @@ def verify_describe_groups(cluster, admin_client, topic): # Verify Describe Consumer Groups desc = verify_provided_describe_for_authorized_operations(admin_client, - admin_client.describe_consumer_groups, - AclOperation.READ, - AclOperation.DELETE, - ResourceType.GROUP, - group, - [group]) + admin_client.describe_consumer_groups, + AclOperation.READ, + AclOperation.DELETE, + ResourceType.GROUP, + group, + [group]) assert group == desc.group_id assert desc.is_simple_consumer_group is False assert desc.state == ConsumerGroupState.EMPTY @@ -166,13 +175,13 @@ def verify_describe_groups(cluster, admin_client, topic): def verify_describe_cluster(admin_client): - desc = verify_provided_describe_for_authorized_operations(admin_client, - admin_client.describe_cluster, - AclOperation.ALTER, - AclOperation.ALTER_CONFIGS, - ResourceType.BROKER, - "kafka-cluster") - assert type(desc.cluster_id) is str + desc = verify_provided_describe_for_authorized_operations(admin_client, + admin_client.describe_cluster, + AclOperation.ALTER, + AclOperation.ALTER_CONFIGS, + ResourceType.BROKER, + "kafka-cluster") + assert isinstance(desc.cluster_id, str) assert len(desc.nodes) > 0 assert desc.controller is not None @@ -188,13 +197,13 @@ def test_describe_operations(sasl_cluster): # Create Topic topic_config = {"compression.type": "gzip"} our_topic = sasl_cluster.create_topic(topic_prefix, - { - "num_partitions": 1, - "config": topic_config, - "replication_factor": 1, - }, - validate_only=False - ) + { + "num_partitions": 1, + "config": topic_config, + "replication_factor": 1, + }, + validate_only=False + ) # Verify Authorized Operations in Describe Topics verify_describe_topics(admin_client, our_topic)