diff --git a/docs/IConfigLoader.md b/docs/IConfigLoader.md new file mode 100644 index 00000000000..1dbd98ac1a1 --- /dev/null +++ b/docs/IConfigLoader.md @@ -0,0 +1,58 @@ +--- +title: IConfigLoader +layout: documentation +documentation: true +--- + + +### Introduction +IConfigLoader is an interface designed to allow dynamic loading of scheduler resource constraints. Currently, the MultiTenant scheduler uses this interface to dynamically load the number of isolated nodes a given user has been guaranteed, and the ResoureAwareScheduler uses the interface to dynamically load per user resource guarantees. + +------ + +### Interface +``` +public interface IConfigLoader { + void prepare(Map conf); + Map load(); + public static IConfigLoader getConfigLoader(Map conf, String loaderClassConfig, String loaderConfConfig); + public static Map loadYamlConfigFromFile(File file); +}; +``` +#### Description + - prepare is called upon class loading, and allows schedulers to pass in configuation items to the classes that implement IConfigLoader + - load is called by the scheduler whenever it wishes to retrieve the most recent configuration map + - getConfigLoader is a static helper class that will return the implementation of IConfigLoader that the scheduler requests + - loadYamlConfigFromFile is a utility function used by implemenations of IConfigLoader that will load in a local yaml file and return a map + +#### Loader Configuration +The loaders are dynamically selected and dynamically configured through configuration items in the scheduler implementations. + +##### Example +``` +resource.aware.scheduler.user.pools.loader: "org.apache.storm.scheduler.utils.ArtifactoryConfigLoader" +resource.aware.scheduler.user.pools.loader.params: + artifactory.config.loader.uri: "http://artifactory.my.company.com:8000/artifactory/configurations/clusters/my_cluster/ras_pools" + artifactory.config.loader.timeout.secs: "30" +multitenant.scheduler.user.pools.loader: "org.apache.storm.scheduler.utils.FileConfigLoader" +multitenant.scheduler.user.pools.loader.params: + file.config.loader.local.file.yaml: "/path/to/my/config.yaml" +``` +### Implementations + +There are currently two implemenations of IConfigLoader + - org.apache.storm.scheduler.utils.ArtifactoryConfigLoader: Load configurations from an Artifactory server + - org.apache.storm.scheduler.utils.FileConfigLoader: Load configurations from a local file + +Each of these have configurations that the scheduler can pass into the implemenation when the prepare method is called + +#### FileConfigLoader + - file.config.loader.local.file.yaml: A path to a local yaml file that represents the map the scheduler will use + +#### ArtifactoryConfigLoader + + - artifactory.config.loader.uri: This can either be a reference to an individual file in Artifactory or to a directory. If it is a directory, the file with the largest lexographic name will be returned. + - artifactory.config.loader.timeout.secs: This is the amount of time an http connection to the artifactory server will wait before timing out. The default is 10. + - artifactory.config.loader.polltime.secs: This is the frequency at which the plugin will call out to artifactory instead of returning the most recently cached result. The default is 600 seconds. + - artifactory.config.loader.scheme: This is the scheme to use when connecting to the Artifactory server. The default is http. + - artifactory.config.loader.base.directory: This is the part of the uri, configurable in Artifactory, which represents the top of the directory tree. It defaults to "/artifactory". diff --git a/storm-core/pom.xml b/storm-core/pom.xml index d423767c257..0361e7fc3a5 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -372,6 +372,10 @@ + + org.apache.httpcomponents + httpclient + src/jvm @@ -516,10 +520,12 @@ commons-collections:commons-collections org.apache.hadoop:hadoop-auth commons-cli:commons-cli + org.apache.httpcomponents:httpclient commons-io:commons-io commons-codec:commons-codec commons-fileupload:commons-fileupload commons-lang:commons-lang + commons-logging:commons-logging com.googlecode.json-simple:json-simple org.clojure:math.numeric-tower org.clojure:tools.cli @@ -657,6 +663,10 @@ org.apache.commons.cli org.apache.storm.shade.org.apache.commons.cli + + org.apache.commons.logging + org.apache.storm.shade.org.apache.commons.logging + org.apache.commons.io org.apache.storm.shade.org.apache.commons.io @@ -682,8 +692,12 @@ org.apache.storm.shade.org.apache.commons.lang - org.apache.commons.collections - org.apache.storm.shade.org.apache.commons.collections + org.apache.commons.collections + org.apache.storm.shade.org.apache.commons.collections + + + org.apache.httpcomponents + org.apache.storm.shade.org.apache.httpcomponents org.json.simple @@ -798,6 +812,13 @@ LICENSE.txt + + commons-logging:commons-logging + + META-INF/LICENSE.txt + META-INF/NOTICE.txt + + org.apache.commons:commons-exec diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 0379a181cb7..1cb8d1b4586 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -2186,6 +2186,18 @@ public class Config extends HashMap { @isMapEntryType(keyType = String.class, valueType = Number.class) public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools"; + /** + * A plugin that should load the user pools for the multitenant scheduler + */ + @isImplementationOfClass(implementsClass = org.apache.storm.scheduler.utils.IConfigLoader.class) + public static final String MULTITENANT_SCHEDULER_USER_POOLS_LOADER = "multitenant.scheduler.user.pools.loader"; + + /** + * Configuration elements for scheduler config loader + */ + @isMapEntryType(keyType = String.class, valueType = String.class) + public static final String MULTITENANT_SCHEDULER_USER_POOLS_LOADER_PARAMS = "multitenant.scheduler.user.pools.loader.params"; + /** * A map of users to another map of the resource guarantees of the user. Used by Resource Aware Scheduler to ensure * per user resource guarantees. @@ -2193,6 +2205,18 @@ public class Config extends HashMap { @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {UserResourcePoolEntryValidator.class}) public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools"; + /** + * A plugin that should load the user pools for the resource aware scheduler + */ + @isImplementationOfClass(implementsClass = org.apache.storm.scheduler.utils.IConfigLoader.class) + public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER = "resource.aware.scheduler.user.pools.loader"; + + /** + * Configuration elements for scheduler config loader + */ + @isMapEntryType(keyType = String.class, valueType = String.class) + public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER_PARAMS = "resource.aware.scheduler.user.pools.loader.params"; + /** * The class that specifies the eviction strategy to use in ResourceAwareScheduler */ diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java index 60a5259fe73..b9a020c8dd4 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java @@ -29,19 +29,34 @@ import org.apache.storm.scheduler.IScheduler; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.utils.IConfigLoader; import org.apache.storm.utils.Utils; public class MultitenantScheduler implements IScheduler { private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class); @SuppressWarnings("rawtypes") private Map _conf; + IConfigLoader _configLoader; @Override public void prepare(@SuppressWarnings("rawtypes") Map conf) { _conf = conf; + _configLoader = IConfigLoader.getConfigLoader(conf, Config.MULTITENANT_SCHEDULER_USER_POOLS_LOADER, + Config.MULTITENANT_SCHEDULER_USER_POOLS_LOADER_PARAMS); } private Map getUserConf() { + // Try the loader plugin, if configured + if (_configLoader != null) { + Map ret = (Map)_configLoader.load(); + if (ret != null) { + return ret; + } else { + LOG.warn("Config loader returned null"); + } + } + + // If that fails, fall back on config Map ret = (Map)_conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS); if (ret == null) { ret = new HashMap<>(); diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java index db0e2633da5..732a8ceb8b6 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java @@ -32,6 +32,7 @@ import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.utils.IConfigLoader; import java.util.Collection; import java.util.HashMap; @@ -46,6 +47,7 @@ public class ResourceAwareScheduler implements IScheduler { @SuppressWarnings("rawtypes") private Map conf; + private IConfigLoader configLoader; private static final Logger LOG = LoggerFactory .getLogger(ResourceAwareScheduler.class); @@ -53,7 +55,8 @@ public class ResourceAwareScheduler implements IScheduler { @Override public void prepare(Map conf) { this.conf = conf; - + this.configLoader = IConfigLoader.getConfigLoader(conf, Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER, + Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER_PARAMS); } @Override @@ -335,6 +338,15 @@ public Map getUserMap() { return this.schedulingState.userMap; } + private Object readFromLoader() { + // If loader plugin is not configured, then leave and fall back + if (this.configLoader == null) { + return null; + } + + return configLoader.load(); + } + /** * Intialize scheduling and running queues * @@ -376,18 +388,12 @@ private void initialize(Topologies topologies, Cluster cluster) { this.schedulingState = new SchedulingState(userMap, cluster, topologies, this.conf); } - /** - * Get resource guarantee configs - * - * @return a map that contains resource guarantees of every user of the following format - * {userid->{resourceType->amountGuaranteed}} - */ - private Map> getUserResourcePools() { - Object raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS); + private Map> convertToDouble(Map> raw) { + Map> ret = new HashMap>(); if (raw != null) { - for (Map.Entry> userPoolEntry : ((Map>) raw).entrySet()) { + for (Map.Entry> userPoolEntry : raw.entrySet()) { String user = userPoolEntry.getKey(); ret.put(user, new HashMap()); for (Map.Entry resourceEntry : userPoolEntry.getValue().entrySet()) { @@ -396,6 +402,25 @@ private Map> getUserResourcePools() { } } + return ret; + } + + /** + * Get resource guarantee configs + * + * @return a map that contains resource guarantees of every user of the following format + * {userid->{resourceType->amountGuaranteed}} + */ + private Map> getUserResourcePools() { + Object raw = readFromLoader(); + if (raw != null) { + return convertToDouble((Map>) raw); + } + + raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS); + + Map> ret = convertToDouble((Map>) raw); + Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false); Map> tmp = (Map>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS); if (tmp != null) { diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java new file mode 100644 index 00000000000..b2c6d58dc1a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java @@ -0,0 +1,429 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import org.apache.storm.utils.Time; +import org.apache.storm.Config; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.Comparator; +import java.util.Map; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.ParseException; +import org.json.simple.parser.JSONParser; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.client.HttpClient; +import org.apache.http.util.EntityUtils; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ArtifactoryConfigLoader.java - A dynamic loader for that can load + * scheduler configurations for user + * resource guarantees from Artifactory. + * + *

+ * Configuration items for this config loader are passed in via config settings in + * each scheduler that has a configurable loader. + * + *

+ * For example, the resource aware scheduler has configuration items defined in Config.java + * that allow a user to configure which implementation of IConfigLoader to use to load + * specific scheduler configs as well as any parameters to pass into the prepare method of + * that configuration. + * + *

+ * resource.aware.scheduler.user.pools.loader can be set to org.apache.storm.scheduler.utils.ArtifactoryConfigLoader + * + *

+ * and then + * + *

+ * resource.aware.scheduler.user.pools.loader.params can be set to any of the following + * + *

+ * + * {"artifactory.config.loader.uri": "http://artifactory.example.org:9989/artifactory/confs/my_cluster/mt_user_pool"} + * + * {"artifactory.config.loader.uri": "file:///confs/my_cluster/mt_user_pool"} + * + * {"artifactory.config.loader.uri": "file:///confs/my_cluster/mt_user_pool", "artifactory.config.loader.timeout.secs" : "60"} + * + * + */ +public class ArtifactoryConfigLoader implements IConfigLoader { + protected static final String ARTIFACTORY_URI = "artifactory.config.loader.uri"; + protected static final String ARTIFACTORY_TIMEOUT_SECS="artifactory.config.loader.timeout.secs"; + protected static final String ARTIFACTORY_POLL_TIME_SECS="artifactory.config.loader.polltime.secs"; + protected static final String ARTIFACTORY_SCHEME="artifactory.config.loader.scheme"; + protected static final String ARTIFACTORY_BASE_DIRECTORY="artifactory.config.loader.base.directory"; + protected static final String LOCAL_ARTIFACT_DIR="scheduler_artifacts"; + static final String cacheFilename = "latest.yaml"; + + private static final Logger LOG = LoggerFactory.getLogger(ArtifactoryConfigLoader.class); + + @SuppressWarnings("rawtypes") + private Map _conf; + private int _artifactoryPollTimeSecs = 600; + private boolean _cacheInitialized = false; + // Location of the file in the artifactory archive. Also used to name file in cache. + private String _localCacheDir; + private String _artifactoryScheme = "http"; + private String _baseDirectory = "/artifactory"; + private int _lastReturnedTime = 0; + private int _timeoutSeconds = 10; + private Map _lastReturnedValue; + + /** + * GETStringResponseHandler - A private class used to check the response + * coming back from httpclient. + * + */ + private static class GETStringResponseHandler implements ResponseHandler { + private static GETStringResponseHandler singleton = null; + + /** + * @return a singleton httpclient GET response handler + */ + public static GETStringResponseHandler getInstance() { + if (singleton == null) { + singleton = new GETStringResponseHandler(); + } + return singleton; + } + + @Override + /** + * @param response The http response to verify. + * + * @return null on failure or the response string if return code is in 200 range + */ + public String handleResponse(final HttpResponse response) throws IOException { + int status = response.getStatusLine().getStatusCode(); + if (status >= 200 && status < 300) { + HttpEntity entity = response.getEntity(); + return entity != null ? EntityUtils.toString(entity) : null; + } else { + LOG.error("Got unexpected response code {}", status); + return null; + } + } + }; + + /** + * @param api null if we are trying to download artifact, otherwise a string to call REST api, + * e.g. "/api/storage" + * @param artifact location of artifact + * @param host Artifactory hostname + * @param port Artifactory port + * + * @return null on failure or the response string if return code is in 200 range + *

+ * Protected so we can override this in unit tests + */ + protected String doGet(String api, String artifact, String host, Integer port) { + URIBuilder builder = new URIBuilder().setScheme(_artifactoryScheme).setHost(host).setPort(port); + + String path = null; + if (api != null) { + path = _baseDirectory + "/" + api + "/" + artifact; + } else { + path = _baseDirectory + "/" + artifact; + } + + // Get rid of multiple '/' in url + path = path.replaceAll("/[/]+", "/"); + builder.setPath(path); + + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(_timeoutSeconds * 1000).build(); + HttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); + + String returnValue; + try { + LOG.debug("About to issue a GET to {}", builder); + HttpGet httpget = new HttpGet(builder.build()); + + String responseBody; + responseBody = httpclient.execute(httpget, GETStringResponseHandler.getInstance()); + returnValue = responseBody; + } catch (Exception e) { + LOG.error("Received exception while connecting to Artifactory", e); + returnValue=null; + } + + LOG.debug("Returning {}",returnValue); + return returnValue; + } + + private JSONObject getArtifactMetadata(String location, String host, Integer port) { + String metadataStr = null; + + metadataStr = doGet("/api/storage", location, host, port); + + if (metadataStr == null) { + return null; + } + + JSONObject returnValue; + try { + returnValue = (JSONObject) new JSONParser().parse(metadataStr); + } catch (ParseException e) { + LOG.error("Could not parse JSON string {}", metadataStr, e); + return null; + } + + return returnValue; + } + + private class DirEntryCompare implements Comparator { + @Override + public int compare(JSONObject o1, JSONObject o2) { + return ((String)o1.get("uri")).compareTo((String)o2.get("uri")); + } + } + + private String loadMostRecentArtifact(String location, String host, Integer port) { + // Is this a directory or is it a file? + JSONObject json = getArtifactMetadata(location, host, port); + if (json == null) { + LOG.error("got null metadata"); + return null; + } + String downloadURI = (String) json.get("downloadUri"); + + // This means we are pointing at a file. + if (downloadURI != null) { + // Then get it and return the file as string. + String returnValue = doGet(null, location, host, port); + saveInArtifactoryCache(returnValue); + return returnValue; + } + + // This should mean that we were pointed at a directory. + // Find the most recent child and load that. + JSONArray msg = (JSONArray) json.get("children"); + if (msg == null || msg.size() == 0) { + LOG.error("Expected directory children not present"); + return null; + } + JSONObject newest = (JSONObject) Collections.max(msg, new DirEntryCompare()); + if (newest == null) { + LOG.error("Failed to find most recent artifact uri in {}", location); + return null; + } + + String uri = (String) newest.get("uri"); + if (uri == null) { + LOG.error("Expected directory uri not present"); + return null; + } + String returnValue = doGet(null, location + uri, host, port); + + saveInArtifactoryCache(returnValue); + + return returnValue; + } + + private Map loadFromFile(File file) { + Map ret = IConfigLoader.loadYamlConfigFromFile(file); + + if (ret != null) { + try { + LOG.debug("returning a new map from file {}", file.getCanonicalPath()); + } catch (java.io.IOException e) { + LOG.debug("Could not get PATH from file object in debug print. Ignoring"); + } + _lastReturnedTime = Time.currentTimeSecs(); + _lastReturnedValue = ret; + return _lastReturnedValue; + } + + return null; + } + + + private Map getLatestFromCache() { + String localFileName = _localCacheDir + File.separator + cacheFilename; + return loadFromFile(new File(localFileName)); + } + + private void saveInArtifactoryCache(String yamlData) { + String localFileName = _localCacheDir + File.separator + cacheFilename; + + File cacheFile = new File(localFileName); + try (FileOutputStream fos = new FileOutputStream(cacheFile)) { + fos.write(yamlData.getBytes()); + fos.flush(); + } catch (IOException e) { + LOG.error("Received exception when writing file {}. Attempting delete", localFileName, e); + try { + cacheFile.delete(); + } catch (Exception deleteException) { + LOG.error("Received exception when deleting file {}.", localFileName, deleteException); + } + } + } + + private void makeArtifactoryCache(String location) { + // Just make sure approprite directories exist + String localDirName = (String)_conf.get(Config.STORM_LOCAL_DIR); + if (localDirName == null) { + return; + } + + // First make the cache dir + localDirName = localDirName + File.separator + "nimbus" + File.separator + LOCAL_ARTIFACT_DIR; + File dir = new File(localDirName); + if (! dir.exists()) { + dir.mkdir(); + } + + _localCacheDir = localDirName + File.separator + location.replaceAll(File.separator, "_"); + dir = new File(_localCacheDir); + if (! dir.exists()) { + dir.mkdir(); + } + _cacheInitialized = true; + } + + @Override + public void prepare(Map conf) { + _conf = conf; + String thisTimeout = (String)_conf.get(ARTIFACTORY_TIMEOUT_SECS); + if (thisTimeout != null) { + _timeoutSeconds = Integer.parseInt(thisTimeout); + } + String thisPollTime = (String)_conf.get(ARTIFACTORY_POLL_TIME_SECS); + if (thisPollTime != null) { + _artifactoryPollTimeSecs = Integer.parseInt(thisPollTime); + } + String thisScheme = (String)_conf.get(ARTIFACTORY_SCHEME); + if (thisScheme != null) { + _artifactoryScheme = thisScheme; + } + String thisBase = (String)_conf.get(ARTIFACTORY_BASE_DIRECTORY); + if (thisBase != null) { + _baseDirectory = thisBase; + } + } + + @Override + public Map load() { + // Check for new file every so often + if (_lastReturnedValue != null && ((Time.currentTimeSecs() - _lastReturnedTime) < _artifactoryPollTimeSecs)) { + LOG.debug("returning our last map"); + return _lastReturnedValue; + } + + String myScheme = null; + String location = null; + String host = null; + Integer port = null; + String uriString = (String)_conf.get(ARTIFACTORY_URI); + String filePath = null; + + if (uriString != null) { + URI uri = null; + try { + uri = new URI(uriString); + myScheme = uri.getScheme().toLowerCase(); + if (myScheme.equals("http")) { + host = uri.getHost(); + port = uri.getPort(); + location = uri.getPath(); + if (location.toLowerCase().startsWith(_baseDirectory.toLowerCase())) { + location = location.substring(_baseDirectory.length()); + } + } else if (myScheme.equals("file")) { + filePath = uri.getPath(); + } + } catch (java.net.URISyntaxException e) { + LOG.error("Failed to parse uri={}", uriString); + return null; + } + } else { + LOG.error("URI is null"); + return null; + } + + // host, port, location are only non-null if uri's scheme is set to + // http. If uri's scheme is "file" then these members will be null + // If urs has a file scheme, filePath will be non-null. We should not + // be in a state where filePath is non null, while host, port, and location + // are also non-null + if (myScheme.equals("http")) { + + if (!_cacheInitialized) { + makeArtifactoryCache(location); + } + + // Get the most recent artifact as a String, and then parse the yaml + String yamlConfig = loadMostRecentArtifact(location, host, port); + + // If we failed to get anything from Artifactory try to get it from our local cache + if (yamlConfig == null) { + return getLatestFromCache(); + } + + // Now parse it and return the map. + Yaml yaml = new Yaml(new SafeConstructor()); + Map ret = null; + try { + ret = (Map) yaml.load(yamlConfig); + } catch (Exception e) { + LOG.error("Could not parse yaml."); + return null; + } + + if (ret != null) { + LOG.debug("returning a new map from Artifactory"); + _lastReturnedTime = Time.currentTimeSecs(); + _lastReturnedValue = ret; + return _lastReturnedValue; + } + } else if (myScheme.equals("file")) { + File file = new File(filePath); + Map ret = loadFromFile(file); + + if (ret != null) { + LOG.debug("returning a new map from file {}", filePath); + _lastReturnedTime = Time.currentTimeSecs(); + _lastReturnedValue = ret; + return _lastReturnedValue; + } + } else { + LOG.error("Unhandled scheme {}", myScheme); + } + + return null; + } +} diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java new file mode 100644 index 00000000000..fdfaaddde2e --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.utils; + +import java.io.File; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileConfigLoader implements IConfigLoader { + /** + * Configuration items for this config loader are passed in via confg settings in + * each scheduler that has a configurable loader. + * + * For example, the resource aware scheduler has configuration items defined in Config.java + * that allow a user to configure which implementation of IConfigLoader to use to load + * specific scheduler configs as well as any parameters to pass into the prepare method of + * tht configuration. + * + * resource.aware.scheduler.user.pools.loader can be set to org.apache.storm.scheduler.utils.ArtifactoryConfigLoader + * + * and then + * + * resource.aware.scheduler.user.pools.loader.params can be set to the following + * + * {"file.config.loader.local.file.yaml": "/path/to/my/config.yaml"} + * + **/ + + @SuppressWarnings("rawtypes") + Map conf; + protected static final String LOCAL_FILE_YAML="file.config.loader.local.file.yaml"; + private static final Logger LOG = LoggerFactory.getLogger(FileConfigLoader.class); + + @Override + public void prepare(Map conf) { + this.conf = conf; + } + + @Override + public Map load() { + String localFileName = (String) conf.get(LOCAL_FILE_YAML); + if (localFileName == null) { + LOG.warn("No yaml file defined in configuration."); + return null; + } + + return IConfigLoader.loadYamlConfigFromFile(new File(localFileName)); + } +} diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java new file mode 100644 index 00000000000..bc2bce82b5f --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.utils; + +import org.apache.storm.utils.Utils; +import java.util.Map; +import java.io.File; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public interface IConfigLoader { + + /** + * Pass in configurable parameters for the location of the data, (filename / artifactory url). + * + * See implementing classes for configuration details. + */ + void prepare(Map conf); + + Map load(); + + public static IConfigLoader getConfigLoader(Map conf, String loaderClassConfig, String loaderConfConfig) { + if (conf.get(loaderClassConfig) != null) { + Map loaderConf = (Map)conf.get(loaderConfConfig); + String clazz = (String)conf.get(loaderClassConfig); + if (clazz != null) { + IConfigLoader loader = (IConfigLoader)Utils.newInstance(clazz); + if (loader != null) { + loader.prepare(loaderConf); + return loader; + } + } + } + return null; + } + + /** + * @param file File object referring to the local file containing the configuration + * in yaml format. + * + * @return null on failure or a map containing the loaded configuration + */ + public static Map loadYamlConfigFromFile(File file) { + Map ret = null; + String pathString="Invalid"; + try { + pathString = file.getCanonicalPath(); + Yaml yaml = new Yaml(new SafeConstructor()); + try (FileInputStream fis = new FileInputStream(file)) { + ret = (Map) yaml.load(new InputStreamReader(fis)); + } + } catch (Exception e) { + IConfigLoaderLogger.LOG.error("Failed to load from file {}", pathString, e); + return null; + } + + return ret; + } +} + +final class IConfigLoaderLogger { + static final Logger LOG = LoggerFactory.getLogger(IConfigLoader.class); +} diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestArtifactoryConfigLoader.java b/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestArtifactoryConfigLoader.java new file mode 100644 index 00000000000..bc5b34f3392 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestArtifactoryConfigLoader.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import org.apache.storm.Config; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.scheduler.utils.ArtifactoryConfigLoader; +import org.junit.Assert; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.nio.file.Path; +import java.nio.file.Files; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + +public class TestArtifactoryConfigLoader { + + private static final Logger LOG = LoggerFactory.getLogger(TestArtifactoryConfigLoader.class); + private Path tmpDirPath; + + private class TestClass extends ArtifactoryConfigLoader { + String getData; + HashMap getDataMap = new HashMap(); + + public void setData(String api, String artifact, String data) { + if (api == null) { + getData = data; + } else { + getDataMap.put(artifact, data); + } + } + + @Override + protected String doGet(String api, String artifact, String host, Integer port) { + if (api == null) { + return getData; + } + return getDataMap.get(artifact); + } + }; + + @Before + public void createTempDir() throws Exception { + tmpDirPath = Files.createTempDirectory("TestArtifactoryConfigLoader"); + File f = tmpDirPath.toFile(); + f.mkdir(); + File dir=new File(f, "nimbus"); + dir.mkdir(); + } + + private void recursiveDeleteFile(File dir) { + File[] listing = dir.listFiles(); + if (listing != null) { + for (File f : listing) { + recursiveDeleteFile(f); + } + } + dir.delete(); + } + + @After + public void removeTempDir() throws Exception { + recursiveDeleteFile(tmpDirPath.toFile()); + } + + @Test + public void testInvalid() { + Config conf = new Config(); + TestClass testClass = new TestClass(); + testClass.prepare(conf); + Map ret = testClass.load(); + Assert.assertNull("Unexpectedly returned not null", ret); + } + + @Test + public void testPointingAtDirectory() { + // This is a test where we are configured to point right at an artifact dir + Config conf = new Config(); + conf.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "http://bogushost.yahoo.com:9999/location/of/this/dir"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + TestClass testClass = new TestClass(); + + testClass.setData("Anything", "/location/of/this/dir", + "{\"children\" : [ { \"uri\" : \"/20160621204337.yaml\", \"folder\" : false }]}" ); + testClass.setData(null, null, "{one: 1, two: 2, three: 3, four : 4}"); + testClass.prepare(conf); + Map ret = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertEquals(4, ret.get("four")); + + // Now let's load w/o setting up gets and we should still get valid map back + TestClass tc2 = new TestClass(); + tc2.prepare(conf); + Map ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertEquals(4, ret2.get("four")); + } + + @Test + public void testArtifactUpdate() { + // This is a test where we are configured to point right at an artifact dir + Config conf = new Config(); + conf.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "http://bogushost.yahoo.com:9999/location/of/test/dir"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + Time.startSimulating(); + + try { + TestClass testClass = new TestClass(); + + testClass.setData("Anything", "/location/of/test/dir", + "{\"children\" : [ { \"uri\" : \"/20160621204337.yaml\", \"folder\" : false }]}" ); + testClass.setData(null, null, "{one: 1, two: 2, three: 3}"); + testClass.prepare(conf); + Map ret = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertNull("Unexpectedly contained \"four\"", ret.get("four")); + + // Now let's load w/o setting up gets and we should still get valid map back + TestClass tc2 = new TestClass(); + tc2.prepare(conf); + Map ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertNull("Unexpectedly did not return null", ret2.get("four")); + + // Now let's update it, but not advance time. Should get old map again. + testClass.setData("Anything", "/location/of/test/dir", + "{\"children\" : [ { \"uri\" : \"/20160621204999.yaml\", \"folder\" : false }]}"); + testClass.setData(null, null, "{one: 1, two: 2, three: 3, four: 4}"); + ret = testClass.load(); + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertNull("Unexpectedly did not return null, not enough time passed!", ret.get("four")); + + // Re-load from cached' file. + ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertNull("Unexpectedly did not return null, last cached result should not have \"four\"", ret2.get("four")); + + // Now, let's advance time. + Time.advanceTime(11*60*1000); + + ret = testClass.load(); + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertEquals(4, ret.get("four")); + + // Re-load from cached' file. + ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertEquals(4, ret2.get("four")); + } finally { + Time.stopSimulating(); + } + } + + @Test + public void testPointingAtSpecificArtifact() { + // This is a test where we are configured to point right at a single artifact + Config conf = new Config(); + conf.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "http://bogushost.yahoo.com:9999/location/of/this/artifact"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + TestClass testClass = new TestClass(); + + testClass.setData("Anything", "/location/of/this/artifact", "{ \"downloadUri\": \"anything\"}"); + testClass.setData(null, null, "{one: 1, two: 2, three: 3}"); + testClass.prepare(conf); + Map ret = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + + // Now let's load w/o setting up gets and we should still get valid map back + TestClass tc2 = new TestClass(); + tc2.prepare(conf); + Map ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + } + + @Test + public void testInvalidConfig() throws Exception { + Config conf = new Config(); + + TestClass testClass = new TestClass(); + testClass.prepare(conf); + + Map result = testClass.load(); + + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testMalformedYaml() throws Exception { + // This is a test where we are configured to point right at a single artifact + Config conf = new Config(); + conf.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "http://bogushost.yahoo.com:9999/location/of/this/artifact"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + TestClass testClass = new TestClass(); + + testClass.setData("Anything", "/location/of/this/artifact", "{ \"downloadUri\": \"anything\"}"); + testClass.setData(null, null, "ThisIsNotValidYaml"); + testClass.prepare(conf); + Map ret = testClass.load(); + + Assert.assertNull("Unexpectedly returned a map", ret); + } + + @Test + public void testValidFileChange() throws Exception { + Time.startSimulating(); + + try { + LOG.error("Doing testValidFileChange"); + File temp = File.createTempFile("FileLoader", ".yaml"); + temp.deleteOnExit(); + Map testMap = new HashMap(); + + testMap.put("a", 1); + testMap.put("b", 2); + testMap.put("c", 3); + testMap.put("d", 4); + + Yaml yaml = new Yaml(); + FileWriter fw = new FileWriter(temp); + yaml.dump(testMap, fw); + fw.flush(); + fw.close(); + + Config config = new Config(); + config.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "file://"+temp.getCanonicalPath()); + + TestClass testClass = new TestClass(); + testClass.prepare(config); + + Map result = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", result); + + Assert.assertEquals("Maps are a different size", testMap.keySet().size(), result.keySet().size()); + + for (String key : testMap.keySet() ) { + Integer expectedValue = (Integer)testMap.get(key); + Integer returnedValue = (Integer)result.get(key); + Assert.assertEquals("Bad value for key=" + key, expectedValue, returnedValue); + } + + File temp2 = File.createTempFile("FileLoader", ".yaml"); + temp2.deleteOnExit(); + Map testMap2 = new HashMap(); + testMap2.put("a", 1); + testMap2.put("b", 2); + testMap2.put("c", 3); + testMap2.put("d", 4); + testMap2.put("e", 5); + + FileWriter fw2 = new FileWriter(temp2); + yaml.dump(testMap2, fw2); + fw2.flush(); + fw2.close(); + + Config config2 = new Config(); + config2.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "file://"+temp2.getCanonicalPath()); + + testClass.prepare(config2); + + Map result2 = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", result2); + + // Shouldn't change yet + Assert.assertEquals("Maps are a different size", testMap.keySet().size(), result2.keySet().size()); + + for (String key : testMap.keySet() ) { + Integer expectedValue = (Integer)testMap.get(key); + Integer returnedValue = (Integer)result2.get(key); + Assert.assertEquals("Bad value for key=" + key, expectedValue, returnedValue); + } + + Time.advanceTime(11*60*1000); + + // Now it should + result2 = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", result2); + + // Shouldn't change yet + Assert.assertEquals("Maps are a different size", testMap2.keySet().size(), result2.keySet().size()); + + for (String key : testMap2.keySet() ) { + Integer expectedValue = (Integer)testMap2.get(key); + Integer returnedValue = (Integer)result2.get(key); + Assert.assertEquals("Bad value for key=" + key, expectedValue, returnedValue); + } + + } finally { + Time.stopSimulating(); + } + } +} diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestFileConfigLoader.java b/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestFileConfigLoader.java new file mode 100644 index 00000000000..97135d554c6 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestFileConfigLoader.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.utils.FileConfigLoader; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.nio.file.Files; +import java.nio.file.Files; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + +public class TestFileConfigLoader { + + private static final Logger LOG = LoggerFactory.getLogger(TestFileConfigLoader.class); + + @Test + public void testFileNotThere() { + + Config config = new Config(); + config.put(FileConfigLoader.LOCAL_FILE_YAML, "/this/is/an/invalid/path"); + + FileConfigLoader testLoader = new FileConfigLoader(); + + testLoader.prepare(config); + + Map result = testLoader.load(); + + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testInvalidConfig() throws Exception { + Config config = new Config(); + + FileConfigLoader testLoader = new FileConfigLoader(); + + testLoader.prepare(config); + + Map result = testLoader.load(); + + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testMalformedYaml() throws Exception { + + File temp = File.createTempFile("FileLoader", ".yaml"); + temp.deleteOnExit(); + + FileWriter fw = new FileWriter(temp); + String outputData = "ThisIsNotValidYaml"; + fw.write(outputData, 0, outputData.length()); + fw.flush(); + fw.close(); + + Config config = new Config(); + config.put(FileConfigLoader.LOCAL_FILE_YAML, temp.getCanonicalPath()); + + FileConfigLoader testLoader = new FileConfigLoader(); + + testLoader.prepare(config); + + Map result = testLoader.load(); + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testValidFile() throws Exception { + FileConfigLoader loader = new FileConfigLoader(); + File temp = File.createTempFile("FileLoader", ".yaml"); + temp.deleteOnExit(); + Map testMap = new HashMap(); + + testMap.put("a", 1); + testMap.put("b", 2); + testMap.put("c", 3); + testMap.put("d", 4); + testMap.put("e", 5); + + Yaml yaml = new Yaml(); + FileWriter fw = new FileWriter(temp); + yaml.dump(testMap, fw); + fw.flush(); + fw.close(); + + Config config = new Config(); + config.put(FileConfigLoader.LOCAL_FILE_YAML, temp.getCanonicalPath()); + + loader.prepare(config); + + Map result = loader.load(); + + Assert.assertNotNull("Unexpectedly returned null", result); + + Assert.assertEquals("Maps are a different size", testMap.keySet().size(), result.keySet().size()); + + for (String key : testMap.keySet() ) { + Integer expectedValue = (Integer)testMap.get(key); + Integer returnedValue = (Integer)result.get(key); + Assert.assertEquals("Bad value for key=" + key, expectedValue, returnedValue); + } + } +}