Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Broker service configuration dynamically #186

Merged
merged 5 commits into from
Mar 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions docs/AdminTools.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
- [Brokers](#brokers)
- [list of active brokers](#list-of-active-brokers)
- [list of namespaces owned by a given broker](#list-of-namespaces-owned-by-a-given-broker)
- [update dynamic configuration](#update-dynamic-configuration)
- [get list of dynamic configuration name](#get-list-of-dynamic-configuration-name)
- [get value of dynamic configurations](#get-value-of-dynamic-configurations)
- [Properties](#properties)
- [list existing properties](#list-existing-properties)
- [create property](#create-property)
Expand Down Expand Up @@ -233,6 +236,157 @@ GET /admin/brokers/{cluster}/{broker}/ownedNamespaces
admin.brokers().getOwnedNamespaces(cluster,brokerUrl)
```

#### update dynamic configuration
Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally.

###### CLI

```
$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100
```

```
N/A
```

###### REST

```
GET /admin/brokers/configuration/{configName}/{configValue}
```

###### Java

```java
admin.brokers().updateDynamicConfiguration(configName, configValue)
```

#### get list of dynamic configuration name
It gives list of updatable dynamic service-configuration name.

###### CLI

```
$ pulsar-admin brokers list-dynamic-config
```

```
brokerShutdownTimeoutMs
```

###### REST

```
GET /admin/brokers/configuration
```

###### Java

```java
admin.brokers().getDynamicConfigurationNames()
```

#### get value of dynamic configurations
It gives value of all dynamic configurations stored in zookeeper

###### CLI

```
$ pulsar-admin brokers get-all-dynamic-config
```

```
brokerShutdownTimeoutMs:100
```

###### REST

```
GET /admin/brokers/configuration/values
```

###### Java

```java
admin.brokers().getAllDynamicConfigurations()
```

#### Update dynamic configuration
Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally.

###### CLI

```
$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100
```

```
N/A
```

###### REST

```
GET /admin/brokers/configuration/{configName}/{configValue}
```

###### Java

```java
admin.brokers().updateDynamicConfiguration(configName, configValue)
```

#### Get list of dynamic configuration name
It gives list of updatable dynamic service-configuration name.

###### CLI

```
$ pulsar-admin brokers list-dynamic-config
```

```
brokerShutdownTimeoutMs
```

###### REST

```
GET /admin/brokers/configuration
```

###### Java

```java
admin.brokers().getDynamicConfigurationNames()
```

#### Get value of dynamic configurations
It gives value of all dynamic configurations stored in zookeeper

###### CLI

```
$ pulsar-admin brokers get-all-dynamic-config
```

```
brokerShutdownTimeoutMs:100
```

###### REST

```
GET /admin/brokers/configuration/values
```

###### Java

```java
admin.brokers().getAllDynamicConfigurations()
```



### Properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ServiceConfiguration implements PulsarConfiguration{
private long zooKeeperSessionTimeoutMillis = 30000;
// Time to wait for broker graceful shutdown. After this time elapses, the
// process will be killed
@FieldContext(dynamic = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave this PR to implement the "mechanism" and then we can make the actual variables dynamic in subsequent PRs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but in order to write test-cases and perform testing of this feature I take one of this simple attribute in this PR. It helps to write e2e testcases for this feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

private long brokerShutdownTimeoutMs = 3000;
// Enable backlog quota check. Enforces action on topic when the quota is
// reached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,11 @@
* @return character length of field
*/
public int maxCharLength() default Integer.MAX_VALUE;

/**
* allow field to be updated dynamically
*
* @return
*/
public boolean dynamic() default false;
}
111 changes: 107 additions & 4 deletions pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,48 @@
*/
package com.yahoo.pulsar.broker.admin;

import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;

import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;

import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;


@Path("/brokers")
@Api(value = "/brokers", description = "Brokers admin apis", tags = "brokers")
@Produces(MediaType.APPLICATION_JSON)
public class Brokers extends AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(Brokers.class);

private int serviceConfigZkVersion = -1;

@GET
@Path("/{cluster}")
@ApiOperation(value = "Get the list of active brokers (web service addresses) in the cluster.", response = String.class, responseContainer = "Set")
Expand Down Expand Up @@ -79,4 +95,91 @@ public Map<String, NamespaceOwnershipStatus> getOwnedNamespaes(@PathParam("clust
throw new RestException(e);
}
}

@POST
@Path("/configuration/{configName}/{configValue}")
@ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 412, message = "Configuration can't be updated dynamically") })
public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
validateSuperUserAccess();
updateDynamicConfigurationOnZk(configName, configValue);
}

@GET
@Path("/configuration/values")
@ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config")
@ApiResponses(value = { @ApiResponse(code = 404, message = "Configuration not found") })
public Map<String, String> getAllDynamicConfigurations() throws Exception {
ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = null;
try {
configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
} catch (RestException e) {
LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
throw e;
} catch (Exception e) {
LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
return configurationMap;
}

@GET
@Path("/configuration")
@ApiOperation(value = "Get all updatable dynamic configurations's name")
public List<String> getDynamicConfigurationName() {
return BrokerService.getDynamicConfigurationMap().keys();
}

/**
* if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so
* all other brokers get the watch and can see the change and take appropriate action on the change.
*
* @param configName
* : configuration key
* @param configValue
* : configuration value
*/
private synchronized void updateDynamicConfigurationOnZk(String configName, String configValue) {
try {
if (BrokerService.getDynamicConfigurationMap().containsKey(configName)) {
ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElse(null);
if (configurationMap != null) {
configurationMap.put(configName, configValue);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
serviceConfigZkVersion = localZk()
.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
} else {
configurationMap = Maps.newHashMap();
configurationMap.put(configName, configValue);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
ZkUtils.createFullPathOptimistic(localZk(), BROKER_SERVICE_CONFIGURATION_PATH, content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName,
configValue);
}
throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
}
} catch (RestException re) {
throw re;
} catch (Exception ie) {
LOG.error("[{}] Failed to update configuration {}/{}, {}", clientAppId(), configName, configValue,
ie.getMessage(), ie);
throw new RestException(ie);
}
}

}
Loading