Skip to content

Commit f8fc983

Browse files
committed
rewrite
1 parent 7abe958 commit f8fc983

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+2087
-6360
lines changed

cmd/hook.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"code.gitea.io/gitea/modules/private"
1919
repo_module "code.gitea.io/gitea/modules/repository"
2020
"code.gitea.io/gitea/modules/setting"
21-
"code.gitea.io/gitea/modules/util"
2221

2322
"github.com/urfave/cli"
2423
)
@@ -141,7 +140,7 @@ func (d *delayWriter) Close() error {
141140
if d == nil {
142141
return nil
143142
}
144-
stopped := util.StopTimer(d.timer)
143+
stopped := d.timer.Stop()
145144
if stopped || d.buf == nil {
146145
return nil
147146
}

models/unittest/testdb.go

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func MainTest(m *testing.M, testOpts *TestOptions) {
8989
}
9090

9191
if err = CreateTestEngine(opts); err != nil {
92+
_, _ = fmt.Fprintln(os.Stderr, `sqlite3 requires: import _ "github.com/mattn/go-sqlite3" or -tags sqlite,sqlite_unlock_notify`)
9293
fatalTestError("Error creating test engine: %v\n", err)
9394
}
9495

modules/indexer/code/bleve.go

-4
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,6 @@ func (b *BleveIndexer) Close() {
273273
log.Info("PID: %d Repository Indexer closed", os.Getpid())
274274
}
275275

276-
// SetAvailabilityChangeCallback does nothing
277-
func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
278-
}
279-
280276
// Ping does nothing
281277
func (b *BleveIndexer) Ping() bool {
282278
return true

modules/indexer/code/elastic_search.go

+5-17
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,11 @@ var _ Indexer = &ElasticSearchIndexer{}
4242

4343
// ElasticSearchIndexer implements Indexer interface
4444
type ElasticSearchIndexer struct {
45-
client *elastic.Client
46-
indexerAliasName string
47-
available bool
48-
availabilityCallback func(bool)
49-
stopTimer chan struct{}
50-
lock sync.RWMutex
45+
client *elastic.Client
46+
indexerAliasName string
47+
available bool
48+
stopTimer chan struct{}
49+
lock sync.RWMutex
5150
}
5251

5352
type elasticLogger struct {
@@ -198,13 +197,6 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
198197
return exists, nil
199198
}
200199

201-
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
202-
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
203-
b.lock.Lock()
204-
defer b.lock.Unlock()
205-
b.availabilityCallback = callback
206-
}
207-
208200
// Ping checks if elastic is available
209201
func (b *ElasticSearchIndexer) Ping() bool {
210202
b.lock.RLock()
@@ -529,8 +521,4 @@ func (b *ElasticSearchIndexer) setAvailability(available bool) {
529521
}
530522

531523
b.available = available
532-
if b.availabilityCallback != nil {
533-
// Call the callback from within the lock to ensure that the ordering remains correct
534-
b.availabilityCallback(b.available)
535-
}
536524
}

modules/indexer/code/indexer.go

+26-30
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ type SearchResultLanguages struct {
4444
// Indexer defines an interface to index and search code contents
4545
type Indexer interface {
4646
Ping() bool
47-
SetAvailabilityChangeCallback(callback func(bool))
4847
Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error
4948
Delete(repoID int64) error
5049
Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
@@ -81,7 +80,7 @@ type IndexerData struct {
8180
RepoID int64
8281
}
8382

84-
var indexerQueue queue.UniqueQueue
83+
var indexerQueue *queue.WorkerPoolQueue[*IndexerData]
8584

8685
func index(ctx context.Context, indexer Indexer, repoID int64) error {
8786
repo, err := repo_model.GetRepositoryByID(ctx, repoID)
@@ -137,37 +136,46 @@ func Init() {
137136
// Create the Queue
138137
switch setting.Indexer.RepoType {
139138
case "bleve", "elasticsearch":
140-
handler := func(data ...queue.Data) []queue.Data {
139+
handler := func(items ...*IndexerData) []*IndexerData {
141140
idx, err := indexer.get()
142141
if idx == nil || err != nil {
143142
log.Error("Codes indexer handler: unable to get indexer!")
144-
return data
143+
return items
144+
}
145+
if !idx.Ping() {
146+
log.Error("Code indexer handler: indexer is unavailable.")
147+
return items
145148
}
146149

147-
unhandled := make([]queue.Data, 0, len(data))
148-
for _, datum := range data {
149-
indexerData, ok := datum.(*IndexerData)
150-
if !ok {
151-
log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
152-
continue
153-
}
150+
// the old logic did: if indexer.Ping() { return nil }, skip all failed items
151+
152+
for _, indexerData := range items {
154153
log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)
155154

155+
// FIXME: it seems there is a bug in `CatFileBatch` or `nio.Pipe`, which will cause the process to hang forever in rare cases
156+
/*
157+
sync.(*Cond).Wait(cond.go:70)
158+
github.com/djherbis/nio/v3.(*PipeReader).Read(sync.go:106)
159+
bufio.(*Reader).fill(bufio.go:106)
160+
bufio.(*Reader).ReadSlice(bufio.go:372)
161+
bufio.(*Reader).collectFragments(bufio.go:447)
162+
bufio.(*Reader).ReadString(bufio.go:494)
163+
code.gitea.io/gitea/modules/git.ReadBatchLine(batch_reader.go:149)
164+
code.gitea.io/gitea/modules/indexer/code.(*BleveIndexer).addUpdate(bleve.go:214)
165+
code.gitea.io/gitea/modules/indexer/code.(*BleveIndexer).Index(bleve.go:296)
166+
code.gitea.io/gitea/modules/indexer/code.(*wrappedIndexer).Index(wrapped.go:74)
167+
code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105)
168+
*/
156169
if err := index(ctx, indexer, indexerData.RepoID); err != nil {
157170
if !setting.IsInTesting {
158171
log.Error("indexer index error for repo %v: %v", indexerData.RepoID, err)
159172
}
160-
if indexer.Ping() {
161-
continue
162-
}
163-
// Add back to queue
164-
unhandled = append(unhandled, datum)
165173
}
166174
}
167-
return unhandled
175+
return nil
168176
}
169177

170-
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
178+
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler)
171179
if indexerQueue == nil {
172180
log.Fatal("Unable to create codes indexer queue")
173181
}
@@ -224,18 +232,6 @@ func Init() {
224232

225233
indexer.set(rIndexer)
226234

227-
if queue, ok := indexerQueue.(queue.Pausable); ok {
228-
rIndexer.SetAvailabilityChangeCallback(func(available bool) {
229-
if !available {
230-
log.Info("Code index queue paused")
231-
queue.Pause()
232-
} else {
233-
log.Info("Code index queue resumed")
234-
queue.Resume()
235-
}
236-
})
237-
}
238-
239235
// Start processing the queue
240236
go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
241237

modules/indexer/code/wrapped.go

-10
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,6 @@ func (w *wrappedIndexer) get() (Indexer, error) {
5656
return w.internal, nil
5757
}
5858

59-
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
60-
func (w *wrappedIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
61-
indexer, err := w.get()
62-
if err != nil {
63-
log.Error("Failed to get indexer: %v", err)
64-
return
65-
}
66-
indexer.SetAvailabilityChangeCallback(callback)
67-
}
68-
6959
// Ping checks if elastic is available
7060
func (w *wrappedIndexer) Ping() bool {
7161
indexer, err := w.get()

modules/indexer/issues/bleve.go

-4
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,6 @@ func (b *BleveIndexer) Init() (bool, error) {
187187
return false, err
188188
}
189189

190-
// SetAvailabilityChangeCallback does nothing
191-
func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
192-
}
193-
194190
// Ping does nothing
195191
func (b *BleveIndexer) Ping() bool {
196192
return true

modules/indexer/issues/db.go

-4
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@ func (i *DBIndexer) Init() (bool, error) {
1818
return false, nil
1919
}
2020

21-
// SetAvailabilityChangeCallback dummy function
22-
func (i *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
23-
}
24-
2521
// Ping checks if database is available
2622
func (i *DBIndexer) Ping() bool {
2723
return db.GetEngine(db.DefaultContext).Ping() != nil

modules/indexer/issues/elastic_search.go

+5-17
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ var _ Indexer = &ElasticSearchIndexer{}
2222

2323
// ElasticSearchIndexer implements Indexer interface
2424
type ElasticSearchIndexer struct {
25-
client *elastic.Client
26-
indexerName string
27-
available bool
28-
availabilityCallback func(bool)
29-
stopTimer chan struct{}
30-
lock sync.RWMutex
25+
client *elastic.Client
26+
indexerName string
27+
available bool
28+
stopTimer chan struct{}
29+
lock sync.RWMutex
3130
}
3231

3332
type elasticLogger struct {
@@ -138,13 +137,6 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
138137
return true, nil
139138
}
140139

141-
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
142-
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
143-
b.lock.Lock()
144-
defer b.lock.Unlock()
145-
b.availabilityCallback = callback
146-
}
147-
148140
// Ping checks if elastic is available
149141
func (b *ElasticSearchIndexer) Ping() bool {
150142
b.lock.RLock()
@@ -305,8 +297,4 @@ func (b *ElasticSearchIndexer) setAvailability(available bool) {
305297
}
306298

307299
b.available = available
308-
if b.availabilityCallback != nil {
309-
// Call the callback from within the lock to ensure that the ordering remains correct
310-
b.availabilityCallback(b.available)
311-
}
312300
}

modules/indexer/issues/indexer.go

+17-52
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ type SearchResult struct {
4949
type Indexer interface {
5050
Init() (bool, error)
5151
Ping() bool
52-
SetAvailabilityChangeCallback(callback func(bool))
5352
Index(issue []*IndexerData) error
5453
Delete(ids ...int64) error
5554
Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
@@ -94,7 +93,7 @@ func (h *indexerHolder) get() Indexer {
9493

9594
var (
9695
// issueIndexerQueue queue of issue ids to be updated
97-
issueIndexerQueue queue.Queue
96+
issueIndexerQueue *queue.WorkerPoolQueue[*IndexerData]
9897
holder = newIndexerHolder()
9998
)
10099

@@ -108,62 +107,43 @@ func InitIssueIndexer(syncReindex bool) {
108107
// Create the Queue
109108
switch setting.Indexer.IssueType {
110109
case "bleve", "elasticsearch", "meilisearch":
111-
handler := func(data ...queue.Data) []queue.Data {
110+
handler := func(items ...*IndexerData) []*IndexerData {
112111
indexer := holder.get()
113112
if indexer == nil {
114-
log.Error("Issue indexer handler: unable to get indexer!")
115-
return data
113+
log.Error("Issue indexer handler: unable to get indexer.")
114+
return items
115+
}
116+
if !indexer.Ping() {
117+
log.Error("Issue indexer handler: indexer is unavailable.")
118+
return items
116119
}
117120

118-
iData := make([]*IndexerData, 0, len(data))
119-
unhandled := make([]queue.Data, 0, len(data))
120-
for _, datum := range data {
121-
indexerData, ok := datum.(*IndexerData)
122-
if !ok {
123-
log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
124-
continue
125-
}
121+
// the old logic did: if indexer.Ping() { return nil }, skip all failed items
122+
123+
toIndex := make([]*IndexerData, 0, len(items))
124+
for _, indexerData := range items {
126125
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
127126
if indexerData.IsDelete {
128127
if err := indexer.Delete(indexerData.IDs...); err != nil {
129128
log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err)
130-
if indexer.Ping() {
131-
continue
132-
}
133-
// Add back to queue
134-
unhandled = append(unhandled, datum)
135129
}
136130
continue
137131
}
138-
iData = append(iData, indexerData)
132+
toIndex = append(toIndex, indexerData)
139133
}
140-
if len(unhandled) > 0 {
141-
for _, indexerData := range iData {
142-
unhandled = append(unhandled, indexerData)
143-
}
144-
return unhandled
145-
}
146-
if err := indexer.Index(iData); err != nil {
147-
log.Error("Error whilst indexing: %v Error: %v", iData, err)
148-
if indexer.Ping() {
149-
return nil
150-
}
151-
// Add back to queue
152-
for _, indexerData := range iData {
153-
unhandled = append(unhandled, indexerData)
154-
}
155-
return unhandled
134+
if err := indexer.Index(toIndex); err != nil {
135+
log.Error("Error whilst indexing: %v Error: %v", toIndex, err)
156136
}
157137
return nil
158138
}
159139

160-
issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
140+
issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler)
161141

162142
if issueIndexerQueue == nil {
163143
log.Fatal("Unable to create issue indexer queue")
164144
}
165145
default:
166-
issueIndexerQueue = &queue.DummyQueue{}
146+
issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil)
167147
}
168148

169149
// Create the Indexer
@@ -240,18 +220,6 @@ func InitIssueIndexer(syncReindex bool) {
240220
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
241221
}
242222

243-
if queue, ok := issueIndexerQueue.(queue.Pausable); ok {
244-
holder.get().SetAvailabilityChangeCallback(func(available bool) {
245-
if !available {
246-
log.Info("Issue index queue paused")
247-
queue.Pause()
248-
} else {
249-
log.Info("Issue index queue resumed")
250-
queue.Resume()
251-
}
252-
})
253-
}
254-
255223
// Start processing the queue
256224
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
257225

@@ -285,9 +253,6 @@ func InitIssueIndexer(syncReindex bool) {
285253
case <-graceful.GetManager().IsShutdown():
286254
log.Warn("Shutdown occurred before issue index initialisation was complete")
287255
case <-time.After(timeout):
288-
if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
289-
shutdownable.Terminate()
290-
}
291256
log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
292257
}
293258
}()

0 commit comments

Comments
 (0)