Skip to content

Commit

Permalink
[Improve][broker] Support clear old bookie data for BKCluster (#16744)
Browse files Browse the repository at this point in the history
(cherry picked from commit eb5725a)
  • Loading branch information
coderzc authored and liangyepianzhou committed Feb 10, 2023
1 parent e59aac7 commit eabc2cd
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ public class EmbeddedPulsarCluster implements AutoCloseable {
private final PulsarAdmin admin;

@Builder
private EmbeddedPulsarCluster(int numBrokers, int numBookies, String metadataStoreUrl) throws Exception {
private EmbeddedPulsarCluster(int numBrokers, int numBookies, String metadataStoreUrl,
boolean clearOldData) throws Exception {
this.numBrokers = numBrokers;
this.numBookies = numBookies;
this.metadataStoreUrl = metadataStoreUrl;
this.bkCluster = new BKCluster(metadataStoreUrl, numBookies);
this.bkCluster = new BKCluster(metadataStoreUrl, numBookies, clearOldData);

for (int i = 0; i < numBrokers; i++) {
PulsarService s = new PulsarService(getConf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,50 @@
package org.apache.pulsar.broker;

import static org.testng.Assert.assertEquals;
import java.io.File;
import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
public class EndToEndMetadataTest extends BaseMetadataStoreTest {

private File tempDir;

@BeforeClass(alwaysRun = true)
@Override
public void setup() throws Exception {
super.setup();
tempDir = IOUtils.createTempDir("bookies", "test");
}

@AfterClass(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.cleanup();
FileUtils.deleteDirectory(tempDir);
}

@Test(dataProvider = "impl")
public void testPublishConsume(String provider, Supplier<String> urlSupplier) throws Exception {

@Cleanup
EmbeddedPulsarCluster epc = EmbeddedPulsarCluster.builder()
.numBrokers(1)
.numBookies(1)
.metadataStoreUrl(urlSupplier.get())
.clearOldData(true)
.build();

@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.bookkeeper;

import static org.apache.commons.io.FileUtils.cleanDirectory;
import java.io.File;
import java.io.IOException;
import java.net.NetworkInterface;
Expand Down Expand Up @@ -67,9 +68,10 @@ public class BKCluster implements AutoCloseable {

protected final ServerConfiguration baseConf = newBaseServerConfiguration();
protected final ClientConfiguration baseClientConf = newBaseClientConfiguration();
private final boolean clearOldData;


public BKCluster(String metadataServiceUri, int numBookies) throws Exception {
public BKCluster(String metadataServiceUri, int numBookies, boolean clearOldData) throws Exception {
this.clearOldData = clearOldData;
this.metadataServiceUri = metadataServiceUri;
this.store = MetadataStoreExtended.create(metadataServiceUri, MetadataStoreConfig.builder().build());
baseConf.setJournalRemovePagesFromCache(false);
Expand All @@ -87,7 +89,6 @@ public BKCluster(String metadataServiceUri, int numBookies) throws Exception {

@Override
public void close() throws Exception {
boolean failed = false;
// stop bookkeeper service
try {
stopBKCluster();
Expand Down Expand Up @@ -161,6 +162,10 @@ protected void cleanupTempDirs() throws Exception {
private ServerConfiguration newServerConfiguration() throws Exception {
File f = createTempDir("bookie", "test");

if (clearOldData) {
cleanDirectory(f);
}

int port;
if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
port = PortManager.nextFreePort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ public abstract class BaseMetadataStoreTest extends TestRetrySupport {

@BeforeClass(alwaysRun = true)
@Override
public final void setup() throws Exception {
public void setup() throws Exception {
incrementSetupNumber();
zks = new TestZKServer();
}

@AfterClass(alwaysRun = true)
@Override
public final void cleanup() throws Exception {
public void cleanup() throws Exception {
markCurrentSetupNumberCleaned();
if (zks != null) {
zks.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class EndToEndTest extends BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testBasic(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
BKCluster bktc = new BKCluster(urlSupplier.get(), 1);
BKCluster bktc = new BKCluster(urlSupplier.get(), 1, true);

@Cleanup
BookKeeper bkc = bktc.newClient();
Expand Down Expand Up @@ -85,7 +85,7 @@ public void testBasic(String provider, Supplier<String> urlSupplier) throws Exce
@Test(dataProvider = "impl")
public void testWithLedgerRecovery(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
BKCluster bktc = new BKCluster(urlSupplier.get(), 3);
BKCluster bktc = new BKCluster(urlSupplier.get(), 3, true);

@Cleanup
BookKeeper bkc = bktc.newClient();
Expand Down

0 comments on commit eabc2cd

Please sign in to comment.