-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-17058: Initial PR #1
Conversation
This includes all the standard practices used in other SQuaRE Python packages. The encrypted variables for PyPI and LTD deployments are real.
This is a generic function that wraps uritemplate, and also adds the ability to add a hostname. This will be a fairly common pattern in this library because relative paths will be well-known, but the hostname needs to be configured. This is based on gidgethub. I've Added the license for gidgethub (licenses/gidgethub.txt) along with attribution in the module docstring to give credit to Gidgethub for the sansio design and basis for this implementation.
I'm trying out the idea of creating a sansio layer for the registry interface, inspired gidgethub (https://github.com/brettcannon/gidgethub) and https://sans-io.readthedocs.io.
This client is based on the sans-io architecture (https://sans-io.readthedocs.io) and heavily influenced by the Gidgethub implementation. The idea is that the sansio.RegistryApi client knows how to format requests and process responses for the Confluence Schema Registry API, but doesn't include an HTTP client itself. This means it's easy to subclass RegistryApi with a mock implementation for unit testing, and with other subclasses to integrate with specific HTTP clients. Some things that can be improved from this starting point: - More tests (I've got cursory tests set up) - A clear exception hierachy for the different failure odes of the schema registry. This also adds pytest-asyncio, which adds a useful decorator for testing coroutines.
This client subclass integrates with aiohttp itself and is what I expect most users to actually use. There's not a lot to it given that the sansio base client has all the logic. I've added an aiohttp setuptools extra to make it easy to install aiohttp alongside kafkit, though because the session is created by the user, there isn't (yet) an import dependency on aiohttp inside kafkit.
- decipher_message automatically raises these errors for users. - the classes correspond to 300, 400 and 500 errors. - the exceptions include information from the error message produced by the registry's response, if possible.
The confluent wire format is documented at https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
This cache specifically maps schemas to their IDs in the registry. Given a schema, you can look up its ID, and given an ID you can look up its ID. This is expected to be the main usage pattern, rather than referring to schemas by their subject name or version number.
This is an important method because it lets us register schemas in the first place, and also lets us get schema IDs for already registered schemas based on the schema itself. By integrating with the SchemaCache, this method should be fast.
fastavro is the 'blessed' package for parsing and normalizing Avro schemas, as well as encoding and decoding messages.
This makes it easier to use the mock client in many different test contexts.
This wraps the registry's GET /schemas/ids/{int: id} endpoint and is used frequently be deserializers since the schema ID is included in the Confluent Wire Format. The method is cached.
This adds Serializer and Deserializer classes that can be used by producers and consumers to send and recieve messages in the Confluent Wire Format (https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format) This means that schema ID in the schema registry are included as a prefix to the avro-encoded message. The Serializer is dedicated to working for a specific schema. This way is can be used as a value_serializer or key_serializer by the Producer (see https://aiokafka.readthedocs.io/en/stable/examples/serialize_and_compress.html#serialization-and-compression). The Deserializer needs to react to whatever schema ID gets send to it. For that reason, it can't implement a __call__ (it'd need to be a coroutine), and thus must always be used externally to the consumer.
Wheras Serializer serializes for a single schema (so it can be a callable), the PolySerializer can serialize messages in any schema.
I think this is more accurate, because it's expected to have the schema, the hostname, and the post.
This method is a high-level interface to the GET /subjects{/subject}/versions{/version} endpoint.
This makes it possible to cache subject version information. The existing schema version is still stored in the Schema Cache. This now associates a subject-version tuple to a schema ID. This makes it possible to cache the RegistryApi.get_schema_by_subject() method.
This is required to easily get the full set of schema information, rather than calling both get_id and get_schema.
Now the get_schema_by_subject method gets caching.
This adds properties to RegistryApi for the SchemaCache and SubjectCache, rather than having general attributes. This effectively renames RegistryApi.schemas -> RegistryApi.schema_cache.
The endpoints like GET /schemas/ids/{int: id} and GET /subjects/(string: subject)/versions/(versionId: version) provide the schemas in a serialized string form, rather than as a JSON object. The tests misrepresented this, but now it's all fixed up :)
API documentation powered by numpydoc and automodapi.
This will get shortened and details moved to conceptual documentation pages around serializers, deserializers, and the registry client.
Put common core APIs in the main kafkit.registry namespace. Leave sansio stuff in kafkit.registry.sansio so it's clear that that code is sans IO. And leave kafkit.registry.aiohttp in its own namespace because of the optional aiohttp dependency.
@cbanek care to look at this (maybe just scan the docs, https://kafkit.lsst.io/v/DM-17058/, and code) and let me know if this looks like a reasonable architecture to you? |
kafkit/registry/sansio.py
Outdated
except Exception: | ||
# If the schema couldn't be parsed, its not going to be a | ||
# valid key anyhow. | ||
raise KeyError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to print out the serialization error, or at least some message with this, as it might be hard for the user to tell the difference between a key not existing, and maybe when there's something troubling in the cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the schema registry catches keyerror and then just continues as if it isn't in the cache, and might add it back in or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. The client should recover mostly by doing the actual HTTP request, but it doesn't seem like there's any harm forming an error message for anyone who's using the cache directly.
kafkit/registry/sansio.py
Outdated
if schema is None: | ||
raise ValueError( | ||
'Trying to cache the schema ID for subject ' | ||
f'{subject!r}, version {version}, but it\'s schema ID ' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its / it's? then you can get rid of the escaped backslash? ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks amazing!
Just wondering, and this is somewhat of an out of band question, but what do you think the workflow of publishing versions will be? It looks like the same library can both make new versions and use any version, which is good, but it seems like maybe the CI system will make new versions with the latest XML schema? And then the callers using the schemas will only be consuming them? This is somewhat important since it doesn't look like there's any strong ordering of versions, or pushing new versions, or even that version numbers have to be sequential?
Thanks! That's a really good question. For SAL/EFD, I have no idea. It is a little spooky that the Schema Registry's register endpoint is both used to register a schema and also to look up a schema's ID based on the schema document if the schema is already registered. For SQuaRE Events, I was thinking of having a policy something like this:
Does that make any sense? |
That makes sense, although I'm not sure if there's any backward compatibility between the versions - it seems like they're completely different from each other. So that means that you need to update everything in lock-step. That is, if a producer starts publishing a new version of a schema, the consumer is basically broken because it's listening for a version that will no longer be produced until it is updated. A single source of truth might help with this, depending on how deep the knowledge of the schemas might have to be. Plus having multiple copies of what versions are used might be hard to keep in sync. Seems reasonable though that the producer controls what schemas are published though, although you might be able to do this at build time? (Like publish the latest version of the schemas when you make the containers or release? Then you might have some kind of post-test hook that does a commit of the new schema) It's also just possible to try a few things out until they break, or become unwieldy, then we can know the problems and figure out a better way of doing things in a more incremental way. I'm not even sure how often we'll update new versions, rather than just add new schemas... Just something to keep in mind. |
This is what I'm thinking about for schema backwards compatibility: reader = schemaless_reader(fh, writer_schema, reader_schema=preferred_schema) So the writer_schema comes from the ID in the Confluent Wire Format and then the To make this work, we'd have to turn on the Schema Registry feature that requires a subject to have transitive forwards compatibility ( Hmm, the Confluent default is to have backwards compatibility so that consumers upgrade first and producers follow. I wonder if that really is better than changing producers first? |
d3529d4
to
31467e2
Compare
31467e2
to
b6d6e76
Compare
This PR sets up the package, implements a RegistryAPI client that works with the Confluent Schema Registry (and caches schemas locally when possible), and adds Avro serializers and deserializers that work with schemas in a Confluent Schema Registry.
The package is pretty young still, but there are API docs at https://kafkit.lsst.io/v/DM-17058/ Those docs explain the scope of the work and aim of the package.