From d480709eae26c18f9ba04a519d122a6e4b9c9d9f Mon Sep 17 00:00:00 2001 From: narro wizard Date: Tue, 22 Oct 2024 10:45:54 +0800 Subject: [PATCH] feat: not update sub task progress if progress less than 1 pct [Refactor][core]Data inflation when using postgres #8142 --- backend/core/runner/run_task.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go index 98b913f284b..543538557dc 100644 --- a/backend/core/runner/run_task.go +++ b/backend/core/runner/run_task.go @@ -20,10 +20,11 @@ package runner import ( gocontext "context" "fmt" - "github.com/apache/incubator-devlake/core/models/common" "strings" "time" + "github.com/apache/incubator-devlake/core/models/common" + "github.com/apache/incubator-devlake/core/context" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -354,6 +355,7 @@ func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDeta Model: common.Model{ID: taskId}, } subtask := &models.Subtask{} + originalFinishedRecords := progressDetail.FinishedRecords switch p.Type { case plugin.TaskSetProgress: progressDetail.TotalSubTasks = p.Total @@ -374,14 +376,20 @@ func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDeta progressDetail.SubTaskName = p.SubTaskName progressDetail.SubTaskNumber = p.SubTaskNumber } - // update subtask progress - where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) - err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ - {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, - }, where) - if err != nil { - basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") + currentFinishedRecords := progressDetail.FinishedRecords + currentTotalRecords := progressDetail.TotalRecords + // do not update progress if progress less than 1% + if float64(currentFinishedRecords-originalFinishedRecords)*0.1/float64(currentTotalRecords) > 0.01 { + // update subtask progress + where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) + err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ + {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, + }, where) + if err != nil { + basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") + } } + } func runSubtask(