Skip to content

Commit

Permalink
iteration over merger performance
Browse files Browse the repository at this point in the history
  • Loading branch information
lezhnev74 committed Nov 26, 2023
1 parent 3cdb72f commit de449e5
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 67 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/rivo/uniseg v0.4.4 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
85 changes: 55 additions & 30 deletions multiple/multiple_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/lezhnev74/inverted_index/single"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"log"
"math/rand"
"os"
Expand Down Expand Up @@ -128,32 +129,37 @@ func (m *DirectoryIndexMerger[T]) checkMerge() ([]*indexFile, error) {
}

func (m *DirectoryIndexMerger[T]) mergeFiles(dstFile string, srcFiles []string) (int64, error) {
// copy all terms to memory and release file pointers.
allTerms := make([]string, 0)

// below code uses one file descriptor at a time:
// Open all indexes in advance. Schedule clean-up.
fileIndexes := make(map[string]single.InvertedIndexReader[T])
defer func() {
for _, ii := range fileIndexes {
ii.Close()
}
}()
for _, f := range srcFiles {
r, err := single.OpenInvertedIndex(f, m.indexDirectory.unserializeValues)
if err != nil {
return 0, fmt.Errorf("open file %s: %w", f, err)
}
fileIndexes[f] = r
}

// copy all terms to memory and release file pointers.
allTerms := make([]string, 0)

it, err := r.ReadTerms()
// below code uses one file descriptor at a time:
for _, f := range srcFiles {
it, err := fileIndexes[f].ReadTerms()
if err != nil {
return 0, fmt.Errorf("read terms %s: %w", f, err)
}

allTerms = append(allTerms, go_iterators.ToSlice(it)...)

err = r.Close()
if err != nil {
return 0, fmt.Errorf("read terms %s: %w", f, err)
}
}

// sort all the terms
slices.Sort(allTerms)
allTerms = uniqueOnly(allTerms)
allTerms = sortUnique(allTerms)

// prepare the new index file:
dstIndex, err := single.NewInvertedIndexUnit(dstFile, m.indexDirectory.segmentSize, m.indexDirectory.serializeValues, m.indexDirectory.unserializeValues)
Expand All @@ -162,28 +168,41 @@ func (m *DirectoryIndexMerger[T]) mergeFiles(dstFile string, srcFiles []string)
}

// for each term, read all the values and push to the new index:
for _, term := range allTerms {
// get all values for this term from all indexes
termValues := make([]T, 0)
for _, srcFile := range srcFiles {
r, err := single.OpenInvertedIndex(srcFile, m.indexDirectory.unserializeValues)
if err != nil {
return 0, fmt.Errorf("open file %s: %w", srcFile, err)
// for faster processing, we start N workers here to handle all the terms
termValuesMap := make(map[string][]T, len(allTerms))
termValuesMapLock := sync.Mutex{}

var wg errgroup.Group
for _, r := range fileIndexes {
r := r
wg.Go(func() error {
fileTermValues := make(map[string][]T)
for _, term := range allTerms {
vIt, err := r.ReadAllValues([]string{term})
if err != nil {
return fmt.Errorf("read term values: %w", err)
}

fileTermValues[term] = go_iterators.ToSlice(vIt)
}

vIt, err := r.ReadAllValues([]string{term})
if err != nil {
return 0, fmt.Errorf("read term values: %w", err)
termValuesMapLock.Lock()
for term, values := range fileTermValues {
termValuesMap[term] = append(termValuesMap[term], values...)
}
termValuesMapLock.Unlock()

termValues = append(termValues, go_iterators.ToSlice(vIt)...)
return nil
})
}

err = r.Close()
if err != nil {
return 0, fmt.Errorf("read term values: %w", err)
}
}
err = wg.Wait()
if err != nil {
return 0, err
}

for _, term := range allTerms {
termValues := termValuesMap[term]
err = dstIndex.Put(term, termValues)
if err != nil {
return 0, fmt.Errorf("put to new index: %w", err)
Expand Down Expand Up @@ -334,7 +353,8 @@ func (d *IndexDirectory[T]) discover() error {
}

d.currentList.safeWrite(func() {
d.currentList.putFile(e.Name(), inf.Size())
filePath := path.Join(d.directoryPath, e.Name())
d.currentList.putFile(filePath, inf.Size())
})
}

Expand Down Expand Up @@ -466,10 +486,15 @@ func checkPermissions(path string) error {
return nil
}

func uniqueOnly[T comparable](values []T) []T {
// sortUnique sorts and removes duplicates from the slice.
func sortUnique[T constraints.Ordered](values []T) []T {

slices.Sort(values)

// since the slice is sorted, we can only compare with the next element
x := 0
for i := 0; i < len(values); i++ {
if !slices.Contains(values[i+1:], values[i]) {
if i == len(values)-1 || values[i] != values[i+1] {
values[x] = values[i]
x++
}
Expand Down
27 changes: 26 additions & 1 deletion multiple/multiple_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import (
"time"
)

func TestUniqueOnly(t *testing.T) {
s := []string{"B", "C", "A", "C", "C"}
require.Equal(t, []string{"A", "B", "C"}, sortUnique(s))
}

func TestReadWrite(t *testing.T) {
type test struct {
name string
Expand Down Expand Up @@ -188,6 +193,26 @@ func TestItDiscoversFiles(t *testing.T) {
require.Len(t, dirIndex2.currentList.files, 2)
}

//
// func TestMergeExternal(t *testing.T) {
// dirPath := "/home/dmitry/Code/go/src/heaplog2/local/ii2/3801863248"
// dirIndex, err := NewIndexDirectory[uint64](
// dirPath,
// 1000,
// single.CompressUint64,
// single.DecompressUint64,
// )
// require.NoError(t, err)
//
// m, err := dirIndex.NewMerger(2, 2)
// require.NoError(t, err)
//
// files, err := m.Merge()
// require.NoError(t, err)
//
// log.Printf("%v", files)
// }

func TestCheckMerge(t *testing.T) {

prepared := []map[string][]uint32{
Expand Down Expand Up @@ -390,7 +415,7 @@ func TestStressConcurrency(t *testing.T) {
expectedTerms = append(expectedTerms, maps.Keys(payload)...)
}
slices.Sort(expectedTerms)
expectedTerms = uniqueOnly(expectedTerms)
expectedTerms = sortUnique(expectedTerms)

for i := 0; i < N; i++ {
wg.Add(1)
Expand Down
97 changes: 61 additions & 36 deletions single/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type InvertedIndex[V cmp.Ordered] struct {
fst *vellum.FST
fstBuf *bytes.Buffer
fstOffset, indexOffset int64
// bitmaps cache stores all bitmaps indexed by file offsets
bitmaps map[int64]*roaring.Bitmap
}

type InvertedIndexWriter[V constraints.Ordered] interface {
Expand Down Expand Up @@ -145,7 +147,7 @@ func (i *InvertedIndex[V]) ReadValues(terms []string, minVal V, maxVal V) (go_it
// Read all terms bitmaps and join into one.
// Use it to select segments for reading.

bs, err := i.readBitmaps(terms)
bs, err := i.readTermsBitmaps(terms)
if err != nil {
return nil, fmt.Errorf("bitmaps: %w", err)
}
Expand Down Expand Up @@ -549,8 +551,8 @@ func (i *InvertedIndex[V]) readFooter() (
return
}

func (i *InvertedIndex[V]) readBitmaps(terms []string) ([]*roaring.Bitmap, error) {

// readTermsBitmaps returns all bitmaps associated with the given terms
func (i *InvertedIndex[V]) readTermsBitmaps(terms []string) ([]*roaring.Bitmap, error) {
slices.Sort(terms)

fstIt, err := i.fst.Iterator([]byte(terms[0]), nil)
Expand All @@ -561,13 +563,11 @@ func (i *InvertedIndex[V]) readBitmaps(terms []string) ([]*roaring.Bitmap, error
}

var (
offset, nextOffset uint64
bitmapLen int
existingTerm []byte
offset uint64
existingTerm []byte
)

bitmaps := make([]*roaring.Bitmap, 0, len(terms))
buf := make([]byte, 4096)

for _, term := range terms {

Expand All @@ -587,37 +587,9 @@ func (i *InvertedIndex[V]) readBitmaps(terms []string) ([]*roaring.Bitmap, error
err = fstIt.Next()
if err != nil && !errors.Is(err, vellum.ErrIteratorDone) {
return nil, fmt.Errorf("fst read: %w", err)
} else if errors.Is(err, vellum.ErrIteratorDone) {
nextOffset = uint64(i.fstOffset) // the last bitmap abuts FST
} else {
_, nextOffset = fstIt.Current()
}

// read the bitmap
_, err = i.file.Seek(int64(offset), io.SeekStart)
if err != nil {
return nil, fmt.Errorf("seek bitmap: %w", err)
}

bitmapLen = int(nextOffset - offset)
if cap(buf) >= bitmapLen {
buf = buf[:bitmapLen]
} else {
buf = make([]byte, bitmapLen)
}

// _, err = i.file.Read(buf)
// if err != nil {
// return nil, fmt.Errorf("read bitmap: %w", err)
// }

b := roaring.New()
_, err = b.ReadFrom(i.file)

if err != nil {
return nil, fmt.Errorf("parse bitmap: %w", err)
}
bitmaps = append(bitmaps, b)
bitmaps = append(bitmaps, i.bitmaps[int64(offset)])
}

return bitmaps, nil
Expand Down Expand Up @@ -789,6 +761,52 @@ func (i *InvertedIndex[V]) makeSegmentsFetchFunc(
return makeFetchFunc(), nil
}

// readBitmaps populates internal cache with all bitmaps in the file
func (i *InvertedIndex[V]) readBitmaps() error {

if i.bitmaps != nil { // already cached
return nil
}
i.bitmaps = make(map[int64]*roaring.Bitmap)

mTerm, _ := i.fst.GetMinKey()
fileBitmapsOffset, _, _ := i.fst.Get(mTerm)
bitmapsLen := i.fstOffset - int64(fileBitmapsOffset)

// Load all bitmaps bytes into memory
_, err := i.file.Seek(int64(fileBitmapsOffset), io.SeekStart)
if err != nil {
return err
}

bitmapsMem := make([]byte, bitmapsLen)
_, err = i.file.Read(bitmapsMem)
if err != nil {
return err
}

// now extract bitmaps for each term in FST
it, err := i.fst.Iterator(nil, nil)
if err != nil {
return err
}
for err == nil {
_, termBitmapOffset := it.Current()
termBitmapBufferOffset := termBitmapOffset - fileBitmapsOffset // adapt to buffer boundaries

termBitmap := roaring.New()
_, err = termBitmap.ReadFrom(bytes.NewBuffer(bitmapsMem[termBitmapBufferOffset:]))
if err != nil {
return fmt.Errorf("parse bitmap: %w", err)
}
i.bitmaps[int64(termBitmapOffset)] = termBitmap

err = it.Next()
}

return nil
}

func NewInvertedIndexUnit[V constraints.Ordered](
filename string,
segmentSize uint32,
Expand Down Expand Up @@ -848,5 +866,12 @@ func OpenInvertedIndex[V constraints.Ordered](
i.minVal = minValue
i.maxVal = maxValue

// to reduce seeks and reads, it is a good idea to load all bitmaps into memory once,
// then use that cache for querying bitmaps.
err = i.readBitmaps()
if err != nil {
return nil, err
}

return i, nil
}
23 changes: 23 additions & 0 deletions single/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,3 +463,26 @@ func TestHugeFile(t *testing.T) {
// show summary
PrintSummary(filename, os.Stdout)
}

func TestBitmapsCachePreload(t *testing.T) {
dirPath, err := os.MkdirTemp("", "")
require.NoError(t, err)
filename := filepath.Join(dirPath, "index")

// 1. Make a new index (open in writer mode), put values and close.
indexWriter, err := NewInvertedIndexUnit[int](filename, 1, CompressGob[int], DecompressGob[int])
require.NoError(t, err)
require.NoError(t, indexWriter.Put("term1", []int{10, 20})) // <-- two segments will be written (len=1)
err = indexWriter.Close()
require.NoError(t, err)

// 2. Open the index in a reader-mode
indexReader, err := OpenInvertedIndex[int](filename, DecompressGob[int])
require.NoError(t, err)
it, err := indexReader.ReadValues([]string{"term1"}, 0, 100)
require.NoError(t, err)
require.NoError(t, it.Close())

_, err = indexReader.(*InvertedIndex[int]).file.Seek(0, io.SeekStart)
require.ErrorIs(t, err, os.ErrClosed)
}

0 comments on commit de449e5

Please sign in to comment.