Skip to content

Commit

Permalink
Fixed styling issue
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavrth committed Oct 19, 2023
1 parent 2ccd6d1 commit 453d0fa
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 54 deletions.
23 changes: 12 additions & 11 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ def example_describe_consumer_groups(a, args):
print(" Is Simple : {}".format(g.is_simple_consumer_group))
print(" State : {}".format(g.state))
print(" Partition Assignor : {}".format(g.partition_assignor))
print(f" Coordinator : ({g.coordinator.id}) {g.coordinator.host}:{g.coordinator.port} {f'(Rack - {g.coordinator.rack})' if g.coordinator.rack else ''}")
print(
f" Coordinator : {g.coordinator}")
print(" Members: ")
for member in g.members:
print(" Id : {}".format(member.member_id))
Expand All @@ -514,7 +515,7 @@ def example_describe_consumer_groups(a, args):
print(" Assignments :")
for toppar in member.assignment.topic_partitions:
print(" {} [{}]".format(toppar.topic, toppar.partition))
if(include_auth_ops):
if (include_auth_ops):
print(" Authorized operations: ")
op_string = ""
for acl_op in g.authorized_operations:
Expand All @@ -539,10 +540,10 @@ def example_describe_topics(a, args):
try:
t = future.result()
print("Topic name : {}".format(t.name))
if(t.is_internal):
if (t.is_internal):
print("Topic is Internal")

if(include_auth_ops):
if (include_auth_ops):
print("Authorized operations : ")
op_string = ""
for acl_op in t.authorized_operations:
Expand All @@ -553,13 +554,13 @@ def example_describe_topics(a, args):
for partition in t.partitions:
print(" Id : {}".format(partition.id))
leader = partition.leader
print(f" Leader : ({leader.id}) {leader.host}:{leader.port} {f'(Rack - {leader.rack})' if leader.rack else ''}")
print(f" Leader : {leader}")
print(" Replicas : {}".format(len(partition.replicas)))
for replica in partition.replicas:
print(f" Replica : ({replica.id}) {replica.host}:{replica.port} {f'(Rack - {replica.rack})' if replica.rack else ''}")
print(f" Replica : {replica}")
print(" In-Sync Replicas : {}".format(len(partition.isr)))
for isr in partition.isr:
print(f" In-Sync Replica : ({isr.id}) {isr.host}:{isr.port} {f'(Rack - {isr.rack})' if isr.rack else ''}")
print(f" In-Sync Replica : {isr}")
print("")
print("")

Expand All @@ -580,16 +581,16 @@ def example_describe_cluster(a, args):
c = future.result()
print("Cluster_id : {}".format(c.cluster_id))

if(c.controller):
print(f"Controller: ({c.controller.id}) {c.controller.host}:{c.controller.port} {f'(Rack - {c.controller.rack})' if c.controller.rack else ''}")
if (c.controller):
print(f"Controller: {c.controller}")
else:
print("No Controller Information Available")

print("Nodes :")
for node in c.nodes:
print(f" Node: ({node.id}) {node.host}:{node.port} {f'(Rack - {node.rack})' if node.rack else ''}")
print(f" Node: {node}")

if(include_auth_ops):
if (include_auth_ops):
print("Authorized operations: ")
op_string = ""
for acl_op in c.authorized_operations:
Expand Down
6 changes: 3 additions & 3 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from .serializing_producer import SerializingProducer
from .error import KafkaException, KafkaError
from ._model import (Node, # noqa: F401
ConsumerGroupTopicPartitions,
ConsumerGroupState,
TopicCollection,
ConsumerGroupTopicPartitions,
ConsumerGroupState,
TopicCollection,
TopicPartitionInfo)

from .cimpl import (Producer,
Expand Down
7 changes: 6 additions & 1 deletion src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ class Node:
rack: str
The rack for this node.
"""

def __init__(self, id, host, port, rack=None):
self.id = id
self.id_string = str(id)
self.host = host
self.port = port
self.rack = rack

def __str__(self):
return f"({self.id}) {self.host}:{self.port} {f'(Rack - {self.rack})' if self.rack else ''}"


class ConsumerGroupTopicPartitions:
"""
Expand All @@ -55,6 +59,7 @@ class ConsumerGroupTopicPartitions:
topic_partitions: list(TopicPartition)
List of topic partitions information.
"""

def __init__(self, group_id, topic_partitions=None):
self.group_id = group_id
self.topic_partitions = topic_partitions
Expand Down Expand Up @@ -126,4 +131,4 @@ def __init__(self, id, leader, replicas, isr):
self.id = id
self.leader = leader
self.replicas = replicas
self.isr = isr
self.isr = isr
2 changes: 1 addition & 1 deletion src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ def describe_topics(self, topics, **kwargs):

if not isinstance(topics, _TopicCollection):
raise TypeError("Expected input to be instance of TopicCollection")

topic_names = topics.topic_names

if not isinstance(topic_names, list):
Expand Down
7 changes: 4 additions & 3 deletions src/confluent_kafka/schema_registry/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class AvroSerializer(Serializer):
Args:
schema_registry_client (SchemaRegistryClient): Schema Registry client instance.
schema_str (str or Schema): Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_
schema_str (str or Schema):
Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_
Accepts either a string or a :py:class:`Schema` instance. Note that string
definitions cannot reference other schemas. For referencing other schemas,
use a :py:class:`Schema` instance.
Expand Down Expand Up @@ -327,9 +328,9 @@ class AvroDeserializer(Deserializer):
client instance.
schema_str (str, Schema, optional): Avro reader schema declaration Accepts
either a string or a :py:class:`Schema` instance. If not provided, the
either a string or a :py:class:`Schema` instance. If not provided, the
writer schema will be used as the reader schema. Note that string
definitions cannot reference other schemas. For referencing other schemas,
definitions cannot reference other schemas. For referencing other schemas,
use a :py:class:`Schema` instance.
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
Expand Down
6 changes: 4 additions & 2 deletions src/confluent_kafka/schema_registry/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ class JSONSerializer(Serializer):
callable with JSONSerializer.
Args:
schema_str (str, Schema): `JSON Schema definition. <https://json-schema.org/understanding-json-schema/reference/generic.html>`_
schema_str (str, Schema):
`JSON Schema definition. <https://json-schema.org/understanding-json-schema/reference/generic.html>`_
Accepts schema as either a string or a :py:class:`Schema` instance.
Note that string definitions cannot reference other schemas. For
referencing other schemas, use a :py:class:`Schema` instance.
Expand Down Expand Up @@ -295,7 +296,8 @@ class JSONDeserializer(Deserializer):
framing.
Args:
schema_str (str, Schema): `JSON schema definition <https://json-schema.org/understanding-json-schema/reference/generic.html>`_
schema_str (str, Schema):
`JSON schema definition <https://json-schema.org/understanding-json-schema/reference/generic.html>`_
Accepts schema as either a string or a :py:class:`Schema` instance.
Note that string definitions cannot reference other schemas. For referencing other schemas,
use a :py:class:`Schema` instance.
Expand Down
1 change: 1 addition & 0 deletions tests/integration/admin/test_basic_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def verify_admin_acls(admin_client,
"ACL bindings don't match, actual: {} expected: {}".format(acl_bindings,
expected_acl_bindings)


def verify_topic_metadata(client, exp_topics, *args, **kwargs):
"""
Verify that exp_topics (dict<topicname,partcnt>) is reported in metadata.
Expand Down
75 changes: 42 additions & 33 deletions tests/integration/admin/test_describe_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

topic_prefix = "test-topic"


def verify_commit_result(err, _):
assert err is not None

Expand Down Expand Up @@ -70,12 +71,12 @@ def get_future_key(*arg):
elif arg_type is TopicCollection:
return get_future_key_TopicCollection(arg[0])
return None


def perform_admin_operation_sync(operation, *arg, **kwargs):
future_key = get_future_key(*arg)
fs = operation(*arg, **kwargs)
fs = fs[future_key] if future_key else fs
fs = fs[future_key] if future_key else fs
return fs.result()


Expand All @@ -87,14 +88,21 @@ def delete_acls(admin_client, acl_binding_filters):
perform_admin_operation_sync(admin_client.delete_acls, acl_binding_filters)


def verify_provided_describe_for_authorized_operations(admin_client, describe_fn, operation_to_allow, operation_to_check, restype, resname, *arg):
def verify_provided_describe_for_authorized_operations(
admin_client,
describe_fn,
operation_to_allow,
operation_to_check,
restype,
resname,
*arg):
kwargs = {}
kwargs['request_timeout'] = 10

# Check with include_authorized_operations as False
kwargs['include_authorized_operations'] = False
desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs)
assert desc.authorized_operations == None
assert desc.authorized_operations is None

# Check with include_authorized_operations as True
kwargs['include_authorized_operations'] = True
Expand All @@ -105,7 +113,7 @@ def verify_provided_describe_for_authorized_operations(admin_client, describe_fn

# Update Authorized Operation by creating new ACLs
acl_binding = AclBinding(restype, resname, ResourcePatternType.LITERAL,
"User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW)
"User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW)
create_acls(admin_client, [acl_binding])

# Check with updated authorized operations
Expand All @@ -116,18 +124,19 @@ def verify_provided_describe_for_authorized_operations(admin_client, describe_fn

# Delete Updated ACLs
acl_binding_filter = AclBindingFilter(restype, resname, ResourcePatternType.ANY,
None, None, AclOperation.ANY, AclPermissionType.ANY)
None, None, AclOperation.ANY, AclPermissionType.ANY)
delete_acls(admin_client, [acl_binding_filter])
return desc


def verify_describe_topics(admin_client, topic_name):
desc = verify_provided_describe_for_authorized_operations(admin_client,
admin_client.describe_topics,
AclOperation.READ,
AclOperation.DELETE,
ResourceType.TOPIC,
topic_name,
TopicCollection([topic_name]))
desc = verify_provided_describe_for_authorized_operations(admin_client,
admin_client.describe_topics,
AclOperation.READ,
AclOperation.DELETE,
ResourceType.TOPIC,
topic_name,
TopicCollection([topic_name]))
assert desc.name == topic_name
assert len(desc.partitions) == 1
assert not desc.is_internal
Expand All @@ -151,12 +160,12 @@ def verify_describe_groups(cluster, admin_client, topic):

# Verify Describe Consumer Groups
desc = verify_provided_describe_for_authorized_operations(admin_client,
admin_client.describe_consumer_groups,
AclOperation.READ,
AclOperation.DELETE,
ResourceType.GROUP,
group,
[group])
admin_client.describe_consumer_groups,
AclOperation.READ,
AclOperation.DELETE,
ResourceType.GROUP,
group,
[group])
assert group == desc.group_id
assert desc.is_simple_consumer_group is False
assert desc.state == ConsumerGroupState.EMPTY
Expand All @@ -166,13 +175,13 @@ def verify_describe_groups(cluster, admin_client, topic):


def verify_describe_cluster(admin_client):
desc = verify_provided_describe_for_authorized_operations(admin_client,
admin_client.describe_cluster,
AclOperation.ALTER,
AclOperation.ALTER_CONFIGS,
ResourceType.BROKER,
"kafka-cluster")
assert type(desc.cluster_id) is str
desc = verify_provided_describe_for_authorized_operations(admin_client,
admin_client.describe_cluster,
AclOperation.ALTER,
AclOperation.ALTER_CONFIGS,
ResourceType.BROKER,
"kafka-cluster")
assert isinstance(desc.cluster_id, str)
assert len(desc.nodes) > 0
assert desc.controller is not None

Expand All @@ -188,13 +197,13 @@ def test_describe_operations(sasl_cluster):
# Create Topic
topic_config = {"compression.type": "gzip"}
our_topic = sasl_cluster.create_topic(topic_prefix,
{
"num_partitions": 1,
"config": topic_config,
"replication_factor": 1,
},
validate_only=False
)
{
"num_partitions": 1,
"config": topic_config,
"replication_factor": 1,
},
validate_only=False
)

# Verify Authorized Operations in Describe Topics
verify_describe_topics(admin_client, our_topic)
Expand Down

0 comments on commit 453d0fa

Please sign in to comment.