Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/IConfigLoader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
title: IConfigLoader
layout: documentation
documentation: true
---


### Introduction
IConfigLoader is an interface designed to allow dynamic loading of scheduler resource constraints. Currently, the MultiTenant scheduler uses this interface to dynamically load the number of isolated nodes a given user has been guaranteed, and the ResoureAwareScheduler uses the interface to dynamically load per user resource guarantees.

------

### Interface
```
public interface IConfigLoader {
void prepare(Map<String, Object> conf);
Map<?,?> load();
public static IConfigLoader getConfigLoader(Map conf, String loaderClassConfig, String loaderConfConfig);
public static Map<?,?> loadYamlConfigFromFile(File file);
};
```
#### Description
- prepare is called upon class loading, and allows schedulers to pass in configuation items to the classes that implement IConfigLoader
- load is called by the scheduler whenever it wishes to retrieve the most recent configuration map
- getConfigLoader is a static helper class that will return the implementation of IConfigLoader that the scheduler requests
- loadYamlConfigFromFile is a utility function used by implemenations of IConfigLoader that will load in a local yaml file and return a map

#### Loader Configuration
The loaders are dynamically selected and dynamically configured through configuration items in the scheduler implementations.

##### Example
```
resource.aware.scheduler.user.pools.loader: "org.apache.storm.scheduler.utils.ArtifactoryConfigLoader"
resource.aware.scheduler.user.pools.loader.params:
artifactory.config.loader.uri: "http://artifactory.my.company.com:8000/artifactory/configurations/clusters/my_cluster/ras_pools"
artifactory.config.loader.timeout.secs: "30"
multitenant.scheduler.user.pools.loader: "org.apache.storm.scheduler.utils.FileConfigLoader"
multitenant.scheduler.user.pools.loader.params:
file.config.loader.local.file.yaml: "/path/to/my/config.yaml"
```
### Implementations

There are currently two implemenations of IConfigLoader
- org.apache.storm.scheduler.utils.ArtifactoryConfigLoader: Load configurations from an Artifactory server
- org.apache.storm.scheduler.utils.FileConfigLoader: Load configurations from a local file

Each of these have configurations that the scheduler can pass into the implemenation when the prepare method is called

#### FileConfigLoader
- file.config.loader.local.file.yaml: A path to a local yaml file that represents the map the scheduler will use

#### ArtifactoryConfigLoader

- artifactory.config.loader.uri: This can either be a reference to an individual file in Artifactory or to a directory. If it is a directory, the file with the largest lexographic name will be returned.
- artifactory.config.loader.timeout.secs: This is the amount of time an http connection to the artifactory server will wait before timing out. The default is 10.
- artifactory.config.loader.polltime.secs: This is the frequency at which the plugin will call out to artifactory instead of returning the most recently cached result. The default is 600 seconds.
- artifactory.config.loader.scheme: This is the scheme to use when connecting to the Artifactory server. The default is http.
- artifactory.config.loader.base.directory: This is the part of the uri, configurable in Artifactory, which represents the top of the directory tree. It defaults to "/artifactory".
25 changes: 23 additions & 2 deletions storm-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
Expand Down Expand Up @@ -516,10 +520,12 @@
<include>commons-collections:commons-collections</include>
<include>org.apache.hadoop:hadoop-auth</include>
<include>commons-cli:commons-cli</include>
<include>org.apache.httpcomponents:httpclient</include>
<include>commons-io:commons-io</include>
<include>commons-codec:commons-codec</include>
<include>commons-fileupload:commons-fileupload</include>
<include>commons-lang:commons-lang</include>
<include>commons-logging:commons-logging</include>
<include>com.googlecode.json-simple:json-simple</include>
<include>org.clojure:math.numeric-tower</include>
<include>org.clojure:tools.cli</include>
Expand Down Expand Up @@ -657,6 +663,10 @@
<pattern>org.apache.commons.cli</pattern>
<shadedPattern>org.apache.storm.shade.org.apache.commons.cli</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.logging</pattern>
<shadedPattern>org.apache.storm.shade.org.apache.commons.logging</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.io</pattern>
<shadedPattern>org.apache.storm.shade.org.apache.commons.io</shadedPattern>
Expand All @@ -682,8 +692,12 @@
<shadedPattern>org.apache.storm.shade.org.apache.commons.lang</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.collections</pattern>
<shadedPattern>org.apache.storm.shade.org.apache.commons.collections</shadedPattern>
<pattern>org.apache.commons.collections</pattern>
<shadedPattern>org.apache.storm.shade.org.apache.commons.collections</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.httpcomponents</pattern>
<shadedPattern>org.apache.storm.shade.org.apache.httpcomponents</shadedPattern>
</relocation>
<relocation>
<pattern>org.json.simple</pattern>
Expand Down Expand Up @@ -798,6 +812,13 @@
<exclude>LICENSE.txt</exclude>
</excludes>
</filter>
<filter>
<artifact>commons-logging:commons-logging</artifact>
<excludes>
<exclude>META-INF/LICENSE.txt</exclude>
<exclude>META-INF/NOTICE.txt</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.commons:commons-exec</artifact>
<excludes>
Expand Down
24 changes: 24 additions & 0 deletions storm-core/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2186,13 +2186,37 @@ public class Config extends HashMap<String, Object> {
@isMapEntryType(keyType = String.class, valueType = Number.class)
public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools";

/**
* A plugin that should load the user pools for the multitenant scheduler
*/
@isImplementationOfClass(implementsClass = org.apache.storm.scheduler.utils.IConfigLoader.class)
public static final String MULTITENANT_SCHEDULER_USER_POOLS_LOADER = "multitenant.scheduler.user.pools.loader";

/**
* Configuration elements for scheduler config loader
*/
@isMapEntryType(keyType = String.class, valueType = String.class)
public static final String MULTITENANT_SCHEDULER_USER_POOLS_LOADER_PARAMS = "multitenant.scheduler.user.pools.loader.params";

/**
* A map of users to another map of the resource guarantees of the user. Used by Resource Aware Scheduler to ensure
* per user resource guarantees.
*/
@isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {UserResourcePoolEntryValidator.class})
public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools";

/**
* A plugin that should load the user pools for the resource aware scheduler
*/
@isImplementationOfClass(implementsClass = org.apache.storm.scheduler.utils.IConfigLoader.class)
public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER = "resource.aware.scheduler.user.pools.loader";

/**
* Configuration elements for scheduler config loader
*/
@isMapEntryType(keyType = String.class, valueType = String.class)
public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER_PARAMS = "resource.aware.scheduler.user.pools.loader.params";

/**
* The class that specifies the eviction strategy to use in ResourceAwareScheduler
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,34 @@
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.utils.IConfigLoader;
import org.apache.storm.utils.Utils;

public class MultitenantScheduler implements IScheduler {
private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class);
@SuppressWarnings("rawtypes")
private Map _conf;
IConfigLoader _configLoader;

@Override
public void prepare(@SuppressWarnings("rawtypes") Map conf) {
_conf = conf;
_configLoader = IConfigLoader.getConfigLoader(conf, Config.MULTITENANT_SCHEDULER_USER_POOLS_LOADER,
Config.MULTITENANT_SCHEDULER_USER_POOLS_LOADER_PARAMS);
}

private Map<String, Number> getUserConf() {
// Try the loader plugin, if configured
if (_configLoader != null) {
Map<String, Number> ret = (Map<String, Number>)_configLoader.load();
if (ret != null) {
return ret;
} else {
LOG.warn("Config loader returned null");
}
}

// If that fails, fall back on config
Map<String, Number> ret = (Map<String, Number>)_conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
if (ret == null) {
ret = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.utils.IConfigLoader;

import java.util.Collection;
import java.util.HashMap;
Expand All @@ -46,14 +47,16 @@ public class ResourceAwareScheduler implements IScheduler {

@SuppressWarnings("rawtypes")
private Map conf;
private IConfigLoader configLoader;

private static final Logger LOG = LoggerFactory
.getLogger(ResourceAwareScheduler.class);

@Override
public void prepare(Map conf) {
this.conf = conf;

this.configLoader = IConfigLoader.getConfigLoader(conf, Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER,
Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER_PARAMS);
}

@Override
Expand Down Expand Up @@ -335,6 +338,15 @@ public Map<String, User> getUserMap() {
return this.schedulingState.userMap;
}

private Object readFromLoader() {
// If loader plugin is not configured, then leave and fall back
if (this.configLoader == null) {
return null;
}

return configLoader.load();
}

/**
* Intialize scheduling and running queues
*
Expand Down Expand Up @@ -376,18 +388,12 @@ private void initialize(Topologies topologies, Cluster cluster) {
this.schedulingState = new SchedulingState(userMap, cluster, topologies, this.conf);
}

/**
* Get resource guarantee configs
*
* @return a map that contains resource guarantees of every user of the following format
* {userid->{resourceType->amountGuaranteed}}
*/
private Map<String, Map<String, Double>> getUserResourcePools() {
Object raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
private Map<String, Map<String, Double>> convertToDouble(Map<String, Map<String, Number>> raw) {

Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();

if (raw != null) {
for (Map.Entry<String, Map<String, Number>> userPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
for (Map.Entry<String, Map<String, Number>> userPoolEntry : raw.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

String user = userPoolEntry.getKey();
ret.put(user, new HashMap<String, Double>());
for (Map.Entry<String, Number> resourceEntry : userPoolEntry.getValue().entrySet()) {
Expand All @@ -396,6 +402,25 @@ private Map<String, Map<String, Double>> getUserResourcePools() {
}
}

return ret;
}

/**
* Get resource guarantee configs
*
* @return a map that contains resource guarantees of every user of the following format
* {userid->{resourceType->amountGuaranteed}}
*/
private Map<String, Map<String, Double>> getUserResourcePools() {
Object raw = readFromLoader();
if (raw != null) {
return convertToDouble((Map<String, Map<String, Number>>) raw);
}

raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);

Map<String, Map<String, Double>> ret = convertToDouble((Map<String, Map<String, Number>>) raw);

Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
Map<String, Map<String, Number>> tmp = (Map<String, Map<String, Number>>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
if (tmp != null) {
Expand Down
Loading