Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parallelly generating index failure with Mysql #24567

Merged
merged 23 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
066d8a5
Add more context on log for generating parall commit status index
lunny May 6, 2023
965f2cf
Fix commit status insert
lunny May 8, 2023
6eafb7b
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny May 8, 2023
4b9f941
Use a new method for mysql generating commit status index
lunny May 16, 2023
5644fb0
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny May 16, 2023
c705ee4
Fix sql
lunny May 16, 2023
a56e2f1
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny May 16, 2023
a6ea6f9
enable commit status index parall test
lunny May 16, 2023
a49a0a8
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 3, 2023
b43dea1
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 3, 2023
9eeb883
Fix SQL
lunny Jun 3, 2023
10c5cc5
Merge branch 'lunny/fix_parall_commit_status_index' of github.com:lun…
lunny Jun 3, 2023
c0ad639
Fix resource index for mysql
lunny Jun 3, 2023
d8086d6
add test for parally creating issue
lunny Jun 3, 2023
c1181fe
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 3, 2023
2fc3fb7
revert unnecessary change
lunny Jun 4, 2023
1d00c35
Reduce usage LAST_INSERT_ID
lunny Jun 4, 2023
fa1eb06
Remove unnecessary transation check
lunny Jun 4, 2023
1079455
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 5, 2023
df83a8f
Merge branch 'main' into lunny/fix_parall_commit_status_index
lunny Jun 5, 2023
e7f0164
Fix merge bug
lunny Jun 5, 2023
9d54319
Merge branch 'lunny/fix_parall_commit_status_index' of github.com:lun…
lunny Jun 5, 2023
e08b0bb
Merge branch 'main' into lunny/fix_parall_commit_status_index
GiteaBot Jun 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion models/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,31 @@ func postgresGetNextResourceIndex(ctx context.Context, tableName string, groupID
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
}

func mysqlGetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
if _, err := GetEngine(ctx).Exec(fmt.Sprintf("INSERT INTO %s (group_id, max_index) "+
"VALUES (?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
tableName), groupID); err != nil {
return 0, err
}

var idx int64
_, err := GetEngine(ctx).SQL(fmt.Sprintf("SELECT max_index FROM %s WHERE group_id = ?", tableName), groupID).Get(&idx)
if err != nil {
return 0, err
}
if idx == 0 {
return 0, errors.New("cannot get the correct index")
}
return idx, nil
}

// GetNextResourceIndex generates a resource index, it must run in the same transaction where the resource is created
func GetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
if setting.Database.Type.IsPostgreSQL() {
switch {
case setting.Database.Type.IsPostgreSQL():
return postgresGetNextResourceIndex(ctx, tableName, groupID)
case setting.Database.Type.IsMySQL():
return mysqlGetNextResourceIndex(ctx, tableName, groupID)
}

e := GetEngine(ctx)
Expand Down
34 changes: 28 additions & 6 deletions models/git/commit_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,40 @@ func postgresGetCommitStatusIndex(ctx context.Context, repoID int64, sha string)
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
}

func mysqlGetCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
if _, err := db.GetEngine(ctx).Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
"VALUES (?,?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
repoID, sha); err != nil {
return 0, err
}

var idx int64
_, err := db.GetEngine(ctx).SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ?",
repoID, sha).Get(&idx)
if err != nil {
return 0, err
}
if idx == 0 {
return 0, errors.New("cannot get the correct index")
}
return idx, nil
}

// GetNextCommitStatusIndex retried 3 times to generate a resource index
func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
if setting.Database.Type.IsPostgreSQL() {
switch {
case setting.Database.Type.IsPostgreSQL():
return postgresGetCommitStatusIndex(ctx, repoID, sha)
case setting.Database.Type.IsMySQL():
return mysqlGetCommitStatusIndex(ctx, repoID, sha)
}

e := db.GetEngine(ctx)

// try to update the max_index to next value, and acquire the write-lock for the record
res, err := e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil {
return 0, err
return 0, fmt.Errorf("update failed: %w", err)
}
affected, err := res.RowsAffected()
if err != nil {
Expand All @@ -86,26 +108,26 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in
_, errIns := e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha)
res, err = e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil {
return 0, err
return 0, fmt.Errorf("update2 failed: %w", err)
}
affected, err = res.RowsAffected()
if err != nil {
return 0, err
return 0, fmt.Errorf("RowsAffected failed: %w", err)
}
// if the update still can not update any records, the record must not exist and there must be some errors (insert error)
if affected == 0 {
if errIns == nil {
return 0, errors.New("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated")
}
return 0, errIns
return 0, fmt.Errorf("insert failed: %w", errIns)
}
}

// now, the new index is in database (protected by the transaction and write-lock)
var newIdx int64
has, err := e.SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?", repoID, sha).Get(&newIdx)
if err != nil {
return 0, err
return 0, fmt.Errorf("select failed: %w", err)
}
if !has {
return 0, errors.New("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected")
Expand Down
45 changes: 45 additions & 0 deletions tests/integration/api_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -106,6 +108,49 @@ func TestAPICreateIssue(t *testing.T) {
assert.Equal(t, repoBefore.NumClosedIssues, repoAfter.NumClosedIssues)
}

func TestAPICreateIssueParallel(t *testing.T) {
defer tests.PrepareTestEnv(t)()
const body, title = "apiTestBody", "apiTestTitle"

repoBefore := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
owner := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: repoBefore.OwnerID})

session := loginUser(t, owner.Name)
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteIssue)
urlStr := fmt.Sprintf("/api/v1/repos/%s/%s/issues?state=all&token=%s", owner.Name, repoBefore.Name, token)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(parentT *testing.T, i int) {
parentT.Run(fmt.Sprintf("ParallelCreateIssue_%d", i), func(t *testing.T) {
newTitle := title + strconv.Itoa(i)
newBody := body + strconv.Itoa(i)
req := NewRequestWithJSON(t, "POST", urlStr, &api.CreateIssueOption{
Body: newBody,
Title: newTitle,
Assignee: owner.Name,
})
resp := MakeRequest(t, req, http.StatusCreated)
var apiIssue api.Issue
DecodeJSON(t, resp, &apiIssue)
assert.Equal(t, newBody, apiIssue.Body)
assert.Equal(t, newTitle, apiIssue.Title)

unittest.AssertExistsAndLoadBean(t, &issues_model.Issue{
RepoID: repoBefore.ID,
AssigneeID: owner.ID,
Content: newBody,
Title: newTitle,
})

wg.Done()
})
}(t, i)
}
wg.Wait()
}

func TestAPIEditIssue(t *testing.T) {
defer tests.PrepareTestEnv(t)()

Expand Down
4 changes: 0 additions & 4 deletions tests/integration/repo_commits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"path"
"sync"
"testing"
Expand Down Expand Up @@ -135,9 +134,6 @@ func TestRepoCommitsWithStatusRunning(t *testing.T) {
}

func TestRepoCommitsStatusParallel(t *testing.T) {
if os.Getenv("CI") != "" {
t.Skip("Skipping because test is flaky on CI")
}
defer tests.PrepareTestEnv(t)()

session := loginUser(t, "user2")
Expand Down