Skip to content

Commit

Permalink
Merge branch 'main' into bugfix/avatar-align
Browse files Browse the repository at this point in the history
  • Loading branch information
lunny authored Nov 23, 2022
2 parents 65aab92 + 787f6c3 commit bf9f8ec
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 32 deletions.
22 changes: 19 additions & 3 deletions models/webhook/hooktask.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask,
return newTask, db.Insert(ctx, newTask)
}

// FindUndeliveredHookTasks represents find the undelivered hook tasks
func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) {
tasks := make([]*HookTask, 0, 10)
// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID
func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) {
const batchSize = 100

tasks := make([]int64, 0, batchSize)
return tasks, db.GetEngine(ctx).
Select("id").
Table(new(HookTask)).
Where("is_delivered=?", false).
And("id > ?", lowerID).
Asc("id").
Limit(batchSize).
Find(&tasks)
}

func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) {
count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Cols("is_delivered").Update(&HookTask{
ID: task.ID,
IsDelivered: true,
})

return count != 0, err
}

// CleanupHookTaskTable deletes rows from hook_task as needed.
func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error {
log.Trace("Doing: CleanupHookTaskTable")
Expand Down
74 changes: 55 additions & 19 deletions services/webhook/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/hostmatcher"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
Expand All @@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
return
}
// There was a panic whilst delivering a hook...
log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
}()

t.IsDelivered = true
Expand All @@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {

switch w.HTTPMethod {
case "":
log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID)
log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL)
fallthrough
case http.MethodPost:
switch w.ContentType {
Expand All @@ -78,14 +79,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
case http.MethodGet:
u, err := url.Parse(w.URL)
if err != nil {
return err
return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err)
}
vals := u.Query()
vals["payload"] = []string{t.PayloadContent}
u.RawQuery = vals.Encode()
req, err = http.NewRequest("GET", u.String(), nil)
if err != nil {
return err
return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err)
}
case http.MethodPut:
switch w.Type {
Expand All @@ -97,13 +98,13 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
url := fmt.Sprintf("%s/%s", w.URL, url.PathEscape(txnID))
req, err = http.NewRequest("PUT", url, strings.NewReader(t.PayloadContent))
if err != nil {
return err
return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err)
}
default:
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
}
default:
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
}

var signatureSHA1 string
Expand Down Expand Up @@ -159,6 +160,20 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
Headers: map[string]string{},
}

// OK We're now ready to attempt to deliver the task - we must double check that it
// has not been delivered in the meantime
updated, err := webhook_model.MarkTaskDelivered(ctx, t)
if err != nil {
log.Error("MarkTaskDelivered[%d]: %v", t.ID, err)
return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err)
}
if !updated {
// This webhook task has already been attempted to be delivered or is in the process of being delivered
log.Trace("Webhook Task[%d] already delivered", t.ID)
return nil
}

// All code from this point will update the hook task
defer func() {
t.Delivered = time.Now().UnixNano()
if t.IsSucceed {
Expand Down Expand Up @@ -190,13 +205,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
}

if !w.IsActive {
log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID)
return nil
}

resp, err := webhookHTTPClient.Do(req.WithContext(ctx))
if err != nil {
t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err)
return err
return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err)
}
defer resp.Body.Close()

Expand All @@ -210,7 +226,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
p, err := io.ReadAll(resp.Body)
if err != nil {
t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err)
return err
return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err)
}
t.ResponseInfo.Body = string(p)
return nil
Expand Down Expand Up @@ -272,17 +288,37 @@ func Init() error {
}
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)

tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext())
if err != nil {
log.Error("FindUndeliveredHookTasks failed: %v", err)
return err
}
go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)

return nil
}

func populateWebhookSendingQueue(ctx context.Context) {
ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue")
defer finished()

for _, task := range tasks {
if err := enqueueHookTask(task); err != nil {
log.Error("enqueueHookTask failed: %v", err)
lowerID := int64(0)
for {
taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID)
if err != nil {
log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err)
return
}
if len(taskIDs) == 0 {
return
}
lowerID = taskIDs[len(taskIDs)-1]

for _, taskID := range taskIDs {
select {
case <-ctx.Done():
log.Warn("Shutdown before Webhook Sending queue finishing being populated")
return
default:
}
if err := enqueueHookTask(taskID); err != nil {
log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err)
}
}
}

return nil
}
10 changes: 9 additions & 1 deletion services/webhook/deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"code.gitea.io/gitea/models/unittest"
webhook_model "code.gitea.io/gitea/models/webhook"
"code.gitea.io/gitea/modules/setting"
api "code.gitea.io/gitea/modules/structs"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -67,8 +68,15 @@ func TestWebhookDeliverAuthorizationHeader(t *testing.T) {
err := hook.SetHeaderAuthorization("Bearer s3cr3t-t0ken")
assert.NoError(t, err)
assert.NoError(t, webhook_model.CreateWebhook(db.DefaultContext, hook))
db.GetEngine(db.DefaultContext).NoAutoTime().DB().Logger.ShowSQL(true)

hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush}
hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush, Payloader: &api.PushPayload{}}

hookTask, err = webhook_model.CreateHookTask(db.DefaultContext, hookTask)
assert.NoError(t, err)
if !assert.NotNil(t, hookTask) {
return
}

assert.NoError(t, Deliver(context.Background(), hookTask))
select {
Expand Down
25 changes: 16 additions & 9 deletions services/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data {
for _, taskID := range data {
task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64))
if err != nil {
log.Error("GetHookTaskByID failed: %v", err)
} else {
if err := Deliver(ctx, task); err != nil {
log.Error("webhook.Deliver failed: %v", err)
}
log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err)
continue
}

if task.IsDelivered {
// Already delivered in the meantime
log.Trace("Task[%d] has already been delivered", task.ID)
continue
}

if err := Deliver(ctx, task); err != nil {
log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err)
}
}

return nil
}

func enqueueHookTask(task *webhook_model.HookTask) error {
err := hookQueue.PushFunc(task.ID, nil)
func enqueueHookTask(taskID int64) error {
err := hookQueue.Push(taskID)
if err != nil && err != queue.ErrAlreadyInQueue {
return err
}
Expand Down Expand Up @@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook
return fmt.Errorf("CreateHookTask: %w", err)
}

return enqueueHookTask(task)
return enqueueHookTask(task.ID)
}

// PrepareWebhooks adds new webhooks to task queue for given payload.
Expand Down Expand Up @@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string)
return err
}

return enqueueHookTask(task)
return enqueueHookTask(task.ID)
}

0 comments on commit bf9f8ec

Please sign in to comment.