Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions docs/IConfigLoader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
title: IConfigLoader
layout: documentation
documentation: true
---


### Introduction
IConfigLoader is an interface designed to allow dynamic loading of scheduler resource constraints. Currently, the MultiTenant scheduler uses this interface to dynamically load the number of isolated nodes a given user has been guaranteed, and the ResoureAwareScheduler uses the interface to dynamically load per user resource guarantees.

The following interface is provided for users to create an IConfigLoader instance based on the scheme of the `scheduler.config.loader.uri`.
```
ConfigLoaderFactoryService.createConfigLoader(Map<String, Object> conf)
```

------

### Interface
```
public interface IConfigLoader {
Map<?,?> load();
};
```
#### Description
- load is called by the scheduler whenever it wishes to retrieve the most recent configuration map.

#### Loader Configuration
The loaders are dynamically selected and dynamically configured through configuration items in the scheduler implementations.

##### Example
```
scheduler.config.loader.uri: "artifactory+http://artifactory.my.company.com:8000/artifactory/configurations/clusters/my_cluster/ras_pools"
scheduler.config.loader.timeout.sec: 30
```
Or
```
scheduler.config.loader.uri: "file:///path/to/my/config.yaml"
```
### Implementations

There are currently two implemenations of IConfigLoader
- org.apache.storm.scheduler.utils.ArtifactoryConfigLoader: Load configurations from an Artifactory server.
It will be used if users add `artifactory+` to the scheme of the real URI and set to `scheduler.config.loader.uri`.
- org.apache.storm.scheduler.utils.FileConfigLoader: Load configurations from a local file. It will be used if users use `file` scheme.

#### Configurations
- scheduler.config.loader.uri: For `ArtifactoryConfigLoader`, this can either be a reference to an individual file in Artifactory or to a directory. If it is a directory, the file with the largest lexographic name will be returned.
For `FileConfigLoader`, this is the URI pointing to a file.
- scheduler.config.loader.timeout.secs: Currently only used in `ArtifactoryConfigLoader`. It is the amount of time an http connection to the artifactory server will wait before timing out. The default is 10.
- scheduler.config.loader.polltime.secs: Currently only used in `ArtifactoryConfigLoader`. It is the frequency at which the plugin will call out to artifactory instead of returning the most recently cached result. The default is 600 seconds.
- scheduler.config.loader.artifactory.base.directory: Only used in `ArtifactoryConfigLoader`. It is the part of the uri, configurable in Artifactory, which represents the top of the directory tree. It defaults to "/artifactory".
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@
<disruptor.version>3.3.2</disruptor.version>
<jgrapht.version>0.9.0</jgrapht.version>
<guava.version>16.0.1</guava.version>
<auto-service.version>1.0-rc3</auto-service.version>
<netty.version>3.9.0.Final</netty.version>
<sysout-over-slf4j.version>1.0.2</sysout-over-slf4j.version>
<log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
Expand Down Expand Up @@ -899,6 +900,12 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>${auto-service.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions storm-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<optional>true</optional>
</dependency>

<!-- test -->
<dependency>
Expand Down
94 changes: 63 additions & 31 deletions storm-server/src/main/java/org/apache/storm/DaemonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public class DaemonConfig implements Validated {
public static final String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";

/**
* Class name for authorization plugin for Nimbus
* Class name for authorization plugin for Nimbus.
*/
@isString
public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer";
Expand Down Expand Up @@ -271,7 +271,7 @@ public class DaemonConfig implements Validated {
public static final String UI_CENTRAL_LOGGING_URL = "ui.central.logging.url";

/**
* HTTP UI port for log viewer
* HTTP UI port for log viewer.
*/
@isInteger
@isPositiveNumber
Expand All @@ -284,46 +284,46 @@ public class DaemonConfig implements Validated {
public static final String LOGVIEWER_CHILDOPTS = "logviewer.childopts";

/**
* How often to clean up old log files
* How often to clean up old log files.
*/
@isInteger
@isPositiveNumber
public static final String LOGVIEWER_CLEANUP_INTERVAL_SECS = "logviewer.cleanup.interval.secs";

/**
* How many minutes since a log was last modified for the log to be considered for clean-up
* How many minutes since a log was last modified for the log to be considered for clean-up.
*/
@isInteger
@isPositiveNumber
public static final String LOGVIEWER_CLEANUP_AGE_MINS = "logviewer.cleanup.age.mins";

/**
* The maximum number of bytes all worker log files can take up in MB
* The maximum number of bytes all worker log files can take up in MB.
*/
@isPositiveNumber
public static final String LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB = "logviewer.max.sum.worker.logs.size.mb";

/**
* The maximum number of bytes per worker's files can take up in MB
* The maximum number of bytes per worker's files can take up in MB.
*/
@isPositiveNumber
public static final String LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB = "logviewer.max.per.worker.logs.size.mb";

/**
* Storm Logviewer HTTPS port
* Storm Logviewer HTTPS port.
*/
@isInteger
@isPositiveNumber
public static final String LOGVIEWER_HTTPS_PORT = "logviewer.https.port";

/**
* Path to the keystore containing the certs used by Storm Logviewer for HTTPS communications
* Path to the keystore containing the certs used by Storm Logviewer for HTTPS communications.
*/
@isString
public static final String LOGVIEWER_HTTPS_KEYSTORE_PATH = "logviewer.https.keystore.path";

/**
* Password for the keystore for HTTPS for Storm Logviewer
* Password for the keystore for HTTPS for Storm Logviewer.
*/
@isString
public static final String LOGVIEWER_HTTPS_KEYSTORE_PASSWORD = "logviewer.https.keystore.password";
Expand All @@ -342,13 +342,13 @@ public class DaemonConfig implements Validated {
public static final String LOGVIEWER_HTTPS_KEY_PASSWORD = "logviewer.https.key.password";

/**
* Path to the truststore containing the certs used by Storm Logviewer for HTTPS communications
* Path to the truststore containing the certs used by Storm Logviewer for HTTPS communications.
*/
@isString
public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PATH = "logviewer.https.truststore.path";

/**
* Password for the truststore for HTTPS for Storm Logviewer
* Password for the truststore for HTTPS for Storm Logviewer.
*/
@isString
public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD = "logviewer.https.truststore.password";
Expand All @@ -370,13 +370,13 @@ public class DaemonConfig implements Validated {
public static final String LOGVIEWER_HTTPS_NEED_CLIENT_AUTH = "logviewer.https.need.client.auth";

/**
* A list of users allowed to view logs via the Log Viewer
* A list of users allowed to view logs via the Log Viewer.
*/
@isStringList
public static final String LOGS_USERS = "logs.users";

/**
* A list of groups allowed to view logs via the Log Viewer
* A list of groups allowed to view logs via the Log Viewer.
*/
@isStringList
public static final String LOGS_GROUPS = "logs.groups";
Expand All @@ -394,19 +394,19 @@ public class DaemonConfig implements Validated {
public static final String UI_CHILDOPTS = "ui.childopts";

/**
* A class implementing javax.servlet.Filter for authenticating/filtering UI requests
* A class implementing javax.servlet.Filter for authenticating/filtering UI requests.
*/
@isString
public static final String UI_FILTER = "ui.filter";

/**
* Initialization parameters for the javax.servlet.Filter
* Initialization parameters for the javax.servlet.Filter.
*/
@isMapEntryType(keyType = String.class, valueType = String.class)
public static final String UI_FILTER_PARAMS = "ui.filter.params";

/**
* The size of the header buffer for the UI in bytes
* The size of the header buffer for the UI in bytes.
*/
@isInteger
@isPositiveNumber
Expand Down Expand Up @@ -555,7 +555,7 @@ public class DaemonConfig implements Validated {
public static final String DRPC_HTTPS_NEED_CLIENT_AUTH = "drpc.https.need.client.auth";

/**
* Class name for authorization plugin for DRPC client
* Class name for authorization plugin for DRPC client.
*/
@isString
public static final String DRPC_AUTHORIZER = "drpc.authorizer";
Expand All @@ -578,7 +578,7 @@ public class DaemonConfig implements Validated {
public static final String DRPC_CHILDOPTS = "drpc.childopts";

/**
* the metadata configured on the supervisor
* the metadata configured on the supervisor.
*/
@isMapEntryType(keyType = String.class, valueType = String.class)
public static final String SUPERVISOR_SCHEDULER_META = "supervisor.scheduler.meta";
Expand All @@ -590,7 +590,7 @@ public class DaemonConfig implements Validated {
*/
@isNoDuplicateInList
@NotNull
@isListEntryCustom(entryValidatorClasses={ConfigValidation.IntegerValidator.class,ConfigValidation.PositiveNumberValidator.class})
@isListEntryCustom(entryValidatorClasses = {ConfigValidation.IntegerValidator.class,ConfigValidation.PositiveNumberValidator.class})
public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";

/**
Expand Down Expand Up @@ -652,14 +652,14 @@ public class DaemonConfig implements Validated {
public static final String NIMBUS_SLOTS_PER_TOPOLOGY = "nimbus.slots.perTopology";

/**
* A class implementing javax.servlet.Filter for DRPC HTTP requests
* A class implementing javax.servlet.Filter for DRPC HTTP requests.
*/
@isString
public static final String DRPC_HTTP_FILTER = "drpc.http.filter";

/**
* Initialization parameters for the javax.servlet.Filter of the DRPC HTTP
* service
* service.
*/
@isMapEntryType(keyType = String.class, valueType = String.class)
public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params";
Expand All @@ -679,7 +679,7 @@ public class DaemonConfig implements Validated {
public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";

/**
* How many seconds to sleep for before shutting down threads on worker
* How many seconds to sleep for before shutting down threads on worker.
*/
@isInteger
@isPositiveNumber
Expand Down Expand Up @@ -739,7 +739,7 @@ public class DaemonConfig implements Validated {
/**
* The command launched supervisor with worker arguments
* pid, action and [target_directory]
* Where action is - start profile, stop profile, jstack, heapdump and kill against pid
* Where action is - start profile, stop profile, jstack, heapdump and kill against pid.
*
*/
@isString
Expand All @@ -762,7 +762,7 @@ public class DaemonConfig implements Validated {
public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = "storm.cluster.metrics.consumer.publish.interval.secs";

/**
* Enables user-first classpath. See topology.classpath.beginning
* Enables user-first classpath. See topology.classpath.beginning.
*/
@isBoolean
public static final String STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED="storm.topology.classpath.beginning.enabled";
Expand Down Expand Up @@ -791,6 +791,38 @@ public class DaemonConfig implements Validated {
@isMapEntryType(keyType = String.class, valueType = Number.class)
public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";

/**
* For ArtifactoryConfigLoader, this can either be a reference to an individual file in Artifactory or to a directory.
* If it is a directory, the file with the largest lexographic name will be returned. Users need to add "artifactory+" to the beginning of
* the real URI to use ArtifactoryConfigLoader.
* For FileConfigLoader, this is the URI pointing to a file.
*/
@isString
public static final String SCHEDULER_CONFIG_LOADER_URI = "scheduler.config.loader.uri";

/**
* It is the frequency at which the plugin will call out to artifactory instead of returning the most recently cached result.
* Currently it's only used in ArtifactoryConfigLoader.
*/
@isInteger
@isPositiveNumber
public static final String SCHEDULER_CONFIG_LOADER_POLLTIME_SECS = "scheduler.config.loader.polltime.secs";

/**
* It is the amount of time an http connection to the artifactory server will wait before timing out.
* Currently it's only used in ArtifactoryConfigLoader.
*/
@isInteger
@isPositiveNumber
public static final String SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS = "scheduler.config.loader.timeout.secs";

/**
* It is the part of the uri, configurable in Artifactory, which represents the top of the directory tree.
* It's only used in ArtifactoryConfigLoader.
*/
@isString
public static final String SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY = "scheduler.config.loader.artifactory.base.directory";

/**
* A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler
* to org.apache.storm.scheduler.multitenant.MultitenantScheduler
Expand All @@ -806,14 +838,14 @@ public class DaemonConfig implements Validated {
public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools";

/**
* The class that specifies the eviction strategy to use in ResourceAwareScheduler
* The class that specifies the eviction strategy to use in ResourceAwareScheduler.
*/
@NotNull
@isImplementationOfClass(implementsClass = IEvictionStrategy.class)
public static final String RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY = "resource.aware.scheduler.eviction.strategy";

/**
* the class that specifies the scheduling priority strategy to use in ResourceAwareScheduler
* the class that specifies the scheduling priority strategy to use in ResourceAwareScheduler.
*/
@NotNull
@isImplementationOfClass(implementsClass = ISchedulingPriorityStrategy.class)
Expand Down Expand Up @@ -842,7 +874,7 @@ public class DaemonConfig implements Validated {
public static final String STORM_CGROUP_RESOURCES = "storm.cgroup.resources";

/**
* name for the cgroup hierarchy
* name for the cgroup hierarchy.
*/
@isString
public static final String STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name";
Expand All @@ -856,25 +888,25 @@ public class DaemonConfig implements Validated {
public static final String STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE = "storm.resource.isolation.plugin.enable";

/**
* root directory for cgoups
* root directory for cgoups.
*/
@isString
public static String STORM_SUPERVISOR_CGROUP_ROOTDIR = "storm.supervisor.cgroup.rootdir";

/**
* the manually set memory limit (in MB) for each CGroup on supervisor node
* the manually set memory limit (in MB) for each CGroup on supervisor node.
*/
@isPositiveNumber
public static String STORM_WORKER_CGROUP_MEMORY_MB_LIMIT = "storm.worker.cgroup.memory.mb.limit";

/**
* the manually set cpu share for each CGroup on supervisor node
* the manually set cpu share for each CGroup on supervisor node.
*/
@isPositiveNumber
public static String STORM_WORKER_CGROUP_CPU_LIMIT = "storm.worker.cgroup.cpu.limit";

/**
* full path to cgexec command
* full path to cgexec command.
*/
@isString
public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd";
Expand Down
Loading