Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge optimizations - support for concurrency #10

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 124 additions & 64 deletions bitmap_opt.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,125 @@
package sroar

import (
"sync"
)

const minContainersForConcurrency = 16

// AndToSuperset calculates intersection of current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize container buffer provided.
// and utilize container buffers provided.
// Number of passed buffers indicates concurrency level
// (e.g. 4 buffers = merge will be performed by 4 goroutines).
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) AndToSuperset(src *Bitmap, containerBufs ...[]uint16) {
conc := len(containerBufs)
assert(conc > 0)

dstNumKeys := dst.keys.numKeys()
if src == nil {
concurrentlyOnRange(conc, dstNumKeys, func(_, from, to int) {
zeroOutSelectedContainers(dst, from, to)
})
return
}

srcNumKeys := src.keys.numKeys()
concurrentlyOnRange(conc, dstNumKeys, func(i, from, to int) {
andSelectedContainers(dst, src, from, to, 0, srcNumKeys, containerBufs[i])
})
}

// OrToSuperset calculates union of current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize containers buffers provided.
// Number of passed buffers indicates concurrency level
// (e.g. 4 buffers = merge will be performed by 4 goroutines).
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) OrToSuperset(src *Bitmap, containerBufs ...[]uint16) {
conc := len(containerBufs)
assert(conc > 0)

if src == nil {
return
}

srcNumKeys := src.keys.numKeys()
concurrentlyOnRange(conc, srcNumKeys, func(i, from, to int) {
orSelectedContainers(dst, src, from, to, containerBufs[i])
})
}

// AndNotToSuperset calculates difference between current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize containers buffers provided.
// Number of passed buffers indicates concurrency level
// (e.g. 4 buffers = merge will be performed by 4 goroutines).
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) AndToSuperset(src *Bitmap, containerBuf []uint16) {
func (dst *Bitmap) AndNotToSuperset(src *Bitmap, containerBufs ...[]uint16) {
conc := len(containerBufs)
assert(conc > 0)

if src == nil {
for ai, an := 0, dst.keys.numKeys(); ai < an; ai++ {
off := dst.keys.val(ai)
zeroOutContainer(dst.getContainer(off))
return
}

dstNumKeys := dst.keys.numKeys()
srcNumKeys := src.keys.numKeys()
concurrentlyOnRange(conc, dstNumKeys, func(i, from, to int) {
andNotSelectedContainers(dst, src, from, to, 0, srcNumKeys, containerBufs[i])
})
}

func (ra *Bitmap) ConvertToBitmapContainers() {
for ai, an := 0, ra.keys.numKeys(); ai < an; ai++ {
ak := ra.keys.key(ai)
off := ra.keys.val(ai)
ac := ra.getContainer(off)

if ac[indexType] == typeArray {
c := array(ac).toBitmapContainer(nil)
offset := ra.newContainer(uint16(len(c)))
copy(ra.data[offset:], c)
ra.setKey(ak, offset)
}
}
}

func concurrentlyOnRange(conc, max int, callback func(i, from, to int)) {
if conc == 1 || max < conc*minContainersForConcurrency {
callback(0, 0, max)
return
}

a, b := dst, src
ai, an := 0, a.keys.numKeys()
bi, bn := 0, b.keys.numKeys()
delta := max / conc

wg := new(sync.WaitGroup)
wg.Add(conc - 1)
for i := 0; i < conc-1; i++ {
go func(i int) {
callback(i, delta*i, delta*(i+1))
wg.Done()
}(i)
}
callback(conc-1, delta*(conc-1), max)
wg.Wait()
}

func zeroOutSelectedContainers(a *Bitmap, ai, an int) {
for ; ai < an; ai++ {
off := a.keys.val(ai)
zeroOutContainer(a.getContainer(off))
}
}

func andSelectedContainers(a, b *Bitmap, ai, an, bi, bn int, containerBuf []uint16) {
for ai < an && bi < bn {
ak := a.keys.key(ai)
bk := b.keys.key(bi)
Expand Down Expand Up @@ -49,64 +150,38 @@ func (dst *Bitmap) AndToSuperset(src *Bitmap, containerBuf []uint16) {
}
}

// OrToSuperset calculates union of current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize container buffer provided.
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) OrToSuperset(src *Bitmap, containerBuf []uint16) {
if src == nil {
return
}

srcIdx, numKeys := 0, src.keys.numKeys()
for ; srcIdx < numKeys; srcIdx++ {
srcCont := src.getContainer(src.keys.val(srcIdx))
if getCardinality(srcCont) == 0 {
func orSelectedContainers(a, b *Bitmap, bi, bn int, containerBuf []uint16) {
for ; bi < bn; bi++ {
off := b.keys.val(bi)
bc := b.getContainer(off)
if getCardinality(bc) == 0 {
continue
}

key := src.keys.key(srcIdx)

dstIdx := dst.keys.search(key)
if dstIdx >= dst.keys.numKeys() || dst.keys.key(dstIdx) != key {
bk := b.keys.key(bi)
ai := a.keys.search(bk)
if ai >= a.keys.numKeys() || a.keys.key(ai) != bk {
// Container does not exist in dst.
panic("Current bitmap should have all containers of incoming bitmap")
} else {
// Container exists in dst as well. Do an inline containerOr.
offset := dst.keys.val(dstIdx)
dstCont := dst.getContainer(offset)
containerOrToSuperset(dstCont, srcCont, containerBuf)
off = a.keys.val(ai)
ac := a.getContainer(off)
containerOrToSuperset(ac, bc, containerBuf)
}
}
}

// AndNotToSuperset calculates difference between current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize container buffer provided.
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) AndNotToSuperset(src *Bitmap, containerBuf []uint16) {
if src == nil {
return
}

a, b := dst, src
ai, an := 0, a.keys.numKeys()
bi, bn := 0, b.keys.numKeys()

func andNotSelectedContainers(a, b *Bitmap, ai, an, bi, bn int, containerBuf []uint16) {
for ai < an && bi < bn {
ak := a.keys.key(ai)
bk := b.keys.key(bi)
if ak == bk {
off := a.keys.val(ai)
ac := a.getContainer(off)
off = b.keys.val(bi)
off := b.keys.val(bi)
bc := b.getContainer(off)

if getCardinality(bc) != 0 {
off = a.keys.val(ai)
ac := a.getContainer(off)
containerAndNotToSuperset(ac, bc, containerBuf)
}
ai++
Expand All @@ -118,18 +193,3 @@ func (dst *Bitmap) AndNotToSuperset(src *Bitmap, containerBuf []uint16) {
}
}
}

func (ra *Bitmap) ConvertToBitmapContainers() {
for ai, an := 0, ra.keys.numKeys(); ai < an; ai++ {
ak := ra.keys.key(ai)
off := ra.keys.val(ai)
ac := ra.getContainer(off)

if ac[indexType] == typeArray {
c := array(ac).toBitmapContainer(nil)
offset := ra.newContainer(uint16(len(c)))
copy(ra.data[offset:], c)
ra.setKey(ak, offset)
}
}
}
Loading
Loading