Skip to content

Commit

Permalink
Enhance Support for Larger Datasets and Buckets in Encoding
Browse files Browse the repository at this point in the history
This commit improves encoding by enabling the handling of number of items and buckets exceeding max(uint32). Formerly, the encoding used uint32 for counts, but the filter structure already supported larger values using uint.
Until now, the filter partially supported larger datasets, not all the buckets were utilized, note to the change in `generateIndexTagHash`, `altIndex` and `indexHash`.

Now, all references to bucket indices and item counts explicitly use uint64. A new encoding format accommodates larger filter.
To distinguish between legacy (up to max(uint32) items) and the new format, a prefix marker is introduced.

Decoding seamlessly supports both formats.
The encode method takes a legacy boolean parameter for gradual adoption.
  • Loading branch information
EladGabay committed Aug 16, 2023
1 parent 92f5275 commit f8cedef
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 198 deletions.
182 changes: 133 additions & 49 deletions cuckoofilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,39 @@ import (
"errors"
"fmt"
"io"
"math"

"github.com/dgryski/go-metro"
)

// maximum number of cuckoo kicks before claiming failure
const kMaxCuckooCount uint = 500
const kMaxCuckooCount uint64 = 500

type TableType uint32

const (
// TableTypeSingle normal single table
TableTypeSingle = 0
TableTypeSingle TableType = 0
// TableTypePacked packed table, use semi-sort to save 1 bit per item
TableTypePacked = 1
TableTypePacked TableType = 1
)

type table interface {
Init(tagsPerBucket, bitsPerTag, num uint, initialBucketsHint []byte) error
NumBuckets() uint
FindTagInBuckets(i1, i2 uint, tag uint32) bool
DeleteTagFromBucket(i uint, tag uint32) bool
InsertTagToBucket(i uint, tag uint32, kickOut bool, oldTag *uint32) bool
SizeInTags() uint
SizeInBytes() uint
Init(tagsPerBucket, bitsPerTag uint, num uint64, initialBucketsHint []byte) error
NumBuckets() uint64
FindTagInBuckets(i1, i2 uint64, tag uint32) bool
DeleteTagFromBucket(i uint64, tag uint32) bool
InsertTagToBucket(i uint64, tag uint32, kickOut bool, oldTag *uint32) bool
SizeInTags() uint64
SizeInBytes() uint64
Info() string
BitsPerItem() uint
Reader() (io.Reader, uint)
Decode([]byte) error
Reader(legacy bool) (io.Reader, uint64)
Decode(legacy bool, _ []byte) error
Reset()
}

func getTable(tableType uint) interface{} {
func getTable(tableType TableType) interface{} {
switch tableType {
case TableTypePacked:
return NewPackedTable()
Expand All @@ -50,30 +53,28 @@ func getTable(tableType uint) interface{} {
}

type victimCache struct {
index uint
index uint64
tag uint32
used bool
}

const filterMetadataSize = 3*bytesPerUint32 + 1

// Filter cuckoo filter type struct
type Filter struct {
victim victimCache
numItems uint
numItems uint64
table table
}

//NewFilter return a new initialized filter
// NewFilter return a new initialized filter
/*
tagsPerBucket: num of tags for each bucket, which is b in paper. tag is fingerprint, which is f in paper.
bitPerItem: num of bits for each item, which is length of tag(fingerprint)
bitPerItem: num of bits for each item, which is length of tag(fingerprint) (max is 32)
maxNumKeys: num of keys that filter will store. this value should close to and lower
nextPow2(maxNumKeys/tagsPerBucket) * maxLoadFactor. cause table.NumBuckets is always a power of two
*/
func NewFilter(tagsPerBucket, bitsPerItem, maxNumKeys, tableType uint) *Filter {
numBuckets := getNextPow2(uint64(maxNumKeys / tagsPerBucket))
if float64(maxNumKeys)/float64(numBuckets*tagsPerBucket) > maxLoadFactor(tagsPerBucket) {
func NewFilter(tagsPerBucket, bitsPerItem uint, maxNumKeys uint64, tableType TableType) *Filter {
numBuckets := getNextPow2(maxNumKeys / uint64(tagsPerBucket))
if float64(maxNumKeys)/float64(numBuckets*uint64(tagsPerBucket)) > maxLoadFactor(tagsPerBucket) {
numBuckets <<= 1
}
if numBuckets == 0 {
Expand All @@ -86,30 +87,46 @@ func NewFilter(tagsPerBucket, bitsPerItem, maxNumKeys, tableType uint) *Filter {
}
}

func (f *Filter) indexHash(hv uint32) uint {
func (f *Filter) indexHash(hv uint64) uint64 {
// table.NumBuckets is always a power of two, so modulo can be replaced with bitwise-and:
return uint(hv) & (f.table.NumBuckets() - 1)
return hv & (f.table.NumBuckets() - 1)
}

func (f *Filter) tagHash(hv uint32) uint32 {
return hv%((1<<f.table.BitsPerItem())-1) + 1
func (f *Filter) tagHash(hv uint64) uint32 {
return uint32(hv)%((1<<f.table.BitsPerItem())-1) + 1
}

func (f *Filter) generateIndexTagHash(item []byte) (index uint, tag uint32) {
hash := metro.Hash64(item, 1337)
index = f.indexHash(uint32(hash >> 32))
tag = f.tagHash(uint32(hash))
func (f *Filter) generateIndexTagHash(item []byte) (index uint64, tag uint32) {
// For backward compatibility with existing filters before adding the support of more than max uint32 items,
// we need to use the old hash function.
if f.table.SizeInTags()-1 <= math.MaxUint32 {
hash := metro.Hash64(item, 1337)
index = f.indexHash(hash >> 32)
tag = f.tagHash(hash)
return
}

hash1, hash2 := metro.Hash128(item, 1337)
index = f.indexHash(hash1)
tag = f.tagHash(hash2)
return
}

func (f *Filter) altIndex(index uint, tag uint32) uint {
// 0x5bd1e995 is the hash constant from MurmurHash2
return f.indexHash(uint32(index) ^ (tag * 0x5bd1e995))
func (f *Filter) altIndex(index uint64, tag uint32) uint64 {
// For backward compatibility with existing filters before adding the support of more than max uint32 items,
// we need to use the old hash function.
if f.table.SizeInTags()-1 <= math.MaxUint32 {
// 0x5bd1e995 is the hash constant from MurmurHash2.
return f.indexHash(uint64(uint32(index) ^ (tag * 0x5bd1e995)))
}

// 0xc6a4a7935bd1e995 is the hash constant from MurmurHash64A.
return f.indexHash(index ^ (uint64(tag) * 0xc6a4a7935bd1e995))
}

// Size return num of items that filter store
func (f *Filter) Size() uint {
var c uint
func (f *Filter) Size() uint64 {
var c uint64
if f.victim.used {
c = 1
}
Expand All @@ -122,7 +139,7 @@ func (f *Filter) LoadFactor() float64 {
}

// SizeInBytes return bytes occupancy of filter's table
func (f *Filter) SizeInBytes() uint {
func (f *Filter) SizeInBytes() uint64 {
return f.table.SizeInBytes()
}

Expand All @@ -148,12 +165,12 @@ func (f *Filter) AddUnique(item []byte) bool {
return f.Add(item)
}

func (f *Filter) addImpl(i uint, tag uint32) bool {
func (f *Filter) addImpl(i uint64, tag uint32) bool {
curIndex := i
curTag := tag
var oldTag uint32

var count uint
var count uint64
var kickOut bool
for count = 0; count < kMaxCuckooCount; count++ {
kickOut = count > 0
Expand Down Expand Up @@ -255,19 +272,57 @@ func (f *Filter) Info() string {
}

// Encode returns a byte slice representing a Cuckoo filter
func (f *Filter) Encode() ([]byte, error) {
filterReader, filterSize := f.EncodeReader()
func (f *Filter) Encode(legacy bool) ([]byte, error) {
filterReader, filterSize := f.EncodeReader(legacy)
buf := make([]byte, filterSize)
if _, err := io.ReadFull(filterReader, buf); err != nil {
return nil, err
}
return buf, nil
}

const (
// uint32(numItems), uint32(victim.index), uint32(victim.tag), byte(victim.used)
filterMetadataSizeLegacy = 3*bytesPerUint32 + 1

// uint64(numItems), uint64(victim.index), uint32(victim.tag), byte(victim.used)
filterMetadataSize = 2*bytesPerUint64 + bytesPerUint32 + 1
)

// In the legacy serialization format, there are 3 uint32s and then a byte for "victimUsed" which is a boolean 0 or 1.
// We need a way to distinguish between the legacy format and the new format, so we can use that byte as a marker
// (0/1 means the legacy format, other value means the new format).
// In the new format the first 13 bytes are not used, just markers, the actual serialization starts after the marker.
// The marker is hex of "IMNOTLEGACY!!" :).
var newFormatMarker = [filterMetadataSizeLegacy]byte{0x49, 0x4D, 0x4E, 0x4F, 0x54, 0x4C, 0x45, 0x47, 0x41, 0x43, 0x59, 0x21, 0x21}

// EncodeReader returns a reader representing a Cuckoo filter
func (f *Filter) EncodeReader() (io.Reader, uint) {
func (f *Filter) EncodeReader(legacy bool) (io.Reader, uint64) {
if legacy {
return f.encodeReaderLegacyMaxUint32()
}

var metadata [filterMetadataSize]byte

for i, n := range []uint64{f.numItems, f.victim.index} {
binary.LittleEndian.PutUint64(metadata[i*bytesPerUint64:], n)
}

binary.LittleEndian.PutUint32(metadata[2*bytesPerUint64:], f.victim.tag)

victimUsed := byte(0)
if f.victim.used {
victimUsed = byte(1)
}
metadata[2*bytesPerUint64+bytesPerUint32] = victimUsed
tableReader, tableEncodedSize := f.table.Reader(false)
return io.MultiReader(bytes.NewReader(newFormatMarker[:]), bytes.NewReader(metadata[:]), tableReader), uint64(len(newFormatMarker)) + uint64(len(metadata)) + tableEncodedSize
}

// encodeReaderLegacyMaxUint32 returns a reader representing a Cuckoo filter encoded in the legacy mode that supports up to max(uint32) items.
func (f *Filter) encodeReaderLegacyMaxUint32() (io.Reader, uint64) {
var metadata [filterMetadataSizeLegacy]byte

for i, n := range []uint32{uint32(f.numItems), uint32(f.victim.index), f.victim.tag} {
binary.LittleEndian.PutUint32(metadata[i*bytesPerUint32:], n)
}
Expand All @@ -277,8 +332,8 @@ func (f *Filter) EncodeReader() (io.Reader, uint) {
victimUsed = byte(1)
}
metadata[bytesPerUint32*3] = victimUsed
tableReader, tableEncodedSize := f.table.Reader()
return io.MultiReader(bytes.NewReader(metadata[:]), tableReader), uint(len(metadata)) + tableEncodedSize
tableReader, tableEncodedSize := f.table.Reader(true)
return io.MultiReader(bytes.NewReader(metadata[:]), tableReader), uint64(len(metadata)) + tableEncodedSize
}

// Decode returns a Cuckoo Filter using a copy of the provided byte slice.
Expand All @@ -293,13 +348,42 @@ func DecodeFrom(b []byte) (*Filter, error) {
if len(b) < 20 {
return nil, errors.New("unexpected bytes length")
}
numItems := uint(binary.LittleEndian.Uint32(b[0*bytesPerUint32:]))
curIndex := uint(binary.LittleEndian.Uint32(b[1*bytesPerUint32:]))
curTag := binary.LittleEndian.Uint32(b[2*1*bytesPerUint32:])
used := b[12] == byte(1)
tableType := uint(b[13])

curOffset := uint64(0)
legacy := uint(b[len(newFormatMarker)-1]) <= 1

// Skip the marker if it's the new format.
if !legacy {
curOffset += uint64(len(newFormatMarker))
}

var numItems uint64
if legacy {
numItems = uint64(binary.LittleEndian.Uint32(b[curOffset:]))
curOffset += bytesPerUint32
} else {
numItems = binary.LittleEndian.Uint64(b[curOffset:])
curOffset += bytesPerUint64
}

var curIndex uint64
if legacy {
curIndex = uint64(binary.LittleEndian.Uint32(b[curOffset:]))
curOffset += bytesPerUint32
} else {
curIndex = binary.LittleEndian.Uint64(b[curOffset:])
curOffset += bytesPerUint64
}

curTag := binary.LittleEndian.Uint32(b[curOffset:])
curOffset += bytesPerUint32

used := b[curOffset] == byte(1)

tableOffset := curOffset + 1
tableType := TableType(b[tableOffset])
table := getTable(tableType).(table)
if err := table.Decode(b[13:]); err != nil {
if err := table.Decode(legacy, b[tableOffset:]); err != nil {
return nil, err
}
return &Filter{
Expand Down
Loading

0 comments on commit f8cedef

Please sign in to comment.