Skip to content

Commit

Permalink
fix: file-worker任务状态更新请求无序到达导致第三方源文件偶现分发失败 TencentBlueKing#2434
Browse files Browse the repository at this point in the history
统计子任务完成状态时加共享锁,防止应用统计结果的过程中数据被修改
  • Loading branch information
jsonwan committed Sep 15, 2023
1 parent 935de87 commit 3d3a640
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface FileSourceTaskDAO {

FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id);

Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status);
Long countFileSourceTasksByBatchTaskIdForUpdate(String batchTaskId, Byte status);

List<FileSourceTaskDTO> listByBatchTaskId(String batchTaskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection<Byte>

List<FileTaskDTO> listFileTasks(String fileSourceTaskId);

Long countFileTask(String fileSourceTaskId, Byte status);
Long countFileTaskForUpdate(String fileSourceTaskId, Byte status);
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,23 +190,23 @@ val record = dslContext.selectFrom(defaultTable).where(
}

@Override
public Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status) {
public Long countFileSourceTasksByBatchTaskIdForUpdate(String batchTaskId, Byte status) {
List<Condition> conditions = new ArrayList<>();
if (batchTaskId != null) {
conditions.add(defaultTable.BATCH_TASK_ID.eq(batchTaskId));
}
if (status != null) {
conditions.add(defaultTable.STATUS.eq(status));
}
return countFileSourceTasksByConditions(conditions);
return countFileSourceTasksByConditionsForUpdate(conditions);
}

public Long countFileSourceTasksByConditions(Collection<Condition> conditions) {
public Long countFileSourceTasksByConditionsForUpdate(Collection<Condition> conditions) {
val query = dslContext.select(
DSL.countDistinct(defaultTable.ID)
).from(defaultTable)
.where(conditions);
return query.fetchOne(0, Long.class);
return query.forUpdate().fetchOne(0, Long.class);
}

public List<FileSourceTaskDTO> listByConditions(Collection<Condition> conditions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ val record = dslContext.selectFrom(defaultTable).where(
}
}

public Long countFileTasksByConditions(Collection<Condition> conditions) {
public Long countFileTasksByConditionsForUpdate(Collection<Condition> conditions) {
val query = dslContext.select(
DSL.countDistinct(defaultTable.ID)
).from(defaultTable)
.where(conditions);
return query.fetchOne(0, Long.class);
return query.forUpdate().fetchOne(0, Long.class);
}

@Override
Expand Down Expand Up @@ -196,15 +196,15 @@ public List<FileTaskDTO> listFileTasks(String fileSourceTaskId) {
}

@Override
public Long countFileTask(String fileSourceTaskId, Byte status) {
public Long countFileTaskForUpdate(String fileSourceTaskId, Byte status) {
List<Condition> conditions = new ArrayList<>();
if (fileSourceTaskId != null) {
conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId));
}
if (status != null) {
conditions.add(defaultTable.STATUS.eq(status));
}
return countFileTasksByConditions(conditions);
return countFileTasksByConditionsForUpdate(conditions);
}

private FileTaskDTO convertRecordToDto(FileTaskRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,10 @@ private void writeLog(FileSourceTaskDTO fileSourceTaskDTO,
redisTemplate.expireAt(PREFIX_REDIS_TASK_LOG + taskId, new Date(System.currentTimeMillis() + 3600 * 1000));
}

private void notifyFileTaskStatusChangeListeners(FileTaskDTO fileTaskDTO, FileSourceTaskDTO fileSourceTaskDTO,
FileWorkerDTO fileWorkerDTO, TaskStatusEnum previousStatus,
private void notifyFileTaskStatusChangeListeners(FileTaskDTO fileTaskDTO,
FileSourceTaskDTO fileSourceTaskDTO,
FileWorkerDTO fileWorkerDTO,
TaskStatusEnum previousStatus,
TaskStatusEnum currentStatus) {
TaskContext context = new DefaultTaskContext(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO);
if (!fileTaskStatusChangeListenerList.isEmpty()) {
Expand Down Expand Up @@ -293,8 +295,7 @@ public String updateFileSourceTask(String taskId, String filePath, String downlo
}
// 通知关注者
if (status != previousStatus) {
notifyFileTaskStatusChangeListeners(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO,
TaskStatusEnum.valueOf(fileTaskDTO.getStatus()), status);
notifyFileTaskStatusChangeListeners(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO, previousStatus, status);
}
// 进度上报
writeLog(fileSourceTaskDTO, fileWorkerDTO, filePath, fileSize, speed, progress, content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

public interface FileSourceTaskStatusChangeListener {
/**
* Invoked when FileSourceTask status change
* 在单个文件源任务状态改变时被调用
*
* @param context
* @param previousStatus
* @param currentStatus
* @return true if need to stop event dispatch to later listeners
* @param context 任务上下文
* @param previousStatus 上一个状态
* @param currentStatus 当前状态
* @return true 如果需要终止事件向后续的Listener传递则返回true
*/
boolean onStatusChange(TaskContext context, TaskStatusEnum previousStatus, TaskStatusEnum currentStatus);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

public interface FileTaskStatusChangeListener {
/**
* Invoked when FileTask status change
* 在单个文件任务状态改变时被调用
*
* @param context
* @param previousStatus
* @param currentStatus
* @return true if need to stop event dispatch to later listeners
* @param context 任务上下文
* @param previousStatus 上一个状态
* @param currentStatus 当前状态
* @return true 如果需要终止事件向后续的Listener传递则返回true
*/
boolean onStatusChange(TaskContext context, TaskStatusEnum previousStatus, TaskStatusEnum currentStatus);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public boolean onStatusChange(TaskContext context, TaskStatusEnum previousStatus
}
if (TaskStatusEnum.SUCCESS.equals(currentStatus)) {
//检查批量任务是否成功
if (fileSourceTaskDAO.countFileSourceTasksByBatchTaskId(batchTaskId, null)
.equals(fileSourceTaskDAO.countFileSourceTasksByBatchTaskId(
if (fileSourceTaskDAO.countFileSourceTasksByBatchTaskIdForUpdate(batchTaskId, null)
.equals(fileSourceTaskDAO.countFileSourceTasksByBatchTaskIdForUpdate(
batchTaskId, TaskStatusEnum.SUCCESS.getStatus()))) {
// 批量任务成功
fileSourceBatchTaskDTO.setStatus(TaskStatusEnum.SUCCESS.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ public boolean onStatusChange(TaskContext context, TaskStatusEnum previousStatus
}
} else if (TaskStatusEnum.SUCCESS.equals(currentStatus)) {
//检查主任务是否成功
if (fileTaskDAO.countFileTask(fileSourceTaskId, null).equals(fileTaskDAO.countFileTask(fileSourceTaskId,
TaskStatusEnum.SUCCESS.getStatus()))) {
if (fileTaskDAO.countFileTaskForUpdate(fileSourceTaskId, null).equals(
fileTaskDAO.countFileTaskForUpdate(fileSourceTaskId, TaskStatusEnum.SUCCESS.getStatus()))) {
// 主任务成功
fileSourceTaskDTO.setStatus(TaskStatusEnum.SUCCESS.getStatus());
fileSourceTaskDAO.updateFileSourceTask(fileSourceTaskDTO);
Expand Down

0 comments on commit 3d3a640

Please sign in to comment.