diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/Cluster.java b/storm-client/src/jvm/org/apache/storm/scheduler/Cluster.java index 968c68acae8..286f599713e 100644 --- a/storm-client/src/jvm/org/apache/storm/scheduler/Cluster.java +++ b/storm-client/src/jvm/org/apache/storm/scheduler/Cluster.java @@ -27,9 +27,10 @@ import org.apache.storm.Config; import org.apache.storm.networktopography.DNSToSwitchMapping; -import org.apache.storm.utils.Utils; +import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ReflectionUtils; +import org.apache.storm.utils.Utils; public class Cluster { /** @@ -541,57 +542,36 @@ public void setNetworkTopography(Map> networkTopography) { this.networkTopography = networkTopography; } - private String getStringFromStringList(Object o) { - StringBuilder sb = new StringBuilder(); - for (String s : (List) o) { - sb.append(s); - sb.append(" "); - } - return sb.toString(); - } - - /* - * Get heap memory usage for a worker's main process and logwriter process - * */ - public Double getAssignedMemoryForSlot(Map topConf) { + /** + * Get heap memory usage for a worker's main process and logwriter process. + * @param topConf - the topology config + * @return the assigned memory (in MB) + */ + public static Double getAssignedMemoryForSlot(final Map topConf) { Double totalWorkerMemory = 0.0; - final Integer TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION = 768; - - String topologyWorkerGcChildopts = null; - if (topConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS) instanceof List) { - topologyWorkerGcChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS)); - } else { - topologyWorkerGcChildopts = ObjectReader.getString(topConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS), null); - } - - String workerGcChildopts = null; - if (topConf.get(Config.WORKER_GC_CHILDOPTS) instanceof List) { - workerGcChildopts = getStringFromStringList(topConf.get(Config.WORKER_GC_CHILDOPTS)); - } else { - workerGcChildopts = ObjectReader.getString(topConf.get(Config.WORKER_GC_CHILDOPTS), null); - } + final Integer topologyWorkerDefaultMemoryAllocation = 768; + List topologyWorkerGcChildopts = ConfigUtils.getValueAsList( + Config.TOPOLOGY_WORKER_GC_CHILDOPTS, topConf); + List workerGcChildopts = ConfigUtils.getValueAsList( + Config.WORKER_GC_CHILDOPTS, topConf); Double memGcChildopts = null; - memGcChildopts = Utils.parseJvmHeapMemByChildOpts(topologyWorkerGcChildopts, null); + memGcChildopts = Utils.parseJvmHeapMemByChildOpts( + topologyWorkerGcChildopts, null); if (memGcChildopts == null) { - memGcChildopts = Utils.parseJvmHeapMemByChildOpts(workerGcChildopts, null); + memGcChildopts = Utils.parseJvmHeapMemByChildOpts( + workerGcChildopts, null); } - String topologyWorkerChildopts = null; - if (topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS) instanceof List) { - topologyWorkerChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS)); - } else { - topologyWorkerChildopts = ObjectReader.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null); - } - Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(topologyWorkerChildopts, null); + List topologyWorkerChildopts = ConfigUtils.getValueAsList( + Config.TOPOLOGY_WORKER_CHILDOPTS, topConf); + Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts( + topologyWorkerChildopts, null); - String workerChildopts = null; - if (topConf.get(Config.WORKER_CHILDOPTS) instanceof List) { - workerChildopts = getStringFromStringList(topConf.get(Config.WORKER_CHILDOPTS)); - } else { - workerChildopts = ObjectReader.getString(topConf.get(Config.WORKER_CHILDOPTS), null); - } - Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(workerChildopts, null); + List workerChildopts = ConfigUtils.getValueAsList( + Config.WORKER_CHILDOPTS, topConf); + Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts( + workerChildopts, null); if (memGcChildopts != null) { totalWorkerMemory += memGcChildopts; @@ -600,17 +580,17 @@ public Double getAssignedMemoryForSlot(Map topConf) { } else if (memWorkerChildopts != null) { totalWorkerMemory += memWorkerChildopts; } else { - totalWorkerMemory += ObjectReader.getInt(topConf.get(Config.WORKER_HEAP_MEMORY_MB), TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION); + Object workerHeapMemoryMb = topConf.get( + Config.WORKER_HEAP_MEMORY_MB); + totalWorkerMemory += ObjectReader.getInt( + workerHeapMemoryMb, topologyWorkerDefaultMemoryAllocation); } - String topoWorkerLwChildopts = null; - if (topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS) instanceof List) { - topoWorkerLwChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)); - } else { - topoWorkerLwChildopts = ObjectReader.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null); - } + List topoWorkerLwChildopts = ConfigUtils.getValueAsList( + Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, topConf); if (topoWorkerLwChildopts != null) { - totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(topoWorkerLwChildopts, 0.0); + totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts( + topoWorkerLwChildopts, 0.0); } return totalWorkerMemory; } diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java b/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java index 9d6331195c0..c2ddc15cf8a 100644 --- a/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java +++ b/storm-client/src/jvm/org/apache/storm/scheduler/TopologyDetails.java @@ -91,7 +91,7 @@ public String getName() { return (String) this.topologyConf.get(Config.TOPOLOGY_NAME); } - public Map getConf() { + public Map getConf() { return this.topologyConf; } diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java index 911d01596e9..6ea79dd5ad4 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -18,7 +18,6 @@ package org.apache.storm.utils; -import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.daemon.supervisor.AdvancedFSOps; @@ -28,10 +27,12 @@ import java.io.File; import java.io.IOException; import java.net.URLEncoder; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Collection; import java.util.HashSet; import java.util.concurrent.Callable; import java.util.stream.Collectors; @@ -343,6 +344,33 @@ public static File getWorkerDirFromRoot(String logRoot, String id, Integer port) return new File((logRoot + FILE_SEPARATOR + id + FILE_SEPARATOR + port)); } + /** + * Get the given config value as a List <String>, if possible. + * @param name - the config key + * @param conf - the config map + * @return - the config value converted to a List <String> if found, otherwise null. + * @throws IllegalArgumentException if conf is null + * @throws NullPointerException if name is null and the conf map doesn't support null keys + */ + public static List getValueAsList(String name, Map conf) { + if (null == conf) { + throw new IllegalArgumentException("Conf is required"); + } + Object value = conf.get(name); + List listValue; + if (value == null) { + listValue = null; + } else if (value instanceof Collection) { + listValue = ((Collection) value) + .stream() + .map(ObjectReader::getString) + .collect(Collectors.toList()); + } else { + listValue = Arrays.asList(ObjectReader.getString(value).split("\\s+")); + } + return listValue; + } + public StormTopology readSupervisorTopologyImpl(Map conf, String stormId, AdvancedFSOps ops) throws IOException { String stormRoot = supervisorStormDistRoot(conf, stormId); String topologyPath = supervisorStormCodePath(stormRoot); diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index d9b6684eec9..28f7e1060fa 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -903,32 +903,36 @@ public static HashMap reverseMap(List listSeq) { * @param defaultValue * @return the value of the JVM heap memory setting (in MB) in a java command. */ - public static Double parseJvmHeapMemByChildOpts(String input, Double defaultValue) { - if (input != null) { - Pattern optsPattern = Pattern.compile("Xmx[0-9]+[mkgMKG]"); - Matcher m = optsPattern.matcher(input); - String memoryOpts = null; - while (m.find()) { - memoryOpts = m.group(); - } - if (memoryOpts != null) { - int unit = 1; - memoryOpts = memoryOpts.toLowerCase(); - - if (memoryOpts.endsWith("k")) { - unit = 1024; - } else if (memoryOpts.endsWith("m")) { - unit = 1024 * 1024; - } else if (memoryOpts.endsWith("g")) { - unit = 1024 * 1024 * 1024; + public static Double parseJvmHeapMemByChildOpts(List options, Double defaultValue) { + if (options != null) { + Pattern optsPattern = Pattern.compile("Xmx([0-9]+)([mkgMKG])"); + for (String option : options) { + if (option == null) { + continue; + } + Matcher m = optsPattern.matcher(option); + while (m.find()) { + int value = Integer.parseInt(m.group(1)); + char unitChar = m.group(2).toLowerCase().charAt(0); + int unit; + switch (unitChar) { + case 'k': + unit = 1024; + break; + case 'm': + unit = 1024 * 1024; + break; + case 'g': + unit = 1024 * 1024 * 1024; + break; + default: + unit = 1; + } + Double result = value * unit / 1024.0 / 1024.0; + return (result < 1.0) ? 1.0 : result; } - - memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", ""); - Double result = Double.parseDouble(memoryOpts) * unit / 1024.0 / 1024.0; - return (result < 1.0) ? 1.0 : result; - } else { - return defaultValue; } + return defaultValue; } else { return defaultValue; } diff --git a/storm-client/test/jvm/org/apache/storm/scheduler/ClusterTest.java b/storm-client/test/jvm/org/apache/storm/scheduler/ClusterTest.java new file mode 100644 index 00000000000..9c01a2a3ba0 --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/scheduler/ClusterTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.storm.Config; +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit tests for {@link Cluster}. + */ +public class ClusterTest { + + /** This should match the value in Cluster.getAssignedMemoryForSlot. */ + final Double TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION = 768.0; + + private Map getConfig(String key, Object value) { + Map topConf = getEmptyConfig(); + topConf.put(key, value); + return topConf; + } + + private Map getEmptyConfig() { + Map topConf = new HashMap<>(); + return topConf; + } + + private Map getPopulatedConfig() { + Map topConf = new HashMap<>(); + topConf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-Xmx128m"); + topConf.put(Config.WORKER_GC_CHILDOPTS, "-Xmx256m"); + topConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx512m"); + topConf.put(Config.WORKER_CHILDOPTS, "-Xmx768m"); + topConf.put(Config.WORKER_HEAP_MEMORY_MB, 1024); + topConf.put(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m"); + return topConf; + } + + /** + * Test Cluster.getAssignedMemoryForSlot with a single config value set. + * @param key - the config key to set + * @param value - the config value to set + * @param expectedValue - the expected result + */ + private void singleValueTest(String key, String value, double expectedValue) { + Map topConf = getConfig(key, value); + Assert.assertEquals(expectedValue, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0); + } + + @Test + public void getAssignedMemoryForSlot_allNull() { + Map topConf = getEmptyConfig(); + Assert.assertEquals(TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION, Cluster.getAssignedMemoryForSlot(topConf)); + } + + @Test + public void getAssignedMemoryForSlot_topologyWorkerGcChildopts() { + singleValueTest(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-Xmx128m", 128.0); + } + + @Test + public void getAssignedMemoryForSlot_workerGcChildopts() { + singleValueTest(Config.WORKER_GC_CHILDOPTS, "-Xmx256m", 256.0); + } + + @Test + public void getAssignedMemoryForSlot_topologyWorkerChildopts() { + singleValueTest(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx512m", 512.0); + } + + @Test + public void getAssignedMemoryForSlot_workerChildopts() { + singleValueTest(Config.WORKER_CHILDOPTS, "-Xmx768m", 768.0); + } + + @Test + public void getAssignedMemoryForSlot_workerHeapMemoryMb() { + Map topConf = getConfig(Config.WORKER_HEAP_MEMORY_MB, 1024); + Assert.assertEquals(1024.0, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0); + } + + @Test + public void getAssignedMemoryForSlot_topologyWorkerLwChildopts() { + singleValueTest(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m", + TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION + 64.0); + } + + @Test + public void getAssignedMemoryForSlot_all() { + Map topConf = getPopulatedConfig(); + Assert.assertEquals(128.0 + 64.0, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0); + } +} diff --git a/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java b/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java new file mode 100644 index 00000000000..ba9eec94506 --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.utils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import org.apache.storm.Config; +import org.junit.Assert; +import org.junit.Test; + +public class ConfigUtilsTest { + + private Map mockMap(String key, Object value) { + Map map = new HashMap(); + map.put(key, value); + return map; + } + + @Test + public void getValueAsList_nullKeySupported() { + String key = null; + List value = Arrays.asList("test"); + Map map = mockMap(key, value); + Assert.assertEquals(value, ConfigUtils.getValueAsList(key, map)); + } + + @Test(expected=NullPointerException.class) + public void getValueAsList_nullKeyNotSupported() { + String key = null; + Map map = new Hashtable<>(); + ConfigUtils.getValueAsList(key, map); + } + + @Test(expected=IllegalArgumentException.class) + public void getValueAsList_nullConfig() { + ConfigUtils.getValueAsList(Config.WORKER_CHILDOPTS, null); + } + + @Test + public void getValueAsList_nullValue() { + String key = Config.WORKER_CHILDOPTS; + Map map = mockMap(key, null); + Assert.assertNull(ConfigUtils.getValueAsList(key, map)); + } + + @Test + public void getValueAsList_nonStringValue() { + String key = Config.WORKER_CHILDOPTS; + List expectedValue = Arrays.asList("1"); + Map map = mockMap(key, 1); + Assert.assertEquals(expectedValue, ConfigUtils.getValueAsList(key, map)); + } + + @Test + public void getValueAsList_spaceSeparatedString() { + String key = Config.WORKER_CHILDOPTS; + String value = "-Xms1024m -Xmx1024m"; + List expectedValue = Arrays.asList("-Xms1024m", "-Xmx1024m"); + Map map = mockMap(key, value); + Assert.assertEquals(expectedValue, ConfigUtils.getValueAsList(key, map)); + } + + @Test + public void getValueAsList_stringList() { + String key = Config.WORKER_CHILDOPTS; + List values = Arrays.asList("-Xms1024m", "-Xmx1024m"); + Map map = mockMap(key, values); + Assert.assertEquals(values, ConfigUtils.getValueAsList(key, map)); + } + + @Test + public void getValueAsList_nonStringList() { + String key = Config.WORKER_CHILDOPTS; + List values = Arrays.asList(1, 2); + List expectedValue = Arrays.asList("1", "2"); + Map map = mockMap(key, values); + Assert.assertEquals(expectedValue, ConfigUtils.getValueAsList(key, map)); + } +} diff --git a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java index 8024820005d..496b7cfd9dc 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java @@ -23,7 +23,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; public class UtilsTest { @@ -64,28 +66,61 @@ private Map serverMockMap(String value) { private Map emptyMockMap() { return new HashMap(); } - - @Test - public void parseJvmHeapMemByChildOptsTest() { - Assert.assertEquals( - "1024K results in 1 MB", - Utils.parseJvmHeapMemByChildOpts("Xmx1024K", 0.0).doubleValue(), 1.0, 0); - - Assert.assertEquals( - "100M results in 100 MB", - Utils.parseJvmHeapMemByChildOpts("Xmx100M", 0.0).doubleValue(), 100.0, 0); - - Assert.assertEquals( - "1G results in 1024 MB", - Utils.parseJvmHeapMemByChildOpts("Xmx1G", 0.0).doubleValue(), 1024.0, 0); - - Assert.assertEquals( - "Unmatched value results in default", - Utils.parseJvmHeapMemByChildOpts("Xmx1T", 123.0).doubleValue(), 123.0, 0); - + + private void doParseJvmHeapMemByChildOptsTest(String message, String opt, double expected) { + doParseJvmHeapMemByChildOptsTest(message, Arrays.asList(opt), expected); + } + + private void doParseJvmHeapMemByChildOptsTest(String message, List opts, double expected) { Assert.assertEquals( - "Null value results in default", - Utils.parseJvmHeapMemByChildOpts(null, 123.0).doubleValue(), 123.0, 0); + message, + Utils.parseJvmHeapMemByChildOpts(opts, 123.0).doubleValue(), expected, 0); + } + + @Test + public void parseJvmHeapMemByChildOptsTestK() { + doParseJvmHeapMemByChildOptsTest("Xmx1024k results in 1 MB", "Xmx1024k", 1.0); + doParseJvmHeapMemByChildOptsTest("Xmx1024K results in 1 MB", "Xmx1024K", 1.0); + doParseJvmHeapMemByChildOptsTest("-Xmx1024k results in 1 MB", "-Xmx1024k", 1.0); + } + + @Test + public void parseJvmHeapMemByChildOptsTestM() { + doParseJvmHeapMemByChildOptsTest("Xmx100M results in 100 MB", "Xmx100m", 100.0); + doParseJvmHeapMemByChildOptsTest("Xmx100m results in 100 MB", "Xmx100M", 100.0); + doParseJvmHeapMemByChildOptsTest("-Xmx100M results in 100 MB", "-Xmx100m", 100.0); + } + + @Test + public void parseJvmHeapMemByChildOptsTestG() { + doParseJvmHeapMemByChildOptsTest("Xmx1g results in 1024 MB", "Xmx1g", 1024.0); + doParseJvmHeapMemByChildOptsTest("Xmx1G results in 1024 MB", "Xmx1G", 1024.0); + doParseJvmHeapMemByChildOptsTest("-Xmx1g results in 1024 MB", "-Xmx1g", 1024.0); + } + + @Test + public void parseJvmHeapMemByChildOptsTestNoMatch() { + doParseJvmHeapMemByChildOptsTest("Unmatched unit results in default", "Xmx1t", 123.0); + doParseJvmHeapMemByChildOptsTest("Unmatched option results in default", "Xms1g", 123.0); + } + + @Test + public void parseJvmHeapMemByChildOptsTestNulls() { + doParseJvmHeapMemByChildOptsTest("Null value results in default", (String) null, 123.0); + doParseJvmHeapMemByChildOptsTest("Null list results in default", (List) null, 123.0); + } + + @Test + public void parseJvmHeapMemByChildOptsTestExtraChars() { + doParseJvmHeapMemByChildOptsTest("Leading characters are ignored", "---Xmx1g", 1024.0); + doParseJvmHeapMemByChildOptsTest("Trailing characters are ignored", "Xmx1ggggg", 1024.0); + } + + @Test + public void parseJvmHeapMemByChildOptsTestFirstMatch() { + doParseJvmHeapMemByChildOptsTest("First valid match is used", + Arrays.asList(null, "Xmx1t", "Xmx1g", "Xms1024k Xmx1024k", "Xmx100m"), + 1024.0); } public void getConfiguredClientThrowsRuntimeExceptionOnBadArgsTest () throws TTransportException { diff --git a/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java b/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java index 6074291045e..6b0ac46a644 100644 --- a/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java +++ b/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java @@ -17,14 +17,39 @@ */ package org.apache.storm.command; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.Utils; -public class ConfigValue { - public static void main(String [] args) { +/** + * Read a value from the topology config map. + */ +public final class ConfigValue { + + /** + * Utility classes should not have a public constructor. + */ + private ConfigValue() { + } + + /** + * Read the topology config and return the value for the given key. + * @param args - an array of length 1 containing the key to fetch. + */ + public static void main(final String[] args) { String name = args[0]; Map conf = Utils.readStormConfig(); - System.out.println("VALUE: " + conf.get(name)); + Object value = conf.get(name); + if (value instanceof List) { + List stringValues = ((List) value) + .stream() + .map(ObjectReader::getString) + .collect(Collectors.toList()); + value = String.join(" ", stringValues); + } + System.out.println("VALUE: " + value); } } diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index 7bc90b6ebe3..0311ba0966a 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -142,7 +142,7 @@ public class DaemonConfig implements Validated { * This parameter is used by the storm-deploy project to configure the * jvm options for the nimbus daemon. */ - @isString + @isStringOrStringList public static final String NIMBUS_CHILDOPTS = "nimbus.childopts"; @@ -279,7 +279,7 @@ public class DaemonConfig implements Validated { /** * Childopts for log viewer java process. */ - @isString + @isStringOrStringList public static final String LOGVIEWER_CHILDOPTS = "logviewer.childopts"; /** @@ -389,7 +389,7 @@ public class DaemonConfig implements Validated { /** * Childopts for Storm UI Java process. */ - @isString + @isStringOrStringList public static final String UI_CHILDOPTS = "ui.childopts"; /** @@ -484,7 +484,7 @@ public class DaemonConfig implements Validated { * This parameter is used by the storm-deploy project to configure the * jvm options for the pacemaker daemon. */ - @isString + @isStringOrStringList public static final String PACEMAKER_CHILDOPTS = "pacemaker.childopts"; @@ -573,7 +573,7 @@ public class DaemonConfig implements Validated { /** * Childopts for Storm DRPC Java process. */ - @isString + @isStringOrStringList public static final String DRPC_CHILDOPTS = "drpc.childopts"; /** @@ -674,7 +674,7 @@ public class DaemonConfig implements Validated { * This parameter is used by the storm-deploy project to configure the * jvm options for the supervisor daemon. */ - @isString + @isStringOrStringList public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts"; /** diff --git a/storm-server/src/test/java/org/apache/storm/DaemonConfigTest.java b/storm-server/src/test/java/org/apache/storm/DaemonConfigTest.java new file mode 100644 index 00000000000..2900111c46d --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/DaemonConfigTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm; + +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +import org.apache.storm.validation.ConfigValidation; +import org.junit.Assert; +import org.junit.Test; + +public class DaemonConfigTest { + + private void stringOrStringListTest(String key) { + Map conf = new HashMap(); + Collection passCases = new LinkedList(); + Collection failCases = new LinkedList(); + + passCases.add(null); + passCases.add("some string"); + String[] stuff = {"some", "string", "list"}; + passCases.add(Arrays.asList(stuff)); + + failCases.add(42); + Integer[] wrongStuff = {1, 2, 3}; + failCases.add(Arrays.asList(wrongStuff)); + + //worker.childopts validates + for (Object value : passCases) { + conf.put(key, value); + ConfigValidation.validateFields(conf); + } + + for (Object value : failCases) { + try { + conf.put(key, value); + ConfigValidation.validateFields(conf); + Assert.fail("Expected Exception not Thrown for value: " + value); + } catch (IllegalArgumentException Ex) { + } + } + } + + @Test + public void testNimbusChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + stringOrStringListTest(DaemonConfig.NIMBUS_CHILDOPTS); + } + + @Test + public void testLogviewerChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + stringOrStringListTest(DaemonConfig.LOGVIEWER_CHILDOPTS); + } + + @Test + public void testUiChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + stringOrStringListTest(DaemonConfig.UI_CHILDOPTS); + } + + @Test + public void testPacemakerChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + stringOrStringListTest(DaemonConfig.PACEMAKER_CHILDOPTS); + } + + @Test + public void testDrpcChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + stringOrStringListTest(DaemonConfig.DRPC_CHILDOPTS); + } + + @Test + public void testSupervisorChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + stringOrStringListTest(DaemonConfig.SUPERVISOR_CHILDOPTS); + } +} diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java index 587831be677..e81f7bfa88f 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java @@ -104,7 +104,7 @@ public void testRASNodeSlotAssign() { Assert.assertEquals(0, node.totalSlotsUsed()); Assert.assertEquals(4, node.totalSlots()); - TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0); + TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap<>(), 1, 0, 2, 0, 0, 0); List executors11 = new ArrayList<>(); executors11.add(new ExecutorDetails(1, 1)); @@ -124,7 +124,7 @@ public void testRASNodeSlotAssign() { Assert.assertEquals(2, node.totalSlotsUsed()); Assert.assertEquals(4, node.totalSlots()); - TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0); + TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap<>(), 1, 0, 2, 0, 0, 0); List executors21 = new ArrayList<>(); executors21.add(new ExecutorDetails(1, 1));