Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequencial issue index numbering with pessimistic locking mechanism #9931

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
333aec5
Passes PSQL, SQLite and MSSQL
guillep2k Jan 20, 2020
70a2a8d
Move to upsert strategy; all tests work
guillep2k Jan 21, 2020
677e49f
Merge branch 'master' into bylock-indexes
guillep2k Jan 21, 2020
d834fb1
Use a LockedResource to numerate issues and prs
guillep2k Jan 21, 2020
58ac901
Fix tests and reserved keyword
guillep2k Jan 21, 2020
aa3797c
Fix unit tests
guillep2k Jan 21, 2020
95db48a
Fix export comments
guillep2k Jan 22, 2020
d8ad174
A little refactoring and better function naming
guillep2k Jan 22, 2020
6bec3d5
Merge branch 'master' into bylock-indexes
guillep2k Jan 22, 2020
e747a2c
Support LockType == "" and LockKey == 0
guillep2k Jan 22, 2020
afeb6f0
Merge branch 'master' into bylock-indexes
guillep2k Jan 22, 2020
c503f0d
Prepare for merge
guillep2k Jan 28, 2020
9b7ec1d
Merge branch 'master' into bylock-indexes
guillep2k Jan 28, 2020
1792664
Go simple
guillep2k Jan 28, 2020
ce6c24f
Improve test legibility
guillep2k Jan 30, 2020
15ffbb4
Fix typo
guillep2k Jan 30, 2020
ea9c875
Remove dead code
guillep2k Jan 30, 2020
9cb79c9
Merge branch 'master' into bylock-indexes
guillep2k Jan 30, 2020
d185a4f
Prepare for merge
guillep2k Feb 1, 2020
f46eaf5
Merge branch 'master' into bylock-indexes
guillep2k Feb 1, 2020
621c9d6
Prepare to merge
guillep2k Feb 12, 2020
17fa5e1
Merge branch 'master' into bylock-indexes
guillep2k Feb 12, 2020
b30094b
Merge branch 'master' into bylock-indexes
guillep2k Feb 15, 2020
299d313
Merge branch 'master' into bylock-indexes
guillep2k Feb 16, 2020
cea7c4f
Merge branch 'master' into bylock-indexes
guillep2k Feb 20, 2020
2311de3
Merge branch 'master' into bylock-indexes
guillep2k Feb 29, 2020
7e280a4
Merge branch 'master' into bylock-indexes
guillep2k May 2, 2020
15e407b
Code review suggestions by @lunny
guillep2k May 2, 2020
dd85873
Ignore SQLite3 integration when _txlock=immediate
guillep2k May 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions integrations/locked_resource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package integrations

import (
"fmt"
"testing"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"

"github.com/stretchr/testify/assert"
)

const (
// The tests will fail if the waiter function takes less than
// blockerDelay minus tolerance to complete.
// Note: these values might require tuning in order to avoid
// false negatives.
waiterDelay = 100 * time.Millisecond
blockerDelay = 200 * time.Millisecond
tolerance = 50 * time.Millisecond // Should be <= (blockerDelay-waiterDelay)/2
)

type waitResult struct {
Waited time.Duration
Err error
}

func TestLockedResource(t *testing.T) {
defer prepareTestEnv(t)()

// We need to check whether two goroutines block each other
// Sadly, there's no way to ensure the second goroutine is
// waiting other than using a time delay. The longer the delay,
// the more certain we are the second goroutine is waiting.

// This check **must** fail as we're not blocking anything
assert.Error(t, blockTest("no block", func(ctx models.DBContext) error {
return nil
}))

models.AssertNotExistsBean(t, &models.LockedResource{LockType: "test-1", LockKey: 1})

// Test with creation (i.e. new lock type)
assert.NoError(t, blockTest("block-new", func(ctx models.DBContext) error {
_, err := models.GetLockedResourceCtx(ctx, "block-test-1", 1)
return err
}))

// Test without creation (i.e. lock type already exists)
assert.NoError(t, blockTest("block-existing", func(ctx models.DBContext) error {
_, err := models.GetLockedResourceCtx(ctx, "block-test-1", 1)
return err
}))

// Test with temporary record
assert.NoError(t, blockTest("block-temp", func(ctx models.DBContext) error {
return models.TemporarilyLockResourceKeyCtx(ctx, "temp-1", 1)
}))
}

func blockTest(name string, f func(ctx models.DBContext) error) error {
cb := make(chan waitResult)
cw := make(chan waitResult)
ref := time.Now()

go func() {
cb <- blockTestFunc(name, true, ref, f)
}()
go func() {
cw <- blockTestFunc(name, false, ref, f)
}()

resb := <-cb
resw := <-cw
if resb.Err != nil {
return resb.Err
}
if resw.Err != nil {
return resw.Err
}

if resw.Waited < blockerDelay-tolerance {
return fmt.Errorf("Waiter not blocked on %s; wait: %d ms, expected > %d ms",
name, resw.Waited.Milliseconds(), (blockerDelay - tolerance).Milliseconds())
}

return nil
}

func blockTestFunc(name string, blocker bool, ref time.Time, f func(ctx models.DBContext) error) (wr waitResult) {
if blocker {
name = fmt.Sprintf("blocker [%s]", name)
} else {
name = fmt.Sprintf("waiter [%s]", name)
}
err := models.WithTx(func(ctx models.DBContext) error {
log.Trace("Entering %s @%d", name, time.Since(ref).Milliseconds())
if !blocker {
log.Trace("Waiting on %s @%d", name, time.Since(ref).Milliseconds())
time.Sleep(waiterDelay)
log.Trace("Wait finished on %s @%d", name, time.Since(ref).Milliseconds())
}
if err := f(ctx); err != nil {
return err
}
if blocker {
log.Trace("Waiting on %s @%d", name, time.Since(ref).Milliseconds())
time.Sleep(blockerDelay)
log.Trace("Wait finished on %s @%d", name, time.Since(ref).Milliseconds())
} else {
wr.Waited = time.Since(ref)
}
log.Trace("Finishing %s @%d", name, time.Since(ref).Milliseconds())
return nil
})
if err != nil {
wr.Err = fmt.Errorf("error in %s: %v", name, err)
}
return
}
15 changes: 0 additions & 15 deletions models/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,21 +1171,6 @@ func (err ErrIssueLabelTemplateLoad) Error() string {
return fmt.Sprintf("Failed to load label template file '%s': %v", err.TemplateFile, err.OriginalError)
}

// ErrNewIssueInsert is used when the INSERT statement in newIssue fails
type ErrNewIssueInsert struct {
OriginalError error
}

// IsErrNewIssueInsert checks if an error is a ErrNewIssueInsert.
func IsErrNewIssueInsert(err error) bool {
_, ok := err.(ErrNewIssueInsert)
return ok
}

func (err ErrNewIssueInsert) Error() string {
return err.OriginalError.Error()
}

// ErrIssueWasClosed is used when close a closed issue
type ErrIssueWasClosed struct {
ID int64
Expand Down
47 changes: 18 additions & 29 deletions models/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ var (

const issueTasksRegexpStr = `(^\s*[-*]\s\[[\sx]\]\s.)|(\n\s*[-*]\s\[[\sx]\]\s.)`
const issueTasksDoneRegexpStr = `(^\s*[-*]\s\[[x]\]\s.)|(\n\s*[-*]\s\[[x]\]\s.)`
const issueMaxDupIndexAttempts = 3

// IssueLockedEnumerator is the name of the locked_resource used to
// numerate issues in a repository.
const IssueLockedEnumerator = "repository-index"

func init() {
issueTasksPat = regexp.MustCompile(issueTasksRegexpStr)
Expand Down Expand Up @@ -838,19 +841,23 @@ func newIssue(e *xorm.Session, doer *User, opts NewIssueOptions) (err error) {
}

// Milestone validation should happen before insert actual object.
if _, err := e.SetExpr("`index`", "coalesce(MAX(`index`),0)+1").
Where("repo_id=?", opts.Issue.RepoID).
Insert(opts.Issue); err != nil {
return ErrNewIssueInsert{err}
}

inserted, err := getIssueByID(e, opts.Issue.ID)
// Obtain the next issue number for this repository, which will be locked
// and reserved for the remaining of the transaction. Should the transaction
// be rolled back, the previous value will be restored.
idxresource, err := GetLockedResource(e, IssueLockedEnumerator, opts.Issue.RepoID)
if err != nil {
return err
return fmt.Errorf("GetLockedResource(%s)", IssueLockedEnumerator)
}
idxresource.Counter++
if err := idxresource.UpdateValue(); err != nil {
return fmt.Errorf("locked.UpdateValue(%s)", IssueLockedEnumerator)
}
opts.Issue.Index = idxresource.Counter

// Patch Index with the value calculated by the database
opts.Issue.Index = inserted.Index
if _, err = e.Insert(opts.Issue); err != nil {
return err
}

if opts.Issue.MilestoneID > 0 {
if _, err = e.Exec("UPDATE `milestone` SET num_issues=num_issues+1 WHERE id=?", opts.Issue.MilestoneID); err != nil {
Expand Down Expand Up @@ -928,24 +935,6 @@ func newIssue(e *xorm.Session, doer *User, opts NewIssueOptions) (err error) {

// NewIssue creates new issue with labels for repository.
func NewIssue(repo *Repository, issue *Issue, labelIDs []int64, uuids []string) (err error) {
// Retry several times in case INSERT fails due to duplicate key for (repo_id, index); see #7887
i := 0
for {
if err = newIssueAttempt(repo, issue, labelIDs, uuids); err == nil {
return nil
}
if !IsErrNewIssueInsert(err) {
return err
}
if i++; i == issueMaxDupIndexAttempts {
break
}
log.Error("NewIssue: error attempting to insert the new issue; will retry. Original error: %v", err)
}
return fmt.Errorf("NewIssue: too many errors attempting to insert the new issue. Last error was: %v", err)
}

func newIssueAttempt(repo *Repository, issue *Issue, labelIDs []int64, uuids []string) (err error) {
sess := x.NewSession()
defer sess.Close()
if err = sess.Begin(); err != nil {
Expand All @@ -958,7 +947,7 @@ func newIssueAttempt(repo *Repository, issue *Issue, labelIDs []int64, uuids []s
LabelIDs: labelIDs,
Attachments: uuids,
}); err != nil {
if IsErrUserDoesNotHaveAccessToRepo(err) || IsErrNewIssueInsert(err) {
if IsErrUserDoesNotHaveAccessToRepo(err) {
return err
}
return fmt.Errorf("newIssue: %v", err)
Expand Down
18 changes: 11 additions & 7 deletions models/issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestIssue_SearchIssueIDsByKeyword(t *testing.T) {
assert.EqualValues(t, []int64{1}, ids)
}

func testInsertIssue(t *testing.T, title, content string) {
func testInsertIssue(t *testing.T, title, content string, idx int64) int64 {
repo := AssertExistsAndLoadBean(t, &Repository{ID: 1}).(*Repository)
user := AssertExistsAndLoadBean(t, &User{ID: 2}).(*User)

Expand All @@ -313,18 +313,22 @@ func testInsertIssue(t *testing.T, title, content string) {
assert.True(t, has)
assert.EqualValues(t, issue.Title, newIssue.Title)
assert.EqualValues(t, issue.Content, newIssue.Content)
// there are 5 issues and max index is 5 on repository 1, so this one should 6
assert.EqualValues(t, 6, newIssue.Index)
assert.EqualValues(t, idx, newIssue.Index)

_, err = x.ID(issue.ID).Delete(new(Issue))
assert.NoError(t, err)
return issue.ID
}

func TestIssue_InsertIssue(t *testing.T) {
assert.NoError(t, PrepareTestDatabase())

testInsertIssue(t, "my issue1", "special issue's comments?")
testInsertIssue(t, `my issue2, this is my son's love \n \r \ `, "special issue's '' comments?")
// there are 5 issues and max index is 5 on repository 1, so this one should be 6
created := testInsertIssue(t, "my issue1", "special issue's comments?", 6)

_, err := x.ID(created).Delete(new(Issue))
assert.NoError(t, err)

// deleting an issue should not let a new issue reuse its index number; this one should be 7
_ = testInsertIssue(t, `my issue2, this is my son's love \n \r \ `, "special issue's '' comments?", 7)
}

func TestIssue_ResolveMentions(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions models/issue_xref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,12 @@ func testCreateIssue(t *testing.T, repo, doer int64, title, content string, ispu
sess := x.NewSession()
defer sess.Close()
assert.NoError(t, sess.Begin())
_, err := sess.SetExpr("`index`", "coalesce(MAX(`index`),0)+1").Where("repo_id=?", repo).Insert(i)
idxresource, err := GetLockedResource(sess, IssueLockedEnumerator, repo)
assert.NoError(t, err)
i, err = getIssueByID(sess, i.ID)
idxresource.Counter++
assert.NoError(t, idxresource.UpdateValue())
i.Index = idxresource.Counter
_, err = sess.Insert(i)
assert.NoError(t, err)
assert.NoError(t, i.addCrossReferences(sess, d, false))
assert.NoError(t, sess.Commit())
Expand Down
Loading