Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Tencent/bk-job into issue…
Browse files Browse the repository at this point in the history
…_261
  • Loading branch information
wangyu096 committed Jun 14, 2024
2 parents 8fd13f4 + 4985e50 commit f882244
Show file tree
Hide file tree
Showing 63 changed files with 2,139 additions and 561 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.crontab.listener.event;
package com.tencent.bk.job.common.event;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.tencent.bk.job.common.util.date.DateUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ public static HttpReq genSimpleJsonReq(String url, Object body) {
httpReq.setHeaders(headerList.toArray(headers));
return httpReq;
}

public static HttpReq genUrlGetReq(String url) {
HttpReq httpReq = new HttpReq();
httpReq.setUrl(url);
return httpReq;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
*/
public interface JobHttpClient {

String get(HttpReq req);

String post(HttpReq req);

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ public JobHttpClientImpl(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}

@Override
public String get(HttpReq req) {
logReq(req);
ResponseEntity<String> respEntity = restTemplate.getForEntity(
req.getUrl(),
String.class
);
if (respEntity.getStatusCode() == HttpStatus.OK) {
String respStr = respEntity.getBody();
logRespStr(respStr);
return respStr;
}
logAndThrow(respEntity);
return null;
}

@Override
public String post(HttpReq req) {
logReq(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package com.tencent.bk.job.crontab.listener.event;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.tencent.bk.job.common.event.Event;
import com.tencent.bk.job.crontab.constant.CrontabActionEnum;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@

public interface FileSourceTaskLogDAO {

void insertOrUpdateFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskLog);
int insertFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskLog);

int updateFileSourceTaskLogByStepInstance(FileSourceTaskLogDTO fileSourceTaskLog);

FileSourceTaskLogDTO getFileSourceTaskLog(long stepInstanceId, int executeCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.tencent.bk.job.execute.model.FileSourceTaskLogDTO;
import com.tencent.bk.job.execute.model.tables.FileSourceTaskLog;
import com.tencent.bk.job.execute.model.tables.records.FileSourceTaskLogRecord;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.UpdateSetFirstStep;
Expand All @@ -37,6 +38,9 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.List;

@Repository
public class FileSourceTaskLogDAOImpl implements FileSourceTaskLogDAO {
FileSourceTaskLog defaultTable = FileSourceTaskLog.FILE_SOURCE_TASK_LOG;
Expand Down Expand Up @@ -65,9 +69,9 @@ private FileSourceTaskLogDTO extractInfo(Record record) {
}

@Override
public void insertOrUpdateFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskLog) {
public int insertFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskLog) {
FileSourceTaskLog t = FileSourceTaskLog.FILE_SOURCE_TASK_LOG;
defaultContext.insertInto(
return defaultContext.insertInto(
t,
t.STEP_INSTANCE_ID,
t.EXECUTE_COUNT,
Expand All @@ -84,27 +88,37 @@ public void insertOrUpdateFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskL
fileSourceTaskLog.getTotalTime(),
JooqDataTypeUtil.toByte(fileSourceTaskLog.getStatus()),
fileSourceTaskLog.getFileSourceBatchTaskId()
).onDuplicateKeyUpdate()
.set(t.START_TIME, fileSourceTaskLog.getStartTime())
.set(t.END_TIME, fileSourceTaskLog.getEndTime())
.set(t.TOTAL_TIME, fileSourceTaskLog.getTotalTime())
.set(t.STATUS, JooqDataTypeUtil.toByte(fileSourceTaskLog.getStatus()))
.set(t.FILE_SOURCE_BATCH_TASK_ID, fileSourceTaskLog.getFileSourceBatchTaskId())
).execute();
}

@Override
public int updateFileSourceTaskLogByStepInstance(FileSourceTaskLogDTO fileSourceTaskLog) {
List<Condition> conditionList = new ArrayList<>();
conditionList.add(defaultTable.STEP_INSTANCE_ID.eq(fileSourceTaskLog.getStepInstanceId()));
conditionList.add(defaultTable.EXECUTE_COUNT.eq(fileSourceTaskLog.getExecuteCount()));
return defaultContext.update(defaultTable)
.set(defaultTable.START_TIME, fileSourceTaskLog.getStartTime())
.set(defaultTable.END_TIME, fileSourceTaskLog.getEndTime())
.set(defaultTable.TOTAL_TIME, fileSourceTaskLog.getTotalTime())
.set(defaultTable.STATUS, JooqDataTypeUtil.toByte(fileSourceTaskLog.getStatus()))
.set(defaultTable.FILE_SOURCE_BATCH_TASK_ID, fileSourceTaskLog.getFileSourceBatchTaskId())
.where(conditionList)
.limit(1)
.execute();
}

@Override
public FileSourceTaskLogDTO getFileSourceTaskLog(long stepInstanceId, int executeCount) {
FileSourceTaskLog t = FileSourceTaskLog.FILE_SOURCE_TASK_LOG;
Record record = defaultContext.select(
t.STEP_INSTANCE_ID,
t.EXECUTE_COUNT,
t.START_TIME,
t.END_TIME,
t.TOTAL_TIME,
t.STATUS,
t.FILE_SOURCE_BATCH_TASK_ID
).from(t)
t.STEP_INSTANCE_ID,
t.EXECUTE_COUNT,
t.START_TIME,
t.END_TIME,
t.TOTAL_TIME,
t.STATUS,
t.FILE_SOURCE_BATCH_TASK_ID
).from(t)
.where(t.STEP_INSTANCE_ID.eq(stepInstanceId))
.and(t.EXECUTE_COUNT.eq(executeCount))
.fetchOne();
Expand All @@ -114,8 +128,8 @@ public FileSourceTaskLogDTO getFileSourceTaskLog(long stepInstanceId, int execut
@Override
public FileSourceTaskLogDTO getFileSourceTaskLogByBatchTaskId(String fileSourceBatchTaskId) {
Record record = defaultContext.select(defaultTable.STEP_INSTANCE_ID, defaultTable.EXECUTE_COUNT,
defaultTable.START_TIME, defaultTable.END_TIME, defaultTable.TOTAL_TIME,
defaultTable.STATUS, defaultTable.FILE_SOURCE_BATCH_TASK_ID).from(defaultTable)
defaultTable.START_TIME, defaultTable.END_TIME, defaultTable.TOTAL_TIME,
defaultTable.STATUS, defaultTable.FILE_SOURCE_BATCH_TASK_ID).from(defaultTable)
.where(defaultTable.FILE_SOURCE_BATCH_TASK_ID.eq(fileSourceBatchTaskId))
.fetchOne();
return extractInfo(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.exception.DataAccessException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -119,6 +120,12 @@ private void setTaskInfoIntoThirdFileSource(StepInstanceBaseDTO stepInstance,
taskInfoDTO.getIpProtocol(),
taskInfoDTO.getIp()
);
log.info(
"[{}]: fileSourceTaskId={} start, sourceHost={}",
stepInstance.getUniqueKey(),
fileSourceTaskId,
hostDTO
);
hostDTOList.add(hostDTO);
fileSourceDTO.getServers().setStaticIpList(hostDTOList);
fileSourceDTO.getServers().buildMergedExecuteObjects(stepInstance.isSupportExecuteObjectFeature());
Expand Down Expand Up @@ -228,7 +235,7 @@ public void prepareThirdFileAsync(
log.debug("[{}]: fileSourceList={}", stepInstance.getUniqueKey(), fileSourceList);
// 放进文件源下载任务进度表中
FileSourceTaskLogDTO fileSourceTaskLogDTO = buildInitFileSourceTaskLog(stepInstance, batchTaskInfoDTO);
fileSourceTaskLogDAO.insertOrUpdateFileSourceTaskLog(fileSourceTaskLogDTO);
insertOrUpdateFileSourceTaskLog(fileSourceTaskLogDTO);
// 更新文件源任务状态
stepInstanceService.updateResolvedSourceFile(stepInstance.getId(), fileSourceList);
// 异步轮询文件下载任务
Expand All @@ -242,6 +249,28 @@ public void prepareThirdFileAsync(
taskMap.put(stepInstance.getUniqueKey(), task);
}

private void insertOrUpdateFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskLogDTO) {
boolean shouldRetry;
do {
try {
int insertedNum = fileSourceTaskLogDAO.insertFileSourceTaskLog(fileSourceTaskLogDTO);
log.info("{} fileSourceTaskLog inserted", insertedNum);
return;
} catch (DataAccessException e) {
String message = e.getMessage();
if (message != null && message.equalsIgnoreCase("Deadlock found")) {
log.info("Deadlock found when insert fileSourceTaskLog, retry", e);
shouldRetry = true;
} else {
log.info("Fail to insert fileSourceTaskLog, update instead", e);
shouldRetry = false;
}
}
} while (shouldRetry);
int updatedNum = fileSourceTaskLogDAO.updateFileSourceTaskLogByStepInstance(fileSourceTaskLogDTO);
log.info("{} fileSourceTaskLog updated", updatedNum);
}

/**
* 立即继续步骤
*
Expand Down
Loading

0 comments on commit f882244

Please sign in to comment.