Skip to content

Commit

Permalink
Merge pull request #408 from ipld/feat/public-insertion-index
Browse files Browse the repository at this point in the history
refactor insertion index to be publicly accessible
  • Loading branch information
willscott authored Mar 27, 2023
2 parents fff0e43 + f7dda88 commit 6c6051d
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 28 deletions.
4 changes: 2 additions & 2 deletions v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type ReadWrite struct {

f *os.File
dataWriter *internalio.OffsetWriteSeeker
idx *store.InsertionIndex
idx *index.InsertionIndex
header carv2.Header

finalized bool // also protected by ronly.mu
Expand Down Expand Up @@ -117,7 +117,7 @@ func OpenReadWriteFile(f *os.File, roots []cid.Cid, opts ...carv2.Option) (*Read
// Set the header fileld before applying options since padding options may modify header.
rwbs := &ReadWrite{
f: f,
idx: store.NewInsertionIndex(),
idx: index.NewInsertionIndex(),
header: carv2.NewHeader(0),
opts: carv2.ApplyOptions(opts...),
finalized: false,
Expand Down
29 changes: 14 additions & 15 deletions v2/internal/store/insertionindex.go → v2/index/insertionindex.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package index

import (
"bytes"
Expand All @@ -8,7 +8,6 @@ import (
"io"

"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/petar/GoLLRB/llrb"
Expand All @@ -34,7 +33,7 @@ func NewInsertionIndex() *InsertionIndex {

type recordDigest struct {
digest []byte
index.Record
Record
}

func (r recordDigest) Less(than llrb.Item) bool {
Expand All @@ -45,7 +44,7 @@ func (r recordDigest) Less(than llrb.Item) bool {
return bytes.Compare(r.digest, other.digest) < 0
}

func newRecordDigest(r index.Record) recordDigest {
func newRecordDigest(r Record) recordDigest {
d, err := multihash.Decode(r.Hash())
if err != nil {
panic(err)
Expand All @@ -60,7 +59,7 @@ func newRecordFromCid(c cid.Cid, at uint64) recordDigest {
panic(err)
}

return recordDigest{d.Digest, index.Record{Cid: c, Offset: at}}
return recordDigest{d.Digest, Record{Cid: c, Offset: at}}
}

func (ii *InsertionIndex) InsertNoReplace(key cid.Cid, n uint64) {
Expand All @@ -75,19 +74,19 @@ func (ii *InsertionIndex) Get(c cid.Cid) (uint64, error) {
return record.Offset, nil
}

func (ii *InsertionIndex) getRecord(c cid.Cid) (index.Record, error) {
func (ii *InsertionIndex) getRecord(c cid.Cid) (Record, error) {
d, err := multihash.Decode(c.Hash())
if err != nil {
return index.Record{}, err
return Record{}, err
}
entry := recordDigest{digest: d.Digest}
e := ii.items.Get(entry)
if e == nil {
return index.Record{}, index.ErrNotFound
return Record{}, ErrNotFound
}
r, ok := e.(recordDigest)
if !ok {
return index.Record{}, errUnsupported
return Record{}, errUnsupported
}

return r.Record, nil
Expand All @@ -112,7 +111,7 @@ func (ii *InsertionIndex) GetAll(c cid.Cid, fn func(uint64) bool) error {
}
ii.items.AscendGreaterOrEqual(entry, iter)
if !any {
return index.ErrNotFound
return ErrNotFound
}
return nil
}
Expand Down Expand Up @@ -142,7 +141,7 @@ func (ii *InsertionIndex) Unmarshal(r io.Reader) error {
}
d := cbor.NewDecoder(r)
for i := int64(0); i < length; i++ {
var rec index.Record
var rec Record
if err := d.Decode(&rec); err != nil {
return err
}
Expand Down Expand Up @@ -175,7 +174,7 @@ func (ii *InsertionIndex) Codec() multicodec.Code {
return insertionIndexCodec
}

func (ii *InsertionIndex) Load(rs []index.Record) error {
func (ii *InsertionIndex) Load(rs []Record) error {
for _, r := range rs {
rec := newRecordDigest(r)
if rec.digest == nil {
Expand All @@ -187,12 +186,12 @@ func (ii *InsertionIndex) Load(rs []index.Record) error {
}

// flatten returns a formatted index in the given codec for more efficient subsequent loading.
func (ii *InsertionIndex) Flatten(codec multicodec.Code) (index.Index, error) {
si, err := index.New(codec)
func (ii *InsertionIndex) Flatten(codec multicodec.Code) (Index, error) {
si, err := New(codec)
if err != nil {
return nil, err
}
rcrds := make([]index.Record, ii.items.Len())
rcrds := make([]Record, ii.items.Len())

idx := 0
iter := func(i llrb.Item) bool {
Expand Down
2 changes: 1 addition & 1 deletion v2/internal/store/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func FindCid(

// Finalize will write the index to the writer at the offset specified in the header. It should only
// be used for a CARv2 and when the CAR interface is being closed.
func Finalize(writer io.WriterAt, header carv2.Header, idx *InsertionIndex, dataSize uint64, storeIdentityCIDs bool, indexCodec multicodec.Code) error {
func Finalize(writer io.WriterAt, header carv2.Header, idx *index.InsertionIndex, dataSize uint64, storeIdentityCIDs bool, indexCodec multicodec.Code) error {
// TODO check if add index option is set and don't write the index then set index offset to zero.
header = header.WithDataSize(dataSize)
header.Characteristics.SetFullyIndexed(storeIdentityCIDs)
Expand Down
5 changes: 3 additions & 2 deletions v2/internal/store/indexcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package store
import (
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/index"
)

// ShouldPut returns true if the block should be put into the CAR according to the options provided
// and the index. It returns false if the block should not be put into the CAR, either because it
// is an identity block and StoreIdentityCIDs is false, or because it already exists and
// BlockstoreAllowDuplicatePuts is false.
func ShouldPut(
idx *InsertionIndex,
idx *index.InsertionIndex,
c cid.Cid,
maxIndexCidSize uint64,
storeIdentityCIDs bool,
Expand Down Expand Up @@ -60,7 +61,7 @@ func ShouldPut(
// rules associated with the options. Similar to ShouldPut, but for the simpler
// Has() case.
func Has(
idx *InsertionIndex,
idx *index.InsertionIndex,
c cid.Cid,
maxIndexCidSize uint64,
storeIdentityCIDs bool,
Expand Down
3 changes: 2 additions & 1 deletion v2/internal/store/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-car/v2/internal/carv1"
internalio "github.com/ipld/go-car/v2/internal/io"
"github.com/multiformats/go-varint"
Expand Down Expand Up @@ -51,7 +52,7 @@ func Resume(
rw ReaderWriterAt,
dataReader io.ReaderAt,
dataWriter *internalio.OffsetWriteSeeker,
idx *InsertionIndex,
idx *index.InsertionIndex,
roots []cid.Cid,
dataOffset uint64,
v1 bool,
Expand Down
14 changes: 7 additions & 7 deletions v2/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func OpenReadable(reader io.ReaderAt, opts ...carv2.Option) (ReadableCar, error)
sc.roots = header.Roots
sc.reader = reader
rr.Seek(0, io.SeekStart)
sc.idx = store.NewInsertionIndex()
sc.idx = index.NewInsertionIndex()
if err := carv2.LoadIndex(sc.idx, rr, opts...); err != nil {
return nil, err
}
Expand All @@ -120,7 +120,7 @@ func OpenReadable(reader io.ReaderAt, opts ...carv2.Option) (ReadableCar, error)
if err != nil {
return nil, err
}
sc.idx = store.NewInsertionIndex()
sc.idx = index.NewInsertionIndex()
if err := carv2.LoadIndex(sc.idx, dr, opts...); err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func NewWritable(writer io.Writer, roots []cid.Cid, opts ...carv2.Option) (Writa
func newWritable(writer io.Writer, roots []cid.Cid, opts ...carv2.Option) (*StorageCar, error) {
sc := &StorageCar{
writer: &positionTrackingWriter{w: writer},
idx: store.NewInsertionIndex(),
idx: index.NewInsertionIndex(),
header: carv2.NewHeader(0),
opts: carv2.ApplyOptions(opts...),
roots: roots,
Expand Down Expand Up @@ -260,7 +260,7 @@ func OpenReadableWritable(rw ReaderAtWriterAt, roots []cid.Cid, opts ...carv2.Op
rw,
sc.reader,
sc.dataWriter,
sc.idx.(*store.InsertionIndex),
sc.idx.(*index.InsertionIndex),
roots,
sc.header.DataOffset,
sc.opts.WriteAsCarV1,
Expand Down Expand Up @@ -309,7 +309,7 @@ func (sc *StorageCar) Put(ctx context.Context, keyStr string, data []byte) error
return errClosed
}

idx, ok := sc.idx.(*store.InsertionIndex)
idx, ok := sc.idx.(*index.InsertionIndex)
if !ok || sc.writer == nil {
return fmt.Errorf("cannot put into a read-only CAR")
}
Expand Down Expand Up @@ -356,7 +356,7 @@ func (sc *StorageCar) Has(ctx context.Context, keyStr string) (bool, error) {
return false, errClosed
}

if idx, ok := sc.idx.(*store.InsertionIndex); ok && sc.writer != nil {
if idx, ok := sc.idx.(*index.InsertionIndex); ok && sc.writer != nil {
// writable CAR, fast path using InsertionIndex
return store.Has(
idx,
Expand Down Expand Up @@ -460,7 +460,7 @@ func (sc *StorageCar) GetStream(ctx context.Context, keyStr string) (io.ReadClos
// payload location. This should be called on a writable StorageCar in order to
// avoid data loss.
func (sc *StorageCar) Finalize() error {
idx, ok := sc.idx.(*store.InsertionIndex)
idx, ok := sc.idx.(*index.InsertionIndex)
if !ok || sc.writer == nil {
// ignore this, it's not writable
return nil
Expand Down

0 comments on commit 6c6051d

Please sign in to comment.