diff --git a/table/builder_test.go b/table/builder_test.go index e73e5e88f..9650a9275 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -57,7 +57,7 @@ func TestTableIndex(t *testing.T) { keysCount := 10000 for _, opt := range opts { builder := NewTableBuilder(opt) - filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Int63()) + filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) f, err := y.OpenSyncedFile(filename, true) require.NoError(t, err) diff --git a/table/table.go b/table/table.go index fe6ca6e25..25227be34 100644 --- a/table/table.go +++ b/table/table.go @@ -18,6 +18,7 @@ package table import ( "crypto/aes" + "encoding/binary" "fmt" "io" "math" @@ -81,7 +82,7 @@ type TableInterface interface { DoesNotHave(hash uint64) bool } -// Table represents a loaded table file with the info we have about it +// Table represents a loaded table file with the info we have about it. type Table struct { sync.Mutex @@ -97,10 +98,11 @@ type Table struct { smallest, biggest []byte // Smallest and largest keys (with timestamps). id uint64 // file id, part of filename - bf *z.Bloom Checksum []byte // Stores the total size of key-values stored in this table (including the size on vlog). estimatedSize uint64 + indexStart int + indexLen int IsInmemory bool // Set to true if the table is on level 0 and opened in memory. opt *Options @@ -146,6 +148,13 @@ func (t *Table) DecrRef() error { if err := os.Remove(filename); err != nil { return err } + // Delete all blocks from the cache. + for i := range t.blockIndex { + t.opt.Cache.Del(t.blockCacheKey(i)) + } + // Delete bloom filter from the cache. + t.opt.Cache.Del(t.bfCacheKey()) + } return nil } @@ -336,10 +345,12 @@ func (t *Table) readIndex() error { // Read index size from the footer. readPos -= 4 buf = t.readNoFail(readPos, 4) - indexLen := int(y.BytesToU32(buf)) + t.indexLen = int(y.BytesToU32(buf)) + // Read index. - readPos -= indexLen - data := t.readNoFail(readPos, indexLen) + readPos -= t.indexLen + t.indexStart = readPos + data := t.readNoFail(readPos, t.indexLen) if err := y.VerifyChecksum(data, expectedChk); err != nil { return y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename()) @@ -358,11 +369,18 @@ func (t *Table) readIndex() error { y.Check(err) t.estimatedSize = index.EstimatedSize - if t.bf, err = z.JSONUnmarshal(index.BloomFilter); err != nil { - return y.Wrapf(err, "failed to unmarshal bloom filter for the table %d in Table.readIndex", - t.id) - } t.blockIndex = index.Offsets + + // Avoid the cost of unmarshalling the bloom filters if the cache is absent. + if t.opt.Cache != nil { + var bf *z.Bloom + if bf, err = z.JSONUnmarshal(index.BloomFilter); err != nil { + return y.Wrapf(err, "failed to unmarshal bloom filter for the table %d in Table.readIndex", + t.id) + } + + t.opt.Cache.Set(t.bfCacheKey(), bf, int64(len(index.BloomFilter))) + } return nil } @@ -443,10 +461,25 @@ func (t *Table) block(idx int) (*block, error) { return blk, nil } -func (t *Table) blockCacheKey(idx int) uint64 { - y.AssertTrue(t.ID() < math.MaxUint32) +// bfCacheKey returns the cache key for bloom filter. +func (t *Table) bfCacheKey() []byte { + y.AssertTrue(t.id < math.MaxUint32) + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, uint32(t.id)) + + // Without the "bf" prefix, we will have conflict with the blockCacheKey. + return append([]byte("bf"), buf...) +} + +func (t *Table) blockCacheKey(idx int) []byte { + y.AssertTrue(t.id < math.MaxUint32) y.AssertTrue(uint32(idx) < math.MaxUint32) - return (t.ID() << 32) | uint64(idx) + + buf := make([]byte, 8) + // Assume t.ID does not overflow uint32. + binary.BigEndian.PutUint32(buf[:4], uint32(t.ID())) + binary.BigEndian.PutUint32(buf[4:], uint32(idx)) + return buf } // EstimatedSize returns the total size of key-values stored in this table (including the @@ -470,7 +503,44 @@ func (t *Table) ID() uint64 { return t.id } // DoesNotHave returns true if (but not "only if") the table does not have the key hash. // It does a bloom filter lookup. -func (t *Table) DoesNotHave(hash uint64) bool { return !t.bf.Has(hash) } +func (t *Table) DoesNotHave(hash uint64) bool { + var bf *z.Bloom + + // Return fast if cache is absent. + if t.opt.Cache == nil { + bf, _ := t.readBloomFilter() + return !bf.Has(hash) + } + + // Check if the bloomfilter exists in the cache. + if b, ok := t.opt.Cache.Get(t.bfCacheKey()); b != nil && ok { + bf = b.(*z.Bloom) + return !bf.Has(hash) + } + + bf, sz := t.readBloomFilter() + t.opt.Cache.Set(t.bfCacheKey(), bf, int64(sz)) + return !bf.Has(hash) +} + +// readBloomFilter reads the bloom filter from the SST and returns its length +// along with the bloom filter. +func (t *Table) readBloomFilter() (*z.Bloom, int) { + // Read bloom filter from the SST. + data := t.readNoFail(t.indexStart, t.indexLen) + index := pb.TableIndex{} + var err error + // Decrypt the table index if it is encrypted. + if t.shouldDecrypt() { + data, err = t.decrypt(data) + y.Check(err) + } + y.Check(proto.Unmarshal(data, &index)) + + bf, err := z.JSONUnmarshal(index.BloomFilter) + y.Check(err) + return bf, len(index.BloomFilter) +} // VerifyChecksum verifies checksum for all blocks of table. This function is called by // OpenTable() function. This function is also called inside levelsController.VerifyChecksum(). diff --git a/table/table_test.go b/table/table_test.go index 7aa47d547..27a4f1d16 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -77,13 +77,9 @@ func buildTable(t *testing.T, keyValues [][]string, opts Options) *os.File { defer b.Close() // TODO: Add test for file garbage collection here. No files should be left after the tests here. - filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) + filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Uint32()) f, err := y.CreateSyncedFile(filename, true) - if t != nil { - require.NoError(t, err) - } else { - y.Check(err) - } + require.NoError(t, err) sort.Slice(keyValues, func(i, j int) bool { return keyValues[i][0] < keyValues[j][0]