Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 33 additions & 53 deletions storm-client/src/jvm/org/apache/storm/scheduler/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@

import org.apache.storm.Config;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.Utils;

public class Cluster {
/**
Expand Down Expand Up @@ -541,57 +542,36 @@ public void setNetworkTopography(Map<String, List<String>> networkTopography) {
this.networkTopography = networkTopography;
}

private String getStringFromStringList(Object o) {
StringBuilder sb = new StringBuilder();
for (String s : (List<String>) o) {
sb.append(s);
sb.append(" ");
}
return sb.toString();
}

/*
* Get heap memory usage for a worker's main process and logwriter process
* */
public Double getAssignedMemoryForSlot(Map topConf) {
/**
* Get heap memory usage for a worker's main process and logwriter process.
* @param topConf - the topology config
* @return the assigned memory (in MB)
*/
public static Double getAssignedMemoryForSlot(final Map<String, Object> topConf) {
Double totalWorkerMemory = 0.0;
final Integer TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION = 768;

String topologyWorkerGcChildopts = null;
if (topConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS) instanceof List) {
topologyWorkerGcChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS));
} else {
topologyWorkerGcChildopts = ObjectReader.getString(topConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS), null);
}

String workerGcChildopts = null;
if (topConf.get(Config.WORKER_GC_CHILDOPTS) instanceof List) {
workerGcChildopts = getStringFromStringList(topConf.get(Config.WORKER_GC_CHILDOPTS));
} else {
workerGcChildopts = ObjectReader.getString(topConf.get(Config.WORKER_GC_CHILDOPTS), null);
}
final Integer topologyWorkerDefaultMemoryAllocation = 768;

List<String> topologyWorkerGcChildopts = ConfigUtils.getValueAsList(
Config.TOPOLOGY_WORKER_GC_CHILDOPTS, topConf);
List<String> workerGcChildopts = ConfigUtils.getValueAsList(
Config.WORKER_GC_CHILDOPTS, topConf);
Double memGcChildopts = null;
memGcChildopts = Utils.parseJvmHeapMemByChildOpts(topologyWorkerGcChildopts, null);
memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
topologyWorkerGcChildopts, null);
if (memGcChildopts == null) {
memGcChildopts = Utils.parseJvmHeapMemByChildOpts(workerGcChildopts, null);
memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
workerGcChildopts, null);
}

String topologyWorkerChildopts = null;
if (topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS) instanceof List) {
topologyWorkerChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS));
} else {
topologyWorkerChildopts = ObjectReader.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null);
}
Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(topologyWorkerChildopts, null);
List<String> topologyWorkerChildopts = ConfigUtils.getValueAsList(
Config.TOPOLOGY_WORKER_CHILDOPTS, topConf);
Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
topologyWorkerChildopts, null);

String workerChildopts = null;
if (topConf.get(Config.WORKER_CHILDOPTS) instanceof List) {
workerChildopts = getStringFromStringList(topConf.get(Config.WORKER_CHILDOPTS));
} else {
workerChildopts = ObjectReader.getString(topConf.get(Config.WORKER_CHILDOPTS), null);
}
Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(workerChildopts, null);
List<String> workerChildopts = ConfigUtils.getValueAsList(
Config.WORKER_CHILDOPTS, topConf);
Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
workerChildopts, null);

if (memGcChildopts != null) {
totalWorkerMemory += memGcChildopts;
Expand All @@ -600,17 +580,17 @@ public Double getAssignedMemoryForSlot(Map topConf) {
} else if (memWorkerChildopts != null) {
totalWorkerMemory += memWorkerChildopts;
} else {
totalWorkerMemory += ObjectReader.getInt(topConf.get(Config.WORKER_HEAP_MEMORY_MB), TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION);
Object workerHeapMemoryMb = topConf.get(
Config.WORKER_HEAP_MEMORY_MB);
totalWorkerMemory += ObjectReader.getInt(
workerHeapMemoryMb, topologyWorkerDefaultMemoryAllocation);
}

String topoWorkerLwChildopts = null;
if (topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS) instanceof List) {
topoWorkerLwChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS));
} else {
topoWorkerLwChildopts = ObjectReader.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null);
}
List<String> topoWorkerLwChildopts = ConfigUtils.getValueAsList(
Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, topConf);
if (topoWorkerLwChildopts != null) {
totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(topoWorkerLwChildopts, 0.0);
totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(
topoWorkerLwChildopts, 0.0);
}
return totalWorkerMemory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public String getName() {
return (String) this.topologyConf.get(Config.TOPOLOGY_NAME);
}

public Map getConf() {
public Map<String, Object> getConf() {
return this.topologyConf;
}

Expand Down
32 changes: 30 additions & 2 deletions storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.storm.utils;

import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
Expand All @@ -28,10 +27,12 @@
import java.io.File;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -343,6 +344,33 @@ public static File getWorkerDirFromRoot(String logRoot, String id, Integer port)
return new File((logRoot + FILE_SEPARATOR + id + FILE_SEPARATOR + port));
}

/**
* Get the given config value as a List &lt;String&gt;, if possible.
* @param name - the config key
* @param conf - the config map
* @return - the config value converted to a List &lt;String&gt; if found, otherwise null.
* @throws IllegalArgumentException if conf is null
* @throws NullPointerException if name is null and the conf map doesn't support null keys
*/
public static List<String> getValueAsList(String name, Map<String, Object> conf) {
if (null == conf) {
throw new IllegalArgumentException("Conf is required");
}
Object value = conf.get(name);
List<String> listValue;
if (value == null) {
listValue = null;
} else if (value instanceof Collection) {
listValue = ((Collection<?>) value)
.stream()
.map(ObjectReader::getString)
.collect(Collectors.toList());
} else {
listValue = Arrays.asList(ObjectReader.getString(value).split("\\s+"));
}
return listValue;
}

public StormTopology readSupervisorTopologyImpl(Map<String, Object> conf, String stormId, AdvancedFSOps ops) throws IOException {
String stormRoot = supervisorStormDistRoot(conf, stormId);
String topologyPath = supervisorStormCodePath(stormRoot);
Expand Down
52 changes: 28 additions & 24 deletions storm-client/src/jvm/org/apache/storm/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -903,32 +903,36 @@ public static HashMap reverseMap(List listSeq) {
* @param defaultValue
* @return the value of the JVM heap memory setting (in MB) in a java command.
*/
public static Double parseJvmHeapMemByChildOpts(String input, Double defaultValue) {
if (input != null) {
Pattern optsPattern = Pattern.compile("Xmx[0-9]+[mkgMKG]");
Matcher m = optsPattern.matcher(input);
String memoryOpts = null;
while (m.find()) {
memoryOpts = m.group();
}
if (memoryOpts != null) {
int unit = 1;
memoryOpts = memoryOpts.toLowerCase();

if (memoryOpts.endsWith("k")) {
unit = 1024;
} else if (memoryOpts.endsWith("m")) {
unit = 1024 * 1024;
} else if (memoryOpts.endsWith("g")) {
unit = 1024 * 1024 * 1024;
public static Double parseJvmHeapMemByChildOpts(List<String> options, Double defaultValue) {
if (options != null) {
Pattern optsPattern = Pattern.compile("Xmx([0-9]+)([mkgMKG])");
for (String option : options) {
if (option == null) {
continue;
}
Matcher m = optsPattern.matcher(option);
while (m.find()) {
int value = Integer.parseInt(m.group(1));
char unitChar = m.group(2).toLowerCase().charAt(0);
int unit;
switch (unitChar) {
case 'k':
unit = 1024;
break;
case 'm':
unit = 1024 * 1024;
break;
case 'g':
unit = 1024 * 1024 * 1024;
break;
default:
unit = 1;
}
Double result = value * unit / 1024.0 / 1024.0;
return (result < 1.0) ? 1.0 : result;
}

memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", "");
Double result = Double.parseDouble(memoryOpts) * unit / 1024.0 / 1024.0;
return (result < 1.0) ? 1.0 : result;
} else {
return defaultValue;
}
return defaultValue;
} else {
return defaultValue;
}
Expand Down
111 changes: 111 additions & 0 deletions storm-client/test/jvm/org/apache/storm/scheduler/ClusterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.scheduler;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.Config;
import org.junit.Assert;
import org.junit.Test;

/**
* Unit tests for {@link Cluster}.
*/
public class ClusterTest {

/** This should match the value in Cluster.getAssignedMemoryForSlot. */
final Double TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION = 768.0;

private Map<String, Object> getConfig(String key, Object value) {
Map<String, Object> topConf = getEmptyConfig();
topConf.put(key, value);
return topConf;
}

private Map<String, Object> getEmptyConfig() {
Map<String, Object> topConf = new HashMap<>();
return topConf;
}

private Map<String, Object> getPopulatedConfig() {
Map<String, Object> topConf = new HashMap<>();
topConf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-Xmx128m");
topConf.put(Config.WORKER_GC_CHILDOPTS, "-Xmx256m");
topConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx512m");
topConf.put(Config.WORKER_CHILDOPTS, "-Xmx768m");
topConf.put(Config.WORKER_HEAP_MEMORY_MB, 1024);
topConf.put(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m");
return topConf;
}

/**
* Test Cluster.getAssignedMemoryForSlot with a single config value set.
* @param key - the config key to set
* @param value - the config value to set
* @param expectedValue - the expected result
*/
private void singleValueTest(String key, String value, double expectedValue) {
Map<String, Object> topConf = getConfig(key, value);
Assert.assertEquals(expectedValue, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0);
}

@Test
public void getAssignedMemoryForSlot_allNull() {
Map<String, Object> topConf = getEmptyConfig();
Assert.assertEquals(TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION, Cluster.getAssignedMemoryForSlot(topConf));
}

@Test
public void getAssignedMemoryForSlot_topologyWorkerGcChildopts() {
singleValueTest(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-Xmx128m", 128.0);
}

@Test
public void getAssignedMemoryForSlot_workerGcChildopts() {
singleValueTest(Config.WORKER_GC_CHILDOPTS, "-Xmx256m", 256.0);
}

@Test
public void getAssignedMemoryForSlot_topologyWorkerChildopts() {
singleValueTest(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx512m", 512.0);
}

@Test
public void getAssignedMemoryForSlot_workerChildopts() {
singleValueTest(Config.WORKER_CHILDOPTS, "-Xmx768m", 768.0);
}

@Test
public void getAssignedMemoryForSlot_workerHeapMemoryMb() {
Map<String, Object> topConf = getConfig(Config.WORKER_HEAP_MEMORY_MB, 1024);
Assert.assertEquals(1024.0, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0);
}

@Test
public void getAssignedMemoryForSlot_topologyWorkerLwChildopts() {
singleValueTest(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m",
TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION + 64.0);
}

@Test
public void getAssignedMemoryForSlot_all() {
Map<String, Object> topConf = getPopulatedConfig();
Assert.assertEquals(128.0 + 64.0, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0);
}
}
Loading