From 4fd17d412c7368f79bf6c82497403e70d17474a6 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Mon, 10 Feb 2020 08:31:21 +0800 Subject: [PATCH] Expose bookkeeper expose explicit lac in broker.conf (#5822) ### Motivation Expose bookkeeper expose explicit lac configuration in broker.conf It's related to #3828 #4976, some Pulsar SQL users need to enable the explicitLacInterval, so that they can get the last message in Pulsar SQL. --- conf/broker.conf | 4 ++++ conf/standalone.conf | 4 ++++ .../org/apache/pulsar/broker/ServiceConfiguration.java | 3 +++ .../pulsar/broker/BookKeeperClientFactoryImpl.java | 1 + .../java/org/apache/pulsar/PulsarBrokerStarterTest.java | 2 ++ .../pulsar/broker/BookKeeperClientFactoryImplTest.java | 9 +++++++++ 6 files changed, 23 insertions(+) diff --git a/conf/broker.conf b/conf/broker.conf index 798fbacf271ca..47eb3357a3507 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -528,6 +528,10 @@ bookkeeperTLSTrustCertsFilePath= # Enable/disable disk weight based placement. Default is false bookkeeperDiskWeightBasedPlacementEnabled=false +# Set the interval to check the need for sending an explicit LAC +# A value of '0' disables sending any explicit LACs. Default is 0. +bookkeeperExplicitLacIntervalInMills=0; + ### --- Managed Ledger --- ### # Number of bookies to use when creating a ledger diff --git a/conf/standalone.conf b/conf/standalone.conf index 6d1023bb40274..138525e223ce1 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -334,6 +334,10 @@ bookkeeperTLSTrustCertsFilePath= # Enable/disable disk weight based placement. Default is false bookkeeperDiskWeightBasedPlacementEnabled=false +# Set the interval to check the need for sending an explicit LAC +# A value of '0' disables sending any explicit LACs. Default is 0. +bookkeeperExplicitLacIntervalInMills=0; + ### --- Managed Ledger --- ### # Number of bookies to use when creating a ledger diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2771692b82f30..52c226a2b1b50 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -879,6 +879,9 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable disk weight based placement. Default is false") private boolean bookkeeperDiskWeightBasedPlacementEnabled = false; + @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the interval to check the need for sending an explicit LAC") + private int bookkeeperExplicitLacIntervalInMills = 0; + /**** --- Managed Ledger --- ****/ @FieldContext( minValue = 1, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index f48c915de379d..0d1d7a8d6f8f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -115,6 +115,7 @@ ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) { } bkConf.setReorderReadSequenceEnabled(conf.isBookkeeperClientReorderReadSequenceEnabled()); + bkConf.setExplictLacInterval(conf.getBookkeeperExplicitLacIntervalInMills()); return bkConf; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java index ba4b212571a91..95711405ac28e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java @@ -78,6 +78,7 @@ private File createValidBrokerConfigFile() throws FileNotFoundException { printWriter.println("bookkeeperClientTimeoutInSeconds=12345"); printWriter.println("bookkeeperClientSpeculativeReadTimeoutInMillis=3000"); printWriter.println("enableRunBookieTogether=true"); + printWriter.println("bookkeeperExplicitLacIntervalInMills=5"); printWriter.close(); testConfigFile.deleteOnExit(); @@ -127,6 +128,7 @@ public void testLoadConfig() throws SecurityException, NoSuchMethodException, IO assertEquals(serviceConfig.getBookkeeperClientIsolationGroups(), "group1,group2"); assertEquals(serviceConfig.getBookkeeperClientSpeculativeReadTimeoutInMillis(), 3000); assertEquals(serviceConfig.getBookkeeperClientTimeoutInSeconds(), 12345); + assertEquals(serviceConfig.getBookkeeperExplicitLacIntervalInMills(), 5); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java index f06e791a12e4b..36ce41f53d883 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java @@ -159,4 +159,13 @@ public void testSetDiskWeightBasedPlacementEnabled() { assertTrue(factory.createBkClientConfiguration(conf).getDiskWeightBasedPlacementEnabled()); } + @Test + public void testSetExplicitLacInterval() { + BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl(); + ServiceConfiguration conf = new ServiceConfiguration(); + assertEquals(factory.createBkClientConfiguration(conf).getExplictLacInterval(), 0); + conf.setBookkeeperExplicitLacIntervalInMills(5); + assertEquals(factory.createBkClientConfiguration(conf).getExplictLacInterval(), 5); + } + }