Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions internal/search/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@
if !progress.IsDone {
return
}

// Use task queue for Meilisearch to avoid race conditions with async indexing
if msInstance, ok := instance.(interface {
EnqueueUpdate(parent string, objs []model.Obj)
}); ok {
// Enqueue task for async processing (diff calculation happens at consumption time)
msInstance.EnqueueUpdate(parent, objs)
return
}

// For other searchers (db, bleve), execute immediately with sync logic
ctx := context.Background()

Check failure on line 229 in internal/search/build.go

View workflow job for this annotation

GitHub Actions / Build windows-arm64

no new variables on left side of :=

Check failure on line 229 in internal/search/build.go

View workflow job for this annotation

GitHub Actions / Build Binaries for Docker Release (Beta)

no new variables on left side of :=

nodes, err := instance.Get(ctx, parent)
if err != nil {
log.Errorf("update search index error while get nodes: %+v", err)
Expand All @@ -241,27 +254,23 @@
}
}
}
// collect files and folders to add in batch
var toAddObjs []ObjWithParent
for i := range objs {
if toAdd.Contains(objs[i].GetName()) {
if !objs[i].IsDir() {
log.Debugf("add index: %s", path.Join(parent, objs[i].GetName()))
err = Index(ctx, parent, objs[i])
if err != nil {
log.Errorf("update search index error while index new node: %+v", err)
return
}
} else {
// build index if it's a folder
dir := path.Join(parent, objs[i].GetName())
err = BuildIndex(ctx,
[]string{dir},
conf.SlicesMap[conf.IgnorePaths],
setting.GetInt(conf.MaxIndexDepth, 20)-strings.Count(dir, "/"), false)
if err != nil {
log.Errorf("update search index error while build index: %+v", err)
return
}
}
log.Debugf("add index: %s", path.Join(parent, objs[i].GetName()))
toAddObjs = append(toAddObjs, ObjWithParent{
Parent: parent,
Obj: objs[i],
})
}
}
// batch index all files and folders at once
if len(toAddObjs) > 0 {
err = BatchIndex(ctx, toAddObjs)
if err != nil {
log.Errorf("update search index error while batch index new nodes: %+v", err)
return
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions internal/search/meilisearch/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func init() {
return nil, err
}
}

// Initialize and start task queue manager
m.taskQueue = NewTaskQueueManager(&m)
m.taskQueue.Start()

return &m, nil
})
}
128 changes: 125 additions & 3 deletions internal/search/meilisearch/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Meilisearch struct {
IndexUid string
FilterableAttributes []string
SearchableAttributes []string
taskQueue *TaskQueueManager
}

func (m *Meilisearch) Config() searcher.Config {
Expand Down Expand Up @@ -82,14 +83,17 @@ func (m *Meilisearch) Index(ctx context.Context, node model.SearchNode) error {
}

func (m *Meilisearch) BatchIndex(ctx context.Context, nodes []model.SearchNode) error {
documents, _ := utils.SliceConvert(nodes, func(src model.SearchNode) (*searchDocument, error) {
documents, err := utils.SliceConvert(nodes, func(src model.SearchNode) (*searchDocument, error) {
parentHash := hashPath(src.Parent)
nodePath := path.Join(src.Parent, src.Name)
nodePathHash := hashPath(nodePath)
parentPaths := utils.GetPathHierarchy(src.Parent)
parentPathHashes, _ := utils.SliceConvert(parentPaths, func(parentPath string) (string, error) {
parentPathHashes, err := utils.SliceConvert(parentPaths, func(parentPath string) (string, error) {
return hashPath(parentPath), nil
})
if err != nil {
return nil, err
}

return &searchDocument{
ID: nodePathHash,
Expand All @@ -98,9 +102,12 @@ func (m *Meilisearch) BatchIndex(ctx context.Context, nodes []model.SearchNode)
SearchNode: src,
}, nil
})
if err != nil {
return err
}

// max up to 10,000 documents per batch to reduce error rate while uploading over the Internet
_, err := m.Client.Index(m.IndexUid).AddDocumentsInBatchesWithContext(ctx, documents, 10000)
_, err = m.Client.Index(m.IndexUid).AddDocumentsInBatchesWithContext(ctx, documents, 10000)
if err != nil {
return err
}
Expand Down Expand Up @@ -203,6 +210,9 @@ func (m *Meilisearch) Del(ctx context.Context, prefix string) error {
}

func (m *Meilisearch) Release(ctx context.Context) error {
if m.taskQueue != nil {
m.taskQueue.Stop()
}
return nil
}

Expand All @@ -219,3 +229,115 @@ func (m *Meilisearch) getTaskStatus(ctx context.Context, taskUID int64) (meilise
}
return forTask.Status, nil
}

// EnqueueUpdate enqueues an update task to the task queue
func (m *Meilisearch) EnqueueUpdate(parent string, objs []model.Obj) {
if m.taskQueue == nil {
return
}

m.taskQueue.Enqueue(parent, objs)
}

// batchIndexWithTaskUID indexes documents and returns all taskUIDs
func (m *Meilisearch) batchIndexWithTaskUID(ctx context.Context, nodes []model.SearchNode) ([]int64, error) {
if len(nodes) == 0 {
return nil, nil
}

documents, err := utils.SliceConvert(nodes, func(src model.SearchNode) (*searchDocument, error) {
parentHash := hashPath(src.Parent)
nodePath := path.Join(src.Parent, src.Name)
nodePathHash := hashPath(nodePath)
parentPaths := utils.GetPathHierarchy(src.Parent)
parentPathHashes, err := utils.SliceConvert(parentPaths, func(parentPath string) (string, error) {
return hashPath(parentPath), nil
})
if err != nil {
return nil, err
}

return &searchDocument{
ID: nodePathHash,
ParentHash: parentHash,
ParentPathHashes: parentPathHashes,
SearchNode: src,
}, nil
})
if err != nil {
return nil, err
}

// max up to 10,000 documents per batch to reduce error rate while uploading over the Internet
tasks, err := m.Client.Index(m.IndexUid).AddDocumentsInBatchesWithContext(ctx, documents, 10000)
if err != nil {
return nil, err
}

// Return all task UIDs
taskUIDs := make([]int64, 0, len(tasks))
for _, task := range tasks {
taskUIDs = append(taskUIDs, task.TaskUID)
}
return taskUIDs, nil
}

// batchDeleteWithTaskUID deletes documents and returns all taskUIDs
func (m *Meilisearch) batchDeleteWithTaskUID(ctx context.Context, paths []string) ([]int64, error) {
if len(paths) == 0 {
return nil, nil
}

// Deduplicate paths first
pathSet := make(map[string]struct{})
uniquePaths := make([]string, 0, len(paths))
for _, p := range paths {
p = utils.FixAndCleanPath(p)
if _, exists := pathSet[p]; !exists {
pathSet[p] = struct{}{}
uniquePaths = append(uniquePaths, p)
}
}

const batchSize = 100 // max paths per batch to avoid filter length limits
var taskUIDs []int64

// Process in batches to avoid filter length limits
for i := 0; i < len(uniquePaths); i += batchSize {
end := i + batchSize
if end > len(uniquePaths) {
end = len(uniquePaths)
}
batch := uniquePaths[i:end]

// Build combined filter to delete all children in one request
// Format: parent_path_hashes = 'hash1' OR parent_path_hashes = 'hash2' OR ...
var filters []string
for _, p := range batch {
pathHash := hashPath(p)
filters = append(filters, fmt.Sprintf("parent_path_hashes = '%s'", pathHash))
}
if len(filters) > 0 {
combinedFilter := strings.Join(filters, " OR ")
// Delete all children for all paths in one request
task, err := m.Client.Index(m.IndexUid).DeleteDocumentsByFilterWithContext(ctx, combinedFilter)
if err != nil {
return nil, err
}
taskUIDs = append(taskUIDs, task.TaskUID)
}

// Convert paths to document IDs and batch delete
documentIDs := make([]string, 0, len(batch))
for _, p := range batch {
documentIDs = append(documentIDs, hashPath(p))
}
// Use batch delete API
task, err := m.Client.Index(m.IndexUid).DeleteDocumentsWithContext(ctx, documentIDs)
if err != nil {
return nil, err
}
taskUIDs = append(taskUIDs, task.TaskUID)
}
return taskUIDs, nil
}
Loading
Loading