diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index 4860f070d..ae6c55116 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -215,6 +215,26 @@ def set(self, schema_id, schema, subject_name=None): if subject_name is not None: self.subject_schemas[subject_name].add(schema) + def remove_by_subject(self, subject_name): + """ + Remove a Schema from the cache. + + Args: + subject_name (str): Subject name the schema is registered under. + """ + + + with self.lock: + if subject_name in self.subject_schemas: + for schema in self.subject_schemas[subject_name]: + schema_id = self.schema_index.get(schema, None) + if schema_id is not None: + self.schema_id_index.pop(schema_id, None) + self.schema_index.pop(Schema, None) + + del self.subject_schemas[subject_name] + + def get_schema(self, schema_id): """ Get the schema instance associated with schema_id from the cache. @@ -546,7 +566,9 @@ def delete_subject(self, subject_name, permanent=False): if permanent: self._rest_client.delete('subjects/{}?permanent=true' .format(_urlencode(subject_name))) - + + self._cache.remove_by_subject(subject_name) + return list def get_latest_version(self, subject_name):