Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Remove tiemout and refactor sizeBelowThreshold #102

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ func (ds *Shard) isValueNode() bool {

// A Shard represents the HAMT. It should be initialized with NewShard().
type Shard struct {
cid cid.Cid

childer *childer

tableSize int
Expand Down Expand Up @@ -123,7 +121,6 @@ func NewHamtFromDag(dserv ipld.DAGService, nd ipld.Node) (*Shard, error) {

ds.childer.makeChilder(fsn.Data(), pbnd.Links())

ds.cid = pbnd.Cid()
ds.hashFunc = fsn.HashType()
ds.builder = pbnd.CidBuilder()

Expand Down Expand Up @@ -355,7 +352,12 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
defer cancel()
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
cset := cid.NewSet()
err := dag.Walk(ctx, getLinks, ds.cid, cset.Visit, dag.Concurrent())
rootNode, err := ds.Node()
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
return
}
err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit)
if err != nil {
schomatis marked this conversation as resolved.
Show resolved Hide resolved
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
}
Expand Down
146 changes: 56 additions & 90 deletions io/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package io
import (
"context"
"fmt"
"os"
"time"

mdag "github.com/ipfs/go-merkledag"

format "github.com/ipfs/go-unixfs"
"github.com/ipfs/go-unixfs/hamt"
"os"

"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
Expand All @@ -26,11 +23,6 @@ var log = logging.Logger("unixfs")
// ProtoNode doesn't use the Data field so this estimate is pretty accurate).
var HAMTShardingSize = 0

// Time in seconds allowed to fetch the shards to compute the size before
// returning an error.
// FIXME: Adjust to sane value.
var EvaluateHAMTTransitionTimeout = time.Duration(1)

// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256

Expand Down Expand Up @@ -438,15 +430,42 @@ func (d *HAMTDirectory) removeFromSizeChange(name string, linkCid cid.Cid) {
d.sizeChange -= estimatedLinkSize(name, linkCid)
}

// Evaluate directory size and check if it's below HAMTShardingSize threshold
// (to trigger a transition to a BasicDirectory). It returns two `bool`s:
// * whether it's below (true) or equal/above (false)
// * whether the passed timeout to compute the size has been exceeded
// FIXME: Will be extended later to the `AddEntry` case.
func (d *HAMTDirectory) needsToSwitchToBasicDir(ctx context.Context, nameToRemove string) (switchToBasic bool, err error) {
if HAMTShardingSize == 0 { // Option disabled.
return false, nil
}

entryToRemove, err := d.shard.Find(ctx, nameToRemove)
if err == os.ErrNotExist {
// Nothing to remove, no point in evaluating a switch.
return false, nil
} else if err != nil {
return false, err
}
sizeToRemove := estimatedLinkSize(nameToRemove, entryToRemove.Cid)

if d.sizeChange-sizeToRemove >= 0 {
// We won't have reduced the HAMT net size.
return false, nil
}

// We have reduced the directory size, check if went below the
// HAMTShardingSize threshold to trigger a switch.
belowThreshold, err := d.sizeBelowThreshold(ctx, -sizeToRemove)
if err != nil {
return false, err
}
return belowThreshold, nil
}
schomatis marked this conversation as resolved.
Show resolved Hide resolved

// Evaluate directory size and a future sizeChange and check if it will be below
// HAMTShardingSize threshold (to trigger a transition to a BasicDirectory).
// Instead of enumerating the entire tree we eagerly call EnumLinksAsync
// until we either reach a value above the threshold (in that case no need)
// to keep counting or the timeout runs out in which case the `below` return
// value is not to be trusted as we didn't have time to count enough shards.
func (d *HAMTDirectory) sizeBelowThreshold(timeout time.Duration) (below bool, timeoutExceeded bool) {
// until we either reach a value above the threshold (in that case no need
// to keep counting) or an error occurs (like the context being canceled
// if we take too much time fetching the necessary shards).
func (d *HAMTDirectory) sizeBelowThreshold(ctx context.Context, sizeChange int) (below bool, err error) {
if HAMTShardingSize == 0 {
panic("asked to compute HAMT size with HAMTShardingSize option off (0)")
}
Expand All @@ -455,57 +474,25 @@ func (d *HAMTDirectory) sizeBelowThreshold(timeout time.Duration) (below bool, t
// end early if we already know we're above the threshold or run out of time.
partialSize := 0

ctx, cancel := context.WithTimeout(context.Background(), time.Second*timeout)
// We stop the enumeration once we have enough information and exit this function.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for linkResult := range d.EnumLinksAsync(ctx) {
if linkResult.Err != nil {
continue
// The timeout exceeded errors will be coming through here but I'm
// not sure if we can just compare against a generic DeadlineExceeded
// error here to return early and avoid iterating the entire loop.
// (We might confuse a specific DeadlineExceeded of an internal function
// with our context here.)
// Since *our* DeadlineExceeded will quickly propagate to any other
// pending fetches it seems that iterating the entire loop won't add
// much more cost anyway.
// FIXME: Check the above reasoning.
}
if linkResult.Link == nil {
panic("empty link result (both values nil)")
// FIXME: Is this *ever* possible?
return false, linkResult.Err
}
partialSize += estimatedLinkSize(linkResult.Link.Name, linkResult.Link.Cid)

if partialSize >= HAMTShardingSize {
partialSize += estimatedLinkSize(linkResult.Link.Name, linkResult.Link.Cid)
if partialSize+sizeChange >= HAMTShardingSize {
// We have already fetched enough shards to assert we are
// above the threshold, so no need to keep fetching.
return false, false
return false, nil
}
}
// At this point either we enumerated all shards or run out of time.
// Figure out which.

if ctx.Err() == context.Canceled {
panic("the context was canceled but we're still evaluating a possible switch")
}
if partialSize >= HAMTShardingSize {
panic("we reach the threshold but we're still evaluating a possible switch")
}

if ctx.Err() == context.DeadlineExceeded {
return false, true
}

// If we reach this then:
// * We are below the threshold (we didn't return inside the EnumLinksAsync
// loop).
// * The context wasn't cancelled so we iterated *all* shards
// and are sure that we have the full size.
// FIXME: Can we actually verify the last claim here to be sure?
// (Iterating all the shards in the HAMT as a plumbing function maybe.
// If they're in memory it shouldn't be that expensive, we won't be
// switching that often, probably.)
return true, false
// We enumerated *all* links in all shards and didn't reach the threshold.
return true, nil
}

// UpgradeableDirectory wraps a Directory interface and provides extra logic
Expand Down Expand Up @@ -573,42 +560,21 @@ func (d *UpgradeableDirectory) getDagService() ipld.DAGService {
// sure we make good on the value). Finding the right margin can be tricky
// and very dependent on the use case so it might not be worth it.
func (d *UpgradeableDirectory) RemoveChild(ctx context.Context, name string) error {
if err := d.Directory.RemoveChild(ctx, name); err != nil {
return err
}

hamtDir, ok := d.Directory.(*HAMTDirectory)
if !ok { // BasicDirectory
return nil
}

if HAMTShardingSize == 0 || // Option disabled.
hamtDir.sizeChange >= 0 { // We haven't reduced the HAMT net size.
return nil
}

// We have reduced the directory size, check if it didn't go under
// the HAMTShardingSize threshold.
belowThreshold, timeoutExceeded := hamtDir.sizeBelowThreshold(EvaluateHAMTTransitionTimeout)

if timeoutExceeded {
// We run out of time before confirming if we're indeed below the
// threshold. When in doubt error to not return inconsistent structures.
// FIXME: We could allow this to return without error and enforce this
// timeout on a GetNode() call when we need to actually commit to a
// structure/CID. (The downside is that GetNode() doesn't have a
// context argument and we would have to break the API.)
return fmt.Errorf("not enought time to fetch shards")
// FIXME: Abstract in new error for testing.
}

if belowThreshold { // Switch.
basicDir, err := hamtDir.switchToBasic(ctx)
if ok {
switchToBasic, err := hamtDir.needsToSwitchToBasicDir(ctx, name)
if err != nil {
return err
}
d.Directory = basicDir

if switchToBasic {
basicDir, err := hamtDir.switchToBasic(ctx)
if err != nil {
return err
}
d.Directory = basicDir
}
}

return nil
return d.Directory.RemoveChild(ctx, name)
schomatis marked this conversation as resolved.
Show resolved Hide resolved
}