Skip to content

Commit

Permalink
fix corrupted buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
lezhnev74 committed Nov 25, 2023
1 parent ff7c48a commit f7f8fe8
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 18 deletions.
16 changes: 7 additions & 9 deletions multiple/files_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,14 @@ func (f *filesList) safeWrite(fn func()) {
}

func (f *filesList) removeFiles(files []*indexFile) {
f.safeWrite(func() {
x := 0
for _, existingFile := range f.files {
if !slices.Contains(files, existingFile) {
f.files[x] = existingFile
x++
}
x := 0
for _, existingFile := range f.files {
if !slices.Contains(files, existingFile) {
f.files[x] = existingFile
x++
}
f.files = f.files[:x]
})
}
f.files = f.files[:x]
}
func (f *filesList) putFile(path string, fileSize int64) {

Expand Down
14 changes: 7 additions & 7 deletions multiple/multiple_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type DirectoryIndexMerger[T constraints.Ordered] struct {
func (m *DirectoryIndexMerger[T]) Merge() (files []*indexFile, err error) {
n := time.Now()

files, err = m.CheckMerge()
files, err = m.checkMerge()
if err != nil {
return nil, fmt.Errorf("merging failed to start: %w", err)
}
Expand All @@ -61,19 +61,17 @@ func (m *DirectoryIndexMerger[T]) Merge() (files []*indexFile, err error) {
}

mergedFilename := m.indexDirectory.selectFilename()

mergedFileLen, err := m.mergeFiles(mergedFilename, mergePaths)
if err != nil {
return nil, fmt.Errorf("merging failed: %w", err)
}

m.indexDirectory.currentList.safeWrite(func() {
m.indexDirectory.currentList.putFile(mergedFilename, mergedFileLen)
// move the merged files to another files list for removal
m.indexDirectory.currentList.removeFiles(files)
})

// move the merged files to another files list for removal
m.indexDirectory.currentList.removeFiles(files)

m.mergedList.safeWrite(func() {
for _, f := range files {
m.mergedList.putFileP(f)
Expand All @@ -91,8 +89,8 @@ func (m *DirectoryIndexMerger[T]) Merge() (files []*indexFile, err error) {
return files, nil
}

// CheckMerge marks and returns files for merging.
func (m *DirectoryIndexMerger[T]) CheckMerge() ([]*indexFile, error) {
// checkMerge marks and returns files for merging.
func (m *DirectoryIndexMerger[T]) checkMerge() ([]*indexFile, error) {

// do not merge less than minMerge files
mergeBatch := make([]*indexFile, 0, m.maxFiles)
Expand Down Expand Up @@ -197,6 +195,7 @@ func (m *DirectoryIndexMerger[T]) mergeFiles(dstFile string, srcFiles []string)
return size, err
}

// Cleanup removes files that are merged. Synchronous.
func (m *DirectoryIndexMerger[T]) Cleanup() (err error) {
m.mergedList.safeWrite(func() {
removedFiles := make([]*indexFile, 0)
Expand Down Expand Up @@ -301,6 +300,7 @@ func (d *IndexDirectory[T]) NewReader() (*IndexDirectoryReader[T], error) {
return r, nil
}

// NewMerger creates a merging service that merges between min,max files in one pass.
func (d *IndexDirectory[T]) NewMerger(min, max int) (*DirectoryIndexMerger[T], error) {
if min > max || min < 2 {
return nil, fmt.Errorf("invalid min/max")
Expand Down
6 changes: 4 additions & 2 deletions single/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/vellum"
"github.com/lezhnev74/go-iterators"

"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"
"io"
Expand Down Expand Up @@ -306,7 +305,10 @@ func (i *InvertedIndex[V]) writeAllValues(values []V) (valuesIndexOffset int64,
func (i *InvertedIndex[V]) writeTermsBitmapsAndUpdateFST(bitmaps []*roaring.Bitmap) error {

// use our existing fst to iterate through terms in the same order as they were ingested
fst, err := vellum.Load(i.fstBuf.Bytes())
fstBuf := make([]byte, i.fstBuf.Len())
copy(fstBuf, i.fstBuf.Bytes()) // copy since the buf will be reused later

fst, err := vellum.Load(fstBuf)
if err != nil {
return fmt.Errorf("fst read failed: %w", err)
}
Expand Down
35 changes: 35 additions & 0 deletions single/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,41 @@ func TestAPI(t *testing.T) {
timestamps := go_iterators.ToSlice(valuesIterator)
require.EqualValues(t, []int{4, 6, 8, 10, 12}, timestamps)
},
}, {
name: "mixed languages",
segmentSize: 1000,
prepare: func(w InvertedIndexWriter[int]) {
terms := []string{`التقديم`, `חתונה`, `бесплатно`, `zx9uyv`}
slices.Sort(terms)
for _, term := range terms {
require.NoError(t, w.Put(term, []int{1}))
}
},
assert: func(r InvertedIndexReader[int]) {
valuesIterator, err := r.ReadValues([]string{"бесплатно"}, 0, 999)
require.NoError(t, err)
timestamps := go_iterators.ToSlice(valuesIterator)
require.EqualValues(t, []int{1}, timestamps)
},
}, {
name: "big chunk",
segmentSize: 1000,
prepare: func(w InvertedIndexWriter[int]) {
terms := []string{"term1"}
for i := 0; i < 1000; i++ {
terms = append(terms, fmt.Sprintf("%d", rand.Uint64()))
}
slices.Sort(terms)
for _, term := range terms {
require.NoError(t, w.Put(term, []int{1}))
}
},
assert: func(r InvertedIndexReader[int]) {
valuesIterator, err := r.ReadValues([]string{"term1"}, 0, 999)
require.NoError(t, err)
timestamps := go_iterators.ToSlice(valuesIterator)
require.EqualValues(t, []int{1}, timestamps)
},
},
}

Expand Down

0 comments on commit f7f8fe8

Please sign in to comment.