Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parallelly generating index failure with Mysql #24567

Merged
merged 23 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
066d8a5
Add more context on log for generating parall commit status index
lunny May 6, 2023
965f2cf
Fix commit status insert
lunny May 8, 2023
6eafb7b
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny May 8, 2023
4b9f941
Use a new method for mysql generating commit status index
lunny May 16, 2023
5644fb0
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny May 16, 2023
c705ee4
Fix sql
lunny May 16, 2023
a56e2f1
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny May 16, 2023
a6ea6f9
enable commit status index parall test
lunny May 16, 2023
a49a0a8
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 3, 2023
b43dea1
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 3, 2023
9eeb883
Fix SQL
lunny Jun 3, 2023
10c5cc5
Merge branch 'lunny/fix_parall_commit_status_index' of github.com:lun…
lunny Jun 3, 2023
c0ad639
Fix resource index for mysql
lunny Jun 3, 2023
d8086d6
add test for parally creating issue
lunny Jun 3, 2023
c1181fe
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 3, 2023
2fc3fb7
revert unnecessary change
lunny Jun 4, 2023
1d00c35
Reduce usage LAST_INSERT_ID
lunny Jun 4, 2023
fa1eb06
Remove unnecessary transation check
lunny Jun 4, 2023
1079455
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 5, 2023
df83a8f
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 5, 2023
e7f0164
Fix merge bug
lunny Jun 5, 2023
9d54319
Merge branch 'lunny/fix_parall_commit_status_index' of github.com:lun…
lunny Jun 5, 2023
e08b0bb
Merge branch 'main' into lunny/fix_parall_commit_status_index
GiteaBot Jun 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion models/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,35 @@ func postgresGetNextResourceIndex(ctx context.Context, tableName string, groupID
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
}

func mysqlGetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
var idx int64
// start a transaction to lock the mysql thread so that LAST_INSERT_ID return the correct value
return idx, WithTx(ctx, func(ctx context.Context) error {
if _, err := GetEngine(ctx).Exec(fmt.Sprintf("INSERT INTO %s (group_id, max_index) "+
"VALUES (?,LAST_INSERT_ID(1)) ON DUPLICATE KEY UPDATE max_index = LAST_INSERT_ID(max_index+1)",
// if LAST_INSERT_ID(expr) is used, next time when use LAST_INSERT_ID(), it will be given the expr value
tableName), groupID); err != nil {
return err
}

_, err := GetEngine(ctx).SQL("SELECT LAST_INSERT_ID()").Get(&idx)
lunny marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if idx == 0 {
return errors.New("cannot get the correct index")
}
return nil
})
}

// GetNextResourceIndex generates a resource index, it must run in the same transaction where the resource is created
func GetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
if setting.Database.Type.IsPostgreSQL() {
switch {
case setting.Database.Type.IsPostgreSQL():
return postgresGetNextResourceIndex(ctx, tableName, groupID)
case setting.Database.Type.IsMySQL():
return mysqlGetNextResourceIndex(ctx, tableName, groupID)
}

e := GetEngine(ctx)
Expand Down
56 changes: 49 additions & 7 deletions models/git/commit_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,48 +64,90 @@ func postgresGetCommitStatusIndex(ctx context.Context, repoID int64, sha string)
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
}

func mysqlGetCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
var idx int64
// start a transaction to lock the mysql thread so that LAST_INSERT_ID return the correct value
return idx, db.WithTx(ctx, func(ctx context.Context) error {
if _, err := db.GetEngine(ctx).Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
"VALUES (?,?,LAST_INSERT_ID(1)) ON DUPLICATE KEY UPDATE max_index = LAST_INSERT_ID(max_index+1)",
// if LAST_INSERT_ID(expr) is used, next time when use LAST_INSERT_ID(), it will be given the expr value
repoID, sha); err != nil {
return err
}

_, err := db.GetEngine(ctx).SQL("SELECT LAST_INSERT_ID()").Get(&idx)
if err != nil {
return err
}
if idx == 0 {
return errors.New("cannot get the correct index")
}
return nil
})
}

// GetNextCommitStatusIndex retried 3 times to generate a resource index
func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
if setting.Database.Type.IsPostgreSQL() {
switch {
case setting.Database.Type.IsPostgreSQL():
return postgresGetCommitStatusIndex(ctx, repoID, sha)
case setting.Database.Type.IsMySQL():
return mysqlGetCommitStatusIndex(ctx, repoID, sha)
}

e := db.GetEngine(ctx)

// try to update the max_index to next value, and acquire the write-lock for the record
res, err := e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil {
return 0, err
return 0, fmt.Errorf("update failed: %w", err)
}
affected, err := res.RowsAffected()
if err != nil {
return 0, err
}
if affected == 0 {
// this slow path is only for the first time of creating a resource index
_, errIns := e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha)
var errIns error
for i := 0; i < 5; i++ {
lunny marked this conversation as resolved.
Show resolved Hide resolved
// in case mysql deadlock occurs, retry for a few times
// Error 1213 (40001): Deadlock found when trying to get lock; try restarting transaction
// To reproduce:
// * txn1: truncate commit_status_index; begin; update commit_status_index set max_index=max_index+1 where sha='xxx' and repo_id=1;
// * txn2: begin; update commit_status_index set max_index=max_index+1 where sha='xxx' and repo_id=1;
// * txn1: insert into `commit_status_index` (repo_id, sha, max_index) values (1, 'xxx', 0);
// * txn2: insert into `commit_status_index` (repo_id, sha, max_index) values (1, 'xxx', 0); -- then deadlock
// * then txn1 can commit, txn2 can retry and commit
_, errIns = e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha)
if errIns != nil && strings.Contains(errIns.Error(), "Deadlock") {
continue
} else {
break
}
}

res, err = e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil {
return 0, err
return 0, fmt.Errorf("update2 failed: %w", err)
}
affected, err = res.RowsAffected()
if err != nil {
return 0, err
return 0, fmt.Errorf("RowsAffected failed: %w", err)
}
// if the update still can not update any records, the record must not exist and there must be some errors (insert error)
if affected == 0 {
if errIns == nil {
return 0, errors.New("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated")
}
return 0, errIns
return 0, fmt.Errorf("insert failed: %w", errIns)
}
}

// now, the new index is in database (protected by the transaction and write-lock)
var newIdx int64
has, err := e.SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?", repoID, sha).Get(&newIdx)
if err != nil {
return 0, err
return 0, fmt.Errorf("select failed: %w", err)
}
if !has {
return 0, errors.New("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected")
Expand Down
45 changes: 45 additions & 0 deletions tests/integration/api_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -106,6 +108,49 @@ func TestAPICreateIssue(t *testing.T) {
assert.Equal(t, repoBefore.NumClosedIssues, repoAfter.NumClosedIssues)
}

func TestAPICreateIssueParallel(t *testing.T) {
defer tests.PrepareTestEnv(t)()
const body, title = "apiTestBody", "apiTestTitle"

repoBefore := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
owner := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: repoBefore.OwnerID})

session := loginUser(t, owner.Name)
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeRepo)
urlStr := fmt.Sprintf("/api/v1/repos/%s/%s/issues?state=all&token=%s", owner.Name, repoBefore.Name, token)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(parentT *testing.T, i int) {
parentT.Run(fmt.Sprintf("ParallelCreateIssue_%d", i), func(t *testing.T) {
newTitle := title + strconv.Itoa(i)
newBody := body + strconv.Itoa(i)
req := NewRequestWithJSON(t, "POST", urlStr, &api.CreateIssueOption{
Body: newBody,
Title: newTitle,
Assignee: owner.Name,
})
resp := MakeRequest(t, req, http.StatusCreated)
var apiIssue api.Issue
DecodeJSON(t, resp, &apiIssue)
assert.Equal(t, newBody, apiIssue.Body)
assert.Equal(t, newTitle, apiIssue.Title)

unittest.AssertExistsAndLoadBean(t, &issues_model.Issue{
RepoID: repoBefore.ID,
AssigneeID: owner.ID,
Content: newBody,
Title: newTitle,
})

wg.Done()
})
}(t, i)
}
wg.Wait()
}

func TestAPIEditIssue(t *testing.T) {
defer tests.PrepareTestEnv(t)()

Expand Down
4 changes: 0 additions & 4 deletions tests/integration/repo_commits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"path"
"sync"
"testing"
Expand Down Expand Up @@ -135,9 +134,6 @@ func TestRepoCommitsWithStatusRunning(t *testing.T) {
}

func TestRepoCommitsStatusParallel(t *testing.T) {
if os.Getenv("CI") != "" {
t.Skip("Skipping because test is flaky on CI")
}
defer tests.PrepareTestEnv(t)()

session := loginUser(t, "user2")
Expand Down