-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Recover susbcription creation on the broken schema ledger topic #22469
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@rdhabalia @lhotari @Technoboy- /**
* This test validates that consumer/producers should recover on topic whose
* schema ledgers are not able to open due to non-recoverable error.
*
* @throws Exception
*/
@Test
public void testDeletedSchemaLedgerRecovery() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicOne = "test-multi-version-schema-one";
final String subName = "test";
final String topicName = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString();
admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet(CLUSTER_NAME));
// (1) create schema
Producer<Schemas.PersonTwo> producer = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.topic(topicName).create();
Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
personTwo.setId(1);
personTwo.setName("Pulsar");
Consumer<Schemas.PersonTwo> consumer = pulsarClient
.newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName(subName).subscriptionType(SubscriptionType.Shared).topic(topicName).subscribe();
producer.send(personTwo);
// producer.close();
// consumer.close();
// (2) Delete schema ledger
MetadataCache<SchemaStorageFormat.SchemaLocator> locatorEntryCache = pulsar.getLocalMetadataStore()
.getMetadataCache(new MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
@Override
public byte[] serialize(String path, SchemaStorageFormat.SchemaLocator value) {
return value.toByteArray();
}
@Override
public SchemaStorageFormat.SchemaLocator deserialize(String path, byte[] content, Stat stat)
throws IOException {
return SchemaStorageFormat.SchemaLocator.parseFrom(content);
}
});
String path = "/schemas/public/" + namespace + "/test-multi-version-schema-one";
SchemaLocator schema = locatorEntryCache.get(path).get().get();
long ledgerId = schema.getInfo().getPosition().getLedgerId();
pulsar.getBookKeeperClient().deleteLedger(ledgerId);
// (3) Topic should recover from deleted schema and should allow to create consumer and producer?????????
Consumer<Schemas.PersonOne> consumer1 = pulsarClient
.newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonOne> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonOne.class).build()))
.subscriptionName(subName).subscriptionType(SubscriptionType.Shared).topic(topicName).subscribe();
Producer<Schemas.PersonOne> producer1 = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonOne> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonOne.class).build()))
.topic(topicName).create();
assertNotNull(consumer1);
assertNotNull(producer1);
Schemas.PersonOne personOne = new Schemas.PersonOne();
personOne.setId(1);
producer1.send(personOne);
Assert.assertEquals(personOne, consumer1.receive().getValue());
// Old "version" consumer cannot be consumed.
producer.send(personTwo);
Assert.assertEquals(personTwo, consumer.receive().getValue());
consumer.close();
producer.close();
consumer1.close();
producer1.close();
} |
Compared with #18010, there is no configuration control. The default behavior is to automatically skip when the integrity of the schema is destroyed. Even after this error occurs, whether to skip or not, there should be a configuration that allows users to configure whether to skip or manually repair when the integrity of the schema is damaged. Of course, the function of manually repairing lost schema does not exist at present. However, I have a pr trying to repair the missing schema.( #20415 ) In the future, I will send an email to discuss how to deal with this lost schema. The current processing of pr does not seem to be a good way. |
If schema ledger itself is deleted then it means it's similar as having topic without schema, and in that case, we should not make the topic unavailable. This PR simply handles non-recoverable error instead eating and ignoring the exception, and not allowing client to connect. yes, I also agree, we should allow client to connect and create a new schema ledger if needed. |
It's because schema ledger is already deleted and in that case, topic doesn't have schema means it should allow to create a new version. Schema deletion means data loss and you can not do much to avoid such situation other than allowing to add a new schema and make topic available to the client applications. Also, this PR doesn't add any additional logic, instead, it just handles non-recoverable error at the top which is fundamentally correct. I think we can always enhance schema-module to add more smarter handling to avoid any incompatibility issues. Earlier, broker was considering IncompatibleException as non-recoverable which is incorrect and this PR also addresses that fix as well. |
Motivation
Right now, broker failed to create schema-consumer if the topic's schema ledger is already deleted and broker is not able to recover that ledger. In that case, client will keep receiving below error message and will never be able to consumer on that topic.
Modifications
Schema should be recovered if schema ledger is failing to open due to non-recoverable ledger error.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: