Skip to content

Commit 316d4ca

Browse files
ImoutoHeavenImoutoHeavenjyxjjj
authored
feat(search): Add task queue for Meilisearch to prevent race conditions (#1423)
* Add task queue for Meilisearch to prevent race conditions - Implement TaskQueueManager for async index operations - Queue update tasks and process them in batches every 30 seconds - Check pending task status before executing new operations - Optimize batch indexing and deletion logic - Fix type assertion bug in buildSearchDocumentFromResults * fix(search): re-enqueue skipped tasks to prevent task loss When tasks are skipped due to pending dependencies, they are now re-enqueued if not already in queue. This prevents task loss while avoiding overwriting newer snapshots for the same parent. * fix(copilot-comment): Invoke Stop() & err of SliceConvert --------- Co-authored-by: ImoutoHeaven <noreply@imoutoheaven.org> Co-authored-by: jyxjjj <773933146@qq.com>
1 parent 60a489e commit 316d4ca

File tree

5 files changed

+431
-31
lines changed

5 files changed

+431
-31
lines changed

internal/search/build.go

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,19 @@ func Update(ctx context.Context, parent string, objs []model.Obj) {
215215
if !progress.IsDone {
216216
return
217217
}
218+
219+
// Use task queue for Meilisearch to avoid race conditions with async indexing
220+
if msInstance, ok := instance.(interface {
221+
EnqueueUpdate(parent string, objs []model.Obj)
222+
}); ok {
223+
// Enqueue task for async processing (diff calculation happens at consumption time)
224+
msInstance.EnqueueUpdate(parent, objs)
225+
return
226+
}
227+
228+
// For other searchers (db, bleve), execute immediately with sync logic
229+
ctx := context.Background()
230+
218231
nodes, err := instance.Get(ctx, parent)
219232
if err != nil {
220233
log.Errorf("update search index error while get nodes: %+v", err)
@@ -241,27 +254,23 @@ func Update(ctx context.Context, parent string, objs []model.Obj) {
241254
}
242255
}
243256
}
257+
// collect files and folders to add in batch
258+
var toAddObjs []ObjWithParent
244259
for i := range objs {
245260
if toAdd.Contains(objs[i].GetName()) {
246-
if !objs[i].IsDir() {
247-
log.Debugf("add index: %s", path.Join(parent, objs[i].GetName()))
248-
err = Index(ctx, parent, objs[i])
249-
if err != nil {
250-
log.Errorf("update search index error while index new node: %+v", err)
251-
return
252-
}
253-
} else {
254-
// build index if it's a folder
255-
dir := path.Join(parent, objs[i].GetName())
256-
err = BuildIndex(ctx,
257-
[]string{dir},
258-
conf.SlicesMap[conf.IgnorePaths],
259-
setting.GetInt(conf.MaxIndexDepth, 20)-strings.Count(dir, "/"), false)
260-
if err != nil {
261-
log.Errorf("update search index error while build index: %+v", err)
262-
return
263-
}
264-
}
261+
log.Debugf("add index: %s", path.Join(parent, objs[i].GetName()))
262+
toAddObjs = append(toAddObjs, ObjWithParent{
263+
Parent: parent,
264+
Obj: objs[i],
265+
})
266+
}
267+
}
268+
// batch index all files and folders at once
269+
if len(toAddObjs) > 0 {
270+
err = BatchIndex(ctx, toAddObjs)
271+
if err != nil {
272+
log.Errorf("update search index error while batch index new nodes: %+v", err)
273+
return
265274
}
266275
}
267276
}

internal/search/meilisearch/init.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ func init() {
9191
return nil, err
9292
}
9393
}
94+
95+
// Initialize and start task queue manager
96+
m.taskQueue = NewTaskQueueManager(&m)
97+
m.taskQueue.Start()
98+
9499
return &m, nil
95100
})
96101
}

internal/search/meilisearch/search.go

Lines changed: 125 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type Meilisearch struct {
3333
IndexUid string
3434
FilterableAttributes []string
3535
SearchableAttributes []string
36+
taskQueue *TaskQueueManager
3637
}
3738

3839
func (m *Meilisearch) Config() searcher.Config {
@@ -82,14 +83,17 @@ func (m *Meilisearch) Index(ctx context.Context, node model.SearchNode) error {
8283
}
8384

8485
func (m *Meilisearch) BatchIndex(ctx context.Context, nodes []model.SearchNode) error {
85-
documents, _ := utils.SliceConvert(nodes, func(src model.SearchNode) (*searchDocument, error) {
86+
documents, err := utils.SliceConvert(nodes, func(src model.SearchNode) (*searchDocument, error) {
8687
parentHash := hashPath(src.Parent)
8788
nodePath := path.Join(src.Parent, src.Name)
8889
nodePathHash := hashPath(nodePath)
8990
parentPaths := utils.GetPathHierarchy(src.Parent)
90-
parentPathHashes, _ := utils.SliceConvert(parentPaths, func(parentPath string) (string, error) {
91+
parentPathHashes, err := utils.SliceConvert(parentPaths, func(parentPath string) (string, error) {
9192
return hashPath(parentPath), nil
9293
})
94+
if err != nil {
95+
return nil, err
96+
}
9397

9498
return &searchDocument{
9599
ID: nodePathHash,
@@ -98,9 +102,12 @@ func (m *Meilisearch) BatchIndex(ctx context.Context, nodes []model.SearchNode)
98102
SearchNode: src,
99103
}, nil
100104
})
105+
if err != nil {
106+
return err
107+
}
101108

102109
// max up to 10,000 documents per batch to reduce error rate while uploading over the Internet
103-
_, err := m.Client.Index(m.IndexUid).AddDocumentsInBatchesWithContext(ctx, documents, 10000)
110+
_, err = m.Client.Index(m.IndexUid).AddDocumentsInBatchesWithContext(ctx, documents, 10000)
104111
if err != nil {
105112
return err
106113
}
@@ -203,6 +210,9 @@ func (m *Meilisearch) Del(ctx context.Context, prefix string) error {
203210
}
204211

205212
func (m *Meilisearch) Release(ctx context.Context) error {
213+
if m.taskQueue != nil {
214+
m.taskQueue.Stop()
215+
}
206216
return nil
207217
}
208218

@@ -219,3 +229,115 @@ func (m *Meilisearch) getTaskStatus(ctx context.Context, taskUID int64) (meilise
219229
}
220230
return forTask.Status, nil
221231
}
232+
233+
// EnqueueUpdate enqueues an update task to the task queue
234+
func (m *Meilisearch) EnqueueUpdate(parent string, objs []model.Obj) {
235+
if m.taskQueue == nil {
236+
return
237+
}
238+
239+
m.taskQueue.Enqueue(parent, objs)
240+
}
241+
242+
// batchIndexWithTaskUID indexes documents and returns all taskUIDs
243+
func (m *Meilisearch) batchIndexWithTaskUID(ctx context.Context, nodes []model.SearchNode) ([]int64, error) {
244+
if len(nodes) == 0 {
245+
return nil, nil
246+
}
247+
248+
documents, err := utils.SliceConvert(nodes, func(src model.SearchNode) (*searchDocument, error) {
249+
parentHash := hashPath(src.Parent)
250+
nodePath := path.Join(src.Parent, src.Name)
251+
nodePathHash := hashPath(nodePath)
252+
parentPaths := utils.GetPathHierarchy(src.Parent)
253+
parentPathHashes, err := utils.SliceConvert(parentPaths, func(parentPath string) (string, error) {
254+
return hashPath(parentPath), nil
255+
})
256+
if err != nil {
257+
return nil, err
258+
}
259+
260+
return &searchDocument{
261+
ID: nodePathHash,
262+
ParentHash: parentHash,
263+
ParentPathHashes: parentPathHashes,
264+
SearchNode: src,
265+
}, nil
266+
})
267+
if err != nil {
268+
return nil, err
269+
}
270+
271+
// max up to 10,000 documents per batch to reduce error rate while uploading over the Internet
272+
tasks, err := m.Client.Index(m.IndexUid).AddDocumentsInBatchesWithContext(ctx, documents, 10000)
273+
if err != nil {
274+
return nil, err
275+
}
276+
277+
// Return all task UIDs
278+
taskUIDs := make([]int64, 0, len(tasks))
279+
for _, task := range tasks {
280+
taskUIDs = append(taskUIDs, task.TaskUID)
281+
}
282+
return taskUIDs, nil
283+
}
284+
285+
// batchDeleteWithTaskUID deletes documents and returns all taskUIDs
286+
func (m *Meilisearch) batchDeleteWithTaskUID(ctx context.Context, paths []string) ([]int64, error) {
287+
if len(paths) == 0 {
288+
return nil, nil
289+
}
290+
291+
// Deduplicate paths first
292+
pathSet := make(map[string]struct{})
293+
uniquePaths := make([]string, 0, len(paths))
294+
for _, p := range paths {
295+
p = utils.FixAndCleanPath(p)
296+
if _, exists := pathSet[p]; !exists {
297+
pathSet[p] = struct{}{}
298+
uniquePaths = append(uniquePaths, p)
299+
}
300+
}
301+
302+
const batchSize = 100 // max paths per batch to avoid filter length limits
303+
var taskUIDs []int64
304+
305+
// Process in batches to avoid filter length limits
306+
for i := 0; i < len(uniquePaths); i += batchSize {
307+
end := i + batchSize
308+
if end > len(uniquePaths) {
309+
end = len(uniquePaths)
310+
}
311+
batch := uniquePaths[i:end]
312+
313+
// Build combined filter to delete all children in one request
314+
// Format: parent_path_hashes = 'hash1' OR parent_path_hashes = 'hash2' OR ...
315+
var filters []string
316+
for _, p := range batch {
317+
pathHash := hashPath(p)
318+
filters = append(filters, fmt.Sprintf("parent_path_hashes = '%s'", pathHash))
319+
}
320+
if len(filters) > 0 {
321+
combinedFilter := strings.Join(filters, " OR ")
322+
// Delete all children for all paths in one request
323+
task, err := m.Client.Index(m.IndexUid).DeleteDocumentsByFilterWithContext(ctx, combinedFilter)
324+
if err != nil {
325+
return nil, err
326+
}
327+
taskUIDs = append(taskUIDs, task.TaskUID)
328+
}
329+
330+
// Convert paths to document IDs and batch delete
331+
documentIDs := make([]string, 0, len(batch))
332+
for _, p := range batch {
333+
documentIDs = append(documentIDs, hashPath(p))
334+
}
335+
// Use batch delete API
336+
task, err := m.Client.Index(m.IndexUid).DeleteDocumentsWithContext(ctx, documentIDs)
337+
if err != nil {
338+
return nil, err
339+
}
340+
taskUIDs = append(taskUIDs, task.TaskUID)
341+
}
342+
return taskUIDs, nil
343+
}

0 commit comments

Comments
 (0)