Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generic and specific avro examples #1381

Merged
merged 2 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
user_pb2.py: user.proto
protoc -I=. --python_out=. ./user.proto;
user_pb2.py: protobuf/user.proto
cd protobuf && protoc -I=. --python_out=. ./user.proto;

clean:
rm -f $(TARGET_DIR)/*_pb2.py
rm -f $(TARGET_DIR)/protobuf/*_pb2.py
16 changes: 4 additions & 12 deletions examples/avro-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,17 @@
#

import argparse
import os
from uuid import uuid4

from six.moves import input

from confluent_kafka import avro

# Parse Schema used for serializing User class
record_schema = avro.loads("""
{
"namespace": "confluent.io.examples.serialization.avro",
"name": "User",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
]
}
""")
path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/avro/user_specific.avsc") as f:
record_schema = avro.loads(f.read())


class User(object):
Expand Down
18 changes: 18 additions & 0 deletions examples/avro/user_generic.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "User",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": "long"
},
{
"name": "favorite_color",
"type": "string"
}
]
}
19 changes: 19 additions & 0 deletions examples/avro/user_specific.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"namespace": "confluent.io.examples.serialization.avro",
"name": "User",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": "long"
},
{
"name": "favorite_color",
"type": "string"
}
]
}
28 changes: 14 additions & 14 deletions examples/avro_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# This is a simple example of the SerializingProducer using Avro.
#
import argparse
import os

from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
Expand Down Expand Up @@ -66,19 +67,16 @@ def dict_to_user(obj, ctx):

def main(args):
topic = args.topic
is_specific = args.specific == "true"

schema_str = """
{
"namespace": "confluent.io.examples.serialization.avro",
"name": "User",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
]
}
"""
if is_specific:
schema = "user_specific.avsc"
else:
schema = "user_generic.avsc"

path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/avro/{schema}") as f:
schema_str = f.read()

sr_conf = {'url': args.schema_registry}
schema_registry_client = SchemaRegistryClient(sr_conf)
Expand Down Expand Up @@ -110,8 +108,8 @@ def main(args):
"\tfavorite_number: {}\n"
"\tfavorite_color: {}\n"
.format(msg.key(), user.name,
user.favorite_color,
user.favorite_number))
user.favorite_number,
user.favorite_color))
except KeyboardInterrupt:
break

Expand All @@ -129,5 +127,7 @@ def main(args):
help="Topic name")
parser.add_argument('-g', dest="group", default="example_serde_avro",
help="Consumer group")
parser.add_argument('-p', dest="specific", default="true",
help="Avro specific record")

main(parser.parse_args())
25 changes: 13 additions & 12 deletions examples/avro_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# This is a simple example of the SerializingProducer using Avro.
#
import argparse
import os
from uuid import uuid4

from six.moves import input
Expand Down Expand Up @@ -99,19 +100,17 @@ def delivery_report(err, msg):

def main(args):
topic = args.topic
is_specific = args.specific == "true"

if is_specific:
schema = "user_specific.avsc"
else:
schema = "user_generic.avsc"

path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/avro/{schema}") as f:
schema_str = f.read()

schema_str = """
{
"namespace": "confluent.io.examples.serialization.avro",
"name": "User",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
]
}
"""
schema_registry_conf = {'url': args.schema_registry}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

Expand Down Expand Up @@ -158,5 +157,7 @@ def main(args):
help="Schema Registry (http(s)://host[:port]")
parser.add_argument('-t', dest="topic", default="example_serde_avro",
help="Topic name")
parser.add_argument('-p', dest="specific", default="true",
help="Avro specific record")

main(parser.parse_args())
4 changes: 2 additions & 2 deletions examples/json_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ def main(args):
"\tfavorite_number: {}\n"
"\tfavorite_color: {}\n"
.format(msg.key(), user.name,
user.favorite_color,
user.favorite_number))
user.favorite_number,
user.favorite_color))
except KeyboardInterrupt:
break

Expand Down
File renamed without changes.
25 changes: 25 additions & 0 deletions examples/protobuf/user_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions examples/protobuf_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
#
import argparse

# Protobuf generated class; resides at ./user_pb2.py
import user_pb2
# Protobuf generated class; resides at ./protobuf/user_pb2.py
import protobuf.user_pb2 as user_pb2
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer
from confluent_kafka.serialization import StringDeserializer
Expand Down
6 changes: 3 additions & 3 deletions examples/protobuf_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@

from six.moves import input

# Protobuf generated class; resides at ./user_pb2.py
import user_pb2
# Protobuf generated class; resides at ./protobuf/user_pb2.py
import protobuf.user_pb2 as user_pb2
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
Expand Down Expand Up @@ -75,7 +75,7 @@ def main(args):

protobuf_serializer = ProtobufSerializer(user_pb2.User,
schema_registry_client,
{'use.deprecated.format': True})
{'use.deprecated.format': False})

producer_conf = {'bootstrap.servers': args.bootstrap_servers,
'key.serializer': StringSerializer('utf_8'),
Expand Down
83 changes: 0 additions & 83 deletions examples/user_pb2.py

This file was deleted.

24 changes: 16 additions & 8 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,14 @@ def describe_acls(self, acl_binding_filter, **kwargs):
must match.
String attributes match exact values or any string if set to None.
Enums attributes match exact values or any value if equal to `ANY`.
If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH` returns ACL bindings with:
:attr:`ResourcePatternType.LITERAL` pattern type with resource name equal to the given resource name;
:attr:`ResourcePatternType.LITERAL` pattern type with wildcard resource name that matches the given resource name;
:attr:`ResourcePatternType.PREFIXED` pattern type with resource name that is a prefix of the given resource name
If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH`
returns ACL bindings with:
:attr:`ResourcePatternType.LITERAL` pattern type with resource name equal
to the given resource name;
:attr:`ResourcePatternType.LITERAL` pattern type with wildcard resource name
that matches the given resource name;
:attr:`ResourcePatternType.PREFIXED` pattern type with resource name
that is a prefix of the given resource name
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
Expand Down Expand Up @@ -433,10 +437,14 @@ def delete_acls(self, acl_binding_filters, **kwargs):
to match ACLs to delete.
String attributes match exact values or any string if set to None.
Enums attributes match exact values or any value if equal to `ANY`.
If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH` deletes ACL bindings with:
:attr:`ResourcePatternType.LITERAL` pattern type with resource name equal to the given resource name;
:attr:`ResourcePatternType.LITERAL` pattern type with wildcard resource name that matches the given resource name;
:attr:`ResourcePatternType.PREFIXED` pattern type with resource name that is a prefix of the given resource name
If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH`
deletes ACL bindings with:
:attr:`ResourcePatternType.LITERAL` pattern type with resource name
equal to the given resource name;
:attr:`ResourcePatternType.LITERAL` pattern type with wildcard resource name
that matches the given resource name;
:attr:`ResourcePatternType.PREFIXED` pattern type with resource name
that is a prefix of the given resource name
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
Expand Down