Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
Expand Down Expand Up @@ -411,10 +413,17 @@ protected List<String> substituteChildopts(Object value, int memOnheap) {
if (value instanceof String) {
String string = substituteChildOptsInternal((String) value, memOnheap);
if (StringUtils.isNotBlank(string)) {
String[] strings = string.split("\\s+");
for (String s: strings) {
if (StringUtils.isNotBlank(s)) {
rets.add(s);
/* This pattern matches
* 1.everything starts with -XX:\w+ or -D[\w.]+ and followed with quoted (both ' and ") strings
* 2.everything without \s, ' and " in it
* This will solve the problem which params like -XX:OnError="pstack %p >~/pstack%p.log" will be split into pieces
*/
Matcher m= Pattern.compile("(?:-XX:\\w+|-D[\\w.]+)=((?<![\\\\])['\\\"])((?:.(?!(?<![\\\\])\\1))*.?)\\1|[^\\s\\\"']+")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have various test cases to cover whether it's working?

.matcher(string);
while (m.find()){
String opt=m.group();
if(StringUtils.isNotBlank(opt)){
rets.add(opt);
}
}
}
Expand Down Expand Up @@ -479,22 +488,22 @@ private static class DependencyLocations {
private final String _topologyId;
private final AdvancedFSOps _ops;
private final String _stormRoot;

public DependencyLocations(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, final String stormRoot) {
_conf = conf;
_topologyId = topologyId;
_ops = ops;
_stormRoot = stormRoot;
}

public String toString() {
List<String> data;
synchronized(this) {
data = _data;
}
return "DEP_LOCS for " + _topologyId +" => " + data;
}

public synchronized List<String> get() throws IOException {
if (_data != null) {
return _data;
Expand All @@ -519,15 +528,15 @@ public synchronized List<String> get() throws IOException {

static class DepLRUCache {
public final int _maxSize = 100; //We could make this configurable in the future...

@SuppressWarnings("serial")
private LinkedHashMap<String, DependencyLocations> _cache = new LinkedHashMap<String, DependencyLocations>() {
@Override
protected boolean removeEldestEntry(Map.Entry<String,DependencyLocations> eldest) {
return (size() > _maxSize);
}
};

public synchronized DependencyLocations get(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) {
//Only go off of the topology id for now.
DependencyLocations dl = _cache.get(topologyId);
Expand All @@ -537,18 +546,18 @@ public synchronized DependencyLocations get(final Map<String, Object> conf, fina
}
return dl;
}

public synchronized void clear() {
_cache.clear();
}
}

static final DepLRUCache DEP_LOC_CACHE = new DepLRUCache();

public static List<String> getDependencyLocationsFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException {
return DEP_LOC_CACHE.get(conf, topologyId, ops, stormRoot).get();
}

/**
* Get parameters for the class path of the worker process. Also used by the
* log Writer
Expand Down Expand Up @@ -611,7 +620,7 @@ private List<String> getWorkerProfilerChildOpts(int memOnheap) {
}
return workerProfilerChildopts;
}

protected String javaCmd(String cmd) {
String ret = null;
String javaHome = System.getenv().get("JAVA_HOME");
Expand Down Expand Up @@ -700,7 +709,7 @@ public void launch() throws IOException {
if (_resourceIsolationManager != null) {
int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
int cpu = (int) Math.ceil(resources.get_cpu());

int cGroupMem = (int) (Math.ceil((double) _conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
int memoryValue = memoffheap + memOnheap + cGroupMem;
int cpuValue = cpu;
Expand Down