Skip to content

Commit 2620450

Browse files
authored
[fix][schema]ledger handle leak when update schema (apache#17283)
### Motivation in the schema update, will create a `ledgerHandle` and write data to BK, after that `ledgerHandle` is no longer useful and no other object holds references to it. `ledgerHandle` will be recycled with GC, but `ledgerHandle` also hold external connections, which will cause leakage. https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L452-L456 ### Modifications after the schema is updated, close the `ledgerHandle`, just like schema-read: https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L519-L525
1 parent bf6e815 commit 2620450

File tree

3 files changed

+47
-5
lines changed

3 files changed

+47
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -437,11 +437,14 @@ private CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToS
437437
byte[] data
438438
) {
439439
SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data);
440-
return createLedger(schemaId).thenCompose(ledgerHandle ->
441-
addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
442-
Functions.newPositionInfo(ledgerHandle.getId(), entryId)
443-
)
444-
);
440+
return createLedger(schemaId).thenCompose(ledgerHandle -> {
441+
final long ledgerId = ledgerHandle.getId();
442+
return addEntry(ledgerHandle, schemaEntry)
443+
.thenApply(entryId -> {
444+
ledgerHandle.closeAsync();
445+
return Functions.newPositionInfo(ledgerId, entryId);
446+
});
447+
});
445448
}
446449

447450
@NotNull

pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java

+35
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.collect.Sets;
2626
import java.util.Collections;
2727
import java.util.concurrent.ThreadLocalRandom;
28+
import java.util.stream.Collectors;
2829
import lombok.extern.slf4j.Slf4j;
2930
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
3031
import org.apache.pulsar.client.api.Consumer;
@@ -478,7 +479,41 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS
478479

479480
consumerOne.close();
480481
producerOne.close();
482+
}
481483

484+
@Test
485+
public void testSchemaLedgerAutoRelease() throws Exception {
486+
String namespaceName = PUBLIC_TENANT + "/default";
487+
String topicName = "persistent://" + namespaceName + "/tp";
488+
admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
489+
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
490+
// Update schema 100 times.
491+
for (int i = 0; i < 100; i++){
492+
Schema schema = Schema.JSON(SchemaDefinition.builder()
493+
.withJsonDef(String.format("""
494+
{
495+
"type": "record",
496+
"name": "Test_Pojo",
497+
"namespace": "org.apache.pulsar.schema.compatibility",
498+
"fields": [{
499+
"name": "prop_%s",
500+
"type": ["null", "string"],
501+
"default": null
502+
}]
503+
}
504+
""", i))
505+
.build());
506+
Producer producer = pulsarClient
507+
.newProducer(schema)
508+
.topic(topicName)
509+
.create();
510+
producer.close();
511+
}
512+
// The other ledgers are about 5.
513+
Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream()
514+
.filter(ledger -> !ledger.isFenced())
515+
.collect(Collectors.toList()).size() < 20);
516+
admin.topics().delete(topicName, true);
482517
}
483518

484519
@Test

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.bookkeeper.client;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import com.google.common.collect.Lists;
2223
import io.netty.buffer.ByteBuf;
2324
import io.netty.buffer.Unpooled;
@@ -29,6 +30,7 @@
2930
import java.util.List;
3031
import java.util.Queue;
3132
import java.util.concurrent.CompletableFuture;
33+
import lombok.Getter;
3234
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
3335
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
3436
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
@@ -58,6 +60,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
5860
final byte[] passwd;
5961
final ReadHandle readHandle;
6062
long lastEntry = -1;
63+
@VisibleForTesting
64+
@Getter
6165
boolean fenced = false;
6266

6367
public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,

0 commit comments

Comments
 (0)