diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go index 246484aea96b2..92d9e97383337 100644 --- a/models/webhook/hooktask.go +++ b/models/webhook/hooktask.go @@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask, return newTask, db.Insert(ctx, newTask) } -// FindUndeliveredHookTasks represents find the undelivered hook tasks -func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) { - tasks := make([]*HookTask, 0, 10) +// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID +func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) { + const batchSize = 100 + + tasks := make([]int64, 0, batchSize) return tasks, db.GetEngine(ctx). + Select("id"). + Table(new(HookTask)). Where("is_delivered=?", false). + And("id > ?", lowerID). + Asc("id"). + Limit(batchSize). Find(&tasks) } +func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) { + count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Cols("is_delivered").Update(&HookTask{ + ID: task.ID, + IsDelivered: true, + }) + + return count != 0, err +} + // CleanupHookTaskTable deletes rows from hook_task as needed. func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error { log.Trace("Doing: CleanupHookTaskTable") diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 85717e09783ed..07fdf18c83472 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -23,6 +23,7 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/hostmatcher" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" @@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return } // There was a panic whilst delivering a hook... - log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) + log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) }() t.IsDelivered = true @@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { switch w.HTTPMethod { case "": - log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID) + log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL) fallthrough case http.MethodPost: switch w.ContentType { @@ -78,14 +79,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { case http.MethodGet: u, err := url.Parse(w.URL) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err) } vals := u.Query() vals["payload"] = []string{t.PayloadContent} u.RawQuery = vals.Encode() req, err = http.NewRequest("GET", u.String(), nil) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err) } case http.MethodPut: switch w.Type { @@ -97,13 +98,13 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { url := fmt.Sprintf("%s/%s", w.URL, url.PathEscape(txnID)) req, err = http.NewRequest("PUT", url, strings.NewReader(t.PayloadContent)) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err) } default: - return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) + return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) } default: - return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) + return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) } var signatureSHA1 string @@ -159,6 +160,20 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { Headers: map[string]string{}, } + // OK We're now ready to attempt to deliver the task - we must double check that it + // has not been delivered in the meantime + updated, err := webhook_model.MarkTaskDelivered(ctx, t) + if err != nil { + log.Error("MarkTaskDelivered[%d]: %v", t.ID, err) + return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) + } + if !updated { + // This webhook task has already been attempted to be delivered or is in the process of being delivered + log.Trace("Webhook Task[%d] already delivered", t.ID) + return nil + } + + // All code from this point will update the hook task defer func() { t.Delivered = time.Now().UnixNano() if t.IsSucceed { @@ -190,13 +205,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { } if !w.IsActive { + log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID) return nil } resp, err := webhookHTTPClient.Do(req.WithContext(ctx)) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err) - return err + return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err) } defer resp.Body.Close() @@ -210,7 +226,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { p, err := io.ReadAll(resp.Body) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err) - return err + return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err) } t.ResponseInfo.Body = string(p) return nil @@ -272,17 +288,37 @@ func Init() error { } go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) - tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext()) - if err != nil { - log.Error("FindUndeliveredHookTasks failed: %v", err) - return err - } + go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue) + + return nil +} + +func populateWebhookSendingQueue(ctx context.Context) { + ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue") + defer finished() - for _, task := range tasks { - if err := enqueueHookTask(task); err != nil { - log.Error("enqueueHookTask failed: %v", err) + lowerID := int64(0) + for { + taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID) + if err != nil { + log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err) + return + } + if len(taskIDs) == 0 { + return + } + lowerID = taskIDs[len(taskIDs)-1] + + for _, taskID := range taskIDs { + select { + case <-ctx.Done(): + log.Warn("Shutdown before Webhook Sending queue finishing being populated") + return + default: + } + if err := enqueueHookTask(taskID); err != nil { + log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err) + } } } - - return nil } diff --git a/services/webhook/deliver_test.go b/services/webhook/deliver_test.go index 83ca7d61786d3..498cf7d159f02 100644 --- a/services/webhook/deliver_test.go +++ b/services/webhook/deliver_test.go @@ -16,6 +16,7 @@ import ( "code.gitea.io/gitea/models/unittest" webhook_model "code.gitea.io/gitea/models/webhook" "code.gitea.io/gitea/modules/setting" + api "code.gitea.io/gitea/modules/structs" "github.com/stretchr/testify/assert" ) @@ -67,8 +68,15 @@ func TestWebhookDeliverAuthorizationHeader(t *testing.T) { err := hook.SetHeaderAuthorization("Bearer s3cr3t-t0ken") assert.NoError(t, err) assert.NoError(t, webhook_model.CreateWebhook(db.DefaultContext, hook)) + db.GetEngine(db.DefaultContext).NoAutoTime().DB().Logger.ShowSQL(true) - hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush} + hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush, Payloader: &api.PushPayload{}} + + hookTask, err = webhook_model.CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) + if !assert.NotNil(t, hookTask) { + return + } assert.NoError(t, Deliver(context.Background(), hookTask)) select { diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index 1780022eb4b2f..5c9139b41f1c0 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data { for _, taskID := range data { task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64)) if err != nil { - log.Error("GetHookTaskByID failed: %v", err) - } else { - if err := Deliver(ctx, task); err != nil { - log.Error("webhook.Deliver failed: %v", err) - } + log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err) + continue + } + + if task.IsDelivered { + // Already delivered in the meantime + log.Trace("Task[%d] has already been delivered", task.ID) + continue + } + + if err := Deliver(ctx, task); err != nil { + log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err) } } return nil } -func enqueueHookTask(task *webhook_model.HookTask) error { - err := hookQueue.PushFunc(task.ID, nil) +func enqueueHookTask(taskID int64) error { + err := hookQueue.Push(taskID) if err != nil && err != queue.ErrAlreadyInQueue { return err } @@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook return fmt.Errorf("CreateHookTask: %w", err) } - return enqueueHookTask(task) + return enqueueHookTask(task.ID) } // PrepareWebhooks adds new webhooks to task queue for given payload. @@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string) return err } - return enqueueHookTask(task) + return enqueueHookTask(task.ID) }