From f9642746326657ca1126285a3f554ba2e85fd855 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 23 Oct 2022 10:08:38 +0100 Subject: [PATCH 1/7] Ensure that Webhook tasks are not double delivered When re-retrieving hook tasks from the DB double check if they have not been delivered in the meantime. Further ensure that tasks are marked as delivered when they are being delivered. In addition improve the error reporting and make sure that the webhook task population script runs in a separate goroutine. Signed-off-by: Andrew Thornton --- models/webhook/hooktask.go | 22 +++++++++-- services/webhook/deliver.go | 73 +++++++++++++++++++++++++++---------- services/webhook/webhook.go | 25 ++++++++----- 3 files changed, 89 insertions(+), 31 deletions(-) diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index 2b9b63c09bf5..1cdbc839fae7 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask, return newTask, db.Insert(ctx, newTask) } -// FindUndeliveredHookTasks represents find the undelivered hook tasks -func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) { - tasks := make([]*HookTask, 0, 10) +// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID +func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) { + const batchSize = 100 + + tasks := make([]int64, 0, batchSize) return tasks, db.GetEngine(ctx). + Select("id"). + Table(new(HookTask)). Where("is_delivered=?", false). + And("id > ?", lowerID). + Asc("id"). + Limit(batchSize). Find(&tasks) } +func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) { + count, err := db.GetEngine(ctx).ID(task.ID).SetExpr("is_delivered = ?", true).Update(&HookTask{ + ID: task.ID, + IsDelivered: true, + }) + + return count != 0, err +} + // CleanupHookTaskTable deletes rows from hook_task as needed. func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error { log.Trace("Doing: CleanupHookTaskTable") diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 74a69c297ca3..2eb6c0d090b3 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -23,6 +23,7 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/hostmatcher" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" @@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return } // There was a panic whilst delivering a hook... - log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) + log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) }() t.IsDelivered = true @@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { switch w.HTTPMethod { case "": - log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID) + log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL) fallthrough case http.MethodPost: switch w.ContentType { @@ -78,27 +79,27 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { case http.MethodGet: u, err := url.Parse(w.URL) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err) } vals := u.Query() vals["payload"] = []string{t.PayloadContent} u.RawQuery = vals.Encode() req, err = http.NewRequest("GET", u.String(), nil) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err) } case http.MethodPut: switch w.Type { case webhook_model.MATRIX: req, err = getMatrixHookRequest(w, t) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err) } default: - return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) + return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) } default: - return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) + return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) } var signatureSHA1 string @@ -170,18 +171,32 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { } }() + // OK We're now ready to attempt to deliver the task - we must double check that it + // has not been delivered in the meantime + marked, err := webhook_model.MarkTaskDelivered(ctx, t) + if err != nil { + log.Error("MarkTaskDelivered[%d]: %v", err) + return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) + } + if !marked { + // This webhook task has already been delivered + log.Trace("Webhook Task[%d] already delivered", t.ID) + return nil + } + if setting.DisableWebhooks { return fmt.Errorf("webhook task skipped (webhooks disabled): [%d]", t.ID) } if !w.IsActive { + log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID) return nil } resp, err := webhookHTTPClient.Do(req.WithContext(ctx)) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err) - return err + return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err) } defer resp.Body.Close() @@ -195,7 +210,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { p, err := io.ReadAll(resp.Body) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err) - return err + return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err) } t.ResponseInfo.Body = string(p) return nil @@ -257,17 +272,37 @@ func Init() error { } go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) - tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext()) - if err != nil { - log.Error("FindUndeliveredHookTasks failed: %v", err) - return err - } + go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue) + + return nil +} + +func populateWebhookSendingQueue(ctx context.Context) { + ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue") + defer finished() - for _, task := range tasks { - if err := enqueueHookTask(task); err != nil { - log.Error("enqueueHookTask failed: %v", err) + lowerID := int64(0) + for { + taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID) + if err != nil { + log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err) + return + } + if len(taskIDs) == 0 { + return + } + lowerID = taskIDs[len(taskIDs)-1] + + for _, taskID := range taskIDs { + select { + case <-ctx.Done(): + log.Warn("Shutdown before Webhook Sending queue finishing being populated") + return + default: + } + if err := enqueueHookTask(taskID); err != nil { + log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err) + } } } - - return nil } diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index e877e16edaa3..cf6f8aa4164a 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data { for _, taskID := range data { task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64)) if err != nil { - log.Error("GetHookTaskByID failed: %v", err) - } else { - if err := Deliver(ctx, task); err != nil { - log.Error("webhook.Deliver failed: %v", err) - } + log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err) + continue + } + + if task.IsDelivered { + // Already delivered in the meantime + log.Trace("Task[%d] has already been delivered", task.ID) + continue + } + + if err := Deliver(ctx, task); err != nil { + log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err) } } return nil } -func enqueueHookTask(task *webhook_model.HookTask) error { - err := hookQueue.PushFunc(task.ID, nil) +func enqueueHookTask(taskID int64) error { + err := hookQueue.Push(taskID) if err != nil && err != queue.ErrAlreadyInQueue { return err } @@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook return fmt.Errorf("CreateHookTask: %v", err) } - return enqueueHookTask(task) + return enqueueHookTask(task.ID) } // PrepareWebhooks adds new webhooks to task queue for given payload. @@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string) return err } - return enqueueHookTask(task) + return enqueueHookTask(task.ID) } From 790ecde67d320b543fb5e65c35eaa9b720a9cc5d Mon Sep 17 00:00:00 2001 From: zeripath Date: Sun, 23 Oct 2022 18:08:08 +0100 Subject: [PATCH 2/7] Update models/webhook/hooktask.go --- models/webhook/hooktask.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index 1cdbc839fae7..bc7c49877fbc 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -249,7 +249,7 @@ func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, er } func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) { - count, err := db.GetEngine(ctx).ID(task.ID).SetExpr("is_delivered = ?", true).Update(&HookTask{ + count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).SetExpr("is_delivered = ?", true).Update(&HookTask{ ID: task.ID, IsDelivered: true, }) From 88cc381e26db0583d6baefe08ea04ee43351f49f Mon Sep 17 00:00:00 2001 From: zeripath Date: Mon, 24 Oct 2022 02:41:28 +0100 Subject: [PATCH 3/7] Update services/webhook/deliver.go Co-authored-by: delvh --- services/webhook/deliver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 2eb6c0d090b3..7a9139b7bc99 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -173,12 +173,12 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { // OK We're now ready to attempt to deliver the task - we must double check that it // has not been delivered in the meantime - marked, err := webhook_model.MarkTaskDelivered(ctx, t) + updated, err := webhook_model.MarkTaskDelivered(ctx, t) if err != nil { log.Error("MarkTaskDelivered[%d]: %v", err) return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) } - if !marked { + if !updated { // This webhook task has already been delivered log.Trace("Webhook Task[%d] already delivered", t.ID) return nil From 57f13c1144870a3587064cfb8b115b6e4006a861 Mon Sep 17 00:00:00 2001 From: zeripath Date: Mon, 24 Oct 2022 02:46:08 +0100 Subject: [PATCH 4/7] Update services/webhook/deliver.go --- services/webhook/deliver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 7a9139b7bc99..be7fd6427e67 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -179,7 +179,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) } if !updated { - // This webhook task has already been delivered + // This webhook task has already been attempted to be delivered or is in the process of being delivered log.Trace("Webhook Task[%d] already delivered", t.ID) return nil } From 6225c5d8e914555876e999371a8cdb25ef1fd118 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 24 Oct 2022 20:51:03 +0100 Subject: [PATCH 5/7] Actually we need to check before the defer is in action! Signed-off-by: Andrew Thornton --- services/webhook/deliver.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index be7fd6427e67..4e6efb177950 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -145,6 +145,20 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { Headers: map[string]string{}, } + // OK We're now ready to attempt to deliver the task - we must double check that it + // has not been delivered in the meantime + updated, err := webhook_model.MarkTaskDelivered(ctx, t) + if err != nil { + log.Error("MarkTaskDelivered[%d]: %v", err) + return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) + } + if !updated { + // This webhook task has already been attempted to be delivered or is in the process of being delivered + log.Trace("Webhook Task[%d] already delivered", t.ID) + return nil + } + + // All code from this point will update the hook task defer func() { t.Delivered = time.Now().UnixNano() if t.IsSucceed { @@ -171,19 +185,6 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { } }() - // OK We're now ready to attempt to deliver the task - we must double check that it - // has not been delivered in the meantime - updated, err := webhook_model.MarkTaskDelivered(ctx, t) - if err != nil { - log.Error("MarkTaskDelivered[%d]: %v", err) - return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) - } - if !updated { - // This webhook task has already been attempted to be delivered or is in the process of being delivered - log.Trace("Webhook Task[%d] already delivered", t.ID) - return nil - } - if setting.DisableWebhooks { return fmt.Errorf("webhook task skipped (webhooks disabled): [%d]", t.ID) } From 0e82d12a0b648b1f0357953de4d45c3ea63907cc Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 13 Nov 2022 12:18:05 +0000 Subject: [PATCH 6/7] remove unnecessary SetExpr Signed-off-by: Andrew Thornton --- models/webhook/hooktask.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index c11a76a05eaf..d699714611b7 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -249,7 +249,7 @@ func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, er } func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) { - count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).SetExpr("is_delivered = ?", true).Update(&HookTask{ + count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Update(&HookTask{ ID: task.ID, IsDelivered: true, }) From cc86689b000886943af6a367eda81c762577b3ed Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 16 Nov 2022 20:52:54 +0000 Subject: [PATCH 7/7] fix test Signed-off-by: Andrew Thornton --- models/webhook/hooktask.go | 2 +- services/webhook/deliver.go | 2 +- services/webhook/deliver_test.go | 10 +++++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index d699714611b7..92d9e9738333 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -249,7 +249,7 @@ func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, er } func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) { - count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Update(&HookTask{ + count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Cols("is_delivered").Update(&HookTask{ ID: task.ID, IsDelivered: true, }) diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 9522da74e53e..07fdf18c8347 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -164,7 +164,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { // has not been delivered in the meantime updated, err := webhook_model.MarkTaskDelivered(ctx, t) if err != nil { - log.Error("MarkTaskDelivered[%d]: %v", err) + log.Error("MarkTaskDelivered[%d]: %v", t.ID, err) return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) } if !updated { diff --git a/services/webhook/deliver_test.go b/services/webhook/deliver_test.go index 83ca7d61786d..498cf7d159f0 100644 --- a/services/webhook/deliver_test.go +++ b/services/webhook/deliver_test.go @@ -16,6 +16,7 @@ import ( "code.gitea.io/gitea/models/unittest" webhook_model "code.gitea.io/gitea/models/webhook" "code.gitea.io/gitea/modules/setting" + api "code.gitea.io/gitea/modules/structs" "github.com/stretchr/testify/assert" ) @@ -67,8 +68,15 @@ func TestWebhookDeliverAuthorizationHeader(t *testing.T) { err := hook.SetHeaderAuthorization("Bearer s3cr3t-t0ken") assert.NoError(t, err) assert.NoError(t, webhook_model.CreateWebhook(db.DefaultContext, hook)) + db.GetEngine(db.DefaultContext).NoAutoTime().DB().Logger.ShowSQL(true) - hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush} + hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush, Payloader: &api.PushPayload{}} + + hookTask, err = webhook_model.CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) + if !assert.NotNil(t, hookTask) { + return + } assert.NoError(t, Deliver(context.Background(), hookTask)) select {