Skip to content

Commit 315124b

Browse files
authored
Fix parallelly generating index failure with Mysql (#24567)
1 parent 3d1fda7 commit 315124b

File tree

4 files changed

+95
-11
lines changed

4 files changed

+95
-11
lines changed

models/db/index.go

+22-1
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,31 @@ func postgresGetNextResourceIndex(ctx context.Context, tableName string, groupID
7171
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
7272
}
7373

74+
func mysqlGetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
75+
if _, err := GetEngine(ctx).Exec(fmt.Sprintf("INSERT INTO %s (group_id, max_index) "+
76+
"VALUES (?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
77+
tableName), groupID); err != nil {
78+
return 0, err
79+
}
80+
81+
var idx int64
82+
_, err := GetEngine(ctx).SQL(fmt.Sprintf("SELECT max_index FROM %s WHERE group_id = ?", tableName), groupID).Get(&idx)
83+
if err != nil {
84+
return 0, err
85+
}
86+
if idx == 0 {
87+
return 0, errors.New("cannot get the correct index")
88+
}
89+
return idx, nil
90+
}
91+
7492
// GetNextResourceIndex generates a resource index, it must run in the same transaction where the resource is created
7593
func GetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
76-
if setting.Database.Type.IsPostgreSQL() {
94+
switch {
95+
case setting.Database.Type.IsPostgreSQL():
7796
return postgresGetNextResourceIndex(ctx, tableName, groupID)
97+
case setting.Database.Type.IsMySQL():
98+
return mysqlGetNextResourceIndex(ctx, tableName, groupID)
7899
}
79100

80101
e := GetEngine(ctx)

models/git/commit_status.go

+28-6
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,40 @@ func postgresGetCommitStatusIndex(ctx context.Context, repoID int64, sha string)
6464
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
6565
}
6666

67+
func mysqlGetCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
68+
if _, err := db.GetEngine(ctx).Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
69+
"VALUES (?,?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
70+
repoID, sha); err != nil {
71+
return 0, err
72+
}
73+
74+
var idx int64
75+
_, err := db.GetEngine(ctx).SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ?",
76+
repoID, sha).Get(&idx)
77+
if err != nil {
78+
return 0, err
79+
}
80+
if idx == 0 {
81+
return 0, errors.New("cannot get the correct index")
82+
}
83+
return idx, nil
84+
}
85+
6786
// GetNextCommitStatusIndex retried 3 times to generate a resource index
6887
func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
69-
if setting.Database.Type.IsPostgreSQL() {
88+
switch {
89+
case setting.Database.Type.IsPostgreSQL():
7090
return postgresGetCommitStatusIndex(ctx, repoID, sha)
91+
case setting.Database.Type.IsMySQL():
92+
return mysqlGetCommitStatusIndex(ctx, repoID, sha)
7193
}
7294

7395
e := db.GetEngine(ctx)
7496

7597
// try to update the max_index to next value, and acquire the write-lock for the record
7698
res, err := e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
7799
if err != nil {
78-
return 0, err
100+
return 0, fmt.Errorf("update failed: %w", err)
79101
}
80102
affected, err := res.RowsAffected()
81103
if err != nil {
@@ -86,26 +108,26 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in
86108
_, errIns := e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha)
87109
res, err = e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
88110
if err != nil {
89-
return 0, err
111+
return 0, fmt.Errorf("update2 failed: %w", err)
90112
}
91113
affected, err = res.RowsAffected()
92114
if err != nil {
93-
return 0, err
115+
return 0, fmt.Errorf("RowsAffected failed: %w", err)
94116
}
95117
// if the update still can not update any records, the record must not exist and there must be some errors (insert error)
96118
if affected == 0 {
97119
if errIns == nil {
98120
return 0, errors.New("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated")
99121
}
100-
return 0, errIns
122+
return 0, fmt.Errorf("insert failed: %w", errIns)
101123
}
102124
}
103125

104126
// now, the new index is in database (protected by the transaction and write-lock)
105127
var newIdx int64
106128
has, err := e.SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?", repoID, sha).Get(&newIdx)
107129
if err != nil {
108-
return 0, err
130+
return 0, fmt.Errorf("select failed: %w", err)
109131
}
110132
if !has {
111133
return 0, errors.New("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected")

tests/integration/api_issue_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"fmt"
88
"net/http"
99
"net/url"
10+
"strconv"
11+
"sync"
1012
"testing"
1113
"time"
1214

@@ -106,6 +108,49 @@ func TestAPICreateIssue(t *testing.T) {
106108
assert.Equal(t, repoBefore.NumClosedIssues, repoAfter.NumClosedIssues)
107109
}
108110

111+
func TestAPICreateIssueParallel(t *testing.T) {
112+
defer tests.PrepareTestEnv(t)()
113+
const body, title = "apiTestBody", "apiTestTitle"
114+
115+
repoBefore := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
116+
owner := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: repoBefore.OwnerID})
117+
118+
session := loginUser(t, owner.Name)
119+
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteIssue)
120+
urlStr := fmt.Sprintf("/api/v1/repos/%s/%s/issues?state=all&token=%s", owner.Name, repoBefore.Name, token)
121+
122+
var wg sync.WaitGroup
123+
for i := 0; i < 10; i++ {
124+
wg.Add(1)
125+
go func(parentT *testing.T, i int) {
126+
parentT.Run(fmt.Sprintf("ParallelCreateIssue_%d", i), func(t *testing.T) {
127+
newTitle := title + strconv.Itoa(i)
128+
newBody := body + strconv.Itoa(i)
129+
req := NewRequestWithJSON(t, "POST", urlStr, &api.CreateIssueOption{
130+
Body: newBody,
131+
Title: newTitle,
132+
Assignee: owner.Name,
133+
})
134+
resp := MakeRequest(t, req, http.StatusCreated)
135+
var apiIssue api.Issue
136+
DecodeJSON(t, resp, &apiIssue)
137+
assert.Equal(t, newBody, apiIssue.Body)
138+
assert.Equal(t, newTitle, apiIssue.Title)
139+
140+
unittest.AssertExistsAndLoadBean(t, &issues_model.Issue{
141+
RepoID: repoBefore.ID,
142+
AssigneeID: owner.ID,
143+
Content: newBody,
144+
Title: newTitle,
145+
})
146+
147+
wg.Done()
148+
})
149+
}(t, i)
150+
}
151+
wg.Wait()
152+
}
153+
109154
func TestAPIEditIssue(t *testing.T) {
110155
defer tests.PrepareTestEnv(t)()
111156

tests/integration/repo_commits_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88
"net/http"
99
"net/http/httptest"
10-
"os"
1110
"path"
1211
"sync"
1312
"testing"
@@ -135,9 +134,6 @@ func TestRepoCommitsWithStatusRunning(t *testing.T) {
135134
}
136135

137136
func TestRepoCommitsStatusParallel(t *testing.T) {
138-
if os.Getenv("CI") != "" {
139-
t.Skip("Skipping because test is flaky on CI")
140-
}
141137
defer tests.PrepareTestEnv(t)()
142138

143139
session := loginUser(t, "user2")

0 commit comments

Comments
 (0)