Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retrying index issues #27554

Merged
merged 3 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions modules/indexer/issues/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,13 @@ func getIssueIndexerQueueHandler(ctx context.Context) func(items ...*IndexerMeta
func populateIssueIndexer(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true)
defer finished()
if err := PopulateIssueIndexer(ctx, true); err != nil {
ctx = contextWithKeepRetry(ctx) // keep retrying since it's a background task
if err := PopulateIssueIndexer(ctx); err != nil {
log.Error("Issue indexer population failed: %v", err)
}
}

func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error {
func PopulateIssueIndexer(ctx context.Context) error {
for page := 1; ; page++ {
select {
case <-ctx.Done():
Expand All @@ -232,20 +233,8 @@ func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error {
}

for _, repo := range repos {
for {
select {
case <-ctx.Done():
return fmt.Errorf("shutdown before completion: %w", ctx.Err())
default:
}
if err := updateRepoIndexer(ctx, repo.ID); err != nil {
if keepRetrying && ctx.Err() == nil {
log.Warn("Retry to populate issue indexer for repo %d: %v", repo.ID, err)
continue
}
return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err)
}
break
if err := updateRepoIndexer(ctx, repo.ID); err != nil {
return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err)
}
}
}
Expand All @@ -259,8 +248,8 @@ func UpdateRepoIndexer(ctx context.Context, repoID int64) {
}

// UpdateIssueIndexer add/update an issue to the issue indexer
func UpdateIssueIndexer(issueID int64) {
if err := updateIssueIndexer(issueID); err != nil {
func UpdateIssueIndexer(ctx context.Context, issueID int64) {
if err := updateIssueIndexer(ctx, issueID); err != nil {
log.Error("Unable to push issue %d to issue indexer: %v", issueID, err)
}
}
Expand Down
46 changes: 34 additions & 12 deletions modules/indexer/issues/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ func updateRepoIndexer(ctx context.Context, repoID int64) error {
return fmt.Errorf("issue_model.GetIssueIDsByRepoID: %w", err)
}
for _, id := range ids {
if err := updateIssueIndexer(id); err != nil {
if err := updateIssueIndexer(ctx, id); err != nil {
return err
}
}
return nil
}

func updateIssueIndexer(issueID int64) error {
return pushIssueIndexerQueue(&IndexerMetadata{ID: issueID})
func updateIssueIndexer(ctx context.Context, issueID int64) error {
return pushIssueIndexerQueue(ctx, &IndexerMetadata{ID: issueID})
}

func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error {
Expand All @@ -148,26 +148,48 @@ func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error {
if len(ids) == 0 {
return nil
}
return pushIssueIndexerQueue(&IndexerMetadata{
return pushIssueIndexerQueue(ctx, &IndexerMetadata{
IDs: ids,
IsDelete: true,
})
}

func pushIssueIndexerQueue(data *IndexerMetadata) error {
type keepRetryKey struct{}

// contextWithKeepRetry returns a context with a key indicating that the indexer should keep retrying.
// Please note that it's for background tasks only, and it should not be used for user requests, or it may cause blocking.
func contextWithKeepRetry(ctx context.Context) context.Context {
return context.WithValue(ctx, keepRetryKey{}, true)
}

func pushIssueIndexerQueue(ctx context.Context, data *IndexerMetadata) error {
if issueIndexerQueue == nil {
// Some unit tests will trigger indexing, but the queue is not initialized.
// It's OK to ignore it, but log a warning message in case it's not a unit test.
log.Warn("Trying to push %+v to issue indexer queue, but the queue is not initialized, it's OK if it's a unit test", data)
return nil
}

err := issueIndexerQueue.Push(data)
if errors.Is(err, queue.ErrAlreadyInQueue) {
return nil
}
if errors.Is(err, context.DeadlineExceeded) {
log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err := issueIndexerQueue.Push(data)
if errors.Is(err, queue.ErrAlreadyInQueue) {
return nil
}
if errors.Is(err, context.DeadlineExceeded) { // the queue is full
log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.")
if ctx.Value(keepRetryKey{}) == nil {
return err
}
// It will be better to increase the queue size instead of retrying, but users may ignore the previous warning message.
// However, even it retries, it may still cause index loss when there's a deadline in the context.
log.Debug("Retry to push %+v to issue indexer queue", data)
continue
}
return err
}
return err
}
2 changes: 1 addition & 1 deletion services/cron/tasks_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func registerRebuildIssueIndexer() {
RunAtStart: false,
Schedule: "@annually",
}, func(ctx context.Context, _ *user_model.User, config Config) error {
return issue_indexer.PopulateIssueIndexer(ctx, false)
return issue_indexer.PopulateIssueIndexer(ctx)
})
}

Expand Down
16 changes: 8 additions & 8 deletions services/indexer/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,35 @@ func (r *indexerNotifier) AdoptRepository(ctx context.Context, doer, u *user_mod
func (r *indexerNotifier) CreateIssueComment(ctx context.Context, doer *user_model.User, repo *repo_model.Repository,
issue *issues_model.Issue, comment *issues_model.Comment, mentions []*user_model.User,
) {
issue_indexer.UpdateIssueIndexer(issue.ID)
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
}

func (r *indexerNotifier) NewIssue(ctx context.Context, issue *issues_model.Issue, mentions []*user_model.User) {
issue_indexer.UpdateIssueIndexer(issue.ID)
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
}

func (r *indexerNotifier) NewPullRequest(ctx context.Context, pr *issues_model.PullRequest, mentions []*user_model.User) {
if err := pr.LoadIssue(ctx); err != nil {
log.Error("LoadIssue: %v", err)
return
}
issue_indexer.UpdateIssueIndexer(pr.Issue.ID)
issue_indexer.UpdateIssueIndexer(ctx, pr.Issue.ID)
}

func (r *indexerNotifier) UpdateComment(ctx context.Context, doer *user_model.User, c *issues_model.Comment, oldContent string) {
if err := c.LoadIssue(ctx); err != nil {
log.Error("LoadIssue: %v", err)
return
}
issue_indexer.UpdateIssueIndexer(c.Issue.ID)
issue_indexer.UpdateIssueIndexer(ctx, c.Issue.ID)
}

func (r *indexerNotifier) DeleteComment(ctx context.Context, doer *user_model.User, comment *issues_model.Comment) {
if err := comment.LoadIssue(ctx); err != nil {
log.Error("LoadIssue: %v", err)
return
}
issue_indexer.UpdateIssueIndexer(comment.Issue.ID)
issue_indexer.UpdateIssueIndexer(ctx, comment.Issue.ID)
}

func (r *indexerNotifier) DeleteRepository(ctx context.Context, doer *user_model.User, repo *repo_model.Repository) {
Expand Down Expand Up @@ -120,13 +120,13 @@ func (r *indexerNotifier) ChangeDefaultBranch(ctx context.Context, repo *repo_mo
}

func (r *indexerNotifier) IssueChangeContent(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldContent string) {
issue_indexer.UpdateIssueIndexer(issue.ID)
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
}

func (r *indexerNotifier) IssueChangeTitle(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldTitle string) {
issue_indexer.UpdateIssueIndexer(issue.ID)
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
}

func (r *indexerNotifier) IssueChangeRef(ctx context.Context, doer *user_model.User, issue *issues_model.Issue, oldRef string) {
issue_indexer.UpdateIssueIndexer(issue.ID)
issue_indexer.UpdateIssueIndexer(ctx, issue.ID)
}
3 changes: 2 additions & 1 deletion tests/integration/issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package integration

import (
"context"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestViewIssuesKeyword(t *testing.T) {
RepoID: repo.ID,
Index: 1,
})
issues.UpdateIssueIndexer(issue.ID)
issues.UpdateIssueIndexer(context.Background(), issue.ID)
time.Sleep(time.Second * 1)
const keyword = "first"
req := NewRequestf(t, "GET", "%s/issues?q=%s", repo.Link(), keyword)
Expand Down