Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Yet More Graceful #8874

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions custom/conf/app.ini.sample
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,19 @@ REPO_INDEXER_INCLUDE =
; A comma separated list of glob patterns to exclude from the index; ; default is empty
REPO_INDEXER_EXCLUDE =

[queue]
; General queue queue type, currently support: persistable-channel, channel, level, redis, dummy
; default to persistable-channel
TYPE = persistable-channel
; data-dir for storing persistable queues and level queues, individual queues will be named by their type
DATADIR = queues/
; Default queue length before a channel queue will block
LENGTH = 20
; Batch size to send for batched queues
BATCH_LENGTH = 20
; Connection string for redis queues this will store the redis connection string.
CONN_STR = "addrs=127.0.0.1:6379 db=0"

[admin]
; Disallow regular (non-admin) users from creating organizations.
DISABLE_REGULAR_ORG_CREATION = false
Expand Down
8 changes: 8 additions & 0 deletions docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ relation to port exhaustion.
- `MAX_FILE_SIZE`: **1048576**: Maximum size in bytes of files to be indexed.
- `STARTUP_TIMEOUT`: **30s**: If the indexer takes longer than this timeout to start - fail. (This timeout will be added to the hammer time above for child processes - as bleve will not start until the previous parent is shutdown.) Set to zero to never timeout.

## Queue (`queue`)

- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel`, `channel`, `level`, `redis`, `dummy`
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues.
- `LENGTH`: **20**: Maximal queue size before channel queues block
- `BATCH_LENGTH`: **20**: Batch data before passing to the handler
- `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type.

## Admin (`admin`)
- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled

Expand Down
1 change: 1 addition & 0 deletions integrations/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func initIntegrationTest() {
defer db.Close()
}
routers.GlobalInit(graceful.GetManager().HammerContext())
NotifierListenerInit()
}

func prepareTestEnv(t testing.TB, skip ...int) func() {
Expand Down
9 changes: 8 additions & 1 deletion integrations/issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"strconv"
"strings"
"testing"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/indexer/issues"
"code.gitea.io/gitea/modules/references"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/test"
Expand Down Expand Up @@ -87,7 +89,12 @@ func TestViewIssuesKeyword(t *testing.T) {
defer prepareTestEnv(t)()

repo := models.AssertExistsAndLoadBean(t, &models.Repository{ID: 1}).(*models.Repository)

issue := models.AssertExistsAndLoadBean(t, &models.Issue{
RepoID: repo.ID,
Index: 1,
}).(*models.Issue)
issues.UpdateIssueIndexer(issue)
time.Sleep(time.Second * 1)
const keyword = "first"
req := NewRequestf(t, "GET", "%s/issues?q=%s", repo.RelLink(), keyword)
resp := MakeRequest(t, req, http.StatusOK)
Expand Down
121 changes: 121 additions & 0 deletions integrations/notification_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package integrations

import (
"encoding/json"
"reflect"
"sync"
"testing"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/notification/base"
"code.gitea.io/gitea/modules/queue"
)

var notifierListener *NotifierListener

var once = sync.Once{}

type NotifierListener struct {
lock sync.RWMutex
callbacks map[string][]*func(string, [][]byte)
notifier base.Notifier
}

func NotifierListenerInit() {
once.Do(func() {
notifierListener = &NotifierListener{
callbacks: map[string][]*func(string, [][]byte){},
}
notifierListener.notifier = base.NewQueueNotifierWithHandle("test-notifier", notifierListener.handle)
notification.RegisterNotifier(notifierListener.notifier)
})
}

// Register will register a callback with the provided notifier function
func (n *NotifierListener) Register(functionName string, callback *func(string, [][]byte)) {
n.lock.Lock()
n.callbacks[functionName] = append(n.callbacks[functionName], callback)
n.lock.Unlock()
}

// Deregister will remove the provided callback from the provided notifier function
func (n *NotifierListener) Deregister(functionName string, callback *func(string, [][]byte)) {
n.lock.Lock()
found := -1
for i, callbackPtr := range n.callbacks[functionName] {
if callbackPtr == callback {
found = i
break
}
}
if found > -1 {
n.callbacks[functionName] = append(n.callbacks[functionName][0:found], n.callbacks[functionName][found+1:]...)
}
n.lock.Unlock()
}

// RegisterChannel will return a registered channel with function name and return a function to deregister it and close the channel at the end
func (n *NotifierListener) RegisterChannel(name string, argNumber int, exemplar interface{}) (<-chan interface{}, func()) {
t := reflect.TypeOf(exemplar)
channel := make(chan interface{}, 10)
callback := func(_ string, args [][]byte) {
n := reflect.New(t).Elem()
err := json.Unmarshal(args[argNumber], n.Addr().Interface())
if err != nil {
log.Error("Wrong Argument passed to register channel: %v ", err)
}
channel <- n.Interface()
}
n.Register(name, &callback)

return channel, func() {
n.Deregister(name, &callback)
close(channel)
}
}

func (n *NotifierListener) handle(data ...queue.Data) {
n.lock.RLock()
defer n.lock.RUnlock()
for _, datum := range data {
call := datum.(*base.FunctionCall)
callbacks, ok := n.callbacks[call.Name]
if ok && len(callbacks) > 0 {
for _, callback := range callbacks {
(*callback)(call.Name, call.Args)
}
}
}
}

func TestNotifierListener(t *testing.T) {
defer prepareTestEnv(t)()

createPullNotified, deregister := notifierListener.RegisterChannel("NotifyNewPullRequest", 0, &models.PullRequest{})

bs, _ := json.Marshal(&models.PullRequest{})
notifierListener.handle(&base.FunctionCall{
Name: "NotifyNewPullRequest",
Args: [][]byte{
bs,
},
})
<-createPullNotified

notifierListener.notifier.NotifyNewPullRequest(&models.PullRequest{})
<-createPullNotified

notification.NotifyNewPullRequest(&models.PullRequest{})
<-createPullNotified

deregister()

notification.NotifyNewPullRequest(&models.PullRequest{})
// would panic if not deregistered
}
105 changes: 72 additions & 33 deletions integrations/pull_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,75 @@ func testPullCleanUp(t *testing.T, session *TestSession, user, repo, pullnum str

func TestPullMerge(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
assert.NoError(t, err)
hookTasksLenBefore := len(hookTasks)
createPullNotified, deferableCreate := notifierListener.RegisterChannel("NotifyNewPullRequest", 0, &models.PullRequest{})
defer deferableCreate()

mergePullNotified, deferableMerge := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
defer deferableMerge()

session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
testEditFile(t, session, "user1", "repo1", "master", "README.md", "Hello, World (Edited)\n")

var prInterface interface{}

resp := testPullCreate(t, session, "user1", "repo1", "master", "This is a pull title")
select {
case prInterface = <-createPullNotified:
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Took too long to notify!")
}
pr := prInterface.(*models.PullRequest)
pr.LoadBaseRepo()
pr.LoadHeadRepo()
pr.BaseRepo.MustOwner()
pr.HeadRepo.MustOwner()

assert.EqualValues(t, "user1", pr.HeadRepo.Owner.Name)
assert.EqualValues(t, "repo1", pr.HeadRepo.Name)
assert.EqualValues(t, "user2", pr.BaseRepo.Owner.Name)
assert.EqualValues(t, "repo1", pr.BaseRepo.Name)

elem := strings.Split(test.RedirectURL(resp), "/")
assert.EqualValues(t, "pulls", elem[3])

testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleMerge)

hookTasks, err = models.HookTasks(1, 1)
assert.NoError(t, err)
assert.Len(t, hookTasks, hookTasksLenBefore+1)
select {
case prInterface = <-mergePullNotified:
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Took too long to notify!")
}

pr = prInterface.(*models.PullRequest)
pr.LoadBaseRepo()
pr.LoadHeadRepo()
pr.BaseRepo.MustOwner()
pr.HeadRepo.MustOwner()

assert.EqualValues(t, "user1", pr.HeadRepo.Owner.Name)
assert.EqualValues(t, "repo1", pr.HeadRepo.Name)
assert.EqualValues(t, "user2", pr.BaseRepo.Owner.Name)
assert.EqualValues(t, "repo1", pr.BaseRepo.Name)

time.Sleep(100 * time.Millisecond)
select {
case prInterface = <-createPullNotified:
assert.Fail(t, "Should only have one pull create notification: %v", prInterface)
default:
}
select {
case prInterface = <-mergePullNotified:
assert.Fail(t, "Should only have one pull merge notification: %v", prInterface)
default:
}
})
}

func TestPullRebase(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
assert.NoError(t, err)
hookTasksLenBefore := len(hookTasks)
mergePullNotified, deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
defer deferable()

session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
Expand All @@ -96,20 +140,18 @@ func TestPullRebase(t *testing.T) {
elem := strings.Split(test.RedirectURL(resp), "/")
assert.EqualValues(t, "pulls", elem[3])
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleRebase)

hookTasks, err = models.HookTasks(1, 1)
assert.NoError(t, err)
assert.Len(t, hookTasks, hookTasksLenBefore+1)
select {
case <-mergePullNotified:
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Took too long to notify!")
}
})
}

func TestPullRebaseMerge(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
defer prepareTestEnv(t)()

hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
assert.NoError(t, err)
hookTasksLenBefore := len(hookTasks)
mergePullNotified, deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
defer deferable()

session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
Expand All @@ -121,19 +163,18 @@ func TestPullRebaseMerge(t *testing.T) {
assert.EqualValues(t, "pulls", elem[3])
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleRebaseMerge)

hookTasks, err = models.HookTasks(1, 1)
assert.NoError(t, err)
assert.Len(t, hookTasks, hookTasksLenBefore+1)
select {
case <-mergePullNotified:
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Took too long to notify!")
}
})
}

func TestPullSquash(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
defer prepareTestEnv(t)()

hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
assert.NoError(t, err)
hookTasksLenBefore := len(hookTasks)
mergePullNotified, deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
defer deferable()

session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
Expand All @@ -146,15 +187,16 @@ func TestPullSquash(t *testing.T) {
assert.EqualValues(t, "pulls", elem[3])
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleSquash)

hookTasks, err = models.HookTasks(1, 1)
assert.NoError(t, err)
assert.Len(t, hookTasks, hookTasksLenBefore+1)
select {
case <-mergePullNotified:
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Took too long to notify!")
}
})
}

func TestPullCleanUpAfterMerge(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
defer prepareTestEnv(t)()
session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
testEditFileToNewBranch(t, session, "user1", "repo1", "master", "feature/test", "README.md", "Hello, World (Edited)\n")
Expand Down Expand Up @@ -190,7 +232,6 @@ func TestPullCleanUpAfterMerge(t *testing.T) {

func TestCantMergeWorkInProgress(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
defer prepareTestEnv(t)()
session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
testEditFile(t, session, "user1", "repo1", "master", "README.md", "Hello, World (Edited)\n")
Expand All @@ -212,7 +253,6 @@ func TestCantMergeWorkInProgress(t *testing.T) {

func TestCantMergeConflict(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
defer prepareTestEnv(t)()
session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
testEditFileToNewBranch(t, session, "user1", "repo1", "master", "conflict", "README.md", "Hello, World (Edited Once)\n")
Expand Down Expand Up @@ -258,7 +298,6 @@ func TestCantMergeConflict(t *testing.T) {

func TestCantMergeUnrelated(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
defer prepareTestEnv(t)()
session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
testEditFileToNewBranch(t, session, "user1", "repo1", "master", "base", "README.md", "Hello, World (Edited Twice)\n")
Expand Down
7 changes: 7 additions & 0 deletions integrations/sqlite.ini
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,10 @@ INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTI3OTU5ODN9.O
[oauth2]
JWT_SECRET = KZb_QLUd4fYVyxetjxC4eZkrBgWM2SndOOWDNtgUUko

[queue]
TYPE=channel

[queue.test-notifier]
BATCH_LENGTH=1
LENGTH=20

2 changes: 1 addition & 1 deletion models/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Issue struct {
IsClosed bool `xorm:"INDEX"`
IsRead bool `xorm:"-"`
IsPull bool `xorm:"INDEX"` // Indicates whether is a pull request or not.
PullRequest *PullRequest `xorm:"-"`
PullRequest *PullRequest `xorm:"-" json:"-"`
NumComments int
Ref string

Expand Down
Loading