Skip to content

Commit

Permalink
Merge pull request #1132 from wangyu096/feature/rolling
Browse files Browse the repository at this point in the history
feature: 原子操作能力支持滚动执行 #446
  • Loading branch information
jsonwan authored Jul 15, 2022
2 parents 321265f + 552bd15 commit 82df0d6
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ public void afterCompletion(HttpServletRequest request, HttpServletResponse resp
log.warn("status {} given by {}", response.getStatus(), handler);
}
if (ex != null) {
log.error("After completion|{}|{}|{}|{}|{}|{}", JobContextUtil.getRequestId(), response.getStatus(),
log.error("After completion|{}|{}|{}|{}|{}|{}|{}", JobContextUtil.getRequestId(), response.getStatus(),
JobContextUtil.getAppResourceScope(),
JobContextUtil.getUsername(), System.currentTimeMillis() - JobContextUtil.getStartTime(),
request.getRequestURI(), ex);
request.getRequestURI(), ex.getMessage());
} else {
log.debug("After completion|{}|{}|{}|{}|{}|{}", JobContextUtil.getRequestId(), response.getStatus(),
JobContextUtil.getAppResourceScope(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static HostDTO fromHostIdAndAgentId(Long hostId, String agentId) {
public static HostDTO fromHostIdAndCloudIp(Long hostId, String cloudIp) {
HostDTO hostDTO = new HostDTO();
hostDTO.setHostId(hostId);
if (StringUtils.isNotEmpty(cloudIp)) {
if (StringUtils.isNotBlank(cloudIp)) {
String[] ipProps = cloudIp.split(IpUtils.COLON);
hostDTO.setBkCloudId(Long.valueOf(ipProps[0]));
hostDTO.setIp(ipProps[1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Response<LogExportJobInfoVO> requestDownloadLogFile(
Long hostId,
@ApiParam(value = "ip", name = "ip")
@RequestParam(value = "ip", required = false)
String ip,
String cloudIp,
@ApiParam(value = "重新打包", name = "repackage")
@RequestParam(value = "repackage", required = false)
Boolean repackage
Expand Down Expand Up @@ -106,6 +106,6 @@ ResponseEntity<StreamingResponseBody> downloadLogFile(
Long hostId,
@ApiParam(value = "ip", name = "ip")
@RequestParam(value = "ip", required = false)
String ip
String cloudIp
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public Response<LogExportJobInfoVO> requestDownloadLogFile(String username,
String scopeId,
Long stepInstanceId,
Long hostId,
String ip,
String cloudIp,
Boolean repackage) {
Long appId = appResourceScope.getAppId();

Expand All @@ -133,7 +133,7 @@ public Response<LogExportJobInfoVO> requestDownloadLogFile(String username,

if (!repackage) {
log.debug("Do not need repackage, check exist job");
LogExportJobInfoDTO exportInfo = logExportService.getExportInfo(appId, stepInstanceId, hostId, ip);
LogExportJobInfoDTO exportInfo = logExportService.getExportInfo(appId, stepInstanceId, hostId, cloudIp);
if (exportInfo != null) {
log.debug("Find exist job info|{}", exportInfo);
switch (exportInfo.getStatus()) {
Expand Down Expand Up @@ -165,18 +165,18 @@ public Response<LogExportJobInfoVO> requestDownloadLogFile(String username,

int executeCount = stepInstance.getExecuteCount();

String logFileName = getLogFileName(stepInstanceId, hostId, executeCount);
String logFileName = getLogFileName(stepInstanceId, hostId, cloudIp, executeCount);
if (StringUtils.isBlank(logFileName)) {
throw new InternalException(ErrorCode.EXPORT_STEP_EXECUTION_LOG_FAIL);
}

LogExportJobInfoDTO exportInfo = logExportService.packageLogFile(username, appId, stepInstanceId, hostId, ip,
LogExportJobInfoDTO exportInfo = logExportService.packageLogFile(username, appId, stepInstanceId, hostId, cloudIp,
executeCount, logFileDir, logFileName, repackage);
return Response.buildSuccessResp(LogExportJobInfoDTO.toVO(exportInfo));
}

private String getLogFileName(Long stepInstanceId, Long hostId, int executeCount) {
String fileName = makeExportLogFileName(stepInstanceId, executeCount, hostId);
private String getLogFileName(Long stepInstanceId, Long hostId, String cloudIp, int executeCount) {
String fileName = makeExportLogFileName(stepInstanceId, executeCount, hostId, cloudIp);
String logFileName = fileName + ".log";

File dir = new File(logFileDir);
Expand Down Expand Up @@ -244,28 +244,29 @@ public ResponseEntity<StreamingResponseBody> downloadLogFile(HttpServletResponse
String scopeId,
Long stepInstanceId,
Long hostId,
String ip) {
String cloudIp) {
Long appId = appResourceScope.getAppId();

StepInstanceBaseDTO stepInstance = taskInstanceService.getBaseStepInstance(stepInstanceId);
if (!stepInstance.getAppId().equals(appId)) {
log.info("StepInstance: {} is not in app: {}", stepInstance.getId(), appResourceScope.getAppId());
return ResponseEntity.notFound().build();
}

int executeCount = stepInstance.getExecuteCount();

LogExportJobInfoDTO exportInfo;

boolean isGetByHost = hostId != null;
boolean isGetByHost = hostId != null || StringUtils.isNotBlank(cloudIp);
if (isGetByHost) {
String logFileName = getLogFileName(stepInstanceId, hostId, executeCount);
String logFileName = getLogFileName(stepInstanceId, hostId, cloudIp, executeCount);
if (StringUtils.isBlank(logFileName)) {
return ResponseEntity.notFound().build();
}
exportInfo = logExportService.packageLogFile(username, appId, stepInstanceId, hostId, ip, executeCount,
exportInfo = logExportService.packageLogFile(username, appId, stepInstanceId, hostId, cloudIp, executeCount,
logFileDir, logFileName, false);
} else {
exportInfo = logExportService.getExportInfo(appId, stepInstanceId, hostId, ip);
exportInfo = logExportService.getExportInfo(appId, stepInstanceId, hostId, cloudIp);
}

if (exportInfo != null) {
Expand Down Expand Up @@ -311,12 +312,14 @@ public ResponseEntity<StreamingResponseBody> downloadLogFile(HttpServletResponse
return ResponseEntity.notFound().build();
}

private String makeExportLogFileName(Long stepInstanceId, Integer executeCount, Long hostId) {
private String makeExportLogFileName(Long stepInstanceId, Integer executeCount, Long hostId, String cloudIp) {
StringBuilder fileName = new StringBuilder();
fileName.append("bk_job_export_log_");
fileName.append("step_").append(stepInstanceId).append("_").append(executeCount).append("_");
if (hostId != null) {
fileName.append(hostId).append("_");
} else if (StringUtils.isNotBlank(cloudIp)) {
fileName.append(cloudIp).append("_");
}
fileName.append(DateUtils.formatLocalDateTime(LocalDateTime.now(), "yyyyMMddHHmmssSSS"));
return fileName.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,16 @@ public String getDisplayIp() {
}
}

public HostDTO getHost() {
HostDTO host = new HostDTO();
host.setHostId(getHostId());
host.setIp(getIp());
host.setBkCloudId(getBkCloudId());
host.setBkCloudName(getBkCloudName());
host.setDisplayIp(getDisplayIp());
host.setAgentId(getAgentId());
return host;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public LogExportServiceImpl(LogService logService,

@Override
public LogExportJobInfoDTO packageLogFile(String username, Long appId, Long stepInstanceId, Long hostId,
String ip, int executeCount,
String cloudIp, int executeCount,
String logFileDir, String logFileName, Boolean repackage) {
log.debug("Package log file for {}|{}|{}|{}|{}|{}|{}|{}", username, appId, stepInstanceId, hostId, executeCount,
logFileDir, logFileName, repackage);
LogExportJobInfoDTO exportJobInfo = new LogExportJobInfoDTO();
exportJobInfo.setJobKey(getExportJobKey(appId, stepInstanceId, hostId, ip));
exportJobInfo.setJobKey(getExportJobKey(appId, stepInstanceId, hostId, cloudIp));
exportJobInfo.setStatus(LogExportStatusEnum.INIT);

if (repackage) {
Expand All @@ -125,10 +125,10 @@ public LogExportJobInfoDTO packageLogFile(String username, Long appId, Long step
}
saveExportInfo(exportJobInfo);

boolean isGetByHost = hostId != null;
boolean isGetByHost = hostId != null || StringUtils.isNotBlank(cloudIp);

if (isGetByHost) {
doPackage(exportJobInfo, stepInstanceId, hostId, ip, executeCount, logFileDir, logFileName);
doPackage(exportJobInfo, stepInstanceId, hostId, cloudIp, executeCount, logFileDir, logFileName);
} else {
logExportExecutor.execute(() -> {
String requestId = UUID.randomUUID().toString();
Expand All @@ -141,7 +141,7 @@ public LogExportJobInfoDTO packageLogFile(String username, Long appId, Long step
exportJobInfo.setStatus(LogExportStatusEnum.PROCESSING);
saveExportInfo(exportJobInfo);

doPackage(exportJobInfo, stepInstanceId, hostId, ip, executeCount, logFileDir, logFileName);
doPackage(exportJobInfo, stepInstanceId, hostId, cloudIp, executeCount, logFileDir, logFileName);
} else {
log.error("Job already running!|{}|{}|{}|{}", requestId, appId, stepInstanceId, hostId);
}
Expand Down Expand Up @@ -177,29 +177,27 @@ private void deleteExportInfo(String jobKey) {
redisTemplate.delete(jobKey);
}

private String getExportJobKey(Long appId, Long stepInstanceId, Long hostId, String ip) {
private String getExportJobKey(Long appId, Long stepInstanceId, Long hostId, String cloudIp) {
String key = EXPORT_KEY_PREFIX + appId + ":" + stepInstanceId + ":";
if (hostId != null) {
key = key + hostId;
} else {
key = key + ip;
key = key + cloudIp;
}
return key;
}

private void doPackage(LogExportJobInfoDTO exportJobInfo, long stepInstanceId, Long hostId,
String ip, int executeCount, String logFileDir, String logFileName) {
StepInstanceBaseDTO stepInstance = taskInstanceService.getBaseStepInstance(stepInstanceId);
boolean isGetByHost = hostId != null;
boolean isGetByHost = hostId != null || StringUtils.isNotBlank(ip);
File logFile = new File(logFileDir + logFileName);

StopWatch watch = new StopWatch("exportJobLog");
watch.start("listJobIps");
List<AgentTaskDTO> gseAgentTasks = new ArrayList<>();
if (isGetByHost) {
HostDTO host = new HostDTO();
host.setHostId(hostId);
host.setIp(ip);
HostDTO host = HostDTO.fromHostIdAndCloudIp(hostId, ip);
AgentTaskDTO agentTask = scriptAgentTaskService.getAgentTaskByHost(stepInstance, executeCount, null,
host);
if (agentTask != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,18 @@
import com.tencent.bk.job.execute.dao.StepInstanceDAO;
import com.tencent.bk.job.execute.dao.TaskInstanceDAO;
import com.tencent.bk.job.execute.engine.consts.AgentTaskStatus;
import com.tencent.bk.job.execute.model.AgentTaskDTO;
import com.tencent.bk.job.execute.model.AgentTaskDetailDTO;
import com.tencent.bk.job.execute.model.AgentTaskResultGroupBaseDTO;
import com.tencent.bk.job.execute.model.AgentTaskResultGroupDTO;
import com.tencent.bk.job.execute.model.ConfirmStepInstanceDTO;
import com.tencent.bk.job.execute.model.FileSourceTaskLogDTO;
import com.tencent.bk.job.execute.model.OperationLogDTO;
import com.tencent.bk.job.execute.model.RollingConfigDTO;
import com.tencent.bk.job.execute.model.StepExecutionDTO;
import com.tencent.bk.job.execute.model.StepExecutionDetailDTO;
import com.tencent.bk.job.execute.model.StepExecutionRecordDTO;
import com.tencent.bk.job.execute.model.StepExecutionResultQuery;
import com.tencent.bk.job.execute.model.StepInstanceBaseDTO;
import com.tencent.bk.job.execute.model.StepInstanceRollingTaskDTO;
import com.tencent.bk.job.execute.model.TaskExecuteResultDTO;
import com.tencent.bk.job.execute.model.TaskExecutionDTO;
import com.tencent.bk.job.execute.model.TaskInstanceDTO;
import com.tencent.bk.job.execute.model.TaskInstanceQuery;
import com.tencent.bk.job.execute.model.*;
import com.tencent.bk.job.execute.model.inner.CronTaskExecuteResult;
import com.tencent.bk.job.execute.model.inner.ServiceCronTaskExecuteResultStatistics;
import com.tencent.bk.job.execute.service.FileAgentTaskService;
import com.tencent.bk.job.execute.service.GseTaskService;
import com.tencent.bk.job.execute.service.HostService;
import com.tencent.bk.job.execute.service.LogService;
import com.tencent.bk.job.execute.service.RollingConfigService;
import com.tencent.bk.job.execute.service.ScriptAgentTaskService;
import com.tencent.bk.job.execute.service.StepInstanceRollingTaskService;
import com.tencent.bk.job.execute.service.TaskInstanceService;
import com.tencent.bk.job.execute.service.TaskOperationLogService;
import com.tencent.bk.job.execute.service.TaskResultService;
import com.tencent.bk.job.execute.service.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;

import static com.tencent.bk.job.common.constant.Order.DESCENDING;
Expand Down Expand Up @@ -958,7 +925,7 @@ public List<HostDTO> getHostsByResultType(String username,
return Collections.emptyList();
}
List<HostDTO> hosts = agentTaskGroupByResultType.stream()
.map(agentTask -> HostDTO.fromHostId(agentTask.getHostId()))
.map(AgentTaskDetailDTO::getHost)
.collect(Collectors.toList());
if (filterByKeyword && CollectionUtils.isNotEmpty(matchHostIds)) {
List<HostDTO> finalHosts = new ArrayList<>();
Expand Down

0 comments on commit 82df0d6

Please sign in to comment.