Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Fix race condition on childer. #124

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 90 additions & 65 deletions hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ func (ds *Shard) Node() (ipld.Node, error) {
continue
}

ch := ds.childer.child(sliceIndex)
chl := ds.childer.childOrLink(sliceIndex)

ch := chl.child
lnk := chl.link

if ch != nil {
clnk, err := ch.Link()
if err != nil {
Expand All @@ -184,7 +188,6 @@ func (ds *Shard) Node() (ipld.Node, error) {
}
} else {
// child unloaded, just copy in link with updated name
lnk := ds.childer.link(sliceIndex)
label := lnk.Name[ds.maxpadlen:]

err := out.AddRawLink(ds.linkNamePrefix(childIndex)+label, lnk)
Expand Down Expand Up @@ -415,46 +418,55 @@ type listCidsAndShards struct {
func (ds *Shard) walkChildren(processLinkValues func(formattedLink *ipld.Link) error) (*listCidsAndShards, error) {
res := &listCidsAndShards{}

for idx, lnk := range ds.childer.links {
if nextShard := ds.childer.children[idx]; nextShard == nil {
lnkLinkType, err := ds.childLinkType(lnk)
if err != nil {
for i := 0; i < ds.childer.length(); i++ {
chl := ds.childer.childOrLink(i)

lnk := chl.link
nextShard := chl.child

if nextShard != nil && nextShard.val != nil {
formattedLink := &ipld.Link{
Name: nextShard.key,
Size: nextShard.val.Size,
Cid: nextShard.val.Cid,
}
if err := processLinkValues(formattedLink); err != nil {
return nil, err
}

switch lnkLinkType {
case shardValueLink:
sv, err := ds.makeShardValue(lnk)
if err != nil {
return nil, err
}
formattedLink := sv.val
formattedLink.Name = sv.key
continue
}

if err := processLinkValues(formattedLink); err != nil {
return nil, err
}
case shardLink:
res.cids = append(res.cids, lnk.Cid)
default:
return nil, fmt.Errorf("unsupported shard link type")
if nextShard != nil {
res.shards = append(res.shards, nextShard)

continue
}

lnkLinkType, err := ds.childLinkType(lnk)
if err != nil {
return nil, err
}

switch lnkLinkType {
case shardValueLink:
sv, err := ds.makeShardValue(lnk)
if err != nil {
return nil, err
}
formattedLink := sv.val
formattedLink.Name = sv.key

} else {
if nextShard.val != nil {
formattedLink := &ipld.Link{
Name: nextShard.key,
Size: nextShard.val.Size,
Cid: nextShard.val.Cid,
}
if err := processLinkValues(formattedLink); err != nil {
return nil, err
}
} else {
res.shards = append(res.shards, nextShard)
if err := processLinkValues(formattedLink); err != nil {
return nil, err
}
case shardLink:
res.cids = append(res.cids, lnk.Cid)
default:
return nil, fmt.Errorf("unsupported shard link type")
}
}

return res, nil
}

Expand Down Expand Up @@ -707,7 +719,11 @@ func (ds *Shard) swapValue(ctx context.Context, hv *hashBits, key string, value
// shard in case an implementation is broken.

// Have we loaded the child? Prefer that.
schild := child.childer.child(0)
chl := child.childer.childOrLink(0)

schild := chl.child
slnk := chl.link

if schild != nil {
if schild.isValueNode() {
ds.childer.set(schild, i)
Expand All @@ -716,7 +732,6 @@ func (ds *Shard) swapValue(ctx context.Context, hv *hashBits, key string, value
}

// Otherwise, work with the link.
slnk := child.childer.link(0)
var lnkType linkType
lnkType, err = child.childer.sd.childLinkType(slnk)
if err != nil {
Expand Down Expand Up @@ -750,9 +765,14 @@ type childer struct {
dserv ipld.DAGService
bitfield bitfield.Bitfield

// Only one of links/children will be non-nil for every child/link.
links []*ipld.Link
children []*Shard
mu sync.RWMutex
children []childOrLink
}

// childOrLink is a data structure that will contains a link OR a child.
type childOrLink struct {
link *ipld.Link
child *Shard
}

func newChilder(ds ipld.DAGService, size int) *childer {
Expand All @@ -763,11 +783,13 @@ func newChilder(ds ipld.DAGService, size int) *childer {
}

func (s *childer) makeChilder(data []byte, links []*ipld.Link) *childer {
s.children = make([]*Shard, len(links))
s.mu.Lock()
defer s.mu.Unlock()

s.bitfield.SetBytes(data)
if len(links) > 0 {
s.links = make([]*ipld.Link, len(links))
copy(s.links, links)

for _, l := range links {
s.children = append(s.children, childOrLink{link: l})
}

return s
Expand All @@ -778,12 +800,11 @@ func (s *childer) sliceIndex(childIndex int) (sliceIndex int) {
return s.bitfield.OnesBefore(childIndex)
}

func (s *childer) child(sliceIndex int) *Shard {
return s.children[sliceIndex]
}
func (s *childer) childOrLink(sliceIndex int) childOrLink {
s.mu.RLock()
defer s.mu.RUnlock()

func (s *childer) link(sliceIndex int) *ipld.Link {
return s.links[sliceIndex]
return s.children[sliceIndex]
}

func (s *childer) insert(key string, lnk *ipld.Link, idx int) error {
Expand All @@ -795,8 +816,12 @@ func (s *childer) insert(key string, lnk *ipld.Link, idx int) error {
i := s.sliceIndex(idx)
sd := &Shard{key: key, val: lnk}

s.children = append(s.children[:i], append([]*Shard{sd}, s.children[i:]...)...)
s.links = append(s.links[:i], append([]*ipld.Link{nil}, s.links[i:]...)...)
s.mu.Lock()

s.children = append(s.children[:i], append([]childOrLink{{child: sd}}, s.children[i:]...)...)

s.mu.Unlock()

// Add a `nil` placeholder in `links` so the rest of the entries keep the same
// index as `children`.
s.bitfield.SetBit(idx)
Expand All @@ -805,13 +830,17 @@ func (s *childer) insert(key string, lnk *ipld.Link, idx int) error {
}

func (s *childer) set(sd *Shard, i int) {
s.children[i] = sd
s.links[i] = nil
s.mu.Lock()
defer s.mu.Unlock()

s.children[i] = childOrLink{child: sd}
}

func (s *childer) setLink(lnk *ipld.Link, i int) {
s.children[i] = nil
s.links[i] = lnk
s.mu.Lock()
defer s.mu.Unlock()

s.children[i] = childOrLink{link: lnk}
}

func (s *childer) rm(childIndex int) error {
Expand All @@ -821,11 +850,12 @@ func (s *childer) rm(childIndex int) error {
return err
}

s.mu.Lock()

copy(s.children[i:], s.children[i+1:])
s.children = s.children[:len(s.children)-1]

copy(s.links[i:], s.links[i+1:])
s.links = s.links[:len(s.links)-1]
s.mu.Unlock()

s.bitfield.UnsetBit(childIndex)

Expand All @@ -840,18 +870,17 @@ func (s *childer) get(ctx context.Context, sliceIndex int) (*Shard, error) {
return nil, err
}

c := s.child(sliceIndex)
if c != nil {
return c, nil
c := s.childOrLink(sliceIndex)
if c.child != nil {
return c.child, nil
}

return s.loadChild(ctx, sliceIndex)
return s.loadChild(ctx, c.link, sliceIndex)
}

// loadChild reads the i'th child node of this shard from disk and returns it
// as a 'child' interface
func (s *childer) loadChild(ctx context.Context, sliceIndex int) (*Shard, error) {
lnk := s.link(sliceIndex)
func (s *childer) loadChild(ctx context.Context, lnk *ipld.Link, sliceIndex int) (*Shard, error) {
lnkLinkType, err := s.sd.childLinkType(lnk)
if err != nil {
return nil, err
Expand Down Expand Up @@ -910,9 +939,5 @@ func (s *childer) check(sliceIndex int) error {
return fmt.Errorf("invalid index passed to operate children (likely corrupt bitfield)")
}

if len(s.children) != len(s.links) {
return fmt.Errorf("inconsistent lengths between children array and Links array")
}

return nil
}
30 changes: 30 additions & 0 deletions hamt/hamt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"os"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -710,3 +711,32 @@ func TestHamtBadSize(t *testing.T) {
t.Fatal("should have failed to construct hamt with bad size")
}
}

func TestChilderModifyChildrenAndLinks(t *testing.T) {
ds := mdtest.Mock()
sh, _ := NewShard(ds, 256)

ch := newChilder(ds, 1024)
ch.makeChilder(nil, []*ipld.Link{{}})

var wg sync.WaitGroup
for i := 0; i < 100000; i++ {
wg.Add(2)
go func() {
ch.set(sh, 0)
wg.Done()
}()
go func() {
ch.setLink(&ipld.Link{}, 0)
wg.Done()
}()

chl := ch.childOrLink(0)
if chl.child == nil && chl.link == nil {
t.Fatal("both slices are nil", i)
}

}

wg.Wait()
}