From a222a8293e2be7802f74d6b40120d738ac22d6ce Mon Sep 17 00:00:00 2001 From: Paul Poulosky Date: Fri, 16 Sep 2016 14:11:12 -0500 Subject: [PATCH 1/4] Add dynamic scheduler configuration loading. This has an interface and two implementations, one that will load from a local file, and another that will load a config from artifactory. Merge pull request #807 from ppoulosk/YSTORM-3095-redux [YSTORM-3095] [YSTORM-3779] Re-merge and fix Artifactory scheduler plugins Move to org.apache --- pom.xml | 6 + storm-core/pom.xml | 29 +- .../src/jvm/org/apache/storm/Config.java | 24 ++ .../multitenant/MultitenantScheduler.java | 16 + .../resource/ResourceAwareScheduler.java | 46 ++- .../utils/ArtifactoryConfigLoader.java | 388 ++++++++++++++++++ .../scheduler/utils/FileConfigLoader.java | 65 +++ .../storm/scheduler/utils/IConfigLoader.java | 33 ++ .../storm/scheduler/utils/SchedulerUtils.java | 64 +++ .../utils/TestArtifactoryConfigLoader.java | 360 ++++++++++++++++ .../scheduler/utils/TestFileConfigLoader.java | 134 ++++++ 11 files changed, 1153 insertions(+), 12 deletions(-) create mode 100644 storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java create mode 100644 storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java create mode 100644 storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java create mode 100644 storm-core/src/jvm/org/apache/storm/scheduler/utils/SchedulerUtils.java create mode 100644 storm-core/test/jvm/org/apache/storm/scheduler/utils/TestArtifactoryConfigLoader.java create mode 100644 storm-core/test/jvm/org/apache/storm/scheduler/utils/TestFileConfigLoader.java diff --git a/pom.xml b/pom.xml index 896d735f52e..86c6c6d28cc 100644 --- a/pom.xml +++ b/pom.xml @@ -238,6 +238,7 @@ 0.3.1 1.1.9 0.3.6 + 1.2 1.4.1 2.5 2.5 @@ -625,6 +626,11 @@ commons-io ${commons-io.version} + + commons-logging + commons-logging + ${commons-logging.version} + org.apache.commons commons-compress diff --git a/storm-core/pom.xml b/storm-core/pom.xml index d423767c257..bcecc72c0ce 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -159,6 +159,10 @@ org.apache.commons commons-compress + + commons-logging + commons-logging + org.apache.commons commons-exec @@ -372,6 +376,10 @@ + + org.apache.httpcomponents + httpclient + src/jvm @@ -516,10 +524,12 @@ commons-collections:commons-collections org.apache.hadoop:hadoop-auth commons-cli:commons-cli + org.apache.httpcomponents:httpclient commons-io:commons-io commons-codec:commons-codec commons-fileupload:commons-fileupload commons-lang:commons-lang + commons-logging:commons-logging com.googlecode.json-simple:json-simple org.clojure:math.numeric-tower org.clojure:tools.cli @@ -657,6 +667,10 @@ org.apache.commons.cli org.apache.storm.shade.org.apache.commons.cli + + org.apache.commons.logging + org.apache.storm.shade.org.apache.commons.logging + org.apache.commons.io org.apache.storm.shade.org.apache.commons.io @@ -682,8 +696,12 @@ org.apache.storm.shade.org.apache.commons.lang - org.apache.commons.collections - org.apache.storm.shade.org.apache.commons.collections + org.apache.commons.collections + org.apache.storm.shade.org.apache.commons.collections + + + org.apache.httpcomponents + org.apache.storm.shade.org.apache.httpcomponents org.json.simple @@ -798,6 +816,13 @@ LICENSE.txt + + commons-logging:commons-logging + + META-INF/LICENSE.txt + META-INF/NOTICE.txt + + org.apache.commons:commons-exec diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 0379a181cb7..635dec9b6f8 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -2186,6 +2186,18 @@ public class Config extends HashMap { @isMapEntryType(keyType = String.class, valueType = Number.class) public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools"; + /** + * A plugin that should load the user pools for the multitenant scheduler + */ + @isString + public static final String MULTITENANT_SCHEDULER_USER_POOLS_LOADER = "multitenant.scheduler.user.pools.loader"; + + /** + * Configuration elements for scheduler config loader + */ + @isMapEntryType(keyType = String.class, valueType = String.class) + public static final String MULTITENANT_SCHEDULER_USER_POOLS_LOADER_PARAMS = "multitenant.scheduler.user.pools.loader.params"; + /** * A map of users to another map of the resource guarantees of the user. Used by Resource Aware Scheduler to ensure * per user resource guarantees. @@ -2193,6 +2205,18 @@ public class Config extends HashMap { @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {UserResourcePoolEntryValidator.class}) public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools"; + /** + * A plugin that should load the user pools for the resource aware scheduler + */ + @isString + public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER = "resource.aware.scheduler.user.pools.loader"; + + /** + * Configuration elements for scheduler config loader + */ + @isMapEntryType(keyType = String.class, valueType = String.class) + public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER_PARAMS = "resource.aware.scheduler.user.pools.loader.params"; + /** * The class that specifies the eviction strategy to use in ResourceAwareScheduler */ diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java index 60a5259fe73..cfa0a161a02 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java @@ -29,19 +29,35 @@ import org.apache.storm.scheduler.IScheduler; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.utils.IConfigLoader; +import org.apache.storm.scheduler.utils.SchedulerUtils; import org.apache.storm.utils.Utils; public class MultitenantScheduler implements IScheduler { private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class); @SuppressWarnings("rawtypes") private Map _conf; + IConfigLoader _configLoader; @Override public void prepare(@SuppressWarnings("rawtypes") Map conf) { _conf = conf; + _configLoader = SchedulerUtils.getConfigLoader(conf, Config.MULTITENANT_SCHEDULER_USER_POOLS_LOADER, + Config.MULTITENANT_SCHEDULER_USER_POOLS_LOADER_PARAMS); } private Map getUserConf() { + // Try the loader plugin, if configured + if (_configLoader != null) { + Map ret = (Map)_configLoader.load(); + if (ret != null) { + return ret; + } else { + LOG.debug("Config loader returned null"); + } + } + + // If that fails, fall back on config Map ret = (Map)_conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS); if (ret == null) { ret = new HashMap<>(); diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java index db0e2633da5..10742947f5f 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java @@ -32,6 +32,8 @@ import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.utils.IConfigLoader; +import org.apache.storm.scheduler.utils.SchedulerUtils; import java.util.Collection; import java.util.HashMap; @@ -46,6 +48,7 @@ public class ResourceAwareScheduler implements IScheduler { @SuppressWarnings("rawtypes") private Map conf; + private IConfigLoader configLoader; private static final Logger LOG = LoggerFactory .getLogger(ResourceAwareScheduler.class); @@ -53,7 +56,8 @@ public class ResourceAwareScheduler implements IScheduler { @Override public void prepare(Map conf) { this.conf = conf; - + this.configLoader = SchedulerUtils.getConfigLoader(conf, Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER, + Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER_PARAMS); } @Override @@ -335,6 +339,15 @@ public Map getUserMap() { return this.schedulingState.userMap; } + private Object readFromLoader() { + // If loader plugin is not configured, then leave and fall back + if (this.configLoader == null) { + return null; + } + + return configLoader.load(); + } + /** * Intialize scheduling and running queues * @@ -376,18 +389,12 @@ private void initialize(Topologies topologies, Cluster cluster) { this.schedulingState = new SchedulingState(userMap, cluster, topologies, this.conf); } - /** - * Get resource guarantee configs - * - * @return a map that contains resource guarantees of every user of the following format - * {userid->{resourceType->amountGuaranteed}} - */ - private Map> getUserResourcePools() { - Object raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS); + private Map> convertToDouble(Map> raw) { + Map> ret = new HashMap>(); if (raw != null) { - for (Map.Entry> userPoolEntry : ((Map>) raw).entrySet()) { + for (Map.Entry> userPoolEntry : raw.entrySet()) { String user = userPoolEntry.getKey(); ret.put(user, new HashMap()); for (Map.Entry resourceEntry : userPoolEntry.getValue().entrySet()) { @@ -396,6 +403,25 @@ private Map> getUserResourcePools() { } } + return ret; + } + + /** + * Get resource guarantee configs + * + * @return a map that contains resource guarantees of every user of the following format + * {userid->{resourceType->amountGuaranteed}} + */ + private Map> getUserResourcePools() { + Object raw = readFromLoader(); + if (raw != null) { + return convertToDouble((Map>) raw); + } + + raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS); + + Map> ret = convertToDouble((Map>) raw); + Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false); Map> tmp = (Map>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS); if (tmp != null) { diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java new file mode 100644 index 00000000000..4e6a7197ec8 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java @@ -0,0 +1,388 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import org.apache.storm.utils.Time; +import org.apache.storm.Config; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.Comparator; +import java.util.Map; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.ParseException; +import org.json.simple.parser.JSONParser; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.client.HttpClient; +import org.apache.http.util.EntityUtils; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ArtifactoryConfigLoader implements IConfigLoader { + /** + * Configuration items for this config loader are passed in via confg settings in + * each scheduler that has a configurable loader. + * + * For example, the resource aware scheduler has configuration items defined in Config.java + * that allow a user to configure which implementation of IConfigLoader to use to load + * specific scheduler configs as well as any parameters to pass into the prepare method of + * tht configuration. + * + * resource.aware.scheduler.user.pools.loader can be set to org.apache.storm.scheduler.utils.ArtifactoryConfigLoader + * + * and then + * + * resource.aware.scheduler.user.pools.loader.params can be set to any of the following + * + * {"artifactory.config.loader.uri": "http://artifactory.example.org:9989/artifactory/confs/my_cluster/mt_user_pool"} + * + * {"artifactory.config.loader.uri": "file:///confs/my_cluster/mt_user_pool"} + * + * {"artifactory.config.loader.uri": "file:///confs/my_cluster/mt_user_pool", "artifactory.config.loader.timeout.secs" : "60"} + **/ + protected static final String ARTIFACTORY_URI = "artifactory.config.loader.uri"; + protected static final String ARTIFACTORY_TIMEOUT_SECS="artifactory.config.loader.timeout.secs"; + protected static final String ARTIFACTORY_POLL_TIME_SECS="artifactory.config.loader.polltime.secs"; + protected static final String ARTIFACTORY_SCHEME="artifactory.config.loader.scheme"; + protected static final String ARTIFACTORY_BASE_DIRECTORY="artifactory.config.loader.base.directory"; + protected static final String LOCAL_ARTIFACT_DIR="scheduler_artifacts"; + static final String cacheFilename = "latest.yaml"; + + private static final Logger LOG = LoggerFactory.getLogger(ArtifactoryConfigLoader.class); + + @SuppressWarnings("rawtypes") + private Map _conf; + private int _artifactoryPollTimeSecs = 600; + private boolean _cacheInitialized = false; + // Location of the file in the artifactory archive. Also used to name file in cache. + private String _localCacheDir; + private String _artifactoryScheme = "http"; + private String _baseDirectory = "/artifactory"; + private int _lastReturnedTime = 0; + private int _timeoutSeconds = 10; + private Map _lastReturnedValue; + + private static class OurResponseHandler implements ResponseHandler { + private static OurResponseHandler singleton = null; + + public static OurResponseHandler getInstance() { + if (singleton == null) { + singleton = new OurResponseHandler(); + } + return singleton; + } + + @Override + public String handleResponse(final HttpResponse response) throws IOException { + int status = response.getStatusLine().getStatusCode(); + if (status >= 200 && status < 300) { + HttpEntity entity = response.getEntity(); + return entity != null ? EntityUtils.toString(entity) : null; + } else { + LOG.error("Got unexpected response code {}", status); + return null; + } + } + }; + + // Protected so we can override this in testing + protected String doGet(String api, String artifact, String host, Integer port) { + URIBuilder builder = new URIBuilder().setScheme(_artifactoryScheme).setHost(host).setPort(port); + + if (api != null) { + builder.setPath(_baseDirectory + api + artifact); + } else { + builder.setPath(_baseDirectory + artifact); + } + + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(_timeoutSeconds * 1000).build(); + HttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); + + String returnValue; + try { + LOG.debug("About to issue a GET to {}", builder); + HttpGet httpget = new HttpGet(builder.build()); + + String responseBody; + responseBody = httpclient.execute(httpget, OurResponseHandler.getInstance()); + returnValue = responseBody; + } catch (Exception e) { + LOG.error("Received exception while connecting to Artifactory", e); + returnValue=null; + } + + LOG.debug("Returning {}",returnValue); + return returnValue; + } + + private JSONObject getArtifactMetadata(String location, String host, Integer port) { + String metadataStr = null; + + metadataStr = doGet("/api/storage", location, host, port); + + if (metadataStr == null) { + return null; + } + + JSONObject returnValue; + try { + returnValue = (JSONObject) new JSONParser().parse(metadataStr); + } catch (ParseException e) { + LOG.error("Could not parse JSON string {}", metadataStr, e); + return null; + } + + return returnValue; + } + + private class DirEntryCompare implements Comparator { + @Override + public int compare(JSONObject o1, JSONObject o2) { + return ((String)o1.get("uri")).compareTo((String)o2.get("uri")); + } + } + + private String loadMostRecentArtifact(String location, String host, Integer port) { + // Is this a directory or is it a file? + JSONObject json = getArtifactMetadata(location, host, port); + if (json == null) { + LOG.error("got null metadata"); + return null; + } + String downloadURI = (String) json.get("downloadUri"); + + // This means we are pointing at a file. + if (downloadURI != null) { + // Then get it and return the file as string. + String returnValue = doGet(null, location, host, port); + saveInArtifactoryCache(returnValue); + return returnValue; + } + + // This should mean that we were pointed at a directory. + // Find the most recent child and load that. + JSONArray msg = (JSONArray) json.get("children"); + if (msg == null || msg.size() == 0) { + LOG.error("Expected directory children not present"); + return null; + } + JSONObject newest = (JSONObject) Collections.max(msg, new DirEntryCompare()); + if (newest == null) { + LOG.error("Failed to find most recent artifact uri in {}", location); + return null; + } + + String uri = (String) newest.get("uri"); + if (uri == null) { + LOG.error("Expected directory uri not present"); + return null; + } + String returnValue = doGet(null, location + uri, host, port); + + saveInArtifactoryCache(returnValue); + + return returnValue; + } + + private Map loadFromFile(File file) { + Map ret = SchedulerUtils.loadYamlFromFile(file); + + if (ret != null) { + try { + LOG.debug("returning a new map from file {}", file.getCanonicalPath()); + } catch (java.io.IOException e) { + LOG.debug("Could not get PATH from file object in debug print. Ignoring"); + } + _lastReturnedTime = Time.currentTimeSecs(); + _lastReturnedValue = ret; + return _lastReturnedValue; + } + + return null; + } + + + private Map getLatestFromCache() { + String localFileName = _localCacheDir + File.separator + cacheFilename; + return loadFromFile(new File(localFileName)); + } + + private void saveInArtifactoryCache(String yamlData) { + String localFileName = _localCacheDir + File.separator + cacheFilename; + + File cacheFile = new File(localFileName); + try (FileOutputStream fos = new FileOutputStream(cacheFile)) { + fos.write(yamlData.getBytes()); + fos.flush(); + } catch (IOException e) { + LOG.error("Received exception when writing file {}. Attempting delete", localFileName, e); + try { + cacheFile.delete(); + } catch (Exception deleteException) { + LOG.error("Received exception when deleting file {}.", localFileName, deleteException); + } + } + } + + private void makeArtifactoryCache(String location) { + // Just make sure approprite directories exist + String localDirName = (String)_conf.get(Config.STORM_LOCAL_DIR); + if (localDirName == null) { + return; + } + + // First make the cache dir + localDirName = localDirName + File.separator + "nimbus" + File.separator + LOCAL_ARTIFACT_DIR; + File dir = new File(localDirName); + if (! dir.exists()) { + dir.mkdir(); + } + + _localCacheDir = localDirName + File.separator + location.replaceAll(File.separator, "_"); + dir = new File(_localCacheDir); + if (! dir.exists()) { + dir.mkdir(); + } + _cacheInitialized = true; + } + + @Override + public void prepare(Map conf) { + _conf = conf; + String thisTimeout = (String)_conf.get(ARTIFACTORY_TIMEOUT_SECS); + if (thisTimeout != null) { + _timeoutSeconds = Integer.parseInt(thisTimeout); + } + String thisPollTime = (String)_conf.get(ARTIFACTORY_POLL_TIME_SECS); + if (thisPollTime != null) { + _artifactoryPollTimeSecs = Integer.parseInt(thisPollTime); + } + String thisScheme = (String)_conf.get(ARTIFACTORY_SCHEME); + if (thisScheme != null) { + _artifactoryScheme = thisScheme; + } + String thisBase = (String)_conf.get(ARTIFACTORY_BASE_DIRECTORY); + if (thisBase != null) { + _baseDirectory = thisBase; + } + } + + @Override + public Map load() { + // Check for new file every so often + if (_lastReturnedValue != null && ((Time.currentTimeSecs() - _lastReturnedTime) < _artifactoryPollTimeSecs)) { + LOG.debug("returning our last map"); + return _lastReturnedValue; + } + + String myScheme = null; + String location = null; + String host = null; + Integer port = null; + String uriString = (String)_conf.get(ARTIFACTORY_URI); + String filePath = null; + + if (uriString != null) { + URI uri = null; + try { + uri = new URI(uriString); + myScheme = uri.getScheme().toLowerCase(); + if (myScheme.equals("http")) { + host = uri.getHost(); + port = uri.getPort(); + location = uri.getPath(); + if (location.toLowerCase().startsWith(_baseDirectory.toLowerCase())) { + location = location.substring(_baseDirectory.length()); + } + } else if (myScheme.equals("file")) { + filePath = uri.getPath(); + } + } catch (java.net.URISyntaxException e) { + LOG.error("Failed to parse uri={}", uriString); + return null; + } + } else { + LOG.error("URI is null"); + return null; + } + + // host, port, location are only non-null if uri's scheme is set to + // http. If uri's scheme is "file" then these members will be null + // If urs has a file scheme, filePath will be non-null. We should not + // be in a state where filePath is non null, while host, port, and location + // are also non-null + if (myScheme.equals("http")) { + + if (!_cacheInitialized) { + makeArtifactoryCache(location); + } + + // Get the most recent artifact as a String, and then parse the yaml + String yamlConfig = loadMostRecentArtifact(location, host, port); + + // If we failed to get anything from Artifactory try to get it from our local cache + if (yamlConfig == null) { + return getLatestFromCache(); + } + + // Now parse it and return the map. + Yaml yaml = new Yaml(new SafeConstructor()); + Map ret = null; + try { + ret = (Map) yaml.load(yamlConfig); + } catch (Exception e) { + LOG.error("Could not parse yaml."); + return null; + } + + if (ret != null) { + LOG.debug("returning a new map from Artifactory"); + _lastReturnedTime = Time.currentTimeSecs(); + _lastReturnedValue = ret; + return _lastReturnedValue; + } + } else if (myScheme.equals("file")) { + File file = new File(filePath); + Map ret = loadFromFile(file); + + if (ret != null) { + LOG.debug("returning a new map from file {}", filePath); + _lastReturnedTime = Time.currentTimeSecs(); + _lastReturnedValue = ret; + return _lastReturnedValue; + } + } else { + LOG.error("Unhandled scheme {}", myScheme); + } + + return null; + } +} diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java new file mode 100644 index 00000000000..e42d027cf26 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.utils; + +import java.io.File; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileConfigLoader implements IConfigLoader { + /** + * Configuration items for this config loader are passed in via confg settings in + * each scheduler that has a configurable loader. + * + * For example, the resource aware scheduler has configuration items defined in Config.java + * that allow a user to configure which implementation of IConfigLoader to use to load + * specific scheduler configs as well as any parameters to pass into the prepare method of + * tht configuration. + * + * resource.aware.scheduler.user.pools.loader can be set to org.apache.storm.scheduler.utils.ArtifactoryConfigLoader + * + * and then + * + * resource.aware.scheduler.user.pools.loader.params can be set to the following + * + * {"file.config.loader.local.file.yaml": "/path/to/my/config.yaml"} + * + **/ + + @SuppressWarnings("rawtypes") + Map conf; + protected static final String LOCAL_FILE_YAML="file.config.loader.local.file.yaml"; + private static final Logger LOG = LoggerFactory.getLogger(FileConfigLoader.class); + + @Override + public void prepare(Map conf) { + this.conf = conf; + } + + @Override + public Map load() { + String localFileName = (String) conf.get(LOCAL_FILE_YAML); + if (localFileName == null) { + LOG.warn("No yaml file defined in configuration."); + return null; + } + + return SchedulerUtils.loadYamlFromFile(new File(localFileName)); + } +} diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java new file mode 100644 index 00000000000..e6f0967c56d --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.utils; + +import java.util.Map; + + +public interface IConfigLoader { + + /** + * Pass in configurable parameters for the location of the data, (filename / artifactory url). + * + * See implementing classes for configuration details. + */ + void prepare(Map conf); + + Map load(); +} diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/SchedulerUtils.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/SchedulerUtils.java new file mode 100644 index 00000000000..5a675b42b47 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/SchedulerUtils.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.utils; + +import org.apache.storm.utils.Utils; +import java.util.Map; +import java.io.File; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SchedulerUtils { + private static final Logger LOG = LoggerFactory.getLogger(SchedulerUtils.class); + + public static IConfigLoader getConfigLoader(Map conf, String loaderClassConfig, String loaderConfConfig) { + if (conf.get(loaderClassConfig) != null) { + Map loaderConf = (Map)conf.get(loaderConfConfig); + String clazz = (String)conf.get(loaderClassConfig); + if (clazz != null) { + IConfigLoader loader = (IConfigLoader)Utils.newInstance(clazz); + if (loader != null) { + loader.prepare(loaderConf); + return loader; + } + } + } + return null; + } + + public static Map loadYamlFromFile(File file) { + Map ret = null; + String pathString="Invalid"; + try { + pathString = file.getCanonicalPath(); + Yaml yaml = new Yaml(new SafeConstructor()); + try (FileInputStream fis = new FileInputStream(file)) { + ret = (Map) yaml.load(new InputStreamReader(fis)); + } + } catch (Exception e) { + LOG.error("Failed to load from file {}", pathString, e); + return null; + } + + return ret; + } +} diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestArtifactoryConfigLoader.java b/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestArtifactoryConfigLoader.java new file mode 100644 index 00000000000..bc5b34f3392 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestArtifactoryConfigLoader.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import org.apache.storm.Config; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.scheduler.utils.ArtifactoryConfigLoader; +import org.junit.Assert; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.nio.file.Path; +import java.nio.file.Files; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + +public class TestArtifactoryConfigLoader { + + private static final Logger LOG = LoggerFactory.getLogger(TestArtifactoryConfigLoader.class); + private Path tmpDirPath; + + private class TestClass extends ArtifactoryConfigLoader { + String getData; + HashMap getDataMap = new HashMap(); + + public void setData(String api, String artifact, String data) { + if (api == null) { + getData = data; + } else { + getDataMap.put(artifact, data); + } + } + + @Override + protected String doGet(String api, String artifact, String host, Integer port) { + if (api == null) { + return getData; + } + return getDataMap.get(artifact); + } + }; + + @Before + public void createTempDir() throws Exception { + tmpDirPath = Files.createTempDirectory("TestArtifactoryConfigLoader"); + File f = tmpDirPath.toFile(); + f.mkdir(); + File dir=new File(f, "nimbus"); + dir.mkdir(); + } + + private void recursiveDeleteFile(File dir) { + File[] listing = dir.listFiles(); + if (listing != null) { + for (File f : listing) { + recursiveDeleteFile(f); + } + } + dir.delete(); + } + + @After + public void removeTempDir() throws Exception { + recursiveDeleteFile(tmpDirPath.toFile()); + } + + @Test + public void testInvalid() { + Config conf = new Config(); + TestClass testClass = new TestClass(); + testClass.prepare(conf); + Map ret = testClass.load(); + Assert.assertNull("Unexpectedly returned not null", ret); + } + + @Test + public void testPointingAtDirectory() { + // This is a test where we are configured to point right at an artifact dir + Config conf = new Config(); + conf.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "http://bogushost.yahoo.com:9999/location/of/this/dir"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + TestClass testClass = new TestClass(); + + testClass.setData("Anything", "/location/of/this/dir", + "{\"children\" : [ { \"uri\" : \"/20160621204337.yaml\", \"folder\" : false }]}" ); + testClass.setData(null, null, "{one: 1, two: 2, three: 3, four : 4}"); + testClass.prepare(conf); + Map ret = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertEquals(4, ret.get("four")); + + // Now let's load w/o setting up gets and we should still get valid map back + TestClass tc2 = new TestClass(); + tc2.prepare(conf); + Map ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertEquals(4, ret2.get("four")); + } + + @Test + public void testArtifactUpdate() { + // This is a test where we are configured to point right at an artifact dir + Config conf = new Config(); + conf.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "http://bogushost.yahoo.com:9999/location/of/test/dir"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + Time.startSimulating(); + + try { + TestClass testClass = new TestClass(); + + testClass.setData("Anything", "/location/of/test/dir", + "{\"children\" : [ { \"uri\" : \"/20160621204337.yaml\", \"folder\" : false }]}" ); + testClass.setData(null, null, "{one: 1, two: 2, three: 3}"); + testClass.prepare(conf); + Map ret = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertNull("Unexpectedly contained \"four\"", ret.get("four")); + + // Now let's load w/o setting up gets and we should still get valid map back + TestClass tc2 = new TestClass(); + tc2.prepare(conf); + Map ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertNull("Unexpectedly did not return null", ret2.get("four")); + + // Now let's update it, but not advance time. Should get old map again. + testClass.setData("Anything", "/location/of/test/dir", + "{\"children\" : [ { \"uri\" : \"/20160621204999.yaml\", \"folder\" : false }]}"); + testClass.setData(null, null, "{one: 1, two: 2, three: 3, four: 4}"); + ret = testClass.load(); + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertNull("Unexpectedly did not return null, not enough time passed!", ret.get("four")); + + // Re-load from cached' file. + ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertNull("Unexpectedly did not return null, last cached result should not have \"four\"", ret2.get("four")); + + // Now, let's advance time. + Time.advanceTime(11*60*1000); + + ret = testClass.load(); + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertEquals(4, ret.get("four")); + + // Re-load from cached' file. + ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertEquals(4, ret2.get("four")); + } finally { + Time.stopSimulating(); + } + } + + @Test + public void testPointingAtSpecificArtifact() { + // This is a test where we are configured to point right at a single artifact + Config conf = new Config(); + conf.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "http://bogushost.yahoo.com:9999/location/of/this/artifact"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + TestClass testClass = new TestClass(); + + testClass.setData("Anything", "/location/of/this/artifact", "{ \"downloadUri\": \"anything\"}"); + testClass.setData(null, null, "{one: 1, two: 2, three: 3}"); + testClass.prepare(conf); + Map ret = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + + // Now let's load w/o setting up gets and we should still get valid map back + TestClass tc2 = new TestClass(); + tc2.prepare(conf); + Map ret2 = tc2.load(); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + } + + @Test + public void testInvalidConfig() throws Exception { + Config conf = new Config(); + + TestClass testClass = new TestClass(); + testClass.prepare(conf); + + Map result = testClass.load(); + + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testMalformedYaml() throws Exception { + // This is a test where we are configured to point right at a single artifact + Config conf = new Config(); + conf.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "http://bogushost.yahoo.com:9999/location/of/this/artifact"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + TestClass testClass = new TestClass(); + + testClass.setData("Anything", "/location/of/this/artifact", "{ \"downloadUri\": \"anything\"}"); + testClass.setData(null, null, "ThisIsNotValidYaml"); + testClass.prepare(conf); + Map ret = testClass.load(); + + Assert.assertNull("Unexpectedly returned a map", ret); + } + + @Test + public void testValidFileChange() throws Exception { + Time.startSimulating(); + + try { + LOG.error("Doing testValidFileChange"); + File temp = File.createTempFile("FileLoader", ".yaml"); + temp.deleteOnExit(); + Map testMap = new HashMap(); + + testMap.put("a", 1); + testMap.put("b", 2); + testMap.put("c", 3); + testMap.put("d", 4); + + Yaml yaml = new Yaml(); + FileWriter fw = new FileWriter(temp); + yaml.dump(testMap, fw); + fw.flush(); + fw.close(); + + Config config = new Config(); + config.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "file://"+temp.getCanonicalPath()); + + TestClass testClass = new TestClass(); + testClass.prepare(config); + + Map result = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", result); + + Assert.assertEquals("Maps are a different size", testMap.keySet().size(), result.keySet().size()); + + for (String key : testMap.keySet() ) { + Integer expectedValue = (Integer)testMap.get(key); + Integer returnedValue = (Integer)result.get(key); + Assert.assertEquals("Bad value for key=" + key, expectedValue, returnedValue); + } + + File temp2 = File.createTempFile("FileLoader", ".yaml"); + temp2.deleteOnExit(); + Map testMap2 = new HashMap(); + testMap2.put("a", 1); + testMap2.put("b", 2); + testMap2.put("c", 3); + testMap2.put("d", 4); + testMap2.put("e", 5); + + FileWriter fw2 = new FileWriter(temp2); + yaml.dump(testMap2, fw2); + fw2.flush(); + fw2.close(); + + Config config2 = new Config(); + config2.put(ArtifactoryConfigLoader.ARTIFACTORY_URI, "file://"+temp2.getCanonicalPath()); + + testClass.prepare(config2); + + Map result2 = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", result2); + + // Shouldn't change yet + Assert.assertEquals("Maps are a different size", testMap.keySet().size(), result2.keySet().size()); + + for (String key : testMap.keySet() ) { + Integer expectedValue = (Integer)testMap.get(key); + Integer returnedValue = (Integer)result2.get(key); + Assert.assertEquals("Bad value for key=" + key, expectedValue, returnedValue); + } + + Time.advanceTime(11*60*1000); + + // Now it should + result2 = testClass.load(); + + Assert.assertNotNull("Unexpectedly returned null", result2); + + // Shouldn't change yet + Assert.assertEquals("Maps are a different size", testMap2.keySet().size(), result2.keySet().size()); + + for (String key : testMap2.keySet() ) { + Integer expectedValue = (Integer)testMap2.get(key); + Integer returnedValue = (Integer)result2.get(key); + Assert.assertEquals("Bad value for key=" + key, expectedValue, returnedValue); + } + + } finally { + Time.stopSimulating(); + } + } +} diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestFileConfigLoader.java b/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestFileConfigLoader.java new file mode 100644 index 00000000000..97135d554c6 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/scheduler/utils/TestFileConfigLoader.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.utils.FileConfigLoader; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.nio.file.Files; +import java.nio.file.Files; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + +public class TestFileConfigLoader { + + private static final Logger LOG = LoggerFactory.getLogger(TestFileConfigLoader.class); + + @Test + public void testFileNotThere() { + + Config config = new Config(); + config.put(FileConfigLoader.LOCAL_FILE_YAML, "/this/is/an/invalid/path"); + + FileConfigLoader testLoader = new FileConfigLoader(); + + testLoader.prepare(config); + + Map result = testLoader.load(); + + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testInvalidConfig() throws Exception { + Config config = new Config(); + + FileConfigLoader testLoader = new FileConfigLoader(); + + testLoader.prepare(config); + + Map result = testLoader.load(); + + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testMalformedYaml() throws Exception { + + File temp = File.createTempFile("FileLoader", ".yaml"); + temp.deleteOnExit(); + + FileWriter fw = new FileWriter(temp); + String outputData = "ThisIsNotValidYaml"; + fw.write(outputData, 0, outputData.length()); + fw.flush(); + fw.close(); + + Config config = new Config(); + config.put(FileConfigLoader.LOCAL_FILE_YAML, temp.getCanonicalPath()); + + FileConfigLoader testLoader = new FileConfigLoader(); + + testLoader.prepare(config); + + Map result = testLoader.load(); + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testValidFile() throws Exception { + FileConfigLoader loader = new FileConfigLoader(); + File temp = File.createTempFile("FileLoader", ".yaml"); + temp.deleteOnExit(); + Map testMap = new HashMap(); + + testMap.put("a", 1); + testMap.put("b", 2); + testMap.put("c", 3); + testMap.put("d", 4); + testMap.put("e", 5); + + Yaml yaml = new Yaml(); + FileWriter fw = new FileWriter(temp); + yaml.dump(testMap, fw); + fw.flush(); + fw.close(); + + Config config = new Config(); + config.put(FileConfigLoader.LOCAL_FILE_YAML, temp.getCanonicalPath()); + + loader.prepare(config); + + Map result = loader.load(); + + Assert.assertNotNull("Unexpectedly returned null", result); + + Assert.assertEquals("Maps are a different size", testMap.keySet().size(), result.keySet().size()); + + for (String key : testMap.keySet() ) { + Integer expectedValue = (Integer)testMap.get(key); + Integer returnedValue = (Integer)result.get(key); + Assert.assertEquals("Bad value for key=" + key, expectedValue, returnedValue); + } + } +} From 7242b6dcf223dbec1ad44a42b40409b76cc145db Mon Sep 17 00:00:00 2001 From: Paul Poulosky Date: Mon, 5 Dec 2016 09:23:18 -0600 Subject: [PATCH 2/4] Rework for code review from evans --- .../src/jvm/org/apache/storm/Config.java | 4 +- .../multitenant/MultitenantScheduler.java | 5 +- .../resource/ResourceAwareScheduler.java | 3 +- .../utils/ArtifactoryConfigLoader.java | 103 ++++++++++++------ .../scheduler/utils/FileConfigLoader.java | 2 +- .../storm/scheduler/utils/IConfigLoader.java | 54 ++++++++- .../storm/scheduler/utils/SchedulerUtils.java | 64 ----------- 7 files changed, 130 insertions(+), 105 deletions(-) delete mode 100644 storm-core/src/jvm/org/apache/storm/scheduler/utils/SchedulerUtils.java diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 635dec9b6f8..1cb8d1b4586 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -2189,7 +2189,7 @@ public class Config extends HashMap { /** * A plugin that should load the user pools for the multitenant scheduler */ - @isString + @isImplementationOfClass(implementsClass = org.apache.storm.scheduler.utils.IConfigLoader.class) public static final String MULTITENANT_SCHEDULER_USER_POOLS_LOADER = "multitenant.scheduler.user.pools.loader"; /** @@ -2208,7 +2208,7 @@ public class Config extends HashMap { /** * A plugin that should load the user pools for the resource aware scheduler */ - @isString + @isImplementationOfClass(implementsClass = org.apache.storm.scheduler.utils.IConfigLoader.class) public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER = "resource.aware.scheduler.user.pools.loader"; /** diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java index cfa0a161a02..b9a020c8dd4 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java @@ -30,7 +30,6 @@ import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.utils.IConfigLoader; -import org.apache.storm.scheduler.utils.SchedulerUtils; import org.apache.storm.utils.Utils; public class MultitenantScheduler implements IScheduler { @@ -42,7 +41,7 @@ public class MultitenantScheduler implements IScheduler { @Override public void prepare(@SuppressWarnings("rawtypes") Map conf) { _conf = conf; - _configLoader = SchedulerUtils.getConfigLoader(conf, Config.MULTITENANT_SCHEDULER_USER_POOLS_LOADER, + _configLoader = IConfigLoader.getConfigLoader(conf, Config.MULTITENANT_SCHEDULER_USER_POOLS_LOADER, Config.MULTITENANT_SCHEDULER_USER_POOLS_LOADER_PARAMS); } @@ -53,7 +52,7 @@ private Map getUserConf() { if (ret != null) { return ret; } else { - LOG.debug("Config loader returned null"); + LOG.warn("Config loader returned null"); } } diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java index 10742947f5f..732a8ceb8b6 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java @@ -33,7 +33,6 @@ import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; import org.apache.storm.scheduler.utils.IConfigLoader; -import org.apache.storm.scheduler.utils.SchedulerUtils; import java.util.Collection; import java.util.HashMap; @@ -56,7 +55,7 @@ public class ResourceAwareScheduler implements IScheduler { @Override public void prepare(Map conf) { this.conf = conf; - this.configLoader = SchedulerUtils.getConfigLoader(conf, Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER, + this.configLoader = IConfigLoader.getConfigLoader(conf, Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER, Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER_PARAMS); } diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java index 4e6a7197ec8..b2c6d58dc1a 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java @@ -46,28 +46,41 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * ArtifactoryConfigLoader.java - A dynamic loader for that can load + * scheduler configurations for user + * resource guarantees from Artifactory. + * + *

+ * Configuration items for this config loader are passed in via config settings in + * each scheduler that has a configurable loader. + * + *

+ * For example, the resource aware scheduler has configuration items defined in Config.java + * that allow a user to configure which implementation of IConfigLoader to use to load + * specific scheduler configs as well as any parameters to pass into the prepare method of + * that configuration. + * + *

+ * resource.aware.scheduler.user.pools.loader can be set to org.apache.storm.scheduler.utils.ArtifactoryConfigLoader + * + *

+ * and then + * + *

+ * resource.aware.scheduler.user.pools.loader.params can be set to any of the following + * + *

+ * + * {"artifactory.config.loader.uri": "http://artifactory.example.org:9989/artifactory/confs/my_cluster/mt_user_pool"} + * + * {"artifactory.config.loader.uri": "file:///confs/my_cluster/mt_user_pool"} + * + * {"artifactory.config.loader.uri": "file:///confs/my_cluster/mt_user_pool", "artifactory.config.loader.timeout.secs" : "60"} + * + * + */ public class ArtifactoryConfigLoader implements IConfigLoader { - /** - * Configuration items for this config loader are passed in via confg settings in - * each scheduler that has a configurable loader. - * - * For example, the resource aware scheduler has configuration items defined in Config.java - * that allow a user to configure which implementation of IConfigLoader to use to load - * specific scheduler configs as well as any parameters to pass into the prepare method of - * tht configuration. - * - * resource.aware.scheduler.user.pools.loader can be set to org.apache.storm.scheduler.utils.ArtifactoryConfigLoader - * - * and then - * - * resource.aware.scheduler.user.pools.loader.params can be set to any of the following - * - * {"artifactory.config.loader.uri": "http://artifactory.example.org:9989/artifactory/confs/my_cluster/mt_user_pool"} - * - * {"artifactory.config.loader.uri": "file:///confs/my_cluster/mt_user_pool"} - * - * {"artifactory.config.loader.uri": "file:///confs/my_cluster/mt_user_pool", "artifactory.config.loader.timeout.secs" : "60"} - **/ protected static final String ARTIFACTORY_URI = "artifactory.config.loader.uri"; protected static final String ARTIFACTORY_TIMEOUT_SECS="artifactory.config.loader.timeout.secs"; protected static final String ARTIFACTORY_POLL_TIME_SECS="artifactory.config.loader.polltime.secs"; @@ -90,17 +103,30 @@ public class ArtifactoryConfigLoader implements IConfigLoader { private int _timeoutSeconds = 10; private Map _lastReturnedValue; - private static class OurResponseHandler implements ResponseHandler { - private static OurResponseHandler singleton = null; - - public static OurResponseHandler getInstance() { + /** + * GETStringResponseHandler - A private class used to check the response + * coming back from httpclient. + * + */ + private static class GETStringResponseHandler implements ResponseHandler { + private static GETStringResponseHandler singleton = null; + + /** + * @return a singleton httpclient GET response handler + */ + public static GETStringResponseHandler getInstance() { if (singleton == null) { - singleton = new OurResponseHandler(); + singleton = new GETStringResponseHandler(); } return singleton; } @Override + /** + * @param response The http response to verify. + * + * @return null on failure or the response string if return code is in 200 range + */ public String handleResponse(final HttpResponse response) throws IOException { int status = response.getStatusLine().getStatusCode(); if (status >= 200 && status < 300) { @@ -113,15 +139,30 @@ public String handleResponse(final HttpResponse response) throws IOException { } }; - // Protected so we can override this in testing + /** + * @param api null if we are trying to download artifact, otherwise a string to call REST api, + * e.g. "/api/storage" + * @param artifact location of artifact + * @param host Artifactory hostname + * @param port Artifactory port + * + * @return null on failure or the response string if return code is in 200 range + *

+ * Protected so we can override this in unit tests + */ protected String doGet(String api, String artifact, String host, Integer port) { URIBuilder builder = new URIBuilder().setScheme(_artifactoryScheme).setHost(host).setPort(port); + String path = null; if (api != null) { - builder.setPath(_baseDirectory + api + artifact); + path = _baseDirectory + "/" + api + "/" + artifact; } else { - builder.setPath(_baseDirectory + artifact); + path = _baseDirectory + "/" + artifact; } + + // Get rid of multiple '/' in url + path = path.replaceAll("/[/]+", "/"); + builder.setPath(path); RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(_timeoutSeconds * 1000).build(); HttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); @@ -132,7 +173,7 @@ protected String doGet(String api, String artifact, String host, Integer port) { HttpGet httpget = new HttpGet(builder.build()); String responseBody; - responseBody = httpclient.execute(httpget, OurResponseHandler.getInstance()); + responseBody = httpclient.execute(httpget, GETStringResponseHandler.getInstance()); returnValue = responseBody; } catch (Exception e) { LOG.error("Received exception while connecting to Artifactory", e); @@ -213,7 +254,7 @@ private String loadMostRecentArtifact(String location, String host, Integer port } private Map loadFromFile(File file) { - Map ret = SchedulerUtils.loadYamlFromFile(file); + Map ret = IConfigLoader.loadYamlConfigFromFile(file); if (ret != null) { try { diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java index e42d027cf26..fdfaaddde2e 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/FileConfigLoader.java @@ -60,6 +60,6 @@ public Map load() { return null; } - return SchedulerUtils.loadYamlFromFile(new File(localFileName)); + return IConfigLoader.loadYamlConfigFromFile(new File(localFileName)); } } diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java index e6f0967c56d..bc2bce82b5f 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/utils/IConfigLoader.java @@ -17,7 +17,15 @@ */ package org.apache.storm.scheduler.utils; +import org.apache.storm.utils.Utils; import java.util.Map; +import java.io.File; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public interface IConfigLoader { @@ -27,7 +35,49 @@ public interface IConfigLoader { * * See implementing classes for configuration details. */ - void prepare(Map conf); + void prepare(Map conf); - Map load(); + Map load(); + + public static IConfigLoader getConfigLoader(Map conf, String loaderClassConfig, String loaderConfConfig) { + if (conf.get(loaderClassConfig) != null) { + Map loaderConf = (Map)conf.get(loaderConfConfig); + String clazz = (String)conf.get(loaderClassConfig); + if (clazz != null) { + IConfigLoader loader = (IConfigLoader)Utils.newInstance(clazz); + if (loader != null) { + loader.prepare(loaderConf); + return loader; + } + } + } + return null; + } + + /** + * @param file File object referring to the local file containing the configuration + * in yaml format. + * + * @return null on failure or a map containing the loaded configuration + */ + public static Map loadYamlConfigFromFile(File file) { + Map ret = null; + String pathString="Invalid"; + try { + pathString = file.getCanonicalPath(); + Yaml yaml = new Yaml(new SafeConstructor()); + try (FileInputStream fis = new FileInputStream(file)) { + ret = (Map) yaml.load(new InputStreamReader(fis)); + } + } catch (Exception e) { + IConfigLoaderLogger.LOG.error("Failed to load from file {}", pathString, e); + return null; + } + + return ret; + } +} + +final class IConfigLoaderLogger { + static final Logger LOG = LoggerFactory.getLogger(IConfigLoader.class); } diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/utils/SchedulerUtils.java b/storm-core/src/jvm/org/apache/storm/scheduler/utils/SchedulerUtils.java deleted file mode 100644 index 5a675b42b47..00000000000 --- a/storm-core/src/jvm/org/apache/storm/scheduler/utils/SchedulerUtils.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.scheduler.utils; - -import org.apache.storm.utils.Utils; -import java.util.Map; -import java.io.File; -import org.yaml.snakeyaml.Yaml; -import org.yaml.snakeyaml.constructor.SafeConstructor; -import java.io.FileInputStream; -import java.io.InputStreamReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SchedulerUtils { - private static final Logger LOG = LoggerFactory.getLogger(SchedulerUtils.class); - - public static IConfigLoader getConfigLoader(Map conf, String loaderClassConfig, String loaderConfConfig) { - if (conf.get(loaderClassConfig) != null) { - Map loaderConf = (Map)conf.get(loaderConfConfig); - String clazz = (String)conf.get(loaderClassConfig); - if (clazz != null) { - IConfigLoader loader = (IConfigLoader)Utils.newInstance(clazz); - if (loader != null) { - loader.prepare(loaderConf); - return loader; - } - } - } - return null; - } - - public static Map loadYamlFromFile(File file) { - Map ret = null; - String pathString="Invalid"; - try { - pathString = file.getCanonicalPath(); - Yaml yaml = new Yaml(new SafeConstructor()); - try (FileInputStream fis = new FileInputStream(file)) { - ret = (Map) yaml.load(new InputStreamReader(fis)); - } - } catch (Exception e) { - LOG.error("Failed to load from file {}", pathString, e); - return null; - } - - return ret; - } -} From 135ac954568c6bcc7b611d19a6f1ac55dabdeebf Mon Sep 17 00:00:00 2001 From: Paul Poulosky Date: Fri, 16 Dec 2016 09:42:29 -0600 Subject: [PATCH 3/4] Add md file --- docs/IConfigLoader.md | 58 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 docs/IConfigLoader.md diff --git a/docs/IConfigLoader.md b/docs/IConfigLoader.md new file mode 100644 index 00000000000..238d5aaecd3 --- /dev/null +++ b/docs/IConfigLoader.md @@ -0,0 +1,58 @@ +--- +title: IConfigLoader +layout: documentation +documentation: true +--- + + +### Introduction +IConfigLoader is an interface designed to allow way to dynamically load scheduler resource constraints into scheduler implementations. Currently, the MultiTenant scheduler uses this interface to dynamically load the number of isolated nodes a given user has been guaranteed, and the ResoureAwareScheduler uses the interface to dynamically load per user resource guarantees. + +------ + +### Interface +``` +public interface IConfigLoader { + void prepare(Map conf); + Map load(); + public static IConfigLoader getConfigLoader(Map conf, String loaderClassConfig, String loaderConfConfig); + public static Map loadYamlConfigFromFile(File file); +}; +``` +#### Description + - prepare is called upon class loading, and allows schedulers to pass in configuation items to the classes that implement IConfigLoader + - load is called by the scheduler whenever it wishes to retrieve the most recent configuration map + - getConfigLoader is a static helper class that will return the implementation of IConfigLoader that the scheduler requests + - loadYamlConfigFromFile is a utility function used by implemenations of IConfigLoader that will load in a local yaml file and return a map + +#### Loader Configuration +The loaders are dynamically selected and dynamically configured through configuration items in the scheduler implementations. + +##### Example +``` +resource.aware.scheduler.user.pools.loader: "org.apache.storm.scheduler.utils.ArtifactoryConfigLoader" +resource.aware.scheduler.user.pools.loader.params: + artifactory.config.loader.uri: "http://artifactory.my.company.com:8000/artifactory/configurations/clusters/my_cluster/ras_pools" + artifactory.config.loader.timeout.secs: "30" +multitenant.scheduler.user.pools.loader: "org.apache.storm.scheduler.utils.FileConfigLoader" +multitenant.scheduler.user.pools.loader.params: + file.config.loader.local.file.yaml: "/path/to/my/config.yaml" +``` +### Implementations + +There are currently two implemenations of IConfigLoader + - org.apache.storm.scheduler.utils.ArtifactoryConfigLoader: Load configurations from an Artifactory server + - org.apache.storm.scheduler.utils.FileConfigLoader: Load configurations from a local file + +Each of these have configurations that the scheduler can pass into the implemenation when the prepare method is called + +#### FileConfigLoader + - file.config.loader.local.file.yaml: A path to a local yaml file that represents the map the scheduler will use + +#### ArtifactoryConfigLoader + + - artifactory.config.loader.uri: This can either be a reference to an individual file in Artifactory or to a directory. If it is a directory, the file with the largest lexographic name will be returned. + - artifactory.config.loader.timeout.secs: This is the amount of time an http connection to the artifactory server will wait before timing out. The default is 10. + - artifactory.config.loader.polltime.secs: This is the frequency at which the plugin will call out to artifactory instead of returning the most recently cached result. The default is 600 seconds. + - artifactory.config.loader.scheme: This is the scheme to use when connecting to the Artifactory server. The default is http. + - artifactory.config.loader.base.directory: This is the part of the uri, configurable in Artifactory, which represents the top of the directory tree. It defaults to "/artifactory". From 63abe63a8742064526bd1e39540a1885c51219fb Mon Sep 17 00:00:00 2001 From: Paul Poulosky Date: Thu, 30 Mar 2017 10:06:15 -0500 Subject: [PATCH 4/4] Address review comments --- docs/IConfigLoader.md | 2 +- pom.xml | 6 ------ storm-core/pom.xml | 4 ---- 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/docs/IConfigLoader.md b/docs/IConfigLoader.md index 238d5aaecd3..1dbd98ac1a1 100644 --- a/docs/IConfigLoader.md +++ b/docs/IConfigLoader.md @@ -6,7 +6,7 @@ documentation: true ### Introduction -IConfigLoader is an interface designed to allow way to dynamically load scheduler resource constraints into scheduler implementations. Currently, the MultiTenant scheduler uses this interface to dynamically load the number of isolated nodes a given user has been guaranteed, and the ResoureAwareScheduler uses the interface to dynamically load per user resource guarantees. +IConfigLoader is an interface designed to allow dynamic loading of scheduler resource constraints. Currently, the MultiTenant scheduler uses this interface to dynamically load the number of isolated nodes a given user has been guaranteed, and the ResoureAwareScheduler uses the interface to dynamically load per user resource guarantees. ------ diff --git a/pom.xml b/pom.xml index 86c6c6d28cc..896d735f52e 100644 --- a/pom.xml +++ b/pom.xml @@ -238,7 +238,6 @@ 0.3.1 1.1.9 0.3.6 - 1.2 1.4.1 2.5 2.5 @@ -626,11 +625,6 @@ commons-io ${commons-io.version} - - commons-logging - commons-logging - ${commons-logging.version} - org.apache.commons commons-compress diff --git a/storm-core/pom.xml b/storm-core/pom.xml index bcecc72c0ce..0361e7fc3a5 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -159,10 +159,6 @@ org.apache.commons commons-compress - - commons-logging - commons-logging - org.apache.commons commons-exec