Skip to content

Commit

Permalink
feat: add subtask logs details api (#7205)
Browse files Browse the repository at this point in the history
* fix: add subtask logs details api and update some subtasks name

* fix: gitextractor name

* fix: subtask migration add fix panic

* feat: add completed rate

* fix: rename some plugins subtasks name

* feat: add total status field

* fix: update task status logic

* feat: get lastest subtask plugins
  • Loading branch information
abeizn authored Mar 26, 2024
1 parent 7b59c68 commit cb4c0d7
Show file tree
Hide file tree
Showing 138 changed files with 526 additions and 172 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package migrationscripts

import (
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
)

var _ plugin.MigrationScript = (*addSubtaskField)(nil)

type subtask20240322 struct {
FinishedRecords int `json:"finishedRecords"`
Sequence int `json:"sequence"`
IsCollector bool `json:"isCollector"`
IsFailed bool `json:"isFailed"`
Message string `json:"message"`
}

func (subtask20240322) TableName() string {
return "_devlake_subtasks"
}

type addSubtaskField struct{}

func (*addSubtaskField) Up(basicRes context.BasicRes) errors.Error {
return basicRes.GetDal().AutoMigrate(subtask20240322{})
}

func (*addSubtaskField) Version() uint64 {
return 20240322111247
}

func (*addSubtaskField) Name() string {
return "add some fields to _devlake_subtasks table"
}
1 change: 1 addition & 0 deletions backend/core/models/migrationscripts/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,6 @@ func All() []plugin.MigrationScript {
new(addOriginalEnvironmentToCicdDeploymentsAndCicdDeploymentCommits),
new(addSubtabknameToDeployment),
new(addStore),
new(addSubtaskField),
}
}
72 changes: 60 additions & 12 deletions backend/core/models/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ const (
var PendingTaskStatus = []string{TASK_CREATED, TASK_RERUN, TASK_RUNNING}

type TaskProgressDetail struct {
TotalSubTasks int `json:"totalSubTasks"`
FinishedSubTasks int `json:"finishedSubTasks"`
TotalRecords int `json:"totalRecords"`
FinishedRecords int `json:"finishedRecords"`
SubTaskName string `json:"subTaskName"`
SubTaskNumber int `json:"subTaskNumber"`
TotalSubTasks int `json:"totalSubTasks"`
FinishedSubTasks int `json:"finishedSubTasks"`
TotalRecords int `json:"totalRecords"`
FinishedRecords int `json:"finishedRecords"`
SubTaskName string `json:"subTaskName"`
SubTaskNumber int `json:"subTaskNumber"`
CollectSubtaskNumber int `json:"collectSubtaskNumber"`
OtherSubtaskNumber int `json:"otherSubtaskNumber"`
}

type NewTask struct {
Expand Down Expand Up @@ -79,14 +81,60 @@ func (Task) TableName() string {

type Subtask struct {
common.Model
TaskID uint64 `json:"task_id" gorm:"index"`
Name string `json:"name" gorm:"index"`
Number int `json:"number"`
BeganAt *time.Time `json:"beganAt"`
FinishedAt *time.Time `json:"finishedAt" gorm:"index"`
SpentSeconds int64 `json:"spentSeconds"`
TaskID uint64 `json:"task_id" gorm:"index"`
Name string `json:"name" gorm:"index"`
Number int `json:"number"`
BeganAt *time.Time `json:"beganAt"`
FinishedAt *time.Time `json:"finishedAt" gorm:"index"`
SpentSeconds int64 `json:"spentSeconds"`
FinishedRecords int `json:"finishedRecords"`
Sequence int `json:"sequence"`
IsCollector bool `json:"isCollector"`
IsFailed bool `json:"isFailed"`
Message string `json:"message"`
}

func (Subtask) TableName() string {
return "_devlake_subtasks"
}

type SubtaskDetails struct {
ID uint64 `json:"id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
TaskID uint64 `json:"task_id"`
Name string `json:"name"`
Number int `json:"number"`
BeganAt *time.Time `json:"began_at"`
FinishedAt *time.Time `json:"finished_at"`
SpentSeconds int64 `json:"spent_seconds"`
FinishedRecords int `json:"finished_records"`
Sequence int `json:"sequence"`
IsCollector bool `json:"is_collector"`
IsFailed bool `json:"is_failed"`
Message string `json:"message"`
}

type SubtasksInfo struct {
ID uint64 `json:"id"`
PipelineID uint64 `json:"pipelineId"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
BeganAt *time.Time `json:"beganAt"`
FinishedAt *time.Time `json:"finishedAt"`
Plugin string `json:"plugin"`
Options any `json:"options"`
Status string `json:"status"`
FailedSubTask string `json:"failedSubTask"`
Message string `json:"message"`
ErrorName string `json:"errorName"`
SpentSeconds int `json:"spentSeconds"`
SubtaskDetails []*SubtaskDetails `json:"subtaskDetails"`
}

type SubTasksOuput struct {
SubtasksInfo []SubtasksInfo `json:"subtasks"`
CompletionRate float64 `json:"completionRate"`
Status string `json:"status"`
Count int64 `json:"count"`
}
12 changes: 7 additions & 5 deletions backend/core/plugin/plugin_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ const (
)

type RunningProgress struct {
Type ProgressType
Current int
Total int
SubTaskName string
SubTaskNumber int
Type ProgressType
Current int
Total int
SubTaskName string
SubTaskNumber int
CollectSubtaskNumber int
OtherSubtaskNumber int
} // nolint

// ExecContext This interface define all resources that needed for task/subtask execution
Expand Down
67 changes: 65 additions & 2 deletions backend/core/runner/run_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,44 @@ func RunPluginSubTasks(
taskCtx.SetSyncPolicy(syncPolicy)
taskCtx.SetData(taskData)

// record subtasks sequence to DB
collectSubtaskNumber := 0
otherSubtaskNumber := 0
isCollector := false
subtask := []models.Subtask{}
for _, subtaskMeta := range subtaskMetas {
subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
if err != nil {
// sth went wrong
return errors.Default.Wrap(err, fmt.Sprintf("error getting context subtask %s", subtaskMeta.Name))
}
if subtaskCtx == nil {
// subtask was disabled
continue
}
if strings.Contains(strings.ToLower(subtaskMeta.Name), "collect") || strings.Contains(strings.ToLower(subtaskMeta.Name), "clone git repo") {
collectSubtaskNumber++
isCollector = true
} else {
otherSubtaskNumber++
isCollector = false
}
s := models.Subtask{
Name: subtaskCtx.GetName(),
TaskID: task.ID,
IsCollector: isCollector,
}
if isCollector {
s.Sequence = collectSubtaskNumber
} else {
s.Sequence = otherSubtaskNumber
}
subtask = append(subtask, s)
}
if err := basicRes.GetDal().CreateOrUpdate(subtask); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask list to DB")
}

// execute subtasks in order
taskCtx.SetProgress(0, steps)
subtaskNumber := 0
Expand All @@ -259,7 +297,6 @@ func RunPluginSubTasks(
// subtask was disabled
continue
}

// run subtask
logger.Info("executing subtask %s", subtaskMeta.Name)
subtaskNumber++
Expand All @@ -274,6 +311,13 @@ func RunPluginSubTasks(
if err != nil {
err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
logger.Error(err, "")
where := dal.Where("task_id = ? and name = ?", task.ID, subtaskCtx.GetName())
if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
{ColumnName: "is_failed", Value: 1},
{ColumnName: "message", Value: err.Error()},
}, where); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask %v status to DB", subtaskCtx.GetName())
}
return err
}
taskCtx.IncProgress(1)
Expand All @@ -286,6 +330,7 @@ func RunPluginSubTasks(
func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDetail *models.TaskProgressDetail, p *plugin.RunningProgress) {
task := &models.Task{}
task.ID = taskId
subtask := &models.Subtask{}
switch p.Type {
case plugin.TaskSetProgress:
progressDetail.TotalSubTasks = p.Total
Expand All @@ -307,6 +352,14 @@ func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDeta
progressDetail.SubTaskName = p.SubTaskName
progressDetail.SubTaskNumber = p.SubTaskNumber
}
// update subtask progress
where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName)
err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
{ColumnName: "finished_records", Value: progressDetail.FinishedRecords},
}, where)
if err != nil {
basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress")
}
}

func runSubtask(
Expand All @@ -323,17 +376,27 @@ func runSubtask(
Number: subtaskNumber,
BeganAt: &beginAt,
}
recordSubtask(basicRes, subtask)
// defer to record subtask status
defer func() {
finishedAt := time.Now()
subtask.FinishedAt = &finishedAt
subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix()

recordSubtask(basicRes, subtask)
}()
return entryPoint(ctx)
}

func recordSubtask(basicRes context.BasicRes, subtask *models.Subtask) {
if err := basicRes.GetDal().Create(subtask); err != nil {
where := dal.Where("task_id = ? and name = ?", subtask.TaskID, subtask.Name)
if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
{ColumnName: "began_at", Value: subtask.BeganAt},
{ColumnName: "finished_at", Value: subtask.FinishedAt},
{ColumnName: "spent_seconds", Value: subtask.SpentSeconds},
{ColumnName: "finished_records", Value: subtask.FinishedRecords},
{ColumnName: "number", Value: subtask.Number},
}, where); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask %d status to DB: %v", subtask.ID)
}
}
Expand Down
2 changes: 1 addition & 1 deletion backend/helpers/pluginhelper/api/api_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int
logger := collector.args.Ctx.GetLogger()
logger.Debug("fetchAsync <<< enqueueing for %s %v", apiUrl, apiQuery)
responseHandler := func(res *http.Response) errors.Error {
defer logger.Debug("fetchAsync >>> done for %s %v %v", apiUrl, apiQuery, collector.args.RequestBody)
defer logger.Debug("fetchAsync >>> done for %s %v", apiUrl, apiQuery)
logger := collector.args.Ctx.GetLogger()
// read body to buffer
body, err := io.ReadAll(res.Body)
Expand Down
2 changes: 1 addition & 1 deletion backend/plugins/bitbucket/tasks/account_convertor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
const RAW_ACCOUNT_TABLE = "bitbucket_api_accounts"

var ConvertAccountsMeta = plugin.SubTaskMeta{
Name: "convertAccounts",
Name: "Convert Users",
EntryPoint: ConvertAccounts,
EnabledByDefault: true,
Required: false,
Expand Down
2 changes: 1 addition & 1 deletion backend/plugins/bitbucket/tasks/commit_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
const RAW_COMMIT_TABLE = "bitbucket_api_commits"

var CollectApiCommitsMeta = plugin.SubTaskMeta{
Name: "collectApiCommits",
Name: "Collect Commits",
EntryPoint: CollectApiCommits,
EnabledByDefault: false,
Required: false,
Expand Down
5 changes: 3 additions & 2 deletions backend/plugins/bitbucket/tasks/commit_convertor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ limitations under the License.
package tasks

import (
"reflect"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models/domainlayer/code"
"github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
plugin "github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/bitbucket/models"
"reflect"
)

var ConvertCommitsMeta = plugin.SubTaskMeta{
Name: "convertCommits",
Name: "Convert Commits",
EntryPoint: ConvertCommits,
EnabledByDefault: false,
Required: false,
Expand Down
5 changes: 3 additions & 2 deletions backend/plugins/bitbucket/tasks/commit_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package tasks

import (
"encoding/json"
"time"

"github.com/apache/incubator-devlake/core/errors"
plugin "github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/bitbucket/models"
"time"
)

var ExtractApiCommitsMeta = plugin.SubTaskMeta{
Name: "extractApiCommits",
Name: "Extract Commits",
EntryPoint: ExtractApiCommits,
EnabledByDefault: false,
Required: false,
Expand Down
2 changes: 1 addition & 1 deletion backend/plugins/bitbucket/tasks/deployment_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
const RAW_DEPLOYMENT_TABLE = "bitbucket_api_deployments"

var CollectApiDeploymentsMeta = plugin.SubTaskMeta{
Name: "collectApiDeployments",
Name: "Collect Deployments",
EntryPoint: CollectApiDeployments,
EnabledByDefault: true,
Description: "Collect deployment data from bitbucket api",
Expand Down
9 changes: 5 additions & 4 deletions backend/plugins/bitbucket/tasks/deployment_convertor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ limitations under the License.
package tasks

import (
"reflect"
"strings"
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models/domainlayer"
Expand All @@ -26,13 +30,10 @@ import (
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/bitbucket/models"
"reflect"
"strings"
"time"
)

var ConvertiDeploymentMeta = plugin.SubTaskMeta{
Name: "convertDeployments",
Name: "Convert Deployments",
EntryPoint: ConvertDeployments,
EnabledByDefault: true,
Description: "Convert tool layer table bitbucket_deployment into domain layer tables",
Expand Down
Loading

0 comments on commit cb4c0d7

Please sign in to comment.