Skip to content

Commit

Permalink
#697 job not runnig告警优化
Browse files Browse the repository at this point in the history
  • Loading branch information
RolfHeG authored and rolf.he committed Jun 28, 2020
1 parent f76b2ce commit 7df48c2
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/zh-cn/3.x/saturn-console-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ JDK 1.8:
| server.port | JVM参数(-D) | 启动端口,默认9088 | N |
| authentication.enabled | JVM参数(-D) | 是否启用用户认证。默认为false。详情参阅认证和授权一节 | N |
| authorization.enabled.default | JVM参数(-D) | 是否默认启用用户鉴权。默认为false。详情参阅认证和授权一节 | N |
| VIP_SATURN_DASHBOARD_NOT_RUNNING_WARN_DELAY_MS_WHEN_JOB_RUNNING | 都支持 | 当job处于running时的job not running告警延时,单位是毫秒。默认值是2小时。 | N |

使用浏览器访问 http://localhost:9088 即可看到你的Saturn Console!

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public class DashboardConstants {
public static int REFRESH_INTERVAL_IN_MINUTE = 7;
public static long ALLOW_DELAY_MILLIONSECONDS = 60L * 1000L * REFRESH_INTERVAL_IN_MINUTE;
public static long ALLOW_CONTAINER_DELAY_MILLIONSECONDS = 60L * 1000L * 3;
/**
* 当JOB有其他item处于running时,job not running的告警延迟
*/
public static long NOT_RUNNING_WARN_DELAY_MS_WHEN_JOB_RUNNING = 1000L * 60L * 60L * 2L;

static {
String refreshInterval = System.getProperty("VIP_SATURN_DASHBOARD_REFRESH_INTERVAL_MINUTE",
Expand All @@ -25,6 +29,16 @@ public class DashboardConstants {
log.error(e.getMessage(), e);
}
}

String notRunningWarnDelay = System.getProperty("VIP_SATURN_DASHBOARD_NOT_RUNNING_WARN_DELAY_MS_WHEN_JOB_RUNNING",
System.getenv("VIP_SATURN_DASHBOARD_NOT_RUNNING_WARN_DELAY_MS_WHEN_JOB_RUNNING"));
if (notRunningWarnDelay != null) {
try {
NOT_RUNNING_WARN_DELAY_MS_WHEN_JOB_RUNNING = Long.parseLong(notRunningWarnDelay);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private void doCheckAndHandleOutdatedNoRunningJobByShardingItem(List<AbnormalJob
int cversion = getCversion(curatorFrameworkOp, JobNodePath.getExecutionItemNodePath(jobName, item));
long nextFireTime = checkShardingItemState(curatorFrameworkOp, abnormalJob, enabledPath, item);
if (nextFireTime != -1 && doubleCheckShardingState(abnormalJob, item, cversion)
&& !hasOtherItemRunningBefore(curatorFrameworkOp, abnormalJob, nextFireTime)) {
&& !mayBlockWaitingRunningItemEnd(curatorFrameworkOp, abnormalJob, nextFireTime, item)) {
if (abnormalJob.getCause() == null) {
abnormalJob.setCause(AbnormalJob.Cause.NOT_RUN.name());
}
Expand All @@ -249,27 +249,52 @@ private void doCheckAndHandleOutdatedNoRunningJobByShardingItem(List<AbnormalJob
}

/**
* 判断是否有其他分片在nextFireTime之前就已经开始运行到现在
* 假如有,说明可能处于以下两种情况,作业正常:
* 1.有重新分片任务下发到/necessary节点,当前分片机器正在block等待running的分片运行结束
* 2.当前分片被failover,但是其他executor都有该job的分片任务并处于running状态,failover无法立即运行
* @return
* 判断是否可能在等待其他处于running状态的item运行结束
*
* 假如当前job有其它item在running,在以下情形下会导致当前item无法准点运行
* 1.当console下发重新分片时,会导致已经跑完的item等待running的item
* 2.executor会优先运行failover的item,可能会导致自己的item无法准点运行
* 3.需要被failover的item可能由于所有的executor都在running自己的分片,导致不能被及时failover
* 可能不止以上的情况
*
* 处理方式为:
* 在当前job有其它running的item时
* 1.考虑failover节点mtime
* 2.进一步增加告警延迟时间
* @return 返回false会触发告警
*/
private boolean hasOtherItemRunningBefore(CuratorRepository.CuratorFrameworkOp curatorFrameworkOp,
AbnormalJob abnormalJob, long nextFireTime) {
private boolean mayBlockWaitingRunningItemEnd(CuratorRepository.CuratorFrameworkOp curatorFrameworkOp,
AbnormalJob abnormalJob, long nextFireTime, String shardingItemStr) {
List<String> executionItems = curatorFrameworkOp
.getChildren(JobNodePath.getExecutionNodePath(abnormalJob.getJobName()));

boolean hasRunningItem = false;
if (!CollectionUtils.isEmpty(executionItems)) {
for (String item : executionItems) {
String runningNodePath = JobNodePath.getRunningNodePath(abnormalJob.getJobName(), item);
Stat stat = curatorFrameworkOp.getStat(runningNodePath);
if (stat != null && stat.getCtime() < nextFireTime) {
return true;
if (curatorFrameworkOp.checkExists(runningNodePath)) {
hasRunningItem = true;
break;
}
}
}
return false;

if (!hasRunningItem) {
return false;
}

//假如failover节点mtime大于nextFireTime,则更新nextFireTime
long currentTime = System.currentTimeMillis();
long failoverMtime;
String failoverNodePath = JobNodePath.getFailoverNodePath(abnormalJob.getJobName(), shardingItemStr);
if ((failoverMtime = curatorFrameworkOp.getMtime(failoverNodePath)) <= 0) {
String leaderFailoverItemPath = JobNodePath
.getLeaderFailoverItemPath(abnormalJob.getJobName(), shardingItemStr);
failoverMtime = curatorFrameworkOp.getMtime(leaderFailoverItemPath);
}

long nextFireTimeTmp = failoverMtime > nextFireTime ? failoverMtime : nextFireTime;
return nextFireTimeTmp + DashboardConstants.NOT_RUNNING_WARN_DELAY_MS_WHEN_JOB_RUNNING > currentTime;
}

private int getCversion(CuratorRepository.CuratorFrameworkOp curatorFrameworkOp, String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,12 @@ public static String getEnabledReportNodePath(final String jobName) {
return JobNodePath.getConfigNodePath(jobName, "enabledReport");
}

public static String getLeaderFailoverNodePath(final String jobName) {
return JobNodePath.getLeaderNodePath(jobName, "failover");
}

public static String getLeaderFailoverItemPath(final String jobName, final String item) {
return String.format("%s/items/%s", getLeaderFailoverNodePath(jobName), item);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.vip.saturn.job.console.service.impl.statistics.analyzer;

import com.vip.saturn.job.console.domain.AbnormalJob;
import com.vip.saturn.job.console.repository.zookeeper.CuratorRepository;
import com.vip.saturn.job.console.service.helper.DashboardConstants;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.util.Arrays;

import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class OutdatedNoRunningJobAnalyzerTest {

@Mock
private CuratorRepository.CuratorFrameworkOp curatorFrameworkOp;

@InjectMocks
private OutdatedNoRunningJobAnalyzer outdatedNoRunningJobAnalyzer;

private String namespace = "saturn-job-test.vip.com";

private String jobName = "testJob";

/**
* 测试mayBlockWaitingRunningItemEnd方法
*/
@Test
public void testMayBlockWaitingRunningItemEnd() throws Exception {
long currentTime = System.currentTimeMillis();
long nextFireTime = currentTime - 1000L * 60L * 60L * 24L * 5L;
long failoverMTime = currentTime - DashboardConstants.NOT_RUNNING_WARN_DELAY_MS_WHEN_JOB_RUNNING;

//反射参数
Class[] parameterTyps = {CuratorRepository.CuratorFrameworkOp.class, AbnormalJob.class, long.class, String.class};
AbnormalJob abnormalJob = new AbnormalJob(jobName, null, null, null);
Object[] invokeParams = {curatorFrameworkOp, abnormalJob, nextFireTime, "2"};
String methodName = "mayBlockWaitingRunningItemEnd";

//1.没有running的item,返回false
when(curatorFrameworkOp.getChildren(eq("/$Jobs/testJob/execution"))).thenReturn(Arrays.asList("0", "1", "2"));
when(curatorFrameworkOp.checkExists(anyString())).thenReturn(false);
Object result = MethodUtils
.invokeMethod(outdatedNoRunningJobAnalyzer, true, methodName, invokeParams, parameterTyps);
Assert.assertEquals(false, result);

//2.加上NOT_RUNNING_WARN_DELAY_MS_WHEN_JOB_RUNNING也已过期,返回false
when(curatorFrameworkOp.checkExists("/$Jobs/testJob/execution/0/running")).thenReturn(true);
when(curatorFrameworkOp.getMtime("/$Jobs/testJob/execution/2/failover")).thenReturn(failoverMTime);
result = MethodUtils
.invokeMethod(outdatedNoRunningJobAnalyzer, true, methodName, invokeParams, parameterTyps);
Assert.assertEquals(false, result);

//3.加上NOT_RUNNING_WARN_DELAY_MS_WHEN_JOB_RUNNING后未过期,返回true
when(curatorFrameworkOp.getMtime("/$Jobs/testJob/execution/2/failover")).thenReturn(failoverMTime + 1000);
result = MethodUtils
.invokeMethod(outdatedNoRunningJobAnalyzer, true, methodName, invokeParams, parameterTyps);
Assert.assertEquals(true, result);
}

}

0 comments on commit 7df48c2

Please sign in to comment.