Skip to content

Commit

Permalink
[improve][broker] Recover susbcription creation on the broken schema …
Browse files Browse the repository at this point in the history
…ledger topic (#22469)
  • Loading branch information
rdhabalia authored Apr 11, 2024
1 parent cea1a9b commit b42d941
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.newCloseConsumer;
Expand Down Expand Up @@ -1291,7 +1292,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
.schemaType(schema == null ? null : schema.getType())
.build();
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
return ignoreUnrecoverableBKException
(topic.addSchemaIfIdleOrCheckCompatible(schema))
.thenCompose(v -> topic.subscribe(option));
} else {
return topic.subscribe(option);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
Expand Down Expand Up @@ -716,6 +717,7 @@ public static <T> CompletableFuture<T> ignoreUnrecoverableBKException(Completabl
return source.exceptionally(t -> {
if (t.getCause() != null
&& (t.getCause() instanceof SchemaException)
&& !(t.getCause() instanceof IncompatibleSchemaException)
&& !((SchemaException) t.getCause()).isRecoverable()) {
// Meeting NoSuchLedgerExistsException, NoSuchEntryException or
// NoSuchLedgerExistsOnMetadataServerException when reading schemas in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -58,6 +59,8 @@
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
import org.apache.pulsar.broker.service.schema.SchemaStorageFormat;
import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -87,6 +90,9 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.Stat;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -1410,4 +1416,74 @@ public User(String name) {
}
}

/**
* This test validates that consumer/producers should recover on topic whose
* schema ledgers are not able to open due to non-recoverable error.
*
* @throws Exception
*/
@Test
public void testDeletedSchemaLedgerRecovery() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicOne = "test-multi-version-schema-one";
final String subName = "test";
final String topicName = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString();

admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet(CLUSTER_NAME));

// (1) create schema
Producer<Schemas.PersonTwo> producer = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.topic(topicName).create();

Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
personTwo.setId(1);
personTwo.setName("Tom");

Consumer<Schemas.PersonTwo> consumer = pulsarClient
.newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName(subName).topic(topicName).subscribe();

producer.send(personTwo);
producer.close();
consumer.close();

// (2) Delete schema ledger
MetadataCache<SchemaStorageFormat.SchemaLocator> locatorEntryCache = pulsar.getLocalMetadataStore()
.getMetadataCache(new MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
@Override
public byte[] serialize(String path, SchemaStorageFormat.SchemaLocator value) {
return value.toByteArray();
}

@Override
public SchemaStorageFormat.SchemaLocator deserialize(String path, byte[] content, Stat stat)
throws IOException {
return SchemaStorageFormat.SchemaLocator.parseFrom(content);
}
});
String path = "/schemas/public/" + namespace + "/test-multi-version-schema-one";
SchemaLocator schema = locatorEntryCache.get(path).get().get();
schema = locatorEntryCache.get(path).get().get();
long ledgerId = schema.getInfo().getPosition().getLedgerId();
pulsar.getBookKeeperClient().deleteLedger(ledgerId);

// (3) Topic should recover from deleted schema and should allow to create consumer and producer
consumer = pulsarClient
.newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName(subName).topic(topicName).subscribe();

producer = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.topic(topicName).create();
assertNotNull(consumer);
assertNotNull(producer);
consumer.close();
producer.close();
}
}

0 comments on commit b42d941

Please sign in to comment.