From f4f3e4de162473a5774f8511ffe2ae887b1ba41f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Tue, 14 Mar 2023 12:57:44 +0100 Subject: [PATCH] ReadWrite: faster Has() by using the in-memory index instead of reading on disk Before: // BenchmarkHas-8 190216 6368 ns/op 744 B/op 16 allocs/op After // BenchmarkHas-8 1419169 845.6 ns/op 320 B/op 6 allocs/op ``` func BenchmarkHas(b *testing.B) { ctx := context.TODO() path := filepath.Join(b.TempDir(), "bench-large-v2.car") generateRandomCarV2File(b, path, 200<<20) // 10 MiB defer os.Remove(path) subject, err := blockstore.OpenReadWrite(path, nil) c, err := subject.AllKeysChan(ctx) require.NoError(b, err) var allCids []cid.Cid for c2 := range c { allCids = append(allCids, c2) } b.ReportAllocs() b.ResetTimer() var idx int for i := 0; i < b.N; i++ { _, _ = subject.Has(ctx, allCids[idx]) // require.NoError(b, err) // require.True(b, has) idx = (idx + 1) % len(allCids) } } ``` --- v2/blockstore/readwrite.go | 24 +++++++++++++++++++++- v2/index/index.go | 6 +++--- v2/internal/store/insertionindex.go | 31 ++++++++++++++++++++++++++--- v2/internal/store/put.go | 8 ++++++-- 4 files changed, 60 insertions(+), 9 deletions(-) diff --git a/v2/blockstore/readwrite.go b/v2/blockstore/readwrite.go index e605414f..5ba24f9c 100644 --- a/v2/blockstore/readwrite.go +++ b/v2/blockstore/readwrite.go @@ -344,7 +344,29 @@ func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { } func (b *ReadWrite) Has(ctx context.Context, key cid.Cid) (bool, error) { - return b.ronly.Has(ctx, key) + if !b.opts.StoreIdentityCIDs { + // If we don't store identity CIDs then we can return them straight away as if they are here, + // otherwise we need to check for their existence. + // Note, we do this without locking, since there is no shared information to lock for in order to perform the check. + if _, ok, err := store.IsIdentity(key); err != nil { + return false, err + } else if ok { + return true, nil + } + } + + if ctx.Err() != nil { + return false, ctx.Err() + } + + b.ronly.mu.Lock() + defer b.ronly.mu.Unlock() + + if b.ronly.closed { + return false, errClosed + } + + return b.idx.HasMultihash(key.Hash()) } func (b *ReadWrite) Get(ctx context.Context, key cid.Cid) (blocks.Block, error) { diff --git a/v2/index/index.go b/v2/index/index.go index 911eaa7a..1634dda5 100644 --- a/v2/index/index.go +++ b/v2/index/index.go @@ -13,7 +13,7 @@ import ( "github.com/multiformats/go-varint" ) -// CarIndexNone is a sentinal value used as a multicodec code for the index indicating no index. +// CarIndexNone is a sentinel value used as a multicodec code for the index indicating no index. const CarIndexNone = 0x300000 type ( @@ -46,7 +46,7 @@ type ( // Unmarshal decodes the index from its serial form. // Note, this function will copy the entire index into memory. // - // Do not unmarshal index from untrusted CARv2 files. Instead the index should be + // Do not unmarshal index from untrusted CARv2 files. Instead, the index should be // regenerated from the CARv2 data payload. Unmarshal(r io.Reader) error @@ -84,7 +84,7 @@ type ( // and the ForEach function returns the error to the user. // // An index may contain multiple offsets corresponding to the same multihash, e.g. via duplicate blocks. - // In such cases, the given function may be called multiple times with the same multhihash but different offset. + // In such cases, the given function may be called multiple times with the same multihash but different offset. // // The order of calls to the given function is deterministic, but entirely index-specific. ForEach(func(multihash.Multihash, uint64) error) error diff --git a/v2/internal/store/insertionindex.go b/v2/internal/store/insertionindex.go index ffdf96fa..bc25fffb 100644 --- a/v2/internal/store/insertionindex.go +++ b/v2/internal/store/insertionindex.go @@ -212,10 +212,10 @@ func (ii *InsertionIndex) Flatten(codec multicodec.Code) (index.Index, error) { // but it's separate as it allows us to compare Record.Cid directly, // whereas GetAll just provides Record.Offset. -func (ii *InsertionIndex) HasExactCID(c cid.Cid) bool { +func (ii *InsertionIndex) HasExactCID(c cid.Cid) (bool, error) { d, err := multihash.Decode(c.Hash()) if err != nil { - panic(err) + return false, err } entry := recordDigest{digest: d.Digest} @@ -235,5 +235,30 @@ func (ii *InsertionIndex) HasExactCID(c cid.Cid) bool { return true } ii.items.AscendGreaterOrEqual(entry, iter) - return found + return found, nil +} + +func (ii *InsertionIndex) HasMultihash(mh multihash.Multihash) (bool, error) { + d, err := multihash.Decode(mh) + if err != nil { + return false, err + } + entry := recordDigest{digest: d.Digest} + + found := false + iter := func(i llrb.Item) bool { + existing := i.(recordDigest) + if !bytes.Equal(existing.digest, entry.digest) { + // We've already looked at all entries with matching digests. + return false + } + if bytes.Equal(existing.Record.Cid.Hash(), mh) { + found = true + return false + } + // Continue looking in ascending order. + return true + } + ii.items.AscendGreaterOrEqual(entry, iter) + return found, nil } diff --git a/v2/internal/store/put.go b/v2/internal/store/put.go index f82d0c1e..2a74dd0d 100644 --- a/v2/internal/store/put.go +++ b/v2/internal/store/put.go @@ -38,8 +38,12 @@ func ShouldPut( } if !blockstoreAllowDuplicatePuts { - if blockstoreUseWholeCIDs && idx.HasExactCID(c) { - return false, nil // deduplicated by CID + if blockstoreUseWholeCIDs { + has, err := idx.HasExactCID(c) + if err != nil { + return false, err + } + return !has, nil // deduplicated by CID } if !blockstoreUseWholeCIDs { _, err := idx.Get(c)