diff --git a/docs/AdminTools.md b/docs/AdminTools.md index 0c175887d7e47..66690fe6dd366 100644 --- a/docs/AdminTools.md +++ b/docs/AdminTools.md @@ -14,6 +14,9 @@ - [Brokers](#brokers) - [list of active brokers](#list-of-active-brokers) - [list of namespaces owned by a given broker](#list-of-namespaces-owned-by-a-given-broker) + - [update dynamic configuration](#update-dynamic-configuration) + - [get list of dynamic configuration name](#get-list-of-dynamic-configuration-name) + - [get value of dynamic configurations](#get-value-of-dynamic-configurations) - [Properties](#properties) - [list existing properties](#list-existing-properties) - [create property](#create-property) @@ -233,6 +236,157 @@ GET /admin/brokers/{cluster}/{broker}/ownedNamespaces admin.brokers().getOwnedNamespaces(cluster,brokerUrl) ``` +#### update dynamic configuration +Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally. + +###### CLI + +``` +$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100 +``` + +``` +N/A +``` + +###### REST + +``` +GET /admin/brokers/configuration/{configName}/{configValue} +``` + +###### Java + +```java +admin.brokers().updateDynamicConfiguration(configName, configValue) +``` + +#### get list of dynamic configuration name +It gives list of updatable dynamic service-configuration name. + +###### CLI + +``` +$ pulsar-admin brokers list-dynamic-config +``` + +``` +brokerShutdownTimeoutMs +``` + +###### REST + +``` +GET /admin/brokers/configuration +``` + +###### Java + +```java +admin.brokers().getDynamicConfigurationNames() +``` + +#### get value of dynamic configurations +It gives value of all dynamic configurations stored in zookeeper + +###### CLI + +``` +$ pulsar-admin brokers get-all-dynamic-config +``` + +``` +brokerShutdownTimeoutMs:100 +``` + +###### REST + +``` +GET /admin/brokers/configuration/values +``` + +###### Java + +```java +admin.brokers().getAllDynamicConfigurations() +``` + +#### Update dynamic configuration +Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally. + +###### CLI + +``` +$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100 +``` + +``` +N/A +``` + +###### REST + +``` +GET /admin/brokers/configuration/{configName}/{configValue} +``` + +###### Java + +```java +admin.brokers().updateDynamicConfiguration(configName, configValue) +``` + +#### Get list of dynamic configuration name +It gives list of updatable dynamic service-configuration name. + +###### CLI + +``` +$ pulsar-admin brokers list-dynamic-config +``` + +``` +brokerShutdownTimeoutMs +``` + +###### REST + +``` +GET /admin/brokers/configuration +``` + +###### Java + +```java +admin.brokers().getDynamicConfigurationNames() +``` + +#### Get value of dynamic configurations +It gives value of all dynamic configurations stored in zookeeper + +###### CLI + +``` +$ pulsar-admin brokers get-all-dynamic-config +``` + +``` +brokerShutdownTimeoutMs:100 +``` + +###### REST + +``` +GET /admin/brokers/configuration/values +``` + +###### Java + +```java +admin.brokers().getAllDynamicConfigurations() +``` + + ### Properties diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java index ab3099362e5d5..87553b3fe13a8 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java @@ -62,6 +62,7 @@ public class ServiceConfiguration implements PulsarConfiguration{ private long zooKeeperSessionTimeoutMillis = 30000; // Time to wait for broker graceful shutdown. After this time elapses, the // process will be killed + @FieldContext(dynamic = true) private long brokerShutdownTimeoutMs = 3000; // Enable backlog quota check. Enforces action on topic when the quota is // reached diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java index cf693239b5484..465d35cfa2784 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java @@ -56,4 +56,11 @@ * @return character length of field */ public int maxCharLength() default Integer.MAX_VALUE; + + /** + * allow field to be updated dynamically + * + * @return + */ + public boolean dynamic() default false; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java index bce449bddafba..3fea37d54bc4b 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java @@ -15,32 +15,48 @@ */ package com.yahoo.pulsar.broker.admin; +import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH; + +import java.util.List; import java.util.Map; import java.util.Set; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; +import com.yahoo.pulsar.broker.ServiceConfiguration; +import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import com.yahoo.pulsar.broker.service.BrokerService; +import com.yahoo.pulsar.broker.web.RestException; +import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus; +import com.yahoo.pulsar.common.util.ObjectMapperFactory; +import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; -import com.yahoo.pulsar.broker.web.RestException; -import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus; + @Path("/brokers") @Api(value = "/brokers", description = "Brokers admin apis", tags = "brokers") @Produces(MediaType.APPLICATION_JSON) public class Brokers extends AdminResource { private static final Logger LOG = LoggerFactory.getLogger(Brokers.class); - + private int serviceConfigZkVersion = -1; + @GET @Path("/{cluster}") @ApiOperation(value = "Get the list of active brokers (web service addresses) in the cluster.", response = String.class, responseContainer = "Set") @@ -79,4 +95,91 @@ public Map getOwnedNamespaes(@PathParam("clust throw new RestException(e); } } + + @POST + @Path("/configuration/{configName}/{configValue}") + @ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.") + @ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"), + @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"), + @ApiResponse(code = 404, message = "Configuration not found"), + @ApiResponse(code = 412, message = "Configuration can't be updated dynamically") }) + public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{ + validateSuperUserAccess(); + updateDynamicConfigurationOnZk(configName, configValue); + } + + @GET + @Path("/configuration/values") + @ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config") + @ApiResponses(value = { @ApiResponse(code = 404, message = "Configuration not found") }) + public Map getAllDynamicConfigurations() throws Exception { + ZooKeeperDataCache> dynamicConfigurationCache = pulsar().getBrokerService() + .getDynamicConfigurationCache(); + Map configurationMap = null; + try { + configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk")); + } catch (RestException e) { + LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e); + throw e; + } catch (Exception e) { + LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e); + throw new RestException(e); + } + return configurationMap; + } + + @GET + @Path("/configuration") + @ApiOperation(value = "Get all updatable dynamic configurations's name") + public List getDynamicConfigurationName() { + return BrokerService.getDynamicConfigurationMap().keys(); + } + + /** + * if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so + * all other brokers get the watch and can see the change and take appropriate action on the change. + * + * @param configName + * : configuration key + * @param configValue + * : configuration value + */ + private synchronized void updateDynamicConfigurationOnZk(String configName, String configValue) { + try { + if (BrokerService.getDynamicConfigurationMap().containsKey(configName)) { + ZooKeeperDataCache> dynamicConfigurationCache = pulsar().getBrokerService() + .getDynamicConfigurationCache(); + Map configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH) + .orElse(null); + if (configurationMap != null) { + configurationMap.put(configName, configValue); + byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap); + dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH); + serviceConfigZkVersion = localZk() + .setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion(); + } else { + configurationMap = Maps.newHashMap(); + configurationMap.put(configName, configValue); + byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap); + ZkUtils.createFullPathOptimistic(localZk(), BROKER_SERVICE_CONFIGURATION_PATH, content, + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName, + configValue); + } + throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration"); + } + } catch (RestException re) { + throw re; + } catch (Exception ie) { + LOG.error("[{}] Failed to update configuration {}/{}, {}", clientAppId(), configName, configValue, + ie.getMessage(), ie); + throw new RestException(ie); + } + } + } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java index c999790df3958..aa10c9331aead 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java @@ -19,14 +19,17 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections.CollectionUtils.isEmpty; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -41,7 +44,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.commons.lang.SystemUtils; -import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -66,6 +68,7 @@ import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.client.impl.PulsarClientImpl; import com.yahoo.pulsar.client.util.FutureUtil; +import com.yahoo.pulsar.common.configuration.FieldContext; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.naming.NamespaceBundle; import com.yahoo.pulsar.common.naming.NamespaceBundleFactory; @@ -77,8 +80,12 @@ import com.yahoo.pulsar.common.policies.data.Policies; import com.yahoo.pulsar.common.policies.data.RetentionPolicies; import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats; +import com.yahoo.pulsar.common.util.FieldParser; +import com.yahoo.pulsar.common.util.ObjectMapperFactory; import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap; +import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet; import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener; +import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; @@ -115,6 +122,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener offlineTopicStatCache; + private static final ConcurrentOpenHashMap dynamicConfigurationMap = prepareDynamicConfigurationMap(); + private final ConcurrentOpenHashMap configRegisteredListeners; private AuthorizationManager authorizationManager = null; private final ScheduledExecutorService statsUpdater; @@ -132,6 +141,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener> dynamicConfigurationCache; public BrokerService(PulsarService pulsar) throws Exception { this.pulsar = pulsar; @@ -141,6 +153,7 @@ public BrokerService(PulsarService pulsar) throws Exception { this.topics = new ConcurrentOpenHashMap<>(); this.replicationClients = new ConcurrentOpenHashMap<>(); this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds(); + this.configRegisteredListeners = new ConcurrentOpenHashMap<>(); this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>(); this.pulsarStats = new PulsarStats(pulsar); @@ -186,6 +199,14 @@ public BrokerService(PulsarService pulsar) throws Exception { this.backlogQuotaChecker = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker")); this.authenticationService = new AuthenticationService(pulsar.getConfiguration()); + + this.dynamicConfigurationCache = new ZooKeeperDataCache>(pulsar().getLocalZkCache()) { + @Override + public Map deserialize(String key, byte[] content) throws Exception { + return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class); + } + }; + updateConfigurationAndRegisterListeners(); PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize()); } @@ -824,4 +845,117 @@ public AuthenticationService getAuthenticationService() { public List getAllTopicsFromNamespaceBundle(String namespace, String bundle) { return multiLayerTopicsMap.get(namespace).get(bundle).values(); } + + public ZooKeeperDataCache> getDynamicConfigurationCache() { + return dynamicConfigurationCache; + } + + /** + * Update dynamic-ServiceConfiguration with value present into zk-configuration-map and register listeners on + * dynamic-ServiceConfiguration field to take appropriate action on change of zk-configuration-map. + */ + private void updateConfigurationAndRegisterListeners() { + // update ServiceConfiguration value by reading zk-configuration-map + updateDynamicServiceConfiguration(); + //add more listeners here + } + + /** + * Allows a listener to listen on update of {@link ServiceConfiguration} change, so listener can take appropriate + * action if any specific config-field value has been changed. + *

+ * On notification, listener should first check if config value has been changed and after taking appropriate + * action, listener should update config value with new value if it has been changed (so, next time listener can + * compare values on configMap change). + * @param + * + * @param configKey + * : configuration field name + * @param listener + * : listener which takes appropriate action on config-value change + */ + public void registerConfigurationListener(String configKey, Consumer listener) { + configRegisteredListeners.put(configKey, listener); + dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener>() { + @SuppressWarnings("unchecked") + @Override + public void onUpdate(String path, Map data, Stat stat) { + if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null + && data.containsKey(configKey)) { + log.info("Updating configuration {}/{}", configKey, data.get(configKey)); + listener.accept((T) FieldParser.value(data.get(configKey), dynamicConfigurationMap.get(configKey))); + } + } + }); + } + + private void updateDynamicServiceConfiguration() { + try { + Optional> data = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH); + if (data.isPresent() && data.get() != null) { + data.get().forEach((key,value)-> { + try { + Field field = ServiceConfiguration.class.getDeclaredField(key); + if (field != null && field.isAnnotationPresent(FieldContext.class)) { + field.setAccessible(true); + field.set(pulsar().getConfiguration(), FieldParser.value(value,field)); + log.info("Successfully updated {}/{}", key, value); + } + } catch (Exception e) { + log.warn("Failed to update service configuration {}/{}, {}",key,value,e.getMessage()); + } + }); + } + // register a listener: it updates field value and triggers appropriate registered field-listener only if + // field's value has been changed so, registered doesn't have to update field value in ServiceConfiguration + dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener>() { + @SuppressWarnings("unchecked") + @Override + public void onUpdate(String path, Map data, Stat stat) { + if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) { + data.forEach((configKey, value) -> { + Field configField = dynamicConfigurationMap.get(configKey); + Object newValue = FieldParser.value(data.get(configKey), configField); + if (configField != null) { + Consumer listener = configRegisteredListeners.get(configKey); + try { + Object existingValue = configField.get(pulsar.getConfiguration()); + configField.set(pulsar.getConfiguration(), newValue); + log.info("Successfully updated configuration {}/{}", configKey, + data.get(configKey)); + if (listener != null && !existingValue.equals(newValue)) { + listener.accept(newValue); + } + } catch (Exception e) { + log.error("Failed to update config {}/{}", configKey, newValue); + } + } else { + log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue); + } + }); + } + } + }); + } catch (Exception e) { + log.warn("Failed to read zookeeper path [{}]:", BROKER_SERVICE_CONFIGURATION_PATH, e); + } + } + + public static ConcurrentOpenHashMap getDynamicConfigurationMap() { + return dynamicConfigurationMap; + } + + private static ConcurrentOpenHashMap prepareDynamicConfigurationMap() { + ConcurrentOpenHashMap dynamicConfigurationMap = new ConcurrentOpenHashMap<>(); + for (Field field : ServiceConfiguration.class.getDeclaredFields()) { + if (field != null && field.isAnnotationPresent(FieldContext.class)) { + field.setAccessible(true); + if (((FieldContext) field.getAnnotation(FieldContext.class)).dynamic()) { + dynamicConfigurationMap.put(field.getName(), field); + } + } + } + return dynamicConfigurationMap; + } + } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java index f909dcc21d6f3..cadb2fa111449 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java @@ -15,12 +15,15 @@ */ package com.yahoo.pulsar.broker.admin; +import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.EnumSet; @@ -36,6 +39,9 @@ import javax.ws.rs.client.WebTarget; import org.apache.bookkeeper.test.PortManager; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -56,6 +62,7 @@ import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest; import com.yahoo.pulsar.broker.namespace.NamespaceEphemeralData; import com.yahoo.pulsar.broker.namespace.NamespaceService; +import com.yahoo.pulsar.broker.service.BrokerService; import com.yahoo.pulsar.client.admin.PulsarAdmin; import com.yahoo.pulsar.client.admin.PulsarAdminException; import com.yahoo.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -377,8 +384,116 @@ public void brokers() throws Exception { admin.namespaces().deleteNamespace("prop-xyz/use/ns1"); admin.clusters().deleteCluster("use"); assertEquals(admin.clusters().getClusters(), Lists.newArrayList()); + } + + /** + *
+     * Verifies: zk-update configuration updates service-config
+     * 1. create znode for dynamic-config
+     * 2. start pulsar service so, pulsar can set the watch on that znode
+     * 3. update the configuration with new value
+     * 4. wait and verify that new value has been updated
+     * 
+ * + * @throws Exception + */ + @Test + public void testUpdateDynamicConfigurationWithZkWatch() throws Exception { + // create configuration znode + ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + // Now, znode is created: set the watch and listener on the znode + Method updateConfigListenerMethod = BrokerService.class + .getDeclaredMethod("updateConfigurationAndRegisterListeners"); + updateConfigListenerMethod.setAccessible(true); + updateConfigListenerMethod.invoke(pulsar.getBrokerService()); + pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000); + // (1) try to update dynamic field + final long shutdownTime = 10; + // update configuration + admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime)); + // wait config to be updated + for (int i = 0; i < 5; i++) { + if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != shutdownTime) { + Thread.sleep(100 + (i * 10)); + } else { + break; + } + } + // verify value is updated + assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime); + // (2) try to update non-dynamic field + try { + admin.brokers().updateDynamicConfiguration("zookeeperServers", "test-zk:1234"); + } catch (Exception e) { + assertTrue(e instanceof PreconditionFailedException); + } + + // (3) try to update non-existent field + try { + admin.brokers().updateDynamicConfiguration("test", Long.toString(shutdownTime)); + } catch (Exception e) { + assertTrue(e instanceof PreconditionFailedException); + } + + } + + /** + *
+     * verifies: that registerListener updates pulsar.config value with newly updated zk-dynamic config
+     * NOTE: pulsar can't set the watch on non-existing znode
+     * So, when pulsar starts it is not able to set the watch on non-existing znode of dynamicConfiguration
+     * So, here, after creating znode we will trigger register explicitly
+     * 1.start pulsar
+     * 2.update zk-config with admin api
+     * 3. trigger watch and listener
+     * 4. verify that config is updated
+     * 
+ * @throws Exception + */ + @Test + public void testUpdateDynamicLocalConfiguration() throws Exception { + // (1) try to update dynamic field + final long shutdownTime = 10; + pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000); + // update configuration + admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime)); + // Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated + Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("updateConfigurationAndRegisterListeners"); + getPermitZkNodeMethod.setAccessible(true); + getPermitZkNodeMethod.invoke(pulsar.getBrokerService()); + // verify value is updated + assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime); + } + + @Test + public void testUpdatableConfigurationName() throws Exception { + // (1) try to update dynamic field + final String configName = "brokerShutdownTimeoutMs"; + assertTrue(admin.brokers().getDynamicConfigurationNames().contains(configName)); + } + + @Test + public void testGetDynamicLocalConfiguration() throws Exception { + // (1) try to update dynamic field + final String configName = "brokerShutdownTimeoutMs"; + final long shutdownTime = 10; + pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000); + try { + admin.brokers().getAllDynamicConfigurations(); + fail("should have fail as configuration is not exist"); + } catch (PulsarAdminException.NotFoundException ne) { + // ok : expected + } + assertNotEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime); + // update configuration + admin.brokers().updateDynamicConfiguration(configName, Long.toString(shutdownTime)); + // Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated + assertEquals(Long.parseLong(admin.brokers().getAllDynamicConfigurations().get(configName)), shutdownTime); + } + @Test(enabled = true) public void properties() throws PulsarAdminException { Set allowedClusters = Sets.newHashSet("use"); diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java index 4e07b61ff97b4..008f7e35334e0 100644 --- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java @@ -67,4 +67,32 @@ public interface Brokers { * @throws PulsarAdminException */ Map getOwnedNamespaces(String cluster, String brokerUrl) throws PulsarAdminException; + + /** + * It updates dynamic configuration value in to Zk that triggers watch on + * brokers and all brokers can update {@link ServiceConfiguration} value + * locally + * + * @param key + * @param value + * @throws PulsarAdminException + */ + void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException; + + /** + * Get list of updatable configuration name + * + * @return + * @throws PulsarAdminException + */ + List getDynamicConfigurationNames() throws PulsarAdminException; + + /** + * Get values of all overridden dynamic-configs + * + * @return + * @throws PulsarAdminException + */ + Map getAllDynamicConfigurations() throws PulsarAdminException; + } diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java index 3051a89c27e0c..2829b8a1a2e8d 100644 --- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java @@ -18,12 +18,14 @@ import java.util.List; import java.util.Map; +import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import com.yahoo.pulsar.client.admin.Brokers; import com.yahoo.pulsar.client.admin.PulsarAdminException; import com.yahoo.pulsar.client.api.Authentication; +import com.yahoo.pulsar.common.policies.data.ErrorData; import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus; public class BrokersImpl extends BaseResource implements Brokers { @@ -56,4 +58,34 @@ public Map getOwnedNamespaces(String cluster, } } + @Override + public void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException { + try { + request(brokers.path("/configuration/").path(configName).path(configValue)).post(Entity.json(""), + ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public Map getAllDynamicConfigurations() throws PulsarAdminException { + try { + return request(brokers.path("/configuration/").path("values")).get(new GenericType>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public List getDynamicConfigurationNames() throws PulsarAdminException { + try { + return request(brokers.path("/configuration")).get(new GenericType>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } + } diff --git a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java index 82890d7ce3389..4eaa1d0688a05 100644 --- a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java +++ b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java @@ -48,9 +48,43 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Update dynamic-serviceConfiguration of broker") + private class UpdateConfigurationCmd extends CliCommand { + @Parameter(names = "--config", description = "service-configuration name", required = true) + private String configName; + @Parameter(names = "--value", description = "service-configuration value", required = true) + private String configValue; + + @Override + void run() throws Exception { + admin.brokers().updateDynamicConfiguration(configName, configValue); + } + } + + @Parameters(commandDescription = "Get all overridden dynamic-configuration values") + private class GetAllConfigurationsCmd extends CliCommand { + + @Override + void run() throws Exception { + print(admin.brokers().getAllDynamicConfigurations()); + } + } + + @Parameters(commandDescription = "Get list of updatable configuration name") + private class GetUpdatableConfigCmd extends CliCommand { + + @Override + void run() throws Exception { + print(admin.brokers().getDynamicConfigurationNames()); + } + } + CmdBrokers(PulsarAdmin admin) { super("brokers", admin); jcommander.addCommand("list", new List()); jcommander.addCommand("namespaces", new Namespaces()); + jcommander.addCommand("update-dynamic-config", new UpdateConfigurationCmd()); + jcommander.addCommand("list-dynamic-config", new GetUpdatableConfigCmd()); + jcommander.addCommand("get-all-dynamic-config", new GetAllConfigurationsCmd()); } } diff --git a/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java index 328da033d18ff..3dbf536737509 100644 --- a/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java @@ -59,6 +59,15 @@ void brokers() throws Exception { brokers.run(split("list use")); verify(mockBrokers).getActiveBrokers("use"); + + brokers.run(split("get-all-dynamic-config")); + verify(mockBrokers).getAllDynamicConfigurations(); + + brokers.run(split("list-dynamic-config")); + verify(mockBrokers).getDynamicConfigurationNames(); + + brokers.run(split("update-dynamic-config --config brokerShutdownTimeoutMs --value 100")); + verify(mockBrokers).updateDynamicConfiguration("brokerShutdownTimeoutMs", "100"); } @Test diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java index a0bc1d1f8a066..e1ddf8e02a3e4 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java @@ -135,7 +135,7 @@ public static void update(Map properties, T obj) throws Ille * : field of the attribute * @return */ - private static Object value(String strValue, Field field) { + public static Object value(String strValue, Field field) { checkNotNull(field); // if field is not primitive type if (field.getGenericType() instanceof ParameterizedType) {