diff --git a/docs/IConfigLoader.md b/docs/IConfigLoader.md new file mode 100644 index 00000000000..3c2f4de1ed9 --- /dev/null +++ b/docs/IConfigLoader.md @@ -0,0 +1,51 @@ +--- +title: IConfigLoader +layout: documentation +documentation: true +--- + + +### Introduction +IConfigLoader is an interface designed to allow dynamic loading of scheduler resource constraints. Currently, the MultiTenant scheduler uses this interface to dynamically load the number of isolated nodes a given user has been guaranteed, and the ResoureAwareScheduler uses the interface to dynamically load per user resource guarantees. + +The following interface is provided for users to create an IConfigLoader instance based on the scheme of the `scheduler.config.loader.uri`. +``` +ConfigLoaderFactoryService.createConfigLoader(Map conf) +``` + +------ + +### Interface +``` +public interface IConfigLoader { + Map load(); +}; +``` +#### Description + - load is called by the scheduler whenever it wishes to retrieve the most recent configuration map. + +#### Loader Configuration +The loaders are dynamically selected and dynamically configured through configuration items in the scheduler implementations. + +##### Example +``` +scheduler.config.loader.uri: "artifactory+http://artifactory.my.company.com:8000/artifactory/configurations/clusters/my_cluster/ras_pools" +scheduler.config.loader.timeout.sec: 30 +``` +Or +``` +scheduler.config.loader.uri: "file:///path/to/my/config.yaml" +``` +### Implementations + +There are currently two implemenations of IConfigLoader + - org.apache.storm.scheduler.utils.ArtifactoryConfigLoader: Load configurations from an Artifactory server. + It will be used if users add `artifactory+` to the scheme of the real URI and set to `scheduler.config.loader.uri`. + - org.apache.storm.scheduler.utils.FileConfigLoader: Load configurations from a local file. It will be used if users use `file` scheme. + +#### Configurations + - scheduler.config.loader.uri: For `ArtifactoryConfigLoader`, this can either be a reference to an individual file in Artifactory or to a directory. If it is a directory, the file with the largest lexographic name will be returned. + For `FileConfigLoader`, this is the URI pointing to a file. + - scheduler.config.loader.timeout.secs: Currently only used in `ArtifactoryConfigLoader`. It is the amount of time an http connection to the artifactory server will wait before timing out. The default is 10. + - scheduler.config.loader.polltime.secs: Currently only used in `ArtifactoryConfigLoader`. It is the frequency at which the plugin will call out to artifactory instead of returning the most recently cached result. The default is 600 seconds. + - scheduler.config.loader.artifactory.base.directory: Only used in `ArtifactoryConfigLoader`. It is the part of the uri, configurable in Artifactory, which represents the top of the directory tree. It defaults to "/artifactory". \ No newline at end of file diff --git a/pom.xml b/pom.xml index c3a04b9d6e4..9ea2501ab13 100644 --- a/pom.xml +++ b/pom.xml @@ -261,6 +261,7 @@ 3.3.2 0.9.0 16.0.1 + 1.0-rc3 3.9.0.Final 1.0.2 1.6.6 @@ -899,6 +900,12 @@ guava ${guava.version} + + com.google.auto.service + auto-service + ${auto-service.version} + true + org.apache.logging.log4j log4j-api diff --git a/storm-server/pom.xml b/storm-server/pom.xml index 311b58f5159..b2a2c0e47dc 100644 --- a/storm-server/pom.xml +++ b/storm-server/pom.xml @@ -59,6 +59,11 @@ org.apache.commons commons-compress + + com.google.auto.service + auto-service + true + diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index a1292e0513a..75a9cad6617 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -221,7 +221,7 @@ public class DaemonConfig implements Validated { public static final String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator"; /** - * Class name for authorization plugin for Nimbus + * Class name for authorization plugin for Nimbus. */ @isString public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer"; @@ -271,7 +271,7 @@ public class DaemonConfig implements Validated { public static final String UI_CENTRAL_LOGGING_URL = "ui.central.logging.url"; /** - * HTTP UI port for log viewer + * HTTP UI port for log viewer. */ @isInteger @isPositiveNumber @@ -284,46 +284,46 @@ public class DaemonConfig implements Validated { public static final String LOGVIEWER_CHILDOPTS = "logviewer.childopts"; /** - * How often to clean up old log files + * How often to clean up old log files. */ @isInteger @isPositiveNumber public static final String LOGVIEWER_CLEANUP_INTERVAL_SECS = "logviewer.cleanup.interval.secs"; /** - * How many minutes since a log was last modified for the log to be considered for clean-up + * How many minutes since a log was last modified for the log to be considered for clean-up. */ @isInteger @isPositiveNumber public static final String LOGVIEWER_CLEANUP_AGE_MINS = "logviewer.cleanup.age.mins"; /** - * The maximum number of bytes all worker log files can take up in MB + * The maximum number of bytes all worker log files can take up in MB. */ @isPositiveNumber public static final String LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB = "logviewer.max.sum.worker.logs.size.mb"; /** - * The maximum number of bytes per worker's files can take up in MB + * The maximum number of bytes per worker's files can take up in MB. */ @isPositiveNumber public static final String LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB = "logviewer.max.per.worker.logs.size.mb"; /** - * Storm Logviewer HTTPS port + * Storm Logviewer HTTPS port. */ @isInteger @isPositiveNumber public static final String LOGVIEWER_HTTPS_PORT = "logviewer.https.port"; /** - * Path to the keystore containing the certs used by Storm Logviewer for HTTPS communications + * Path to the keystore containing the certs used by Storm Logviewer for HTTPS communications. */ @isString public static final String LOGVIEWER_HTTPS_KEYSTORE_PATH = "logviewer.https.keystore.path"; /** - * Password for the keystore for HTTPS for Storm Logviewer + * Password for the keystore for HTTPS for Storm Logviewer. */ @isString public static final String LOGVIEWER_HTTPS_KEYSTORE_PASSWORD = "logviewer.https.keystore.password"; @@ -342,13 +342,13 @@ public class DaemonConfig implements Validated { public static final String LOGVIEWER_HTTPS_KEY_PASSWORD = "logviewer.https.key.password"; /** - * Path to the truststore containing the certs used by Storm Logviewer for HTTPS communications + * Path to the truststore containing the certs used by Storm Logviewer for HTTPS communications. */ @isString public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PATH = "logviewer.https.truststore.path"; /** - * Password for the truststore for HTTPS for Storm Logviewer + * Password for the truststore for HTTPS for Storm Logviewer. */ @isString public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD = "logviewer.https.truststore.password"; @@ -370,13 +370,13 @@ public class DaemonConfig implements Validated { public static final String LOGVIEWER_HTTPS_NEED_CLIENT_AUTH = "logviewer.https.need.client.auth"; /** - * A list of users allowed to view logs via the Log Viewer + * A list of users allowed to view logs via the Log Viewer. */ @isStringList public static final String LOGS_USERS = "logs.users"; /** - * A list of groups allowed to view logs via the Log Viewer + * A list of groups allowed to view logs via the Log Viewer. */ @isStringList public static final String LOGS_GROUPS = "logs.groups"; @@ -394,19 +394,19 @@ public class DaemonConfig implements Validated { public static final String UI_CHILDOPTS = "ui.childopts"; /** - * A class implementing javax.servlet.Filter for authenticating/filtering UI requests + * A class implementing javax.servlet.Filter for authenticating/filtering UI requests. */ @isString public static final String UI_FILTER = "ui.filter"; /** - * Initialization parameters for the javax.servlet.Filter + * Initialization parameters for the javax.servlet.Filter. */ @isMapEntryType(keyType = String.class, valueType = String.class) public static final String UI_FILTER_PARAMS = "ui.filter.params"; /** - * The size of the header buffer for the UI in bytes + * The size of the header buffer for the UI in bytes. */ @isInteger @isPositiveNumber @@ -555,7 +555,7 @@ public class DaemonConfig implements Validated { public static final String DRPC_HTTPS_NEED_CLIENT_AUTH = "drpc.https.need.client.auth"; /** - * Class name for authorization plugin for DRPC client + * Class name for authorization plugin for DRPC client. */ @isString public static final String DRPC_AUTHORIZER = "drpc.authorizer"; @@ -578,7 +578,7 @@ public class DaemonConfig implements Validated { public static final String DRPC_CHILDOPTS = "drpc.childopts"; /** - * the metadata configured on the supervisor + * the metadata configured on the supervisor. */ @isMapEntryType(keyType = String.class, valueType = String.class) public static final String SUPERVISOR_SCHEDULER_META = "supervisor.scheduler.meta"; @@ -590,7 +590,7 @@ public class DaemonConfig implements Validated { */ @isNoDuplicateInList @NotNull - @isListEntryCustom(entryValidatorClasses={ConfigValidation.IntegerValidator.class,ConfigValidation.PositiveNumberValidator.class}) + @isListEntryCustom(entryValidatorClasses = {ConfigValidation.IntegerValidator.class,ConfigValidation.PositiveNumberValidator.class}) public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports"; /** @@ -652,14 +652,14 @@ public class DaemonConfig implements Validated { public static final String NIMBUS_SLOTS_PER_TOPOLOGY = "nimbus.slots.perTopology"; /** - * A class implementing javax.servlet.Filter for DRPC HTTP requests + * A class implementing javax.servlet.Filter for DRPC HTTP requests. */ @isString public static final String DRPC_HTTP_FILTER = "drpc.http.filter"; /** * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP - * service + * service. */ @isMapEntryType(keyType = String.class, valueType = String.class) public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params"; @@ -679,7 +679,7 @@ public class DaemonConfig implements Validated { public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts"; /** - * How many seconds to sleep for before shutting down threads on worker + * How many seconds to sleep for before shutting down threads on worker. */ @isInteger @isPositiveNumber @@ -739,7 +739,7 @@ public class DaemonConfig implements Validated { /** * The command launched supervisor with worker arguments * pid, action and [target_directory] - * Where action is - start profile, stop profile, jstack, heapdump and kill against pid + * Where action is - start profile, stop profile, jstack, heapdump and kill against pid. * */ @isString @@ -762,7 +762,7 @@ public class DaemonConfig implements Validated { public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = "storm.cluster.metrics.consumer.publish.interval.secs"; /** - * Enables user-first classpath. See topology.classpath.beginning + * Enables user-first classpath. See topology.classpath.beginning. */ @isBoolean public static final String STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED="storm.topology.classpath.beginning.enabled"; @@ -791,6 +791,38 @@ public class DaemonConfig implements Validated { @isMapEntryType(keyType = String.class, valueType = Number.class) public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines"; + /** + * For ArtifactoryConfigLoader, this can either be a reference to an individual file in Artifactory or to a directory. + * If it is a directory, the file with the largest lexographic name will be returned. Users need to add "artifactory+" to the beginning of + * the real URI to use ArtifactoryConfigLoader. + * For FileConfigLoader, this is the URI pointing to a file. + */ + @isString + public static final String SCHEDULER_CONFIG_LOADER_URI = "scheduler.config.loader.uri"; + + /** + * It is the frequency at which the plugin will call out to artifactory instead of returning the most recently cached result. + * Currently it's only used in ArtifactoryConfigLoader. + */ + @isInteger + @isPositiveNumber + public static final String SCHEDULER_CONFIG_LOADER_POLLTIME_SECS = "scheduler.config.loader.polltime.secs"; + + /** + * It is the amount of time an http connection to the artifactory server will wait before timing out. + * Currently it's only used in ArtifactoryConfigLoader. + */ + @isInteger + @isPositiveNumber + public static final String SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS = "scheduler.config.loader.timeout.secs"; + + /** + * It is the part of the uri, configurable in Artifactory, which represents the top of the directory tree. + * It's only used in ArtifactoryConfigLoader. + */ + @isString + public static final String SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY = "scheduler.config.loader.artifactory.base.directory"; + /** * A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler * to org.apache.storm.scheduler.multitenant.MultitenantScheduler @@ -806,14 +838,14 @@ public class DaemonConfig implements Validated { public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools"; /** - * The class that specifies the eviction strategy to use in ResourceAwareScheduler + * The class that specifies the eviction strategy to use in ResourceAwareScheduler. */ @NotNull @isImplementationOfClass(implementsClass = IEvictionStrategy.class) public static final String RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY = "resource.aware.scheduler.eviction.strategy"; /** - * the class that specifies the scheduling priority strategy to use in ResourceAwareScheduler + * the class that specifies the scheduling priority strategy to use in ResourceAwareScheduler. */ @NotNull @isImplementationOfClass(implementsClass = ISchedulingPriorityStrategy.class) @@ -842,7 +874,7 @@ public class DaemonConfig implements Validated { public static final String STORM_CGROUP_RESOURCES = "storm.cgroup.resources"; /** - * name for the cgroup hierarchy + * name for the cgroup hierarchy. */ @isString public static final String STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name"; @@ -856,25 +888,25 @@ public class DaemonConfig implements Validated { public static final String STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE = "storm.resource.isolation.plugin.enable"; /** - * root directory for cgoups + * root directory for cgoups. */ @isString public static String STORM_SUPERVISOR_CGROUP_ROOTDIR = "storm.supervisor.cgroup.rootdir"; /** - * the manually set memory limit (in MB) for each CGroup on supervisor node + * the manually set memory limit (in MB) for each CGroup on supervisor node. */ @isPositiveNumber public static String STORM_WORKER_CGROUP_MEMORY_MB_LIMIT = "storm.worker.cgroup.memory.mb.limit"; /** - * the manually set cpu share for each CGroup on supervisor node + * the manually set cpu share for each CGroup on supervisor node. */ @isPositiveNumber public static String STORM_WORKER_CGROUP_CPU_LIMIT = "storm.worker.cgroup.cpu.limit"; /** - * full path to cgexec command + * full path to cgexec command. */ @isString public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd"; diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java index 95cbf63095d..8bd9035650c 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java @@ -20,44 +20,66 @@ import java.util.HashMap; import java.util.Map; - import org.apache.storm.DaemonConfig; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.Config; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.IScheduler; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService; +import org.apache.storm.scheduler.utils.IConfigLoader; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MultitenantScheduler implements IScheduler { private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class); @SuppressWarnings("rawtypes") - private Map _conf; + private Map conf; + protected IConfigLoader configLoader; @Override - public void prepare(@SuppressWarnings("rawtypes") Map conf) { - _conf = conf; + public void prepare(Map conf) { + this.conf = conf; + configLoader = ConfigLoaderFactoryService.createConfigLoader(conf); + } - + + /** + * Load from configLoaders first; if no config available, read from multitenant-scheduler.yaml; + * if no config available from multitenant-scheduler.yaml, get configs from conf. Only one will be used. + * @return User pool configs. + */ private Map getUserConf() { - Map ret = (Map)_conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); - if (ret == null) { - ret = new HashMap<>(); - } else { - ret = new HashMap<>(ret); + Map ret; + + // Try the loader plugin, if configured + if (configLoader != null) { + ret = (Map) configLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + if (ret != null) { + return ret; + } else { + LOG.warn("Config loader returned null. Will try to read from multitenant-scheduler.yaml"); + } } + // If that fails, fall back on the multitenant-scheduler.yaml file Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false); - Map tmp = (Map)fromFile.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); - if (tmp != null) { - ret.putAll(tmp); + ret = (Map)fromFile.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + if (ret != null) { + return ret; + } else { + LOG.warn("Reading from multitenant-scheduler.yaml returned null. This could because the file is not available. " + + "Will load configs from storm configuration"); } - return ret; - } + // If that fails, use config + ret = (Map) conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + if (ret == null) { + return new HashMap<>(); + } else { + return ret; + } + } @Override public Map config() { diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java index 19838f452fa..669079b6854 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.scheduler.Cluster; @@ -31,8 +30,10 @@ import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy; import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; +import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService; import org.apache.storm.utils.ReflectionUtils; import org.apache.storm.utils.DisallowedStrategyException; +import org.apache.storm.scheduler.utils.IConfigLoader; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,14 +43,16 @@ public class ResourceAwareScheduler implements IScheduler { private Map conf; private ISchedulingPriorityStrategy schedulingPrioritystrategy; private IEvictionStrategy evictionStrategy; + private IConfigLoader configLoader; @Override public void prepare(Map conf) { this.conf = conf; schedulingPrioritystrategy = (ISchedulingPriorityStrategy) ReflectionUtils.newInstance( - (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY)); + (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY)); evictionStrategy = (IEvictionStrategy) ReflectionUtils.newInstance( - (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY)); + (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY)); + configLoader = ConfigLoaderFactoryService.createConfigLoader(conf); } @Override @@ -75,7 +78,7 @@ public void schedule(Topologies topologies, Cluster cluster) { td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap); } catch (Exception ex) { LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!", - schedulingPrioritystrategy.getClass().getName(), ex); + schedulingPrioritystrategy.getClass().getName(), ex); break; } if (td == null) { @@ -104,8 +107,8 @@ public void scheduleTopology(TopologyDetails td, Cluster cluster, final User top } catch (DisallowedStrategyException e) { topologySubmitter.markTopoUnsuccess(td); cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass() - + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY - + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString()); + + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY + + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString()); return; } catch (RuntimeException e) { LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.", @@ -115,7 +118,7 @@ public void scheduleTopology(TopologyDetails td, Cluster cluster, final User top + strategyConf + ". Please check logs for details"); return; } - + while (true) { // A copy of the cluster that restricts the strategy to only modify a single topology SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId()); @@ -139,8 +142,8 @@ public void scheduleTopology(TopologyDetails td, Cluster cluster, final User top LOG.error("Unsuccessful attempting to assign executors to nodes.", ex); topologySubmitter.markTopoUnsuccess(td); cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " - + "IllegalStateException thrown when attempting to assign executors to nodes. Please check" - + " log for details."); + + "IllegalStateException thrown when attempting to assign executors to nodes. Please check" + + " log for details."); } return; } else { @@ -151,8 +154,8 @@ public void scheduleTopology(TopologyDetails td, Cluster cluster, final User top madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap); } catch (Exception ex) { LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}." - + " No evictions will be done!", evictionStrategy.getClass().getName(), - td.getName(), ex); + + " No evictions will be done!", evictionStrategy.getClass().getName(), + td.getName(), ex); topologySubmitter.markTopoUnsuccess(td); return; } @@ -160,7 +163,7 @@ public void scheduleTopology(TopologyDetails td, Cluster cluster, final User top LOG.debug("Could not make space for topo {} will move to attempted", td); topologySubmitter.markTopoUnsuccess(td); cluster.setStatus(td.getId(), "Not enough resources to schedule - " - + result.getErrorMessage()); + + result.getErrorMessage()); return; } continue; @@ -171,7 +174,7 @@ public void scheduleTopology(TopologyDetails td, Cluster cluster, final User top } } else { LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", - td.getName()); + td.getName()); topologySubmitter.markTopoUnsuccess(td, cluster); return; } @@ -202,19 +205,11 @@ private Map getUsers(Cluster cluster) { return userMap; } - /** - * Get resource guarantee configs. - * - * @return a map that contains resource guarantees of every user of the following format - * {userid->{resourceType->amountGuaranteed}} - */ - private Map> getUserResourcePools() { - Map> raw = - (Map>) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS); - Map> ret = new HashMap<>(); + private Map> convertToDouble(Map> raw) { + Map> ret = new HashMap>(); if (raw != null) { - for (Map.Entry> userPoolEntry : raw.entrySet()) { + for (Map.Entry> userPoolEntry : raw.entrySet()) { String user = userPoolEntry.getKey(); ret.put(user, new HashMap()); for (Map.Entry resourceEntry : userPoolEntry.getValue().entrySet()) { @@ -223,18 +218,42 @@ private Map> getUserResourcePools() { } } - Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false); - Map> tmp = - (Map>) fromFile.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS); - if (tmp != null) { - for (Map.Entry> userPoolEntry : tmp.entrySet()) { - String user = userPoolEntry.getKey(); - ret.put(user, new HashMap()); - for (Map.Entry resourceEntry : userPoolEntry.getValue().entrySet()) { - ret.get(user).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue()); - } + return ret; + } + + /** + * Get resource guarantee configs. + * Load from configLoaders first; if no config available, read from user-resource-pools.yaml; + * if no config available from user-resource-pools.yaml, get configs from conf. Only one will be used. + * @return a map that contains resource guarantees of every user of the following format + * {userid->{resourceType->amountGuaranteed}} + */ + private Map> getUserResourcePools() { + Map> raw; + + // Try the loader plugin, if configured + if (configLoader != null) { + raw = (Map>) configLoader.load(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS); + if (raw != null) { + return convertToDouble(raw); + } else { + LOG.warn("Config loader returned null. Will try to read from user-resource-pools.yaml"); } } - return ret; + + // if no configs from loader, try to read from user-resource-pools.yaml + Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false); + raw = (Map>) fromFile.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS); + if (raw != null) { + return convertToDouble(raw); + } else { + LOG.warn("Reading from user-resource-pools.yaml returned null. This could because the file is not available. " + + "Will load configs from storm configuration"); + } + + // if no configs from user-resource-pools.yaml, get configs from conf + raw = (Map>) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS); + + return convertToDouble(raw); } } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java new file mode 100644 index 00000000000..3afa9034311 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Comparator; +import java.util.Map; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.storm.Config; +import org.apache.storm.DaemonConfig; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + +/** + * A dynamic loader that can load scheduler configurations for user resource guarantees from Artifactory (an artifact repository manager). + */ +public class ArtifactoryConfigLoader implements IConfigLoader { + protected static final String LOCAL_ARTIFACT_DIR = "scheduler_artifacts"; + static final String cacheFilename = "latest.yaml"; + private static final String DEFAULT_ARTIFACTORY_BASE_DIRECTORY = "/artifactory"; + private static final int DEFAULT_POLLTIME_SECS = 600; + private static final int DEFAULT_TIMEOUT_SECS = 10; + private static final String ARTIFACTORY_SCHEME_PREFIX = "artifactory+"; + + private static final Logger LOG = LoggerFactory.getLogger(ArtifactoryConfigLoader.class); + + private Map conf; + private int artifactoryPollTimeSecs = DEFAULT_POLLTIME_SECS; + private boolean cacheInitialized = false; + // Location of the file in the artifactory archive. Also used to name file in cache. + private String localCacheDir; + private String baseDirectory = DEFAULT_ARTIFACTORY_BASE_DIRECTORY; + private int lastReturnedTime = 0; + private int timeoutSeconds = DEFAULT_TIMEOUT_SECS; + private Map lastReturnedValue; + private URI targetURI = null; + private JSONParser jsonParser; + private String scheme; + + public ArtifactoryConfigLoader(Map conf) { + this.conf = conf; + Integer thisTimeout = (Integer) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS); + if (thisTimeout != null) { + timeoutSeconds = thisTimeout; + } + Integer thisPollTime = (Integer) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_POLLTIME_SECS); + if (thisPollTime != null) { + artifactoryPollTimeSecs =thisPollTime; + } + String thisBase = (String) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY); + if (thisBase != null) { + baseDirectory = thisBase; + } + String uriString = (String) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI); + if (uriString == null) { + LOG.error("No URI defined in {} configuration.", DaemonConfig.SCHEDULER_CONFIG_LOADER_URI); + } else { + try { + targetURI = new URI(uriString); + scheme = targetURI.getScheme().substring(ARTIFACTORY_SCHEME_PREFIX.length()); + } catch(URISyntaxException e) { + LOG.error("Failed to parse uri={}", uriString); + } + } + jsonParser = new JSONParser(); + } + + /** + * Load the configs associated with the configKey from the targetURI. + * @param configKey The key from which we want to get the scheduler config. + * @return The scheduler configuration if exists; null otherwise. + */ + @Override + public Map load(String configKey) { + if (targetURI == null) { + return null; + } + + // Check for new file every so often + int currentTimeSecs = Time.currentTimeSecs(); + if (lastReturnedValue != null && ((currentTimeSecs - lastReturnedTime) < artifactoryPollTimeSecs)) { + LOG.debug("currentTimeSecs: {}; lastReturnedTime {}; artifactoryPollTimeSecs: {}. Returning our last map.", + currentTimeSecs, lastReturnedTime, artifactoryPollTimeSecs); + return (Map) lastReturnedValue.get(configKey); + } + + try { + Map raw = loadFromURI(targetURI); + if (raw != null) { + return (Map) raw.get(configKey); + } + } catch (Exception e) { + LOG.error("Failed to load from uri {}", targetURI); + } + return null; + } + + + /** + * A private class used to check the response coming back from httpclient. + */ + private static class GETStringResponseHandler implements ResponseHandler { + private static GETStringResponseHandler singleton = null; + + /** + * @return a singleton httpclient GET response handler + */ + public static GETStringResponseHandler getInstance() { + if (singleton == null) { + singleton = new GETStringResponseHandler(); + } + return singleton; + } + + /** + * @param response The http response to verify. + * @return null on failure or the response string if return code is in 200 range + */ + @Override + public String handleResponse(final HttpResponse response) throws IOException { + int status = response.getStatusLine().getStatusCode(); + HttpEntity entity = response.getEntity(); + String entityString = (entity != null ? EntityUtils.toString(entity) : null); + if (status >= 200 && status < 300) { + return entityString; + } else { + LOG.error("Got unexpected response code {}; entity: {}", status, entityString); + return null; + } + } + } + + /** + * @param api null if we are trying to download artifact, otherwise a string to call REST api, + * e.g. "/api/storage" + * @param artifact location of artifact + * @param host Artifactory hostname + * @param port Artifactory port + * @return null on failure or the response string if return code is in 200 range + * + *

Protected so we can override this in unit tests + */ + protected String doGet(String api, String artifact, String host, Integer port) { + URIBuilder builder = new URIBuilder().setScheme(scheme).setHost(host).setPort(port); + + String path = null; + if (api != null) { + path = baseDirectory + "/" + api + "/" + artifact; + } else { + path = baseDirectory + "/" + artifact; + } + + // Get rid of multiple '/' in url + path = path.replaceAll("/[/]+", "/"); + builder.setPath(path); + + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(timeoutSeconds * 1000).build(); + HttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); + + String returnValue; + try { + LOG.debug("About to issue a GET to {}", builder); + HttpGet httpget = new HttpGet(builder.build()); + String responseBody; + responseBody = httpclient.execute(httpget, GETStringResponseHandler.getInstance()); + returnValue = responseBody; + } catch (Exception e) { + LOG.error("Received exception while connecting to Artifactory", e); + returnValue = null; + } + + LOG.debug("Returning {}", returnValue); + return returnValue; + } + + private JSONObject getArtifactMetadata(String location, String host, Integer port) { + String metadataStr = null; + + metadataStr = doGet("/api/storage", location, host, port); + + if (metadataStr == null) { + return null; + } + + JSONObject returnValue; + try { + returnValue = (JSONObject) jsonParser.parse(metadataStr); + } catch (ParseException e) { + LOG.error("Could not parse JSON string {}", metadataStr, e); + return null; + } + + return returnValue; + } + + private class DirEntryCompare implements Comparator { + @Override + public int compare(JSONObject o1, JSONObject o2) { + return ((String)o1.get("uri")).compareTo((String)o2.get("uri")); + } + } + + private String loadMostRecentArtifact(String location, String host, Integer port) { + // Is this a directory or is it a file? + JSONObject json = getArtifactMetadata(location, host, port); + if (json == null) { + LOG.error("got null metadata"); + return null; + } + String downloadURI = (String) json.get("downloadUri"); + + // This means we are pointing at a file. + if (downloadURI != null) { + // Then get it and return the file as string. + String returnValue = doGet(null, location, host, port); + saveInArtifactoryCache(returnValue); + return returnValue; + } + + // This should mean that we were pointed at a directory. + // Find the most recent child and load that. + JSONArray msg = (JSONArray) json.get("children"); + if (msg == null || msg.size() == 0) { + LOG.error("Expected directory children not present"); + return null; + } + JSONObject newest = (JSONObject) Collections.max(msg, new DirEntryCompare()); + if (newest == null) { + LOG.error("Failed to find most recent artifact uri in {}", location); + return null; + } + + String uri = (String) newest.get("uri"); + if (uri == null) { + LOG.error("Expected directory uri not present"); + return null; + } + String returnValue = doGet(null, location + uri, host, port); + saveInArtifactoryCache(returnValue); + return returnValue; + } + + private void updateLastReturned(Map ret) { + lastReturnedTime = Time.currentTimeSecs(); + lastReturnedValue = ret; + } + + private Map loadFromFile(File file) { + Map ret = null; + + try { + ret = (Map) Utils.readYamlFile(file.getCanonicalPath()); + } catch (IOException e) { + LOG.error("Filed to load from file. Exception: {}", e.getMessage()); + } + + if (ret != null) { + try { + LOG.debug("returning a new map from file {}", file.getCanonicalPath()); + } catch (java.io.IOException e) { + LOG.debug("Could not get PATH from file object in debug print. Ignoring"); + } + return ret; + } + + return null; + } + + private Map getLatestFromCache() { + String localFileName = localCacheDir + File.separator + cacheFilename; + return loadFromFile(new File(localFileName)); + } + + private void saveInArtifactoryCache(String yamlData) { + if (yamlData == null) { + LOG.warn("Will not save null data into the artifactory cache"); + return; + } + + String localFileName = localCacheDir + File.separator + cacheFilename; + + File cacheFile = new File(localFileName); + try (FileOutputStream fos = new FileOutputStream(cacheFile)) { + fos.write(yamlData.getBytes()); + fos.flush(); + } catch (IOException e) { + LOG.error("Received exception when writing file {}. Attempting delete", localFileName, e); + try { + cacheFile.delete(); + } catch (Exception deleteException) { + LOG.error("Received exception when deleting file {}.", localFileName, deleteException); + } + } + } + + private void makeArtifactoryCache(String location) { + // Just make sure appropriate directories exist + String localDirName = (String) conf.get(Config.STORM_LOCAL_DIR); + if (localDirName == null) { + return; + } + + // First make the cache dir + localDirName = localDirName + File.separator + "nimbus" + File.separator + LOCAL_ARTIFACT_DIR; + File dir = new File(localDirName); + if (! dir.exists()) { + dir.mkdirs(); + } + + localCacheDir = localDirName + File.separator + location.replaceAll(File.separator, "_"); + dir = new File(localCacheDir); + if (! dir.exists()) { + dir.mkdir(); + } + cacheInitialized = true; + } + + private Map loadFromURI(URI uri) { + String host = uri.getHost(); + Integer port = uri.getPort(); + String location = uri.getPath(); + if (location.toLowerCase().startsWith(baseDirectory.toLowerCase())) { + location = location.substring(baseDirectory.length()); + } + + if (!cacheInitialized) { + makeArtifactoryCache(location); + } + + // Get the most recent artifact as a String, and then parse the yaml + String yamlConfig = loadMostRecentArtifact(location, host, port); + + // If we failed to get anything from Artifactory try to get it from our local cache + if (yamlConfig == null) { + Map ret = getLatestFromCache(); + updateLastReturned(ret); + return ret; + } + + // Now parse it and return the map. + Yaml yaml = new Yaml(new SafeConstructor()); + Map ret = null; + try { + ret = (Map) yaml.load(yamlConfig); + } catch (Exception e) { + LOG.error("Could not parse yaml."); + return null; + } + + if (ret != null) { + LOG.debug("returning a new map from Artifactory"); + updateLastReturned(ret); + return ret; + } + + return null; + } +} \ No newline at end of file diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java new file mode 100644 index 00000000000..6494ae23c65 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import com.google.auto.service.AutoService; +import java.net.URI; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory class for ArtifactoryConfigLoader. + */ +@AutoService(IConfigLoaderFactory.class) +public class ArtifactoryConfigLoaderFactory implements IConfigLoaderFactory { + private static final Logger LOG = LoggerFactory.getLogger(ArtifactoryConfigLoaderFactory.class); + + /** + * Create a ArtifactoryConfigLoader if the scheme of the URI is "artifactory+http" or "artifactory+https"; otherwise return null. + * @param uri The URI which pointing to the config file/directory location on Artifactory server. + * @param conf The storm configuration. + * @return A ArtifactoryConfigLoader if the scheme is "artifactory+http" or "artifactory+https"; otherwise, null. + */ + @Override + public IConfigLoader createIfSupported(URI uri, Map conf) { + String scheme = uri.getScheme(); + if ("artifactory+http".equalsIgnoreCase(scheme) || "artifactory+https".equalsIgnoreCase(scheme)) { + return new ArtifactoryConfigLoader(conf); + } else { + LOG.debug("scheme {} not supported in this factory.", scheme); + return null; + } + } +} diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java new file mode 100644 index 00000000000..53df78b6eb5 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.storm.DaemonConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The user interface to create a concrete IConfigLoader instance for use. + */ +public class ConfigLoaderFactoryService { + + private static Logger LOG = LoggerFactory.getLogger(IConfigLoaderFactory.class); + + /** + * SerivceLoader loads all the implementations of IConfigLoaderFactory for use. + */ + private static ServiceLoader serviceLoader = ServiceLoader.load(IConfigLoaderFactory.class); + + /** + * The user interface to create an IConfigLoader instance. + * It iterates all the implementations of IConfigLoaderFactory and finds the one which supports the + * specific scheme of the URI and then uses it to create an IConfigLoader instance. + * @param conf The storm configuration. + * @return A concrete IConfigLoader implementation which supports the scheme of the URI. + * If multiple implementations are available, return the first one; otherwise, return null. + */ + public static IConfigLoader createConfigLoader(Map conf) { + String uriString = (String) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI); + if (null != uriString) { + try { + URI uri = new URI(uriString); + for (IConfigLoaderFactory factory : serviceLoader) { + IConfigLoader ret = factory.createIfSupported(uri, conf); + if (ret != null) { + return ret; + } + } + } catch (URISyntaxException e) { + LOG.error("Failed to parse uri={}", uriString); + } + } else { + LOG.debug("Config {} is not set.", DaemonConfig.SCHEDULER_CONFIG_LOADER_URI); + } + return null; + } +} diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java new file mode 100644 index 00000000000..e785e5cb16f --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Scheduler configuration loader which loads configs from a file. + */ +public class FileConfigLoader implements IConfigLoader { + private static final Logger LOG = LoggerFactory.getLogger(FileConfigLoader.class); + + private Map conf; + private String targetFilePath = null; + + public FileConfigLoader(Map conf) { + this.conf = conf; + String uriString = (String) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI); + if (uriString == null) { + LOG.error("No URI defined in {} configuration.", DaemonConfig.SCHEDULER_CONFIG_LOADER_URI); + } else { + try { + targetFilePath = new URI(uriString).getPath(); + } catch (URISyntaxException e) { + LOG.error("Failed to parse uri={}", uriString); + } + } + } + + /** + * Load the configs associated with the configKey from the targetFilePath. + * @param configKey The key from which we want to get the scheduler config. + * @return The scheduler configuration if exists; null otherwise. + */ + @Override + public Map load(String configKey) { + if (targetFilePath != null) { + try { + Map raw = (Map) Utils.readYamlFile(targetFilePath); + if (raw != null) { + return (Map) raw.get(configKey); + } + } catch (Exception e) { + LOG.error("Failed to load from file {}", targetFilePath); + } + } + return null; + } +} \ No newline at end of file diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java new file mode 100644 index 00000000000..9a2ac5c9fcd --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import com.google.auto.service.AutoService; +import java.net.URI; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory class for FileConfigLoader. + */ +@AutoService(IConfigLoaderFactory.class) +public class FileConfigLoaderFactory implements IConfigLoaderFactory { + + private static final Logger LOG = LoggerFactory.getLogger(FileConfigLoaderFactory.class); + + /** + * Create a FileConfigLoader if the scheme of the URI is "file"; else return null. + * @param uri The URI which pointing to the config file location. + * @param conf The storm configuration. + * @return A FileConfigLoader if the scheme is "file"; otherwise, null. + */ + @Override + public IConfigLoader createIfSupported(URI uri, Map conf) { + String scheme = uri.getScheme(); + if ("file".equalsIgnoreCase(scheme)) { + return new FileConfigLoader(conf); + } else { + LOG.debug("scheme {} not supported in this factory.", scheme); + return null; + } + } +} diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java new file mode 100644 index 00000000000..50711337eb9 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import java.util.Map; + +public interface IConfigLoader { + + /** + * Load scheduler configs associated with the configKey. + * @param configKey The key from which we want to get the scheduler config. + * @return The scheduler configs + */ + Map load(String configKey); + +} diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java new file mode 100644 index 00000000000..a0398703e49 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import java.net.URI; +import java.util.Map; + +public interface IConfigLoaderFactory { + + /** + * Create an IConfigLoader implementation if the scheme of the URI is supported; otherwise returns null. + * + * @param uri The URI of the config location. + * @param conf The storm configuration. + * @return An concrete implementation if the scheme is supported, or null if not. + */ + IConfigLoader createIfSupported(URI uri, Map conf); + +} \ No newline at end of file diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java new file mode 100644 index 00000000000..009f479a117 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.DaemonConfig; +import org.apache.storm.utils.Time; +import org.junit.Assert; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; + +public class ArtifactoryConfigLoaderTest { + + private static final Logger LOG = LoggerFactory.getLogger(ArtifactoryConfigLoaderTest.class); + private Path tmpDirPath; + private static final String ARTIFACTORY_HTTP_SCHEME_PREFIX = "artifactory+http://"; + + private class ArtifactoryConfigLoaderMock extends ArtifactoryConfigLoader { + String getData; + HashMap getDataMap = new HashMap(); + + public ArtifactoryConfigLoaderMock(Map conf) { + super(conf); + } + + public void setData(String api, String artifact, String data) { + if (api == null) { + getData = data; + } else { + getDataMap.put(artifact, data); + } + } + + @Override + protected String doGet(String api, String artifact, String host, Integer port) { + if (api == null) { + return getData; + } + return getDataMap.get(artifact); + } + }; + + @Before + public void createTempDir() throws Exception { + tmpDirPath = Files.createTempDirectory("TestArtifactoryConfigLoader"); + File f = tmpDirPath.toFile(); + f.mkdir(); + File dir = new File(f, "nimbus"); + dir.mkdir(); + } + + @After + public void removeTempDir() throws Exception { + FileUtils.deleteDirectory(tmpDirPath.toFile()); + } + + @Test + public void testInvalidConfig() { + Config conf = new Config(); + ArtifactoryConfigLoaderMock loaderMock = new ArtifactoryConfigLoaderMock(conf); + Map ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNull("Unexpectedly returned not null", ret); + } + + @Test + public void testPointingAtDirectory() { + // This is a test where we are configured to point right at an artifact dir + Config conf = new Config(); + conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, ARTIFACTORY_HTTP_SCHEME_PREFIX + "bogushost.yahoo.com:9999/location/of/this/dir"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + ArtifactoryConfigLoaderMock loaderMock = new ArtifactoryConfigLoaderMock(conf); + + loaderMock.setData("Anything", "/location/of/this/dir", + "{\"children\" : [ { \"uri\" : \"/20160621204337.yaml\", \"folder\" : false }]}" ); + loaderMock.setData(null, null, "{ \"" + DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS + "\": {one: 1, two: 2, three: 3, four : 4}}"); + + Map ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertEquals(4, ret.get("four")); + + // Now let's load w/o setting up gets and we should still get valid map back + ArtifactoryConfigLoaderMock tc2 = new ArtifactoryConfigLoaderMock(conf); + + Map ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertEquals(4, ret2.get("four")); + } + + @Test + public void testArtifactUpdate() { + // This is a test where we are configured to point right at an artifact dir + Config conf = new Config(); + conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, ARTIFACTORY_HTTP_SCHEME_PREFIX + "bogushost.yahoo.com:9999/location/of/test/dir"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + Time.startSimulating(); + + try { + ArtifactoryConfigLoaderMock loaderMock = new ArtifactoryConfigLoaderMock(conf); + + loaderMock.setData("Anything", "/location/of/test/dir", + "{\"children\" : [ { \"uri\" : \"/20160621204337.yaml\", \"folder\" : false }]}" ); + loaderMock.setData(null, null, "{ \"" + DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS + "\": {one: 1, two: 2, three: 3}}"); + Map ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertNull("Unexpectedly contained \"four\"", ret.get("four")); + + // Now let's load w/o setting up gets and we should still get valid map back + ArtifactoryConfigLoaderMock tc2 = new ArtifactoryConfigLoaderMock(conf); + Map ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertNull("Unexpectedly did not return null", ret2.get("four")); + + // Now let's update it, but not advance time. Should get old map again. + loaderMock.setData("Anything", "/location/of/test/dir", + "{\"children\" : [ { \"uri\" : \"/20160621204999.yaml\", \"folder\" : false }]}"); + loaderMock.setData(null, null, "{ \"" + DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS + "\": {one: 1, two: 2, three: 3, four : 4}}"); + ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertNull("Unexpectedly did not return null, not enough time passed!", ret.get("four")); + + // Re-load from cached' file. + ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertNull("Unexpectedly did not return null, last cached result should not have \"four\"", ret2.get("four")); + + // Now, let's advance time. + Time.advanceTime(11*60*1000); + + ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + Assert.assertEquals(4, ret.get("four")); + + // Re-load from cached' file. + ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + Assert.assertEquals(4, ret2.get("four")); + } finally { + Time.stopSimulating(); + } + } + + @Test + public void testPointingAtSpecificArtifact() { + // This is a test where we are configured to point right at a single artifact + Config conf = new Config(); + conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, ARTIFACTORY_HTTP_SCHEME_PREFIX + "bogushost.yahoo.com:9999/location/of/this/artifact"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + ArtifactoryConfigLoaderMock loaderMock = new ArtifactoryConfigLoaderMock(conf); + + loaderMock.setData("Anything", "/location/of/this/artifact", "{ \"downloadUri\": \"anything\"}"); + loaderMock.setData(null, null, "{ \"" + DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS + "\": {one: 1, two: 2, three: 3}}"); + Map ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + + Assert.assertNotNull("Unexpectedly returned null", ret); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("two")); + Assert.assertEquals(3, ret.get("three")); + + // Now let's load w/o setting up gets and we should still get valid map back + ArtifactoryConfigLoaderMock tc2 = new ArtifactoryConfigLoaderMock(conf); + Map ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNotNull("Unexpectedly returned null", ret2); + Assert.assertEquals(1, ret2.get("one")); + Assert.assertEquals(2, ret2.get("two")); + Assert.assertEquals(3, ret2.get("three")); + } + + @Test + public void testMalformedYaml() throws Exception { + // This is a test where we are configured to point right at a single artifact + Config conf = new Config(); + conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, ARTIFACTORY_HTTP_SCHEME_PREFIX + "bogushost.yahoo.com:9999/location/of/this/artifact"); + conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString()); + + ArtifactoryConfigLoaderMock loaderMock = new ArtifactoryConfigLoaderMock(conf); + loaderMock.setData("Anything", "/location/of/this/artifact", "{ \"downloadUri\": \"anything\"}"); + loaderMock.setData(null, null, "ThisIsNotValidYaml"); + + Map ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNull("Unexpectedly returned a map", ret); + } +} \ No newline at end of file diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/utils/FileConfigLoaderTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/utils/FileConfigLoaderTest.java new file mode 100644 index 00000000000..33df1a187f3 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/scheduler/utils/FileConfigLoaderTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.utils; + +import org.apache.storm.Config; +import org.apache.storm.DaemonConfig; +import org.apache.storm.utils.Time; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileWriter; +import java.util.HashMap; +import java.util.Map; +import org.yaml.snakeyaml.Yaml; + +public class FileConfigLoaderTest { + + private static final Logger LOG = LoggerFactory.getLogger(FileConfigLoaderTest.class); + + private static final String FILE_SCHEME_PREFIX = "file://"; + + @Test + public void testFileNotThere() { + Config conf = new Config(); + conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX + "/file/not/exist/"); + FileConfigLoader testLoader = new FileConfigLoader(conf); + Map result = testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testInvalidConfig() throws Exception { + Config conf = new Config(); + FileConfigLoader testLoader = new FileConfigLoader(conf); + Map result = testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testMalformedYaml() throws Exception { + + File temp = File.createTempFile("FileLoader", ".yaml"); + temp.deleteOnExit(); + + FileWriter fw = new FileWriter(temp); + String outputData = "ThisIsNotValidYaml"; + fw.write(outputData, 0, outputData.length()); + fw.flush(); + fw.close(); + + Config conf = new Config(); + conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX + temp.getCanonicalPath()); + + FileConfigLoader testLoader = new FileConfigLoader(conf); + Map result = testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + Assert.assertNull("Unexpectedly returned a map", result); + } + + @Test + public void testValidFile() throws Exception { + + File temp = File.createTempFile("FileLoader", ".yaml"); + temp.deleteOnExit(); + + Map testMap = new HashMap(); + testMap.put("a", 1); + testMap.put("b", 2); + testMap.put("c", 3); + testMap.put("d", 4); + testMap.put("e", 5); + + Map> confMap = new HashMap<>(); + confMap.put(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS, testMap); + + Yaml yaml = new Yaml(); + FileWriter fw = new FileWriter(temp); + yaml.dump(confMap, fw); + fw.flush(); + fw.close(); + + Config conf = new Config(); + conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX + temp.getCanonicalPath()); + FileConfigLoader loader = new FileConfigLoader(conf); + + Map result = loader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS); + + Assert.assertNotNull("Unexpectedly returned null", result); + + Assert.assertEquals("Maps are a different size", testMap.keySet().size(), result.keySet().size()); + + for (String key : testMap.keySet() ) { + Integer expectedValue = testMap.get(key); + Integer returnedValue = (Integer) result.get(key); + Assert.assertEquals("Bad value for key=" + key, expectedValue, returnedValue); + } + } +} \ No newline at end of file