From f9642746326657ca1126285a3f554ba2e85fd855 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 23 Oct 2022 10:08:38 +0100 Subject: [PATCH] Ensure that Webhook tasks are not double delivered When re-retrieving hook tasks from the DB double check if they have not been delivered in the meantime. Further ensure that tasks are marked as delivered when they are being delivered. In addition improve the error reporting and make sure that the webhook task population script runs in a separate goroutine. Signed-off-by: Andrew Thornton --- models/webhook/hooktask.go | 22 +++++++++-- services/webhook/deliver.go | 73 +++++++++++++++++++++++++++---------- services/webhook/webhook.go | 25 ++++++++----- 3 files changed, 89 insertions(+), 31 deletions(-) diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index 2b9b63c09bf5..1cdbc839fae7 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask, return newTask, db.Insert(ctx, newTask) } -// FindUndeliveredHookTasks represents find the undelivered hook tasks -func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) { - tasks := make([]*HookTask, 0, 10) +// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID +func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) { + const batchSize = 100 + + tasks := make([]int64, 0, batchSize) return tasks, db.GetEngine(ctx). + Select("id"). + Table(new(HookTask)). Where("is_delivered=?", false). + And("id > ?", lowerID). + Asc("id"). + Limit(batchSize). Find(&tasks) } +func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) { + count, err := db.GetEngine(ctx).ID(task.ID).SetExpr("is_delivered = ?", true).Update(&HookTask{ + ID: task.ID, + IsDelivered: true, + }) + + return count != 0, err +} + // CleanupHookTaskTable deletes rows from hook_task as needed. func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error { log.Trace("Doing: CleanupHookTaskTable") diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 74a69c297ca3..2eb6c0d090b3 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -23,6 +23,7 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/hostmatcher" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" @@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return } // There was a panic whilst delivering a hook... - log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) + log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) }() t.IsDelivered = true @@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { switch w.HTTPMethod { case "": - log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID) + log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL) fallthrough case http.MethodPost: switch w.ContentType { @@ -78,27 +79,27 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { case http.MethodGet: u, err := url.Parse(w.URL) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err) } vals := u.Query() vals["payload"] = []string{t.PayloadContent} u.RawQuery = vals.Encode() req, err = http.NewRequest("GET", u.String(), nil) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err) } case http.MethodPut: switch w.Type { case webhook_model.MATRIX: req, err = getMatrixHookRequest(w, t) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err) } default: - return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) + return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) } default: - return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) + return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) } var signatureSHA1 string @@ -170,18 +171,32 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { } }() + // OK We're now ready to attempt to deliver the task - we must double check that it + // has not been delivered in the meantime + marked, err := webhook_model.MarkTaskDelivered(ctx, t) + if err != nil { + log.Error("MarkTaskDelivered[%d]: %v", err) + return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) + } + if !marked { + // This webhook task has already been delivered + log.Trace("Webhook Task[%d] already delivered", t.ID) + return nil + } + if setting.DisableWebhooks { return fmt.Errorf("webhook task skipped (webhooks disabled): [%d]", t.ID) } if !w.IsActive { + log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID) return nil } resp, err := webhookHTTPClient.Do(req.WithContext(ctx)) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err) - return err + return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err) } defer resp.Body.Close() @@ -195,7 +210,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { p, err := io.ReadAll(resp.Body) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err) - return err + return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err) } t.ResponseInfo.Body = string(p) return nil @@ -257,17 +272,37 @@ func Init() error { } go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) - tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext()) - if err != nil { - log.Error("FindUndeliveredHookTasks failed: %v", err) - return err - } + go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue) + + return nil +} + +func populateWebhookSendingQueue(ctx context.Context) { + ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue") + defer finished() - for _, task := range tasks { - if err := enqueueHookTask(task); err != nil { - log.Error("enqueueHookTask failed: %v", err) + lowerID := int64(0) + for { + taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID) + if err != nil { + log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err) + return + } + if len(taskIDs) == 0 { + return + } + lowerID = taskIDs[len(taskIDs)-1] + + for _, taskID := range taskIDs { + select { + case <-ctx.Done(): + log.Warn("Shutdown before Webhook Sending queue finishing being populated") + return + default: + } + if err := enqueueHookTask(taskID); err != nil { + log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err) + } } } - - return nil } diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index e877e16edaa3..cf6f8aa4164a 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data { for _, taskID := range data { task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64)) if err != nil { - log.Error("GetHookTaskByID failed: %v", err) - } else { - if err := Deliver(ctx, task); err != nil { - log.Error("webhook.Deliver failed: %v", err) - } + log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err) + continue + } + + if task.IsDelivered { + // Already delivered in the meantime + log.Trace("Task[%d] has already been delivered", task.ID) + continue + } + + if err := Deliver(ctx, task); err != nil { + log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err) } } return nil } -func enqueueHookTask(task *webhook_model.HookTask) error { - err := hookQueue.PushFunc(task.ID, nil) +func enqueueHookTask(taskID int64) error { + err := hookQueue.Push(taskID) if err != nil && err != queue.ErrAlreadyInQueue { return err } @@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook return fmt.Errorf("CreateHookTask: %v", err) } - return enqueueHookTask(task) + return enqueueHookTask(task.ID) } // PrepareWebhooks adds new webhooks to task queue for given payload. @@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string) return err } - return enqueueHookTask(task) + return enqueueHookTask(task.ID) }