From f12bb2b275c2db822724d594d3fb86ec04fda0e8 Mon Sep 17 00:00:00 2001 From: 0xn4utilus Date: Tue, 1 Oct 2024 14:57:39 -0700 Subject: [PATCH] fix: update schema registry cache after deleting subject Signed-off-by: 0xn4utilus --- .../schema_registry/schema_registry_client.py | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index 4860f070d..ae6c55116 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -215,6 +215,26 @@ def set(self, schema_id, schema, subject_name=None): if subject_name is not None: self.subject_schemas[subject_name].add(schema) + def remove_by_subject(self, subject_name): + """ + Remove a Schema from the cache. + + Args: + subject_name (str): Subject name the schema is registered under. + """ + + + with self.lock: + if subject_name in self.subject_schemas: + for schema in self.subject_schemas[subject_name]: + schema_id = self.schema_index.get(schema, None) + if schema_id is not None: + self.schema_id_index.pop(schema_id, None) + self.schema_index.pop(Schema, None) + + del self.subject_schemas[subject_name] + + def get_schema(self, schema_id): """ Get the schema instance associated with schema_id from the cache. @@ -546,7 +566,9 @@ def delete_subject(self, subject_name, permanent=False): if permanent: self._rest_client.delete('subjects/{}?permanent=true' .format(_urlencode(subject_name))) - + + self._cache.remove_by_subject(subject_name) + return list def get_latest_version(self, subject_name):