diff --git a/README.md b/README.md
index 51d3e6b56..86abdb544 100644
Binary files a/README.md and b/README.md differ
diff --git a/example/sequence-split-merge/conf/conf.prop b/example/sequence-split-merge/conf/conf.prop
index c4332be16..f3e005515 100644
--- a/example/sequence-split-merge/conf/conf.prop
+++ b/example/sequence-split-merge/conf/conf.prop
@@ -8,10 +8,13 @@ topology.debug=false
spout.parallel=1
-bolt.parallel=2
+bolt.parallel=1
#send.sleep.second=100
check.sequence=true
kryo.enable=false
fall.back.on.java.serialization=true
enable.split=false
+
+storm.cluster.mode=local
+#topology.enable.classloader=true
diff --git a/example/sequence-split-merge/pom.xml b/example/sequence-split-merge/pom.xml
index 660035c43..71f515421 100644
--- a/example/sequence-split-merge/pom.xml
+++ b/example/sequence-split-merge/pom.xml
@@ -11,7 +11,7 @@
UTF-8
- 0.9.6.1
+ 0.9.6.3
storm-0.9.2-incubating
@@ -29,15 +29,9 @@
-
-
+
+
com.alibaba.jstorm
jstorm-client-extension
@@ -45,12 +39,18 @@
provided
-
+
com.alibaba.jstorm
jstorm-client
${jstorm.version}
provided
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
@@ -58,9 +58,28 @@
jstorm-server
${jstorm.version}
provided
+
+
+
+
+ ch.qos.logback
+ logback-classic
+ 1.0.13
+
+
+
+ org.slf4j
+ log4j-over-slf4j
+ 1.7.10
-
+
+ junit
+ junit
+ 4.10
+ test
+
+
+ -->
4.0.0
com.alibaba.jstorm
jstorm-client-extension
- 0.9.6.2
+ 0.9.6.3
jar
${project.artifactId}-${project.version}
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java
index 00524036a..f71a4ced3 100644
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java
+++ b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java
@@ -162,6 +162,10 @@ public static boolean isEnableCgroup(Map conf) {
public static void setUseOldAssignment(Map conf, boolean useOld) {
conf.put(USE_OLD_ASSIGNMENT, Boolean.valueOf(useOld));
}
+
+ public static boolean isUseOldAssignment(Map conf) {
+ return JStormUtils.parseBoolean(conf.get(USE_OLD_ASSIGNMENT), false);
+ }
/**
* The supervisor's hostname
@@ -195,6 +199,16 @@ public static boolean isEnableTopologyClassLoader(Map conf) {
public static void setEnableTopologyClassLoader(Map conf, boolean enable) {
conf.put(TOPOLOGY_ENABLE_CLASSLOADER, Boolean.valueOf(enable));
}
+
+ protected static String CLASSLOADER_DEBUG = "classloader.debug";
+
+ public static boolean isEnableClassloaderDebug(Map conf) {
+ return JStormUtils.parseBoolean(conf.get(CLASSLOADER_DEBUG), false);
+ }
+
+ public static void setEnableClassloaderDebug(Map conf, boolean enable) {
+ conf.put(CLASSLOADER_DEBUG, enable);
+ }
protected static final String CONTAINER_NIMBUS_HEARTBEAT = "container.nimbus.heartbeat";
@@ -313,6 +327,16 @@ public static void setUserDefineAssignment(Map conf,
}
conf.put(USE_USERDEFINE_ASSIGNMENT, ret);
}
+
+ public static List getUserDefineAssignment(Map conf) {
+ List ret = new ArrayList();
+ if (conf.get(USE_USERDEFINE_ASSIGNMENT) == null)
+ return ret;
+ for (String worker : (List) conf.get(USE_USERDEFINE_ASSIGNMENT)) {
+ ret.add(WorkerAssignment.parseFromObj(Utils.from_json(worker)));
+ }
+ return ret;
+ }
protected static final String MEMSIZE_PER_WORKER = "worker.memory.size";
@@ -334,11 +358,23 @@ public static void setMemSizePerWorkerByGB(Map conf, long memSize) {
long size = memSize * 1024l;
setMemSizePerWorkerByMB(conf, size);
}
+
+ public static long getMemSizePerWorker(Map conf) {
+ long size = JStormUtils.parseLong(conf.get(MEMSIZE_PER_WORKER),
+ JStormUtils.SIZE_1_G * 2);
+ return size > 0 ? size : JStormUtils.SIZE_1_G * 2;
+ }
+
protected static final String CPU_SLOT_PER_WORKER = "worker.cpu.slot.num";
public static void setCpuSlotNumPerWorker(Map conf, int slotNum) {
conf.put(CPU_SLOT_PER_WORKER, slotNum);
}
+
+ public static int getCpuSlotPerWorker(Map conf) {
+ int slot = JStormUtils.parseInt(conf.get(CPU_SLOT_PER_WORKER), 1);
+ return slot > 0 ? slot : 1;
+ }
protected static String TOPOLOGY_PERFORMANCE_METRICS = "topology.performance.metrics";
@@ -550,7 +586,57 @@ public static boolean getTopologyBufferSizeLimited(Map conf) {
return true;
}
- return JStormUtils.parseBoolean(conf.get(TOPOLOGY_BUFFER_SIZE_LIMITED), false);
+ return JStormUtils.parseBoolean(conf.get(TOPOLOGY_BUFFER_SIZE_LIMITED), true);
}
+
+ protected static String SUPERVISOR_SLOTS_PORTS_BASE = "supervisor.slots.ports.base";
+
+ public static int getSupervisorSlotsPortsBase(Map conf) {
+ return JStormUtils.parseInt(conf.get(SUPERVISOR_SLOTS_PORTS_BASE), 6800);
+ }
+
+ // SUPERVISOR_SLOTS_PORTS_BASE don't provide setting function, it must be set by configuration
+
+ protected static String SUPERVISOR_SLOTS_PORT_CPU_WEIGHT = "supervisor.slots.port.cpu.weight";
+ public static double getSupervisorSlotsPortCpuWeight(Map conf) {
+ Object value = conf.get(SUPERVISOR_SLOTS_PORT_CPU_WEIGHT);
+ Double ret = JStormUtils.convertToDouble(value);
+ if (ret == null) {
+ return 1.0;
+ }else {
+ return ret;
+ }
+ }
+ // SUPERVISOR_SLOTS_PORT_CPU_WEIGHT don't provide setting function, it must be set by configuration
+
+ protected static String USER_DEFINED_LOG4J_CONF = "user.defined.log4j.conf";
+
+ public static String getUserDefinedLog4jConf(Map conf) {
+ return (String)conf.get(USER_DEFINED_LOG4J_CONF);
+ }
+
+ public static void setUserDefinedLog4jConf(Map conf, String fileName) {
+ conf.put(USER_DEFINED_LOG4J_CONF, fileName);
+ }
+
+ protected static String USER_DEFINED_LOGBACK_CONF = "user.defined.logback.conf";
+
+ public static String getUserDefinedLogbackConf(Map conf) {
+ return (String)conf.get(USER_DEFINED_LOGBACK_CONF);
+ }
+
+ public static void setUserDefinedLogbackConf(Map conf, String fileName) {
+ conf.put(USER_DEFINED_LOGBACK_CONF, fileName);
+ }
+
+ protected static String TASK_ERROR_INFO_REPORT_INTERVAL = "topology.task.error.report.interval";
+
+ public static Integer getTaskErrorReportInterval(Map conf) {
+ return JStormUtils.parseInt(conf.get(TASK_ERROR_INFO_REPORT_INTERVAL), 60);
+ }
+
+ public static void setTaskErrorReportInterval(Map conf, Integer interval) {
+ conf.put(TASK_ERROR_INFO_REPORT_INTERVAL, interval);
+ }
}
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java
index eb46e6262..9eac3269e 100644
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java
+++ b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java
@@ -92,7 +92,7 @@ public void setCpu(int cpu) {
@Override
public String toJSONString() {
- StringBuilder sb = new StringBuilder();
+// StringBuilder sb = new StringBuilder();
// sb.append("[");
// sb.append("\"" + this.getNodeId() + "\"");
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
index ef8cb4eb7..95224f0dd 100644
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
+++ b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
@@ -18,6 +18,7 @@
import com.alibaba.jstorm.callback.ClusterStateCallback;
import com.alibaba.jstorm.callback.WatcherCallBack;
+import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.zk.Zookeeper;
@@ -127,6 +128,8 @@ public void mkdirs(String path) throws Exception {
@Override
public void set_data(String path, byte[] data) throws Exception {
+ if (data.length > (JStormUtils.SIZE_1_K * 800))
+ throw new Exception("Writing 800k+ data into ZK is not allowed!");
if (zkobj.exists(zk, path, false)) {
zkobj.setData(zk, path, data);
} else {
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetricData.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetricData.java
index cab04e0d7..7ca0860cf 100644
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetricData.java
+++ b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetricData.java
@@ -5,22 +5,24 @@
import java.util.Map.Entry;
import java.io.Serializable;
-import com.codahale.metrics.Metric;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.log4j.Logger;
+
import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Sampling;
-import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
-import com.alibaba.jstorm.client.metric.MetricCallback;
import com.alibaba.jstorm.metric.metrdata.*;
+import com.alibaba.jstorm.utils.JStormUtils;
/**
* /storm-zk-root/Monitor/{topologyid}/user/{workerid} data
*/
public class UserDefMetricData implements Serializable {
+ private static final Logger LOG = Logger.getLogger(UserDefMetricData.class);
private static final long serialVersionUID = 954727168057659270L;
@@ -55,9 +57,13 @@ public Map getHistogramDataMap() {
public void updateFromGauge(Map> gaugeMap) {
for(Entry> entry : gaugeMap.entrySet()) {
- GaugeData gaugeData = new GaugeData();
- gaugeData.setValue((Double)(entry.getValue().getValue()));
- gaugeDataMap.put(entry.getKey(), gaugeData);
+ try {
+ GaugeData gaugeData = new GaugeData();
+ gaugeData.setValue(JStormUtils.parseDouble(entry.getValue().getValue()));
+ gaugeDataMap.put(entry.getKey(), gaugeData);
+ } catch (Throwable e) {
+ LOG.error("updateFromGauge exception ", e);
+ }
}
}
@@ -123,4 +129,10 @@ public void updateFromTimerData(Map timerMap) {
timerDataMap.put(entry.getKey(), timerData);
}
}
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this,
+ ToStringStyle.SHORT_PREFIX_STYLE);
+ }
}
\ No newline at end of file
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
index 76c41a033..ee7376dcb 100644
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
+++ b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
@@ -12,6 +12,8 @@ public class HttpserverUtils {
public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK = "jstack";
+ public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF = "showConf";
+
public static final String HTTPSERVER_LOGVIEW_PARAM_LOGFILE = "log";
public static final String HTTPSERVER_LOGVIEW_PARAM_POS = "pos";
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
index 390b2603e..17b28cff3 100644
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
+++ b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
@@ -34,10 +34,12 @@
import org.apache.log4j.FileAppender;
import org.apache.log4j.Logger;
+import backtype.storm.Config;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopDefaultKill;
import com.alibaba.jstorm.callback.RunnableCallback;
+import com.alibaba.jstorm.client.ConfigExtension;
/**
* JStorm utility
@@ -58,6 +60,8 @@ public class JStormUtils {
public static final int MIN_30 = MIN_1 * 30;
public static final int HOUR_1 = MIN_30 * 2;
public static final int DAY_1 = HOUR_1 * 24;
+
+ public static final String osName = System.getProperty("os.name");
public static String getErrorInfo(String baseInfo, Exception e) {
try {
@@ -140,15 +144,35 @@ public static int byteToInt2(byte[] b) {
return iOutcome;
}
+
+ /**
+ * LocalMode variable isn't clean, it make the JStormUtils ugly
+ */
+ public static boolean localMode = false;
+
+ public static boolean isLocalMode() {
+ return localMode;
+ }
+
+ public static void setLocalMode(boolean localMode) {
+ JStormUtils.localMode = localMode;
+ }
+
+ public static void haltProcess(int val) {
+ Runtime.getRuntime().halt(val);
+ }
public static void halt_process(int val, String msg) {
LOG.info("Halting process: " + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- LOG.error("halt_process", e);
}
- Runtime.getRuntime().halt(val);
+ if (localMode && val == 0) {
+ //throw new RuntimeException(msg);
+ }else {
+ haltProcess(val);
+ }
}
/**
@@ -218,7 +242,7 @@ public static void extract_dir_from_jar(String jarpath, String dir,
try {
exec_command(cmd);
} catch (Exception e) {
- LOG.warn("No " + dir + " from " + jarpath + "by cmd:" + cmd + "!"
+ LOG.warn("No " + dir + " from " + jarpath + " by cmd:" + cmd + "!\n"
+ e.getMessage());
}
@@ -273,6 +297,28 @@ public static void kill_signal(Integer pid, String signal) {
}
}
+ /**
+ * This function is only for linux
+ *
+ * @param pid
+ * @return
+ */
+ public static boolean isProcDead(String pid) {
+ if (osName.equalsIgnoreCase("Linux") == false) {
+ return false;
+ }
+
+ String path = "/proc/" + pid;
+ File file = new File(path);
+
+ if (file.exists() == false) {
+ LOG.info("Process " + pid + " is dead");
+ return true;
+ }
+
+ return false;
+ }
+
/**
* If it is backend, please set resultHandler, such as DefaultExecuteResultHandler
* If it is frontend, ByteArrayOutputStream.toString get the result
@@ -371,7 +417,7 @@ public void run() {
try {
launchProcess(cmdlist, environment);
} catch (IOException e) {
- LOG.error("Failed to run " + cmdlist + ":" + e.getCause(), e);
+ LOG.error("Failed to run " + command + ":" + e.getCause(), e);
}
}
}).start();
@@ -552,6 +598,27 @@ public static Long parseLong(Object o) {
+ o.getClass().getName() + " " + o);
}
}
+
+ public static Double parseDouble(Object o) {
+ if (o == null) {
+ return null;
+ }
+
+ if (o instanceof String) {
+ return Double.valueOf(String.valueOf(o));
+ } else if (o instanceof Integer) {
+ Number value = (Integer) o;
+ return value.doubleValue();
+ } else if (o instanceof Long) {
+ Number value = (Long) o;
+ return value.doubleValue();
+ } else if (o instanceof Double) {
+ return (Double) o;
+ } else {
+ throw new RuntimeException("Invalid value "
+ + o.getClass().getName() + " " + o);
+ }
+ }
public static Long parseLong(Object o, long defaultValue) {
@@ -950,4 +1017,60 @@ public static HashMap filter_val(RunnableCallback fn,
}
return rtn;
}
+
+ public static List getSupervisorPortList(Map conf) {
+ List portList = (List) conf
+ .get(Config.SUPERVISOR_SLOTS_PORTS);
+ if (portList != null && portList.size() > 0) {
+ return portList;
+ }
+
+ LOG.info("Generate port list through CPU cores and system memory size");
+
+ double cpuWeight = ConfigExtension.getSupervisorSlotsPortCpuWeight(conf);
+ int sysCpuNum = 4;
+ try {
+ sysCpuNum = Runtime.getRuntime().availableProcessors();
+ }catch(Exception e) {
+ LOG.info("Failed to get CPU cores, set cpu cores as 4");
+ sysCpuNum = 4;
+ }
+ int cpuPortNum = (int)(sysCpuNum/cpuWeight);
+ if (cpuPortNum < 1) {
+
+ LOG.info("Invalid supervisor.slots.port.cpu.weight setting :"
+ + cpuWeight + ", cpu cores:" + sysCpuNum);
+ cpuPortNum = 1;
+ }
+
+ int memPortNum = Integer.MAX_VALUE;
+ Long physicalMemSize = JStormUtils.getPhysicMemorySize();
+ if (physicalMemSize == null) {
+ LOG.info("Failed to get memory size");
+ }else {
+ LOG.info("Get system memory size :" + physicalMemSize);
+ long workerMemSize = ConfigExtension.getMemSizePerWorker(conf);
+ memPortNum = (int)(physicalMemSize/workerMemSize);
+ if (memPortNum < 1) {
+ LOG.info("Invalide worker.memory.size setting:" + workerMemSize );
+ memPortNum = 4;
+ }else if (memPortNum < 4){
+ LOG.info("System memory is too small for jstorm");
+ memPortNum = 4;
+ }
+ }
+
+ int portNum = Math.min(cpuPortNum, memPortNum);
+ if (portNum < 1) {
+ portNum = 1;
+ }
+
+ int portBase = ConfigExtension.getSupervisorSlotsPortsBase(conf);
+ portList = new ArrayList();
+ for(int i = 0; i < portNum; i++) {
+ portList.add(portBase + i);
+ }
+
+ return portList;
+ }
}
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
index 51483a97f..1a6c649b8 100644
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
+++ b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
@@ -4,7 +4,9 @@
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
+import java.security.InvalidParameterException;
+import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
/**
@@ -97,4 +99,22 @@ public static String ip2Host(String ip) {
return address.getHostName();
}
+ public static boolean equals(String host1, String host2) {
+
+
+ if (StringUtils.equalsIgnoreCase(host1, host2) == true) {
+ return true;
+ }
+
+ if (host1 == null || host2 == null) {
+ return false;
+ }
+
+ String ip1 = host2Ip(host1);
+ String ip2 = host2Ip(host2);
+
+ return StringUtils.equalsIgnoreCase(ip1, ip2);
+
+ }
+
}
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/PathUtils.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
index 87bffd9fb..26cddf589 100644
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
+++ b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
@@ -110,5 +110,21 @@ public static List read_dir_contents(String dir) {
}
return rtn;
}
+
+ public static String getCanonicalPath(String fileName) {
+ String ret = null;
+ File file = new File(fileName);
+ if (file.exists()) {
+ try {
+ ret = file.getCanonicalPath();
+ } catch (IOException e) {
+ LOG.error("", e);
+ }
+ }else {
+ LOG.warn(fileName + " doesn't exist ");
+ }
+
+ return ret;
+ }
}
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/zk/ZkTool.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
index 130afff6f..2d504d92d 100644
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
+++ b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
@@ -34,7 +34,7 @@ public static String getData(DistributedClusterState zkClusterState,
return null;
}
- Object obj = Utils.deserialize(data);
+ Object obj = Utils.deserialize(data, null);
return obj.toString();
}
diff --git a/jstorm-client/pom.xml b/jstorm-client/pom.xml
index 9eaa8652e..6a0f4658c 100644
--- a/jstorm-client/pom.xml
+++ b/jstorm-client/pom.xml
@@ -5,18 +5,18 @@
com.alibaba.jstorm
jstorm-all
- 0.9.6.2
+ 0.9.6.3
..
+ -->
4.0.0
com.alibaba.jstorm
jstorm-client
- 0.9.6.2
+ 0.9.6.3
jar
${project.artifactId}-${project.version}
@@ -180,6 +180,11 @@
plexus-compiler-javac
1.8.1
+
+ com.google.code.gson
+ gson
+ 2.3.1
+
+ -->
4.0.0
com.alibaba.jstorm
jstorm-server
- 0.9.6.2
+ 0.9.6.3
jar
${project.artifactId}-${project.version}
jstorm server modules
diff --git a/jstorm-server/src/main/java/backtype/storm/LocalCluster.java b/jstorm-server/src/main/java/backtype/storm/LocalCluster.java
index d0a0c93b7..303bc11bc 100644
--- a/jstorm-server/src/main/java/backtype/storm/LocalCluster.java
+++ b/jstorm-server/src/main/java/backtype/storm/LocalCluster.java
@@ -1,5 +1,6 @@
package backtype.storm;
+import java.util.Enumeration;
import java.util.Map;
import org.apache.log4j.BasicConfigurator;
@@ -16,15 +17,37 @@
import backtype.storm.generated.TopologyInfo;
import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.utils.JStormUtils;
+
public class LocalCluster implements ILocalCluster {
public static Logger LOG = Logger.getLogger(LocalCluster.class);
private LocalClusterMap state;
+ protected void setLogger() {
+ boolean needReset = true;
+ Logger rootLogger = Logger.getRootLogger();
+ if (rootLogger != null) {
+ Enumeration appenders = rootLogger.getAllAppenders();
+ if (appenders.hasMoreElements() == true) {
+ needReset = false;
+ }
+ }
+
+ if (needReset == true) {
+ BasicConfigurator.configure();
+ rootLogger.setLevel(Level.INFO);
+ }
+
+ }
+
public LocalCluster() {
- BasicConfigurator.configure();
- Logger.getRootLogger().setLevel(Level.INFO);
+ setLogger();
+
+ // fix in zk occur Address family not supported by protocol family: connect
+ System.setProperty("java.net.preferIPv4Stack", "true");
+
this.state = LocalUtils.prepareLocalCluster();
if (this.state == null)
throw new RuntimeException("prepareLocalCluster error");
@@ -42,6 +65,8 @@ public void submitTopologyWithOpts(String topologyName, Map conf,
// TODO Auto-generated method stub
if (!Utils.isValidConf(conf))
throw new RuntimeException("Topology conf is not json-serializable");
+ JStormUtils.setLocalMode(true);
+
try {
if (submitOpts == null) {
state.getNimbus().submitTopology(topologyName, null,
@@ -62,7 +87,10 @@ public void submitTopologyWithOpts(String topologyName, Map conf,
public void killTopology(String topologyName) {
// TODO Auto-generated method stub
try {
- state.getNimbus().killTopology(topologyName);
+ // kill topology quickly
+ KillOptions killOps = new KillOptions();
+ killOps.set_wait_secs(0);
+ state.getNimbus().killTopologyWithOpts(topologyName, killOps);
} catch (Exception e) {
// TODO Auto-generated catch block
LOG.error("fail to kill Topology " + topologyName, e);
@@ -121,6 +149,9 @@ public void rebalance(String name, RebalanceOptions options){
@Override
public void shutdown() {
// TODO Auto-generated method stub
+ // in order to avoid kill topology's command competition
+ // it take 10 seconds to remove topology's node
+ JStormUtils.sleepMs(10 * 1000);
this.state.clean();
}
diff --git a/jstorm-server/src/main/java/backtype/storm/LocalClusterMap.java b/jstorm-server/src/main/java/backtype/storm/LocalClusterMap.java
index bedc219f0..7c29cdbda 100644
--- a/jstorm-server/src/main/java/backtype/storm/LocalClusterMap.java
+++ b/jstorm-server/src/main/java/backtype/storm/LocalClusterMap.java
@@ -102,7 +102,7 @@ public void clean() {
PathUtils.rmr(dir);
} catch (IOException e) {
// TODO Auto-generated catch block
- LOG.error("fail to delete " + dir, e);
+ LOG.error("Fail to delete " + dir);
}
}
}
diff --git a/jstorm-server/src/main/java/backtype/storm/LocalUtils.java b/jstorm-server/src/main/java/backtype/storm/LocalUtils.java
index f84aae79a..30b6daa12 100644
--- a/jstorm-server/src/main/java/backtype/storm/LocalUtils.java
+++ b/jstorm-server/src/main/java/backtype/storm/LocalUtils.java
@@ -13,6 +13,7 @@
import backtype.storm.messaging.IContext;
import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.daemon.nimbus.DefaultInimbus;
import com.alibaba.jstorm.daemon.nimbus.NimbusServer;
import com.alibaba.jstorm.daemon.supervisor.Supervisor;
@@ -93,6 +94,8 @@ private static Map getLocalConf(int port) {
conf.put(Config.ZMQ_LINGER_MILLIS, 0);
conf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 50);
+ ConfigExtension.setSpoutDelayRunSeconds(conf, 0);
+ ConfigExtension.setTaskCleanupTimeoutSec(conf, 0);
return conf;
}
diff --git a/jstorm-server/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java b/jstorm-server/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java
index c2c49e898..ed82071e4 100644
--- a/jstorm-server/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java
+++ b/jstorm-server/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java
@@ -73,7 +73,7 @@ public int getDelaySeconds(Object[] args) {
delaySecs = JStormUtils.parseInt(args[0]);
}
- if (delaySecs == null || delaySecs <= 0) {
+ if (delaySecs == null || delaySecs < 0) {
delaySecs = DelayStatusTransitionCallback.DEFAULT_DELAY_SECONDS;
}
diff --git a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/Cluster.java b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/Cluster.java
index 753336261..53e2cf815 100644
--- a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/Cluster.java
+++ b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/Cluster.java
@@ -143,7 +143,7 @@ public static Object maybe_deserialize(byte[] data) {
if (data == null) {
return null;
}
- return Utils.deserialize(data);
+ return Utils.deserialize(data, null);
}
@SuppressWarnings("rawtypes")
diff --git a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/Common.java b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/Common.java
index eedd2bc10..3ab090399 100644
--- a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/Common.java
+++ b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/Common.java
@@ -11,6 +11,7 @@
import java.util.Set;
import backtype.storm.Config;
+import backtype.storm.Constants;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.ComponentObject;
@@ -23,6 +24,7 @@
import backtype.storm.generated.StateSpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.StreamInfo;
+import backtype.storm.metric.SystemBolt;
import backtype.storm.spout.ShellSpout;
import backtype.storm.task.IBolt;
import backtype.storm.task.ShellBolt;
@@ -304,7 +306,11 @@ public static Map acker_inputs(
* @param num_tasks
* @param ret
*/
- public static void add_acker(Integer ackerNum, StormTopology ret) {
+ public static void add_acker(Map stormConf, StormTopology ret) {
+ String key = Config.TOPOLOGY_ACKER_EXECUTORS;
+
+ Integer ackerNum = JStormUtils.parseInt(stormConf.get(key), 0);
+
// generate outputs
HashMap outputs = new HashMap();
ArrayList fields = new ArrayList();
@@ -319,8 +325,7 @@ public static void add_acker(Integer ackerNum, StormTopology ret) {
Map inputs = acker_inputs(ret);
// generate acker which will be stored in topology
- Bolt acker_bolt = Thrift.mkAckerBolt(inputs, ackerbolt, outputs,
- ackerNum);
+ Bolt acker_bolt = Thrift.mkBolt(inputs, ackerbolt, outputs, ackerNum);
// add every bolt two output stream
// ACKER_ACK_STREAM_ID/ACKER_FAIL_STREAM_ID
@@ -413,19 +418,75 @@ public static void add_system_streams(StormTopology topology) {
}
}
+ public static StormTopology add_system_components(StormTopology topology) {
+ // generate inputs
+ Map inputs = new HashMap();
+
+ // generate outputs
+ HashMap outputs = new HashMap();
+ ArrayList fields = new ArrayList();
+
+ outputs.put(Constants.SYSTEM_TICK_STREAM_ID,
+ Thrift.outputFields(JStormUtils.mk_list("rate_secs")));
+ outputs.put(Constants.METRICS_TICK_STREAM_ID,
+ Thrift.outputFields(JStormUtils.mk_list("interval")));
+ outputs.put(Constants.CREDENTIALS_CHANGED_STREAM_ID,
+ Thrift.outputFields(JStormUtils.mk_list("creds")));
+
+ ComponentCommon common = new ComponentCommon(inputs, outputs);
+
+ IBolt ackerbolt = new SystemBolt();
+
+ Bolt bolt = Thrift.mkBolt(inputs, ackerbolt, outputs,
+ Integer.valueOf(0));
+
+ topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, bolt);
+
+ add_system_streams(topology);
+
+ return topology;
+
+ }
+
+ public static StormTopology add_metrics_component(StormTopology topology) {
+
+ /**
+ * @@@ TODO Add metrics consumer bolt
+ */
+ // (defn metrics-consumer-bolt-specs [storm-conf topology]
+ // (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
+ // inputs (->> (for [comp-id component-ids-that-emit-metrics]
+ // {[comp-id METRICS-STREAM-ID] :shuffle})
+ // (into {}))
+ //
+ // mk-bolt-spec (fn [class arg p]
+ // (thrift/mk-bolt-spec*
+ // inputs
+ // (backtype.storm.metric.MetricsConsumerBolt. class arg)
+ // {} :p p :conf {TOPOLOGY-TASKS p}))]
+ //
+ // (map
+ // (fn [component-id register]
+ // [component-id (mk-bolt-spec (get register "class")
+ // (get register "argument")
+ // (or (get register "parallelism.hint") 1))])
+ //
+ // (metrics-consumer-register-ids storm-conf)
+ // (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
+ return topology;
+ }
+
@SuppressWarnings("rawtypes")
public static StormTopology system_topology(Map storm_conf,
StormTopology topology) throws InvalidTopologyException {
StormTopology ret = topology.deepCopy();
- String key = Config.TOPOLOGY_ACKER_EXECUTORS;
-
- Integer ackercount = JStormUtils.parseInt(storm_conf.get(key), 0);
+ add_acker(storm_conf, ret);
- add_acker(ackercount, ret);
+ add_metrics_component(ret);
- add_system_streams(ret);
+ add_system_components(ret);
return ret;
}
diff --git a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java
index f0871fe15..8bdcdc218 100644
--- a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java
+++ b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java
@@ -3,12 +3,9 @@
import java.util.List;
import java.util.Map;
-import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.daemon.worker.WorkerMetricInfo;
-import com.alibaba.jstorm.metric.UserDefMetric;
import com.alibaba.jstorm.metric.UserDefMetricData;
import com.alibaba.jstorm.task.Assignment;
import com.alibaba.jstorm.task.AssignmentBak;
@@ -48,6 +45,8 @@ public void update_storm(String topology_id, StormStatus new_elems)
public void remove_storm_base(String topology_id) throws Exception;
public void remove_storm(String topology_id) throws Exception;
+
+ public void try_remove_storm(String topology_id);
public List task_ids(String topology_id) throws Exception;
@@ -149,7 +148,13 @@ public List task_errors(String topology_id, int task_id)
public List monitor_user_workers(String topologyId) throws Exception;
+ public List monitors() throws Exception;
+
public TaskMetricInfo get_task_metric(String topologyId, int taskId) throws Exception;
public WorkerMetricInfo get_worker_metric(String topologyId, String workerId) throws Exception;
+
+ public List task_error_time(String topologyId, int taskId) throws Exception;
+
+ public String task_error_info(String topologyId, int taskId, long timeStamp) throws Exception;
}
diff --git a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java
index 306cbaf85..094f68e9a 100644
--- a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java
+++ b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java
@@ -2,7 +2,7 @@
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.*;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -11,14 +11,13 @@
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
-import com.alibaba.jstorm.utils.EventSampler;
-import com.alibaba.jstorm.utils.PathUtils;
-
import backtype.storm.Config;
import backtype.storm.generated.StormTopology;
import backtype.storm.utils.LocalState;
import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.utils.PathUtils;
+
public class StormConfig {
private final static Logger LOG = Logger.getLogger(StormConfig.class);
public final static String RESOURCES_SUBDIR = "resources";
diff --git a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
index 28bd495c1..3fec09546 100644
--- a/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
+++ b/jstorm-server/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
@@ -207,6 +207,11 @@ public List active_storms() throws Exception {
public List monitor_user_workers(String topologyId) throws Exception {
return cluster_state.get_children(Cluster.monitor_userdir_path(topologyId), false);
}
+
+ @Override
+ public List monitors() throws Exception {
+ return cluster_state.get_children(Cluster.MONITOR_SUBTREE, false);
+ }
@Override
public List heartbeat_storms() throws Exception {
@@ -229,6 +234,30 @@ public void remove_storm(String topologyId) throws Exception {
cluster_state.delete_node(Cluster.monitor_path(topologyId));
this.remove_storm_base(topologyId);
}
+
+ @Override
+ public void try_remove_storm(String topologyId) {
+ teardown_heartbeats(topologyId);
+ teardown_task_errors(topologyId);
+
+ try {
+ cluster_state.delete_node(Cluster.assignment_path(topologyId));
+ }catch(Exception e) {
+ LOG.warn("Failed to delete zk Assignment " + topologyId);
+ }
+
+ try {
+ cluster_state.delete_node(Cluster.storm_task_root(topologyId));
+ }catch(Exception e) {
+ LOG.warn("Failed to delete zk taskInfo " + topologyId);
+ }
+
+ try {
+ cluster_state.delete_node(Cluster.monitor_path(topologyId));
+ }catch(Exception e) {
+ LOG.warn("Failed to delete zk monitor " + topologyId);
+ }
+ }
@Override
public void remove_storm_base(String topologyId) throws Exception {
@@ -406,6 +435,21 @@ public void remove_lastErr_time(String topologyId) throws Exception {
public List task_error_storms() throws Exception {
return cluster_state.get_children(Cluster.TASKERRORS_SUBTREE, false);
}
+
+ @Override
+ public List task_error_time(String topologyId, int taskId) throws Exception {
+ String path = Cluster.taskerror_path(topologyId, taskId);
+ cluster_state.mkdirs(path);
+ return cluster_state.get_children(path, false);
+ }
+
+ @Override
+ public String task_error_info(String topologyId, int taskId, long timeStamp) throws Exception {
+ String path = Cluster.taskerror_path(topologyId, taskId);
+ cluster_state.mkdirs(path);
+ path = path + "/" + timeStamp;
+ return new String(cluster_state.get_data(path, false));
+ }
@Override
public List task_errors(String topologyId, int taskId)
diff --git a/jstorm-server/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java b/jstorm-server/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
index d1b84b5d3..864519bed 100644
--- a/jstorm-server/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
+++ b/jstorm-server/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
@@ -5,6 +5,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -62,6 +63,8 @@ public class NimbusData {
private final boolean localMode;
private volatile boolean isLeader;
+
+ private AtomicBoolean isShutdown = new AtomicBoolean(false);
@SuppressWarnings({ "unchecked", "rawtypes" })
public NimbusData(Map conf, TimeCacheMap