Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that Webhook tasks are not double delivered #21558

Merged
merged 19 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions models/webhook/hooktask.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask,
return newTask, db.Insert(ctx, newTask)
}

// FindUndeliveredHookTasks represents find the undelivered hook tasks
func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) {
tasks := make([]*HookTask, 0, 10)
// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID
func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) {
const batchSize = 100

tasks := make([]int64, 0, batchSize)
return tasks, db.GetEngine(ctx).
Select("id").
Table(new(HookTask)).
Where("is_delivered=?", false).
And("id > ?", lowerID).
Asc("id").
Limit(batchSize).
Find(&tasks)
}

func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) {
zeripath marked this conversation as resolved.
Show resolved Hide resolved
zeripath marked this conversation as resolved.
Show resolved Hide resolved
count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).SetExpr("is_delivered = ?", true).Update(&HookTask{
ID: task.ID,
IsDelivered: true,
})

return count != 0, err
}

// CleanupHookTaskTable deletes rows from hook_task as needed.
func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error {
log.Trace("Doing: CleanupHookTaskTable")
Expand Down
73 changes: 54 additions & 19 deletions services/webhook/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/hostmatcher"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
Expand All @@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
return
}
// There was a panic whilst delivering a hook...
log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
}()

t.IsDelivered = true
Expand All @@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {

switch w.HTTPMethod {
case "":
log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID)
log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL)
fallthrough
case http.MethodPost:
switch w.ContentType {
Expand All @@ -78,27 +79,27 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
case http.MethodGet:
u, err := url.Parse(w.URL)
if err != nil {
return err
return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err)
}
vals := u.Query()
vals["payload"] = []string{t.PayloadContent}
u.RawQuery = vals.Encode()
req, err = http.NewRequest("GET", u.String(), nil)
if err != nil {
return err
return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err)
}
case http.MethodPut:
switch w.Type {
case webhook_model.MATRIX:
req, err = getMatrixHookRequest(w, t)
if err != nil {
return err
return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err)
}
default:
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
}
default:
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
}

var signatureSHA1 string
Expand Down Expand Up @@ -170,18 +171,32 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
}
}()

// OK We're now ready to attempt to deliver the task - we must double check that it
// has not been delivered in the meantime
updated, err := webhook_model.MarkTaskDelivered(ctx, t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move these lines after line 194?

Copy link
Contributor Author

@zeripath zeripath Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We must do it as soon as the defer is in action.

ACTUALLY - We must do it BEFORE the defer is in action.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read the defer

zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Error("MarkTaskDelivered[%d]: %v", err)
return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err)
}
if !updated {
// This webhook task has already been attempted to be delivered or is in the process of being delivered
log.Trace("Webhook Task[%d] already delivered", t.ID)
return nil
}

if setting.DisableWebhooks {
return fmt.Errorf("webhook task skipped (webhooks disabled): [%d]", t.ID)
}

if !w.IsActive {
log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID)
return nil
}

resp, err := webhookHTTPClient.Do(req.WithContext(ctx))
if err != nil {
t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err)
return err
return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err)
}
defer resp.Body.Close()

Expand All @@ -195,7 +210,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
p, err := io.ReadAll(resp.Body)
if err != nil {
t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err)
return err
return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err)
}
t.ResponseInfo.Body = string(p)
return nil
Expand Down Expand Up @@ -257,17 +272,37 @@ func Init() error {
}
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)

tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext())
if err != nil {
log.Error("FindUndeliveredHookTasks failed: %v", err)
return err
}
go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)

return nil
}

func populateWebhookSendingQueue(ctx context.Context) {
ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue")
defer finished()

for _, task := range tasks {
if err := enqueueHookTask(task); err != nil {
log.Error("enqueueHookTask failed: %v", err)
lowerID := int64(0)
for {
lunny marked this conversation as resolved.
Show resolved Hide resolved
taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID)
if err != nil {
log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err)
return
}
if len(taskIDs) == 0 {
return
}
lowerID = taskIDs[len(taskIDs)-1]

for _, taskID := range taskIDs {
select {
case <-ctx.Done():
log.Warn("Shutdown before Webhook Sending queue finishing being populated")
return
default:
}
if err := enqueueHookTask(taskID); err != nil {
log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err)
}
}
}

return nil
}
25 changes: 16 additions & 9 deletions services/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data {
for _, taskID := range data {
task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64))
if err != nil {
log.Error("GetHookTaskByID failed: %v", err)
} else {
if err := Deliver(ctx, task); err != nil {
log.Error("webhook.Deliver failed: %v", err)
}
log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err)
continue
}

if task.IsDelivered {
// Already delivered in the meantime
log.Trace("Task[%d] has already been delivered", task.ID)
continue
}

if err := Deliver(ctx, task); err != nil {
log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err)
}
}

return nil
}

func enqueueHookTask(task *webhook_model.HookTask) error {
err := hookQueue.PushFunc(task.ID, nil)
func enqueueHookTask(taskID int64) error {
err := hookQueue.Push(taskID)
if err != nil && err != queue.ErrAlreadyInQueue {
return err
}
Expand Down Expand Up @@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook
return fmt.Errorf("CreateHookTask: %v", err)
}

return enqueueHookTask(task)
return enqueueHookTask(task.ID)
}

// PrepareWebhooks adds new webhooks to task queue for given payload.
Expand Down Expand Up @@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string)
return err
}

return enqueueHookTask(task)
return enqueueHookTask(task.ID)
}