Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions storm-core/src/clj/org/apache/storm/command/config_value.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@

(defn -main [^String name]
(let [conf (read-storm-config)]
(println "VALUE:" (conf name))
))
(if (coll? (conf name))
(println "VALUE:" (clojure.string/join " " (conf name)))
(println "VALUE:" (conf name))
)))
12 changes: 6 additions & 6 deletions storm-core/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public class Config extends HashMap<String, Object> {
* This parameter is used by the storm-deploy project to configure the
* jvm options for the nimbus daemon.
*/
@isString
@isStringOrStringList
public static final String NIMBUS_CHILDOPTS = "nimbus.childopts";


Expand Down Expand Up @@ -740,7 +740,7 @@ public class Config extends HashMap<String, Object> {
/**
* Childopts for log viewer java process.
*/
@isString
@isStringOrStringList
public static final String LOGVIEWER_CHILDOPTS = "logviewer.childopts";

/**
Expand Down Expand Up @@ -850,7 +850,7 @@ public class Config extends HashMap<String, Object> {
/**
* Childopts for Storm UI Java process.
*/
@isString
@isStringOrStringList
public static final String UI_CHILDOPTS = "ui.childopts";

/**
Expand Down Expand Up @@ -973,7 +973,7 @@ public class Config extends HashMap<String, Object> {
* This parameter is used by the storm-deploy project to configure the
* jvm options for the pacemaker daemon.
*/
@isString
@isStringOrStringList
public static final String PACEMAKER_CHILDOPTS = "pacemaker.childopts";

/**
Expand Down Expand Up @@ -1160,7 +1160,7 @@ public class Config extends HashMap<String, Object> {
/**
* Childopts for Storm DRPC Java process.
*/
@isString
@isStringOrStringList
public static final String DRPC_CHILDOPTS = "drpc.childopts";

/**
Expand Down Expand Up @@ -1345,7 +1345,7 @@ public class Config extends HashMap<String, Object> {
* This parameter is used by the storm-deploy project to configure the
* jvm options for the supervisor daemon.
*/
@isString
@isStringOrStringList
public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";

/**
Expand Down
85 changes: 33 additions & 52 deletions storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.apache.storm.Config;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;

public class Cluster {
Expand Down Expand Up @@ -539,57 +541,36 @@ public void setNetworkTopography(Map<String, List<String>> networkTopography) {
this.networkTopography = networkTopography;
}

private String getStringFromStringList(Object o) {
StringBuilder sb = new StringBuilder();
for (String s : (List<String>) o) {
sb.append(s);
sb.append(" ");
}
return sb.toString();
}

/*
* Get heap memory usage for a worker's main process and logwriter process
* */
public Double getAssignedMemoryForSlot(Map topConf) {
/**
* Get heap memory usage for a worker's main process and logwriter process.
* @param topConf - the topology config
* @return the assigned memory (in MB)
*/
public static Double getAssignedMemoryForSlot(final Map<String, Object> topConf) {
Double totalWorkerMemory = 0.0;
final Integer TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION = 768;

String topologyWorkerGcChildopts = null;
if (topConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS) instanceof List) {
topologyWorkerGcChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS));
} else {
topologyWorkerGcChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS), null);
}

String workerGcChildopts = null;
if (topConf.get(Config.WORKER_GC_CHILDOPTS) instanceof List) {
workerGcChildopts = getStringFromStringList(topConf.get(Config.WORKER_GC_CHILDOPTS));
} else {
workerGcChildopts = Utils.getString(topConf.get(Config.WORKER_GC_CHILDOPTS), null);
}
final Integer topologyWorkerDefaultMemoryAllocation = 768;

List<String> topologyWorkerGcChildopts = ConfigUtils.getValueAsList(
Config.TOPOLOGY_WORKER_GC_CHILDOPTS, topConf);
List<String> workerGcChildopts = ConfigUtils.getValueAsList(
Config.WORKER_GC_CHILDOPTS, topConf);
Double memGcChildopts = null;
memGcChildopts = Utils.parseJvmHeapMemByChildOpts(topologyWorkerGcChildopts, null);
memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
topologyWorkerGcChildopts, null);
if (memGcChildopts == null) {
memGcChildopts = Utils.parseJvmHeapMemByChildOpts(workerGcChildopts, null);
memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
workerGcChildopts, null);
}

String topologyWorkerChildopts = null;
if (topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS) instanceof List) {
topologyWorkerChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS));
} else {
topologyWorkerChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null);
}
Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(topologyWorkerChildopts, null);
List<String> topologyWorkerChildopts = ConfigUtils.getValueAsList(
Config.TOPOLOGY_WORKER_CHILDOPTS, topConf);
Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
topologyWorkerChildopts, null);

String workerChildopts = null;
if (topConf.get(Config.WORKER_CHILDOPTS) instanceof List) {
workerChildopts = getStringFromStringList(topConf.get(Config.WORKER_CHILDOPTS));
} else {
workerChildopts = Utils.getString(topConf.get(Config.WORKER_CHILDOPTS), null);
}
Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(workerChildopts, null);
List<String> workerChildopts = ConfigUtils.getValueAsList(
Config.WORKER_CHILDOPTS, topConf);
Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
workerChildopts, null);

if (memGcChildopts != null) {
totalWorkerMemory += memGcChildopts;
Expand All @@ -598,17 +579,17 @@ public Double getAssignedMemoryForSlot(Map topConf) {
} else if (memWorkerChildopts != null) {
totalWorkerMemory += memWorkerChildopts;
} else {
totalWorkerMemory += Utils.getInt(topConf.get(Config.WORKER_HEAP_MEMORY_MB), TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION);
Object workerHeapMemoryMb = topConf.get(
Config.WORKER_HEAP_MEMORY_MB);
totalWorkerMemory += ObjectReader.getInt(
workerHeapMemoryMb, topologyWorkerDefaultMemoryAllocation);
}

String topoWorkerLwChildopts = null;
if (topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS) instanceof List) {
topoWorkerLwChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS));
} else {
topoWorkerLwChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null);
}
List<String> topoWorkerLwChildopts = ConfigUtils.getValueAsList(
Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, topConf);
if (topoWorkerLwChildopts != null) {
totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(topoWorkerLwChildopts, 0.0);
totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(
topoWorkerLwChildopts, 0.0);
}
return totalWorkerMemory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public String getName() {
return (String) this.topologyConf.get(Config.TOPOLOGY_NAME);
}

public Map getConf() {
public Map<String, Object> getConf() {
return this.topologyConf;
}

Expand Down
31 changes: 29 additions & 2 deletions storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -533,4 +533,31 @@ public static List<String> getTopoLogsGroups(Map topologyConf) {
Collections.sort(ret);
return ret;
}

/**
* Get the given config value as a List &lt;String&gt;, if possible.
* @param name - the config key
* @param conf - the config map
* @return - the config value converted to a List &lt;String&gt; if found, otherwise null.
* @throws IllegalArgumentException if conf is null
* @throws NullPointerException if name is null and the conf map doesn't support null keys
*/
public static List<String> getValueAsList(String name, Map<String, Object> conf) {
if (null == conf) {
throw new IllegalArgumentException("Conf is required");
}
Object value = conf.get(name);
List<String> listValue;
if (value == null) {
listValue = null;
} else if (value instanceof Collection) {
listValue = new ArrayList<>(((Collection<?>)value).size());
for (Object o : (Collection<?>)value) {
listValue.add(ObjectReader.getString(o));
}
} else {
listValue = Arrays.asList(ObjectReader.getString(value).split("\\s+"));
}
return listValue;
}
}
58 changes: 58 additions & 0 deletions storm-core/src/jvm/org/apache/storm/utils/ObjectReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.utils;

public class ObjectReader {

public static String getString(Object o) {
if (null == o) {
throw new IllegalArgumentException("Don't know how to convert null to String");
}
return o.toString();
}

public static Integer getInt(Object o) {
Integer result = getInt(o, null);
if (null == result) {
throw new IllegalArgumentException("Don't know how to convert null to int");
}
return result;
}

public static Integer getInt(Object o, Integer defaultValue) {
if (null == o) {
return defaultValue;
}

if (o instanceof Integer ||
o instanceof Short ||
o instanceof Byte) {
return ((Number) o).intValue();
} else if (o instanceof Long) {
final long l = (Long) o;
if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
return (int) l;
}
} else if (o instanceof String) {
return Integer.parseInt((String) o);
}

throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
}
52 changes: 28 additions & 24 deletions storm-core/src/jvm/org/apache/storm/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1437,32 +1437,36 @@ public static double zeroIfNaNOrInf(double x) {
* @param defaultValue
* @return the value of the JVM heap memory setting (in MB) in a java command.
*/
public static Double parseJvmHeapMemByChildOpts(String input, Double defaultValue) {
if (input != null) {
Pattern optsPattern = Pattern.compile("Xmx[0-9]+[mkgMKG]");
Matcher m = optsPattern.matcher(input);
String memoryOpts = null;
while (m.find()) {
memoryOpts = m.group();
}
if (memoryOpts != null) {
int unit = 1;
memoryOpts = memoryOpts.toLowerCase();

if (memoryOpts.endsWith("k")) {
unit = 1024;
} else if (memoryOpts.endsWith("m")) {
unit = 1024 * 1024;
} else if (memoryOpts.endsWith("g")) {
unit = 1024 * 1024 * 1024;
public static Double parseJvmHeapMemByChildOpts(List<String> options, Double defaultValue) {
if (options != null) {
Pattern optsPattern = Pattern.compile("Xmx([0-9]+)([mkgMKG])");
for (String option : options) {
if (option == null) {
continue;
}
Matcher m = optsPattern.matcher(option);
while (m.find()) {
int value = Integer.parseInt(m.group(1));
char unitChar = m.group(2).toLowerCase().charAt(0);
int unit;
switch (unitChar) {
case 'k':
unit = 1024;
break;
case 'm':
unit = 1024 * 1024;
break;
case 'g':
unit = 1024 * 1024 * 1024;
break;
default:
unit = 1;
}
Double result = value * unit / 1024.0 / 1024.0;
return (result < 1.0) ? 1.0 : result;
}

memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", "");
Double result = Double.parseDouble(memoryOpts) * unit / 1024.0 / 1024.0;
return (result < 1.0) ? 1.0 : result;
} else {
return defaultValue;
}
return defaultValue;
} else {
return defaultValue;
}
Expand Down
Loading