From 57c3fe479c4ce02c299444912fa9eee514093dde Mon Sep 17 00:00:00 2001
From: d4x1 <1507509064@qq.com>
Date: Mon, 19 Aug 2024 20:14:39 +0800
Subject: [PATCH 1/2] feat(framework): add custom pipeline notification service
---
backend/server/api/api.go | 9 ++++
backend/server/services/init.go | 8 ++++
backend/server/services/pipeline.go | 48 ++++++++++++++-----
.../server/services/pipeline_notification.go | 46 ++++++++++++++++++
...ation.go => pipeline_notification_impl.go} | 34 +++++--------
5 files changed, 110 insertions(+), 35 deletions(-)
create mode 100644 backend/server/services/pipeline_notification.go
rename backend/server/services/{notification.go => pipeline_notification_impl.go} (76%)
diff --git a/backend/server/api/api.go b/backend/server/api/api.go
index 7c0266b6173..95a485713b3 100644
--- a/backend/server/api/api.go
+++ b/backend/server/api/api.go
@@ -59,6 +59,15 @@ func Init() {
basicRes = services.GetBasicRes()
}
+func InjectCustomService(pipelineNotifier services.PipelineNotificationService) errors.Error {
+ if pipelineNotifier != nil {
+ if err := services.InjectCustomService(pipelineNotifier); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
// @title DevLake Swagger API
// @version 0.1
// @description
This is the main page of devlake api
diff --git a/backend/server/services/init.go b/backend/server/services/init.go
index e75af65bf9e..df6308fad7f 100644
--- a/backend/server/services/init.go
+++ b/backend/server/services/init.go
@@ -125,6 +125,14 @@ func Init() {
registerPluginsMigrationScripts()
}
+func InjectCustomService(pipelineNotifier PipelineNotificationService) errors.Error {
+ if pipelineNotifier == nil {
+ return errors.Default.New("pipeline notifier is nil")
+ }
+ customPipelineNotificationService = pipelineNotifier
+ return nil
+}
+
var statusLock sync.Mutex
// ExecuteMigration executes all pending migration scripts and initialize services module
diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go
index 15d930205aa..f0c7f313348 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -41,7 +41,7 @@ import (
"golang.org/x/sync/semaphore"
)
-var notificationService *NotificationService
+var defaultNotificationService *DefaultPipelineNotificationService
var globalPipelineLog = logruslog.Global.Nested("pipeline service")
var pluginOptionSanitizers = map[string]func(map[string]interface{}){
"gitextractor": func(options map[string]interface{}) {
@@ -85,7 +85,7 @@ func pipelineServiceInit() {
var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
if strings.TrimSpace(notificationEndpoint) != "" {
- notificationService = NewNotificationService(notificationEndpoint, notificationSecret)
+ defaultNotificationService = NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret)
}
// standalone mode: reset pipeline status
@@ -226,8 +226,10 @@ func GetPipeline(pipelineId uint64, shouldSanitize bool) (*models.Pipeline, erro
if err != nil {
return nil, err
}
- if err := SanitizePipeline(dbPipeline); err != nil {
- return nil, errors.Convert(err)
+ if shouldSanitize {
+ if err := SanitizePipeline(dbPipeline); err != nil {
+ return nil, errors.Convert(err)
+ }
}
return dbPipeline, nil
}
@@ -352,9 +354,26 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
}
}
+func getProjectName(pipeline *models.Pipeline) (string, errors.Error) {
+ if pipeline == nil {
+ return "", errors.Default.New("pipeline is nil")
+ }
+ blueprintId := pipeline.BlueprintId
+ dbBlueprint := &models.Blueprint{}
+ err := db.First(dbBlueprint, dal.Where("id = ?", blueprintId))
+ if err != nil {
+ if db.IsErrorNotFound(err) {
+ return "", errors.NotFound.New(fmt.Sprintf("blueprint(id: %d) not found", blueprintId))
+ }
+ return "", errors.Internal.Wrap(err, "error getting the blueprint from database")
+ }
+ return dbBlueprint.ProjectName, nil
+}
+
// NotifyExternal FIXME ...
func NotifyExternal(pipelineId uint64) errors.Error {
- if notificationService == nil {
+ notification := GetPipelineNotificationService()
+ if notification == nil {
return nil
}
// send notification to an external web endpoint
@@ -362,13 +381,18 @@ func NotifyExternal(pipelineId uint64) errors.Error {
if err != nil {
return err
}
- err = notificationService.PipelineStatusChanged(PipelineNotification{
- PipelineID: pipeline.ID,
- CreatedAt: pipeline.CreatedAt,
- UpdatedAt: pipeline.UpdatedAt,
- BeganAt: pipeline.BeganAt,
- FinishedAt: pipeline.FinishedAt,
- Status: pipeline.Status,
+ projectName, err := getProjectName(pipeline)
+ if err != nil {
+ return err
+ }
+ err = notification.PipelineStatusChanged(PipelineNotificationParam{
+ ProjectName: projectName,
+ PipelineID: pipeline.ID,
+ CreatedAt: pipeline.CreatedAt,
+ UpdatedAt: pipeline.UpdatedAt,
+ BeganAt: pipeline.BeganAt,
+ FinishedAt: pipeline.FinishedAt,
+ Status: pipeline.Status,
})
if err != nil {
globalPipelineLog.Error(err, "failed to send notification: %v", err)
diff --git a/backend/server/services/pipeline_notification.go b/backend/server/services/pipeline_notification.go
new file mode 100644
index 00000000000..f8cea3929b1
--- /dev/null
+++ b/backend/server/services/pipeline_notification.go
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+ "github.com/apache/incubator-devlake/core/errors"
+ "time"
+)
+
+type PipelineNotificationParam struct {
+ ProjectName string
+ PipelineID uint64
+ CreatedAt time.Time
+ UpdatedAt time.Time
+ BeganAt *time.Time
+ FinishedAt *time.Time
+ Status string
+}
+
+type PipelineNotificationService interface {
+ PipelineStatusChanged(params PipelineNotificationParam) errors.Error
+}
+
+var customPipelineNotificationService PipelineNotificationService
+
+func GetPipelineNotificationService() PipelineNotificationService {
+ if customPipelineNotificationService != nil {
+ return customPipelineNotificationService
+ }
+ return defaultNotificationService
+}
diff --git a/backend/server/services/notification.go b/backend/server/services/pipeline_notification_impl.go
similarity index 76%
rename from backend/server/services/notification.go
rename to backend/server/services/pipeline_notification_impl.go
index 854a78e5358..2a4a096a59e 100644
--- a/backend/server/services/notification.go
+++ b/backend/server/services/pipeline_notification_impl.go
@@ -22,46 +22,34 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
- "io"
- "net/http"
- "strings"
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/utils"
+ "io"
+ "net/http"
+ "strings"
)
-// NotificationService FIXME ...
-type NotificationService struct {
+// DefaultPipelineNotificationService FIXME ...
+type DefaultPipelineNotificationService struct {
EndPoint string
Secret string
}
-// NewNotificationService FIXME ...
-func NewNotificationService(endpoint, secret string) *NotificationService {
- return &NotificationService{
+// NewDefaultPipelineNotificationService creates a new DefaultPipelineNotificationService
+func NewDefaultPipelineNotificationService(endpoint, secret string) *DefaultPipelineNotificationService {
+ return &DefaultPipelineNotificationService{
EndPoint: endpoint,
Secret: secret,
}
}
-// PipelineNotification FIXME ...
-type PipelineNotification struct {
- PipelineID uint64
- CreatedAt time.Time
- UpdatedAt time.Time
- BeganAt *time.Time
- FinishedAt *time.Time
- Status string
-}
-
// PipelineStatusChanged FIXME ...
-func (n *NotificationService) PipelineStatusChanged(params PipelineNotification) errors.Error {
+func (n *DefaultPipelineNotificationService) PipelineStatusChanged(params PipelineNotificationParam) errors.Error {
return n.sendNotification(models.NotificationPipelineStatusChanged, params)
}
-func (n *NotificationService) sendNotification(notificationType models.NotificationType, data interface{}) errors.Error {
+func (n *DefaultPipelineNotificationService) sendNotification(notificationType models.NotificationType, data interface{}) errors.Error {
var dataJson, err = json.Marshal(data)
if err != nil {
return errors.Convert(err)
@@ -99,7 +87,7 @@ func (n *NotificationService) sendNotification(notificationType models.Notificat
return db.Update(notification)
}
-func (n *NotificationService) signature(input, nouce string) string {
+func (n *DefaultPipelineNotificationService) signature(input, nouce string) string {
sum := sha256.Sum256([]byte(input + n.Secret + nouce))
return hex.EncodeToString(sum[:])
}
From 21dd4d442e6f59146b1f3509f48b87cfa4063c90 Mon Sep 17 00:00:00 2001
From: d4x1 <1507509064@qq.com>
Date: Mon, 19 Aug 2024 21:03:15 +0800
Subject: [PATCH 2/2] fix(e2e): fix errors
---
backend/server/services/pipeline_notification.go | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/backend/server/services/pipeline_notification.go b/backend/server/services/pipeline_notification.go
index f8cea3929b1..2c25c44ce21 100644
--- a/backend/server/services/pipeline_notification.go
+++ b/backend/server/services/pipeline_notification.go
@@ -42,5 +42,8 @@ func GetPipelineNotificationService() PipelineNotificationService {
if customPipelineNotificationService != nil {
return customPipelineNotificationService
}
- return defaultNotificationService
+ if defaultNotificationService != nil {
+ return defaultNotificationService
+ }
+ return nil
}