Skip to content

Commit

Permalink
[SchemaRegistry] update type of schema in serialize (Azure#20683)
Browse files Browse the repository at this point in the history
* update type

* update sample naming
  • Loading branch information
swathipil authored Sep 14, 2021
1 parent 599a099 commit 080f88d
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- `schema_registry` parameter in the `SchemaRegistryAvroSerializer` constructor has been renamed `client`.
- `schema_group` parameter in the `SchemaRegistryAvroSerializer` constructor has been renamed `group_name`.
- `data` parameter in the `serialize` and `deserialize` methods on `SchemaRegistryAvroSerializer` has been renamed `value`.
- `schema` parameter in the `serialize` method on `SchemaRegistryAvroSerializer` no longer accepts argument of type `bytes`.
- `SchemaRegistryAvroSerializer` constructor no longer takes in the `codec` keyword argument.

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def serialize(self, value, schema, **kwargs):
:param value: The data to be encoded.
:type value: Dict[str, Any]
:param schema: The schema used to encode the data.
:type schema: Union[str, bytes]
:type schema: str
:rtype: bytes
"""
raw_input_schema = schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
CLIENT_SECRET=os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_SECRET']

SCHEMA_REGISTRY_ENDPOINT=os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_GROUP=os.environ['SCHEMA_REGISTRY_GROUP']
GROUP_NAME=os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMA_STRING = """
{"namespace": "example.avro",
"type": "record",
Expand Down Expand Up @@ -80,7 +80,7 @@ def deserialize(serializer, bytes_payload):

if __name__ == '__main__':
schema_registry = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_ENDPOINT, credential=token_credential)
serializer = SchemaRegistryAvroSerializer(schema_registry, SCHEMA_GROUP)
serializer = SchemaRegistryAvroSerializer(schema_registry, GROUP_NAME)
bytes_data_ben, bytes_data_alice = serialize(serializer)
dict_data_ben = deserialize(serializer, bytes_data_ben)
dict_data_alice = deserialize(serializer, bytes_data_alice)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']


def on_event(partition_context, event):
Expand Down Expand Up @@ -50,7 +50,7 @@ def on_event(partition_context, event):
endpoint=SCHEMA_REGISTRY_ENDPOINT,
credential=DefaultAzureCredential()
),
group_name=SCHEMA_GROUP
group_name=GROUP_NAME
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']

SCHEMA_STRING = """
{"namespace": "example.avro",
Expand Down Expand Up @@ -64,7 +64,7 @@ def send_event_data_batch(producer, serializer):
endpoint=SCHEMA_REGISTRY_ENDPOINT,
credential=DefaultAzureCredential()
),
group_name=SCHEMA_GROUP
group_name=GROUP_NAME
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,32 @@ def create_client():

async def register_schema(schema_registry_client):
# [START register_schema_async]
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMA_NAME = 'your-schema-name'
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
NAME = 'your-schema-name'
SERIALIZATION_TYPE = SerializationType.AVRO
SCHEMA_CONTENT = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
schema_properties = await schema_registry_client.register_schema(SCHEMA_GROUP, SCHEMA_NAME, SCHEMA_CONTENT, SERIALIZATION_TYPE)
CONTENT = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
schema_properties = await schema_registry_client.register_schema(GROUP_NAME, NAME, CONTENT, SERIALIZATION_TYPE)
schema_id = schema_properties.id
# [END register_schema_async]
return schema_id


async def get_schema(schema_registry_client, schema_id):
async def get_schema(schema_registry_client, id):
# [START get_schema_async]
schema = await schema_registry_client.get_schema(schema_id)
schema = await schema_registry_client.get_schema(id)
schema_content = schema.content
# [END get_schema_async]
return schema_content


async def get_schema_id(schema_registry_client):
schema_group = os.environ['SCHEMA_REGISTRY_GROUP']
schema_name = 'your-schema-name'
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
name = 'your-schema-name'
serialization_type = SerializationType.AVRO
schema_content = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
content = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""

# [START get_schema_id_async]
schema_properties = await schema_registry_client.get_schema_properties(schema_group, schema_name, schema_content, serialization_type)
schema_properties = await schema_registry_client.get_schema_properties(group_name, name, content, serialization_type)
schema_id = schema_properties.id
# [END get_schema_id_async]
return schema_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,31 @@
CLIENT_SECRET = os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_SECRET']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMA_NAME = 'your-schema-name'
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
NAME = 'your-schema-name'
SERIALIZATION_TYPE = SerializationType.AVRO
SCHEMA_STRING = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""


async def register_schema(client, schema_group, schema_name, schema_string, serialization_type):
async def register_schema(client, group_name, name, schema_string, serialization_type):
print("Registering schema...")
schema_properties = await client.register_schema(schema_group, schema_name, schema_string, serialization_type)
schema_properties = await client.register_schema(group_name, name, schema_string, serialization_type)
print("Schema registered, returned schema id is {}".format(schema_properties.id))
print("Schema properties are {}".format(schema_properties))
return schema_properties.id


async def get_schema_by_id(client, schema_id):
async def get_schema_by_id(client, id):
print("Getting schema by id...")
schema = await client.get_schema(schema_id)
print("The schema string of schema id: {} string is {}".format(schema_id, schema.content))
print("Schema properties are {}".format(schema_id))
schema = await client.get_schema(id)
print("The schema string of schema id: {} string is {}".format(id, schema.content))
print("Schema properties are {}".format(id))
return schema.content


async def get_schema_id(client, schema_group, schema_name, schema_string, serialization_type):
async def get_schema_id(client, group_name, name, schema_string, serialization_type):
print("Getting schema id...")
schema_properties = await client.get_schema_properties(schema_group, schema_name, schema_string, serialization_type)
schema_properties = await client.get_schema_properties(group_name, name, schema_string, serialization_type)
print("The schema id is: {}".format(schema_properties.id))
print("Schema properties are {}".format(schema_properties))
return schema_properties.id
Expand All @@ -61,9 +61,9 @@ async def main():
)
schema_registry_client = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_ENDPOINT, credential=token_credential)
async with token_credential, schema_registry_client:
schema_id = await register_schema(schema_registry_client, SCHEMA_GROUP, SCHEMA_NAME, SCHEMA_STRING, SERIALIZATION_TYPE)
schema_id = await register_schema(schema_registry_client, GROUP_NAME, NAME, SCHEMA_STRING, SERIALIZATION_TYPE)
schema_str = await get_schema_by_id(schema_registry_client, schema_id)
schema_id = await get_schema_id(schema_registry_client, SCHEMA_GROUP, SCHEMA_NAME, SCHEMA_STRING, SERIALIZATION_TYPE)
schema_id = await get_schema_id(schema_registry_client, GROUP_NAME, NAME, SCHEMA_STRING, SERIALIZATION_TYPE)


loop = asyncio.get_event_loop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def create_client():

def register_schema(schema_registry_client):
# [START register_schema_sync]
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMA_NAME = 'your-schema-name'
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
NAME = 'your-schema-name'
SERIALIZATION_TYPE = SerializationType.AVRO
SCHEMA_CONTENT = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
schema_properties = schema_registry_client.register_schema(SCHEMA_GROUP, SCHEMA_NAME, SCHEMA_CONTENT, SERIALIZATION_TYPE)
CONTENT = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
schema_properties = schema_registry_client.register_schema(GROUP_NAME, NAME, CONTENT, SERIALIZATION_TYPE)
schema_id = schema_properties.id
# [END register_schema_sync]

Expand All @@ -63,9 +63,9 @@ def register_schema(schema_registry_client):
return schema_id


def get_schema(schema_registry_client, schema_id):
def get_schema(schema_registry_client, id):
# [START get_schema_sync]
schema = schema_registry_client.get_schema(schema_id)
schema = schema_registry_client.get_schema(id)
schema_content = schema.content
# [END get_schema_sync]

Expand All @@ -77,12 +77,12 @@ def get_schema(schema_registry_client, schema_id):


def get_schema_id(schema_registry_client):
schema_group = os.environ['SCHEMA_REGISTRY_GROUP']
schema_name = 'your-schema-name'
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
name = 'your-schema-name'
serialization_type = SerializationType.AVRO
schema_content = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
content = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
# [START get_schema_id_sync]
schema_properties = schema_registry_client.get_schema_properties(schema_group, schema_name, schema_content, serialization_type)
schema_properties = schema_registry_client.get_schema_properties(group_name, name, content, serialization_type)
schema_id = schema_properties.id
# [END get_schema_id_sync]
return schema_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
CLIENT_SECRET = os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_SECRET']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMA_NAME = 'your-schema-name'
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
NAME = 'your-schema-name'
SERIALIZATION_TYPE = SerializationType.AVRO

SCHEMA_JSON = {
Expand All @@ -69,25 +69,25 @@
SCHEMA_STRING = json.dumps(SCHEMA_JSON, separators=(',', ':'))


def register_schema(client, schema_group, schema_name, schema_string, serialization_type):
def register_schema(client, group_name, name, schema_string, serialization_type):
print("Registering schema...")
schema_properties = client.register_schema(schema_group, schema_name, schema_string, serialization_type)
schema_properties = client.register_schema(group_name, name, schema_string, serialization_type)
print("Schema registered, returned schema id is {}".format(schema_properties.id))
print("Schema properties are {}".format(schema_properties))
return schema_properties.id


def get_schema_by_id(client, schema_id):
def get_schema_by_id(client, id):
print("Getting schema by id...")
schema = client.get_schema(schema_id)
print("The schema string of schema id: {} string is {}".format(schema_id, schema.content))
print("Schema properties are {}".format(schema_id))
schema = client.get_schema(id)
print("The schema string of schema id: {} string is {}".format(id, schema.content))
print("Schema properties are {}".format(id))
return schema.content


def get_schema_id(client, schema_group, schema_name, schema_string, serialization_type):
def get_schema_id(client, group_name, name, schema_string, serialization_type):
print("Getting schema id...")
schema_properties = client.get_schema_properties(schema_group, schema_name, schema_string, serialization_type)
schema_properties = client.get_schema_properties(group_name, name, schema_string, serialization_type)
print("The schema id is: {}".format(schema_properties.id))
print("Schema properties are {}".format(schema_properties))
return schema_properties.id
Expand All @@ -101,7 +101,6 @@ def get_schema_id(client, schema_group, schema_name, schema_string, serializatio
)
schema_registry_client = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_ENDPOINT, credential=token_credential)
with schema_registry_client:
schema_id = register_schema(schema_registry_client, SCHEMA_GROUP, SCHEMA_NAME, SCHEMA_STRING, SERIALIZATION_TYPE)
schema_str = get_schema_by_id(schema_registry_client, schema_id=schema_id)
schema_id = get_schema_id(schema_registry_client, SCHEMA_GROUP, SCHEMA_NAME, SCHEMA_STRING, SERIALIZATION_TYPE)

schema_id = register_schema(schema_registry_client, GROUP_NAME, NAME, SCHEMA_STRING, SERIALIZATION_TYPE)
schema_str = get_schema_by_id(schema_registry_client, schema_id)
schema_id = get_schema_id(schema_registry_client, GROUP_NAME, NAME, SCHEMA_STRING, SERIALIZATION_TYPE)

0 comments on commit 080f88d

Please sign in to comment.