Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MB-64883: Adapt interface change for VectorIndex #302

Open
wants to merge 6 commits into
base: v16-trinity-couchbase
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 88 additions & 49 deletions faiss_vector_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,35 @@ func (vc *vectorIndexCache) Clear() {
vc.m.Unlock()
}

// loadDocVecIDMap indicates if a non-nil docVecIDMap should be returned.
// loadPreFilteringConstructs indicates if pre-filtering constructs(docVecIDMap
// and clusterAssignment) should be returned.
// It is true when a filtered kNN query accesses the cache since it requires the
// map. It's false otherwise.
func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte,
loadDocVecIDMap bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64,
vecIDsToExclude []int64, err error) {
index, vecDocIDMap, docVecIDMap, vecIDsToExclude, err = vc.loadFromCache(
fieldID, loadDocVecIDMap, mem, except)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, err
loadPreFilteringConstructs bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, clusterAssignment map[int64]*roaring.Bitmap,
vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]uint32, vecIDsToExclude []int64, err error) {
index, clusterAssignment, vecDocIDMap, docVecIDMap, vecIDsToExclude, err = vc.loadFromCache(
fieldID, loadPreFilteringConstructs, mem, except)
return index, clusterAssignment, vecDocIDMap, docVecIDMap, vecIDsToExclude, err
}

// function to load the vectorDocIDMap and if required, docVecIDMap from cache
// If not, it will create these and add them to the cache.
func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap bool,
mem []byte, except *roaring.Bitmap) (index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) {
func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadPreFilteringConstructs bool,
mem []byte, except *roaring.Bitmap) (index *faiss.IndexImpl, clusterAssignment map[int64]*roaring.Bitmap,
vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]uint32,
vecIDsToExclude []int64, err error) {

vc.m.RLock()

entry, ok := vc.cache[fieldID]
if ok {
index, vecDocIDMap, docVecIDMap = entry.load()
index, clusterAssignment, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) {
if !loadPreFilteringConstructs || (loadPreFilteringConstructs && len(entry.docVecIDMap) > 0) {
vc.m.RUnlock()
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
return index, clusterAssignment, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

vc.m.RUnlock()
Expand All @@ -88,26 +90,26 @@ func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap bool,
// typically seen for the first filtered query.
docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry)
vc.m.Unlock()
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
return index, clusterAssignment, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

vc.m.RUnlock()
// acquiring a lock since this is modifying the cache.
vc.m.Lock()
defer vc.m.Unlock()
return vc.createAndCacheLOCKED(fieldID, mem, loadDocVecIDMap, except)
return vc.createAndCacheLOCKED(fieldID, mem, loadPreFilteringConstructs, except)
}

func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint32][]int64 {
func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint32][]uint32 {
// Handle concurrent accesses (to avoid unnecessary work) by adding a
// check within the write lock here.
if ce.docVecIDMap != nil {
return ce.docVecIDMap
}

docVecIDMap := make(map[uint32][]int64)
docVecIDMap := make(map[uint32][]uint32)
for vecID, docID := range ce.vecDocIDMap {
docVecIDMap[docID] = append(docVecIDMap[docID], vecID)
docVecIDMap[docID] = append(docVecIDMap[docID], uint32(vecID))
}

ce.docVecIDMap = docVecIDMap
Expand All @@ -116,21 +118,22 @@ func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint

// Rebuilding the cache on a miss.
func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,
loadDocVecIDMap bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) {
loadPreFilteringConstructs bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, centroidVecIDMap map[int64]*roaring.Bitmap,
vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]uint32,
metonymic-smokey marked this conversation as resolved.
Show resolved Hide resolved
vecIDsToExclude []int64, err error) {

// Handle concurrent accesses (to avoid unnecessary work) by adding a
// check within the write lock here.
entry := vc.cache[fieldID]
if entry != nil {
index, vecDocIDMap, docVecIDMap = entry.load()
index, centroidVecIDMap, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) {
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
if !loadPreFilteringConstructs || (loadPreFilteringConstructs && len(entry.docVecIDMap) > 0) {
return index, centroidVecIDMap, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}
docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
return index, centroidVecIDMap, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

// if the cache doesn't have the entry, construct the vector to doc id map and
Expand All @@ -140,8 +143,9 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,
pos += n

vecDocIDMap = make(map[int64]uint32, numVecs)
if loadDocVecIDMap {
docVecIDMap = make(map[uint32][]int64, numVecs)
if loadPreFilteringConstructs {
docVecIDMap = make(map[uint32][]uint32, numVecs)
centroidVecIDMap = make(map[int64]*roaring.Bitmap)
}
isExceptNotEmpty := except != nil && !except.IsEmpty()
for i := 0; i < int(numVecs); i++ {
Expand All @@ -156,8 +160,8 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,
continue
}
vecDocIDMap[vecID] = docIDUint32
if loadDocVecIDMap {
docVecIDMap[docIDUint32] = append(docVecIDMap[docIDUint32], vecID)
if loadPreFilteringConstructs {
docVecIDMap[docIDUint32] = append(docVecIDMap[docIDUint32], uint32(vecID))
}
}

Expand All @@ -166,16 +170,37 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,

index, err = faiss.ReadIndexFromBuffer(mem[pos:pos+int(indexSize)], faissIOFlags)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}

vc.insertLOCKED(fieldID, index, vecDocIDMap, loadDocVecIDMap, docVecIDMap)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
if loadPreFilteringConstructs {
clusterAssignment, _ := index.ObtainClusterToVecIDsFromIVFIndex()
for centroidID, vecIDs := range clusterAssignment {
if _, exists := centroidVecIDMap[centroidID]; !exists {
centroidVecIDMap[centroidID] = roaring.NewBitmap()
}
vecIDsUint32 := make([]uint32, len(vecIDs))
for i, vecID := range vecIDs {
vecIDsUint32[i] = uint32(vecID)
}
centroidVecIDMap[centroidID].AddMany(vecIDsUint32)
}
}

cacheEntryStub := &cacheEntryReqs{
index: index,
vecDocIDMap: vecDocIDMap,
}
if loadPreFilteringConstructs {
cacheEntryStub.loadPreFilteringConstructs = loadPreFilteringConstructs
cacheEntryStub.docVecIDMap = docVecIDMap
cacheEntryStub.clusterAssignment = centroidVecIDMap
}
vc.insertLOCKED(fieldID, cacheEntryStub)
return index, centroidVecIDMap, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16,
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, loadDocVecIDMap bool,
docVecIDMap map[uint32][]int64) {
func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, stub *cacheEntryReqs) {
// the first time we've hit the cache, try to spawn a monitoring routine
// which will reconcile the moving averages for all the fields being hit
if len(vc.cache) == 0 {
Expand All @@ -189,8 +214,9 @@ func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16,
// this makes the average to be kept above the threshold value for a
// longer time and thereby the index to be resident in the cache
// for longer time.
vc.cache[fieldIDPlus1] = createCacheEntry(index, vecDocIDMap,
loadDocVecIDMap, docVecIDMap, 0.4)
stub.alpha = 0.4

vc.cache[fieldIDPlus1] = createCacheEntry(stub)
}
}

Expand Down Expand Up @@ -283,19 +309,30 @@ func (e *ewma) add(val uint64) {

// -----------------------------------------------------------------------------

func createCacheEntry(index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
loadDocVecIDMap bool, docVecIDMap map[uint32][]int64, alpha float64) *cacheEntry {
// required info to create a cache entry.
type cacheEntryReqs struct {
alpha float64
index *faiss.IndexImpl
vecDocIDMap map[int64]uint32
// Used to indicate if the below fields are populated - will only be
// used for pre-filtered queries.
loadPreFilteringConstructs bool
docVecIDMap map[uint32][]uint32
clusterAssignment map[int64]*roaring.Bitmap
}

func createCacheEntry(stub *cacheEntryReqs) *cacheEntry {
ce := &cacheEntry{
index: index,
vecDocIDMap: vecDocIDMap,
index: stub.index,
vecDocIDMap: stub.vecDocIDMap,
tracker: &ewma{
alpha: alpha,
alpha: stub.alpha,
sample: 1,
},
refs: 1,
}
if loadDocVecIDMap {
ce.docVecIDMap = docVecIDMap
if stub.loadPreFilteringConstructs {
ce.docVecIDMap = stub.docVecIDMap
}
return ce
}
Expand All @@ -308,9 +345,10 @@ type cacheEntry struct {
// threshold we close/cleanup only if the live refs to the cache entry is 0.
refs int64

index *faiss.IndexImpl
vecDocIDMap map[int64]uint32
docVecIDMap map[uint32][]int64
index *faiss.IndexImpl
vecDocIDMap map[int64]uint32
docVecIDMap map[uint32][]uint32
clusterAssignment map[int64]*roaring.Bitmap
}

func (ce *cacheEntry) incHit() {
Expand All @@ -325,10 +363,11 @@ func (ce *cacheEntry) decRef() {
atomic.AddInt64(&ce.refs, -1)
}

func (ce *cacheEntry) load() (*faiss.IndexImpl, map[int64]uint32, map[uint32][]int64) {
func (ce *cacheEntry) load() (*faiss.IndexImpl, map[int64]*roaring.Bitmap,
map[int64]uint32, map[uint32][]uint32) {
ce.incHit()
ce.addRef()
return ce.index, ce.vecDocIDMap, ce.docVecIDMap
return ce.index, ce.clusterAssignment, ce.vecDocIDMap, ce.docVecIDMap
}

func (ce *cacheEntry) close() {
Expand Down
Loading