diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 2115ed08f96..f12976b44f8 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -137,10 +137,6 @@ "ImportPath": "github.com/jbenet/go-base58", "Rev": "6237cf65f3a6f7111cd8a42be3590df99a66bc7d" }, - { - "ImportPath": "github.com/jbenet/go-ctxgroup", - "Rev": "c14598396fa31465dc558b176c7976606f95a49d" - }, { "ImportPath": "github.com/jbenet/go-datastore", "Rev": "245a981af3750d7710db13dca731ba8461aa1095" @@ -201,7 +197,7 @@ }, { "ImportPath": "github.com/jbenet/goprocess", - "Rev": "67fe91f1081e806f1bb51051c972ac782ea46d85" + "Rev": "a6650d0b69f2aa0fe7c9685baf0b4d7ecc8766bf" }, { "ImportPath": "github.com/kardianos/osext", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Godeps.json deleted file mode 100644 index acfaccebdfe..00000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Godeps.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "ImportPath": "github.com/jbenet/go-ctxgroup", - "GoVersion": "go1.4.2", - "Packages": [ - "./..." - ], - "Deps": [ - { - "ImportPath": "golang.org/x/net/context", - "Rev": "b6fdb7d8a4ccefede406f8fe0f017fb58265054c" - } - ] -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Readme b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Readme deleted file mode 100644 index 4cdaa53d56d..00000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Readme +++ /dev/null @@ -1,5 +0,0 @@ -This directory tree is generated automatically by godep. - -Please do not edit. - -See https://github.com/tools/godep for more information. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Makefile b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Makefile deleted file mode 100644 index 6b988cf64bd..00000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Makefile +++ /dev/null @@ -1,16 +0,0 @@ -all: - # no-op - -GODEP=$(which godep) - -godep: ${GODEP} - -${GODEP}: - echo ${GODEP} - go get github.com/tools/godep - -# saves/vendors third-party dependencies to Godeps/_workspace -# -r flag rewrites import paths to use the vendored path -# ./... performs operation on all packages in tree -vendor: godep - godep save -r ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/README.md b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/README.md deleted file mode 100644 index ee5f3af55f9..00000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/README.md +++ /dev/null @@ -1,35 +0,0 @@ -# ContextGroup - - -- Godoc: https://godoc.org/github.com/jbenet/go-ctxgroup - -ContextGroup is an interface for services able to be opened and closed. -It has a parent Context, and Children. But ContextGroup is not a proper -"tree" like the Context tree. It is more like a Context-WaitGroup hybrid. -It models a main object with a few children objects -- and, unlike the -context -- concerns itself with the parent-child closing semantics: - -- Can define an optional TeardownFunc (func() error) to be run at Closetime. -- Children call Children().Add(1) to be waited upon -- Children can select on <-Closing() to know when they should shut down. -- Close() will wait until all children call Children().Done() -- <-Closed() signals when the service is completely closed. - -ContextGroup can be embedded into the main object itself. In that case, -the teardownFunc (if a member function) has to be set after the struct -is intialized: - -```Go -type service struct { - ContextGroup - net.Conn -} -func (s *service) close() error { - return s.Conn.Close() -} -func newService(ctx context.Context, c net.Conn) *service { - s := &service{c} - s.ContextGroup = NewContextGroup(ctx, s.close) - return s -} -``` diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup.go b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup.go deleted file mode 100644 index 7482e720ad8..00000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup.go +++ /dev/null @@ -1,257 +0,0 @@ -// package ctxgroup provides the ContextGroup, a hybrid between the -// context.Context and sync.WaitGroup, which models process trees. -package ctxgroup - -import ( - "io" - "sync" - - context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" -) - -// TeardownFunc is a function used to cleanup state at the end of the -// lifecycle of a process. -type TeardownFunc func() error - -// ChildFunc is a function to register as a child. It will be automatically -// tracked. -type ChildFunc func(parent ContextGroup) - -var nilTeardownFunc = func() error { return nil } - -// ContextGroup is an interface for services able to be opened and closed. -// It has a parent Context, and Children. But ContextGroup is not a proper -// "tree" like the Context tree. It is more like a Context-WaitGroup hybrid. -// It models a main object with a few children objects -- and, unlike the -// context -- concerns itself with the parent-child closing semantics: -// -// - Can define an optional TeardownFunc (func() error) to be run at Close time. -// - Children call Children().Add(1) to be waited upon -// - Children can select on <-Closing() to know when they should shut down. -// - Close() will wait until all children call Children().Done() -// - <-Closed() signals when the service is completely closed. -// -// ContextGroup can be embedded into the main object itself. In that case, -// the teardownFunc (if a member function) has to be set after the struct -// is intialized: -// -// type service struct { -// ContextGroup -// net.Conn -// } -// -// func (s *service) close() error { -// return s.Conn.Close() -// } -// -// func newService(ctx context.Context, c net.Conn) *service { -// s := &service{c} -// s.ContextGroup = NewContextGroup(ctx, s.close) -// return s -// } -// -type ContextGroup interface { - - // Context is the context of this ContextGroup. It is "sort of" a parent. - Context() context.Context - - // SetTeardown assigns the teardown function. - // It is called exactly _once_ when the ContextGroup is Closed. - SetTeardown(tf TeardownFunc) - - // Children is a sync.Waitgroup for all children goroutines that should - // shut down completely before this service is said to be "closed". - // Follows the semantics of WaitGroup: - // - // Children().Add(1) // add one more dependent child - // Children().Done() // child signals it is done - // - // WARNING: this is deprecated and will go away soon. - Children() *sync.WaitGroup - - // AddChild gives ownership of a child io.Closer. The child will be closed - // when the context group is closed. - AddChild(io.Closer) - - // AddChildFunc registers a dependent ChildFund. The child will receive - // its parent ContextGroup, and can wait on its signals. Child references - // tracked automatically. It equivalent to the following: - // - // go func(parent, child ContextGroup) { - // - // <-parent.Closing() // wait until parent is closing - // child.Close() // signal child to close - // parent.Children().Done() // child signals it is done - // }(a, b) - // - AddChildFunc(c ChildFunc) - - // Close is a method to call when you wish to stop this ContextGroup - Close() error - - // Closing is a signal to wait upon, like Context.Done(). - // It fires when the object should be closing (but hasn't yet fully closed). - // The primary use case is for child goroutines who need to know when - // they should shut down. (equivalent to Context().Done()) - Closing() <-chan struct{} - - // Closed is a method to wait upon, like Context.Done(). - // It fires when the entire object is fully closed. - // The primary use case is for external listeners who need to know when - // this object is completly done, and all its children closed. - Closed() <-chan struct{} -} - -// contextGroup is a Closer with a cancellable context -type contextGroup struct { - ctx context.Context - cancel context.CancelFunc - - // called to run the teardown logic. - teardownFunc TeardownFunc - - // closed is released once the close function is done. - closed chan struct{} - - // wait group for child goroutines - children sync.WaitGroup - - // sync primitive to ensure the close logic is only called once. - closeOnce sync.Once - - // error to return to clients of Close(). - closeErr error -} - -// newContextGroup constructs and returns a ContextGroup. It will call -// cf TeardownFunc before its Done() Wait signals fire. -func newContextGroup(ctx context.Context, cf TeardownFunc) ContextGroup { - ctx, cancel := context.WithCancel(ctx) - c := &contextGroup{ - ctx: ctx, - cancel: cancel, - closed: make(chan struct{}), - } - c.SetTeardown(cf) - - c.Children().Add(1) // initialize with 1. calling Close will decrement it. - go c.closeOnContextDone() - return c -} - -// SetTeardown assigns the teardown function. -func (c *contextGroup) SetTeardown(cf TeardownFunc) { - if cf == nil { - cf = nilTeardownFunc - } - c.teardownFunc = cf -} - -func (c *contextGroup) Context() context.Context { - return c.ctx -} - -func (c *contextGroup) Children() *sync.WaitGroup { - return &c.children -} - -func (c *contextGroup) AddChild(child io.Closer) { - c.children.Add(1) - go func(parent ContextGroup, child io.Closer) { - <-parent.Closing() // wait until parent is closing - child.Close() // signal child to close - parent.Children().Done() // child signals it is done - }(c, child) -} - -func (c *contextGroup) AddChildFunc(child ChildFunc) { - c.children.Add(1) - go func(parent ContextGroup, child ChildFunc) { - child(parent) - parent.Children().Done() // child signals it is done - }(c, child) -} - -// Close is the external close function. it's a wrapper around internalClose -// that waits on Closed() -func (c *contextGroup) Close() error { - c.internalClose() - <-c.Closed() // wait until we're totally done. - return c.closeErr -} - -func (c *contextGroup) Closing() <-chan struct{} { - return c.Context().Done() -} - -func (c *contextGroup) Closed() <-chan struct{} { - return c.closed -} - -func (c *contextGroup) internalClose() { - go c.closeOnce.Do(c.closeLogic) -} - -// the _actual_ close process. -func (c *contextGroup) closeLogic() { - // this function should only be called once (hence the sync.Once). - // and it will panic at the bottom (on close(c.closed)) otherwise. - - c.cancel() // signal that we're shutting down (Closing) - c.closeErr = c.teardownFunc() // actually run the close logic - c.children.Wait() // wait till all children are done. - close(c.closed) // signal that we're shut down (Closed) -} - -// if parent context is shut down before we call Close explicitly, -// we need to go through the Close motions anyway. Hence all the sync -// stuff all over the place... -func (c *contextGroup) closeOnContextDone() { - <-c.Context().Done() // wait until parent (context) is done. - c.internalClose() - c.Children().Done() -} - -// WithTeardown constructs and returns a ContextGroup with -// cf TeardownFunc (and context.Background) -func WithTeardown(cf TeardownFunc) ContextGroup { - if cf == nil { - panic("nil TeardownFunc") - } - return newContextGroup(context.Background(), cf) -} - -// WithContext constructs and returns a ContextGroup with given context -func WithContext(ctx context.Context) ContextGroup { - if ctx == nil { - panic("nil Context") - } - return newContextGroup(ctx, nil) -} - -// WithContextAndTeardown constructs and returns a ContextGroup with -// cf TeardownFunc (and context.Background) -func WithContextAndTeardown(ctx context.Context, cf TeardownFunc) ContextGroup { - if ctx == nil { - panic("nil Context") - } - if cf == nil { - panic("nil TeardownFunc") - } - return newContextGroup(ctx, cf) -} - -// WithParent constructs and returns a ContextGroup with given parent -func WithParent(p ContextGroup) ContextGroup { - if p == nil { - panic("nil ContextGroup") - } - c := newContextGroup(p.Context(), nil) - p.AddChild(c) - return c -} - -// WithBackground returns a ContextGroup with context.Background() -func WithBackground() ContextGroup { - return newContextGroup(context.Background(), nil) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup_test.go b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup_test.go deleted file mode 100644 index 11b55b8870f..00000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup_test.go +++ /dev/null @@ -1,187 +0,0 @@ -package ctxgroup - -import ( - "testing" - "time" - - context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" -) - -type tree struct { - ContextGroup - c []tree -} - -func setupCGHierarchy(ctx context.Context) tree { - t := func(n ContextGroup, ts ...tree) tree { - return tree{n, ts} - } - - if ctx == nil { - ctx = context.Background() - } - a := WithContext(ctx) - b1 := WithParent(a) - b2 := WithParent(a) - c1 := WithParent(b1) - c2 := WithParent(b1) - c3 := WithParent(b2) - c4 := WithParent(b2) - - return t(a, t(b1, t(c1), t(c2)), t(b2, t(c3), t(c4))) -} - -func TestClosingClosed(t *testing.T) { - - a := WithBackground() - Q := make(chan string) - - go func() { - <-a.Closing() - Q <- "closing" - }() - - go func() { - <-a.Closed() - Q <- "closed" - }() - - go func() { - a.Close() - Q <- "closed" - }() - - if q := <-Q; q != "closing" { - t.Error("order incorrect. closing not first") - } - if q := <-Q; q != "closed" { - t.Error("order incorrect. closing not first") - } - if q := <-Q; q != "closed" { - t.Error("order incorrect. closing not first") - } -} - -func TestChildFunc(t *testing.T) { - a := WithBackground() - - wait1 := make(chan struct{}) - wait2 := make(chan struct{}) - wait3 := make(chan struct{}) - wait4 := make(chan struct{}) - go func() { - a.Close() - wait4 <- struct{}{} - }() - - a.AddChildFunc(func(parent ContextGroup) { - wait1 <- struct{}{} - <-wait2 - wait3 <- struct{}{} - }) - - <-wait1 - select { - case <-wait3: - t.Error("should not be closed yet") - case <-wait4: - t.Error("should not be closed yet") - case <-a.Closed(): - t.Error("should not be closed yet") - default: - } - - wait2 <- struct{}{} - - select { - case <-wait3: - case <-time.After(time.Second): - t.Error("should be closed now") - } - - select { - case <-wait4: - case <-time.After(time.Second): - t.Error("should be closed now") - } -} - -func TestTeardownCalledOnce(t *testing.T) { - a := setupCGHierarchy(nil) - - onlyOnce := func() func() error { - count := 0 - return func() error { - count++ - if count > 1 { - t.Error("called", count, "times") - } - return nil - } - } - - a.SetTeardown(onlyOnce()) - a.c[0].SetTeardown(onlyOnce()) - a.c[0].c[0].SetTeardown(onlyOnce()) - a.c[0].c[1].SetTeardown(onlyOnce()) - a.c[1].SetTeardown(onlyOnce()) - a.c[1].c[0].SetTeardown(onlyOnce()) - a.c[1].c[1].SetTeardown(onlyOnce()) - - a.c[0].c[0].Close() - a.c[0].c[0].Close() - a.c[0].c[0].Close() - a.c[0].c[0].Close() - a.c[0].Close() - a.c[0].Close() - a.c[0].Close() - a.c[0].Close() - a.Close() - a.Close() - a.Close() - a.Close() - a.c[1].Close() - a.c[1].Close() - a.c[1].Close() - a.c[1].Close() -} - -func TestOnClosed(t *testing.T) { - - ctx, cancel := context.WithCancel(context.Background()) - a := setupCGHierarchy(ctx) - Q := make(chan string, 10) - - onClosed := func(s string, c ContextGroup) { - <-c.Closed() - Q <- s - } - - go onClosed("0", a.c[0]) - go onClosed("10", a.c[1].c[0]) - go onClosed("", a) - go onClosed("00", a.c[0].c[0]) - go onClosed("1", a.c[1]) - go onClosed("01", a.c[0].c[1]) - go onClosed("11", a.c[1].c[1]) - - test := func(ss ...string) { - s1 := <-Q - for _, s2 := range ss { - if s1 == s2 { - return - } - } - t.Error("context not in group", s1, ss) - } - - cancel() - - test("00", "01", "10", "11") - test("00", "01", "10", "11") - test("00", "01", "10", "11") - test("00", "01", "10", "11") - test("0", "1") - test("0", "1") - test("") -} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/context/context.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/context/context.go index 2cdf2e52279..cb4aa3ef8f8 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/context/context.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/context/context.go @@ -10,19 +10,23 @@ import ( // // func ProcessWithContext(ctx context.Context) goprocess.Process { // p := goprocess.WithParent(goprocess.Background()) -// go func() { -// <-ctx.Done() -// p.Close() -// }() +// CloseAfterContext(p, ctx) // return p // } // func WithContext(ctx context.Context) goprocess.Process { + p := goprocess.WithParent(goprocess.Background()) + CloseAfterContext(p, ctx) + return p +} + +// WithContextAndTeardown is a helper function to set teardown at initiation +// of WithContext +func WithContextAndTeardown(ctx context.Context, tf goprocess.TeardownFunc) goprocess.Process { if ctx == nil { panic("nil Context") } - - p := goprocess.WithParent(goprocess.Background()) + p := goprocess.WithTeardown(tf) go func() { <-ctx.Done() p.Close() @@ -39,6 +43,30 @@ func WaitForContext(ctx context.Context, p goprocess.Process) { p.WaitFor(WithContext(ctx)) } +// CloseAfterContext schedules the process to close after the given +// context is done. It is the equivalent of: +// +// func CloseAfterContext(p goprocess.Process, ctx context.Context) { +// go func() { +// <-ctx.Done() +// p.Close() +// }() +// } +// +func CloseAfterContext(p goprocess.Process, ctx context.Context) { + if p == nil { + panic("nil Process") + } + if ctx == nil { + panic("nil Context") + } + + go func() { + <-ctx.Done() + p.Close() + }() +} + // WithProcessClosing returns a context.Context derived from ctx that // is cancelled as p is Closing (after: <-p.Closing()). It is simply: // diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go index 17cb37799a1..f51ad512d7b 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go @@ -114,7 +114,7 @@ type Process interface { // // It is useful to construct simple asynchronous workers, children of p. Go(f ProcessFunc) Process - + // SetTeardown sets the process's teardown to tf. SetTeardown(tf TeardownFunc) diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go index de246a8fa90..e2a7801833c 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go @@ -119,8 +119,16 @@ func (p *process) SetTeardown(tf TeardownFunc) { if tf == nil { tf = nilTeardownFunc } + p.Lock() - p.teardown = tf + if p.teardown == nil { + select { + case <-p.Closed(): + p.teardown = tf + p.closeErr = tf() + default: + } + } p.Unlock() } diff --git a/core/core.go b/core/core.go index 424372c0e01..6ef28150dc0 100644 --- a/core/core.go +++ b/core/core.go @@ -17,9 +17,10 @@ import ( "time" b58 "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" mamask "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/multiaddr-filter" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" diag "github.com/ipfs/go-ipfs/diagnostics" @@ -105,7 +106,8 @@ type IpfsNode struct { IpnsFs *ipnsfs.Filesystem - ctxgroup.ContextGroup + proc goprocess.Process + ctx context.Context mode mode } @@ -120,23 +122,22 @@ type Mounts struct { type ConfigOption func(ctx context.Context) (*IpfsNode, error) -func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) { - ctxg := ctxgroup.WithContext(parent) - ctx := ctxg.Context() +func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) { + node, err := option(ctx) + if err != nil { + return nil, err + } + + node.proc = goprocessctx.WithContextAndTeardown(ctx, node.teardown) + node.ctx = ctx + success := false // flip to true after all sub-system inits succeed defer func() { if !success { - ctxg.Close() + node.proc.Close() } }() - node, err := option(ctx) - if err != nil { - return nil, err - } - node.ContextGroup = ctxg - ctxg.SetTeardown(node.teardown) - // Need to make sure it's perfectly clear 1) which variables are expected // to be initialized at this point, and 2) which variables will be // initialized after this point. @@ -345,6 +346,21 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost return nil } +// Process returns the Process object +func (n *IpfsNode) Process() goprocess.Process { + return n.proc +} + +// Close calls Close() on the Process object +func (n *IpfsNode) Close() error { + return n.proc.Close() +} + +// Context returns the IpfsNode context +func (n *IpfsNode) Context() context.Context { + return n.ctx +} + // teardown closes owned children. If any errors occur, this function returns // the first error. func (n *IpfsNode) teardown() error { @@ -371,7 +387,7 @@ func (n *IpfsNode) teardown() error { } if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - closers = append(closers, dht) + closers = append(closers, dht.Process()) } if n.PeerHost != nil { diff --git a/core/corehttp/corehttp.go b/core/corehttp/corehttp.go index 95a159fa2e0..042f056ad24 100644 --- a/core/corehttp/corehttp.go +++ b/core/corehttp/corehttp.go @@ -12,6 +12,7 @@ import ( ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" core "github.com/ipfs/go-ipfs/core" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" ) @@ -78,20 +79,17 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error var serverError error serverExited := make(chan struct{}) - node.Children().Add(1) - defer node.Children().Done() - - go func() { + node.Process().Go(func(p goprocess.Process) { serverError = http.Serve(lis, handler) close(serverExited) - }() + }) // wait for server to exit. select { case <-serverExited: // if node being closed before server exits, close server - case <-node.Closing(): + case <-node.Process().Closing(): log.Infof("server at %s terminating...", addr) lis.Close() diff --git a/core/coreunix/cat.go b/core/coreunix/cat.go index cc495582f27..5bbd0f56ed6 100644 --- a/core/coreunix/cat.go +++ b/core/coreunix/cat.go @@ -10,9 +10,9 @@ import ( func Cat(n *core.IpfsNode, pstr string) (io.Reader, error) { p := path.FromString(pstr) - dagNode, err := n.Resolver.ResolvePath(n.ContextGroup.Context(), p) + dagNode, err := n.Resolver.ResolvePath(n.Context(), p) if err != nil { return nil, err } - return uio.NewDagReader(n.ContextGroup.Context(), dagNode, n.DAG) + return uio.NewDagReader(n.Context(), dagNode, n.DAG) } diff --git a/core/mock/mock.go b/core/mock/mock.go index d3be2c1da45..14f90f56c8e 100644 --- a/core/mock/mock.go +++ b/core/mock/mock.go @@ -1,7 +1,6 @@ package coremock import ( - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" @@ -16,6 +15,7 @@ import ( path "github.com/ipfs/go-ipfs/path" pin "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/repo" + config "github.com/ipfs/go-ipfs/repo/config" offrt "github.com/ipfs/go-ipfs/routing/offline" ds2 "github.com/ipfs/go-ipfs/util/datastore2" testutil "github.com/ipfs/go-ipfs/util/testutil" @@ -28,33 +28,38 @@ import ( // NewMockNode constructs an IpfsNode for use in tests. func NewMockNode() (*core.IpfsNode, error) { ctx := context.TODO() - nd := new(core.IpfsNode) // Generate Identity ident, err := testutil.RandIdentity() if err != nil { return nil, err } - p := ident.ID() - nd.Identity = p + + c := config.Config{ + Identity: config.Identity{ + PeerID: p.String(), + }, + } + + nd, err := core.Offline(&repo.Mock{ + C: c, + D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())), + })(ctx) + if err != nil { + return nil, err + } + nd.PrivateKey = ident.PrivateKey() nd.Peerstore = peer.NewPeerstore() nd.Peerstore.AddPrivKey(p, ident.PrivateKey()) nd.Peerstore.AddPubKey(p, ident.PublicKey()) - nd.ContextGroup = ctxgroup.WithContext(ctx) nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline if err != nil { return nil, err } - // Temp Datastore - nd.Repo = &repo.Mock{ - // TODO C: conf, - D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())), - } - // Routing nd.Routing = offrt.NewOfflineRouter(nd.Repo.Datastore(), nd.PrivateKey) diff --git a/fuse/ipns/mount_unix.go b/fuse/ipns/mount_unix.go index 343653d0a21..9b4133341ce 100644 --- a/fuse/ipns/mount_unix.go +++ b/fuse/ipns/mount_unix.go @@ -18,5 +18,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) { return nil, err } - return mount.NewMount(ipfs, fsys, ipnsmp, allow_other) + return mount.NewMount(ipfs.Process(), fsys, ipnsmp, allow_other) } diff --git a/fuse/mount/fuse.go b/fuse/mount/fuse.go index 6aed12957c3..c00dcebc879 100644 --- a/fuse/mount/fuse.go +++ b/fuse/mount/fuse.go @@ -8,7 +8,7 @@ import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs" - "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" ) // mount implements go-ipfs/fuse/mount @@ -18,12 +18,12 @@ type mount struct { fuseConn *fuse.Conn // closeErr error - cg ctxgroup.ContextGroup + proc goprocess.Process } // Mount mounts a fuse fs.FS at a given location, and returns a Mount instance. // parent is a ContextGroup to bind the mount's ContextGroup to. -func NewMount(p ctxgroup.ContextGroup, fsys fs.FS, mountpoint string, allow_other bool) (Mount, error) { +func NewMount(p goprocess.Process, fsys fs.FS, mountpoint string, allow_other bool) (Mount, error) { var conn *fuse.Conn var err error @@ -41,9 +41,9 @@ func NewMount(p ctxgroup.ContextGroup, fsys fs.FS, mountpoint string, allow_othe mpoint: mountpoint, fuseConn: conn, filesys: fsys, - cg: ctxgroup.WithParent(p), // link it to parent. + proc: goprocess.WithParent(p), // link it to parent. } - m.cg.SetTeardown(m.unmount) + m.proc.SetTeardown(m.unmount) // launch the mounting process. if err := m.mount(); err != nil { @@ -116,8 +116,8 @@ func (m *mount) unmount() error { return nil } -func (m *mount) CtxGroup() ctxgroup.ContextGroup { - return m.cg +func (m *mount) Process() goprocess.Process { + return m.proc } func (m *mount) MountPoint() string { @@ -125,6 +125,6 @@ func (m *mount) MountPoint() string { } func (m *mount) Unmount() error { - // call ContextCloser Close(), which calls unmount() exactly once. - return m.cg.Close() + // call Process Close(), which calls unmount() exactly once. + return m.proc.Close() } diff --git a/fuse/mount/mount.go b/fuse/mount/mount.go index 0e4e1dda82b..0adc05f5551 100644 --- a/fuse/mount/mount.go +++ b/fuse/mount/mount.go @@ -7,7 +7,7 @@ import ( "runtime" "time" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" + goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" u "github.com/ipfs/go-ipfs/util" ) @@ -24,9 +24,9 @@ type Mount interface { // Unmounts the mount Unmount() error - // CtxGroup returns the mount's CtxGroup to be able to link it + // Process returns the mount's Process to be able to link it // to other processes. Unmount upon closing. - CtxGroup() ctxgroup.ContextGroup + Process() goprocess.Process } // ForceUnmount attempts to forcibly unmount a given mount. diff --git a/fuse/readonly/mount_unix.go b/fuse/readonly/mount_unix.go index 60d14ffe8e2..dc5a56e4eb3 100644 --- a/fuse/readonly/mount_unix.go +++ b/fuse/readonly/mount_unix.go @@ -13,5 +13,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) { cfg := ipfs.Repo.Config() allow_other := cfg.Mounts.FuseAllowOther fsys := NewFileSystem(ipfs) - return mount.NewMount(ipfs, fsys, mountpoint, allow_other) + return mount.NewMount(ipfs.Process(), fsys, mountpoint, allow_other) } diff --git a/p2p/net/conn/listen.go b/p2p/net/conn/listen.go index 71b89d76703..1cd6aa2c32d 100644 --- a/p2p/net/conn/listen.go +++ b/p2p/net/conn/listen.go @@ -5,11 +5,12 @@ import ( "io" "net" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ic "github.com/ipfs/go-ipfs/p2p/crypto" @@ -31,7 +32,7 @@ type listener struct { wrapper ConnWrapper - cg ctxgroup.ContextGroup + proc goprocess.Process } func (l *listener) teardown() error { @@ -41,7 +42,7 @@ func (l *listener) teardown() error { func (l *listener) Close() error { log.Debugf("listener closing: %s %s", l.local, l.Multiaddr()) - return l.cg.Close() + return l.proc.Close() } func (l *listener) String() string { @@ -157,9 +158,8 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey Listener: ml, local: local, privk: sk, - cg: ctxgroup.WithContext(ctx), } - l.cg.SetTeardown(l.teardown) + l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown) log.Debugf("Conn Listener on %s", l.Multiaddr()) log.Event(ctx, "swarmListen", l) diff --git a/p2p/net/interface.go b/p2p/net/interface.go index 752293f91a0..14a94d76993 100644 --- a/p2p/net/interface.go +++ b/p2p/net/interface.go @@ -6,8 +6,8 @@ import ( conn "github.com/ipfs/go-ipfs/p2p/net/conn" peer "github.com/ipfs/go-ipfs/p2p/peer" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -80,8 +80,8 @@ type Network interface { // use the known local interfaces. InterfaceListenAddresses() ([]ma.Multiaddr, error) - // CtxGroup returns the network's contextGroup - CtxGroup() ctxgroup.ContextGroup + // Process returns the network's Process + Process() goprocess.Process } // Dialer represents a service that can dial out to peers diff --git a/p2p/net/mock/mock_net.go b/p2p/net/mock/mock_net.go index d3e6f675d12..f7cabf463e3 100644 --- a/p2p/net/mock/mock_net.go +++ b/p2p/net/mock/mock_net.go @@ -13,8 +13,9 @@ import ( p2putil "github.com/ipfs/go-ipfs/p2p/test/util" testutil "github.com/ipfs/go-ipfs/util/testutil" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -31,7 +32,8 @@ type mocknet struct { linkDefaults LinkOptions - cg ctxgroup.ContextGroup // for Context closing + proc goprocess.Process // for Context closing + ctx context.Context sync.RWMutex } @@ -40,7 +42,8 @@ func New(ctx context.Context) Mocknet { nets: map[peer.ID]*peernet{}, hosts: map[peer.ID]*bhost.BasicHost{}, links: map[peer.ID]map[peer.ID]map[*link]struct{}{}, - cg: ctxgroup.WithContext(ctx), + proc: goprocessctx.WithContext(ctx), + ctx: ctx, } } @@ -61,7 +64,7 @@ func (mn *mocknet) GenPeer() (host.Host, error) { } func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) { - n, err := newPeernet(mn.cg.Context(), mn, k, a) + n, err := newPeernet(mn.ctx, mn, k, a) if err != nil { return nil, err } @@ -69,7 +72,7 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) { h := bhost.New(n) log.Debugf("mocknet added listen addr for peer: %s -- %s", n.LocalPeer(), a) - mn.cg.AddChild(n.cg) + mn.proc.AddChild(n.proc) mn.Lock() mn.nets[n.peer] = n @@ -297,11 +300,11 @@ func (mn *mocknet) ConnectAll() error { } func (mn *mocknet) ConnectPeers(a, b peer.ID) (inet.Conn, error) { - return mn.Net(a).DialPeer(mn.cg.Context(), b) + return mn.Net(a).DialPeer(mn.ctx, b) } func (mn *mocknet) ConnectNets(a, b inet.Network) (inet.Conn, error) { - return a.DialPeer(mn.cg.Context(), b.LocalPeer()) + return a.DialPeer(mn.ctx, b.LocalPeer()) } func (mn *mocknet) DisconnectPeers(p1, p2 peer.ID) error { diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index b3b65dc8658..373953f876d 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -9,8 +9,9 @@ import ( inet "github.com/ipfs/go-ipfs/p2p/net" peer "github.com/ipfs/go-ipfs/p2p/peer" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -34,7 +35,7 @@ type peernet struct { notifmu sync.RWMutex notifs map[inet.Notifiee]struct{} - cg ctxgroup.ContextGroup + proc goprocess.Process sync.RWMutex } @@ -57,7 +58,6 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, mocknet: m, peer: p, ps: ps, - cg: ctxgroup.WithContext(ctx), connsByPeer: map[peer.ID]map[*conn]struct{}{}, connsByLink: map[*link]map[*conn]struct{}{}, @@ -65,7 +65,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, notifs: make(map[inet.Notifiee]struct{}), } - n.cg.SetTeardown(n.teardown) + n.proc = goprocessctx.WithContextAndTeardown(ctx, n.teardown) return n, nil } @@ -93,7 +93,7 @@ func (pn *peernet) allConns() []*conn { // Close calls the ContextCloser func func (pn *peernet) Close() error { - return pn.cg.Close() + return pn.proc.Close() } func (pn *peernet) Peerstore() peer.Peerstore { @@ -223,9 +223,9 @@ func (pn *peernet) removeConn(c *conn) { delete(cs, c) } -// CtxGroup returns the network's ContextGroup -func (pn *peernet) CtxGroup() ctxgroup.ContextGroup { - return pn.cg +// Process returns the network's Process +func (pn *peernet) Process() goprocess.Process { + return pn.proc } // LocalPeer the network's LocalPeer diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 5c070d4e2a2..bf3d0395f7f 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -14,11 +14,12 @@ import ( peer "github.com/ipfs/go-ipfs/p2p/peer" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" prom "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" mafilter "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/multiaddr-filter" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" @@ -63,8 +64,9 @@ type Swarm struct { // filters for addresses that shouldnt be dialed Filters *filter.Filters - cg ctxgroup.ContextGroup - bwc metrics.Reporter + proc goprocess.Process + ctx context.Context + bwc metrics.Reporter } // NewSwarm constructs a Swarm, with a Chan. @@ -80,7 +82,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, swarm: ps.NewSwarm(PSTransport), local: local, peers: peers, - cg: ctxgroup.WithContext(ctx), + ctx: ctx, dialT: DialTimeout, notifs: make(map[inet.Notifiee]ps.Notifiee), bwc: bwc, @@ -88,7 +90,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, } // configure Swarm - s.cg.SetTeardown(s.teardown) + s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown) s.SetConnHandler(nil) // make sure to setup our own conn handler. // setup swarm metrics @@ -111,8 +113,6 @@ func (s *Swarm) AddAddrFilter(f string) error { s.Filters.AddDialFilter(m) return nil } - -// CtxGroup returns the Context Group of the swarm func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) { if len(listenAddrs) > 0 { filtered := addrutil.FilterUsableAddrs(listenAddrs) @@ -124,7 +124,6 @@ func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) { return listenAddrs, nil } -// CtxGroup returns the Context Group of the swarm func (s *Swarm) Listen(addrs ...ma.Multiaddr) error { addrs, err := filterAddrs(addrs) if err != nil { @@ -134,14 +133,19 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error { return s.listen(addrs) } -// CtxGroup returns the Context Group of the swarm -func (s *Swarm) CtxGroup() ctxgroup.ContextGroup { - return s.cg +// Process returns the Process of the swarm +func (s *Swarm) Process() goprocess.Process { + return s.proc +} + +// Context returns the context of the swarm +func (s *Swarm) Context() context.Context { + return s.ctx } // Close stops the Swarm. func (s *Swarm) Close() error { - return s.cg.Close() + return s.proc.Close() } // StreamSwarm returns the underlying peerstream.Swarm diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index c969ca57018..a68d2880590 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -17,6 +17,7 @@ import ( ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + processctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -416,8 +417,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote } }) - <-ctx.Done() - limiter.Close() + processctx.CloseAfterContext(limiter, ctx) }() // wair fot the results. diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 4a8f4dd4d9e..d1bcb075212 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -64,7 +64,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") } log.Debugf("Swarm Listening at %s", maddr) - list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk) + list, err := conn.Listen(s.Context(), maddr, s.local, sk) if err != nil { return err } @@ -112,7 +112,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { return } } - }(s.cg.Context(), sl) + }(s.Context(), sl) return nil } diff --git a/p2p/net/swarm/swarm_net.go b/p2p/net/swarm/swarm_net.go index 7b6e72906c9..da9b52251e2 100644 --- a/p2p/net/swarm/swarm_net.go +++ b/p2p/net/swarm/swarm_net.go @@ -8,8 +8,8 @@ import ( metrics "github.com/ipfs/go-ipfs/metrics" inet "github.com/ipfs/go-ipfs/p2p/net" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -43,9 +43,9 @@ func (n *Network) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) { return inet.Conn(sc), nil } -// CtxGroup returns the network's ContextGroup -func (n *Network) CtxGroup() ctxgroup.ContextGroup { - return n.cg +// Process returns the network's Process +func (n *Network) Process() goprocess.Process { + return n.proc } // Swarm returns the network's peerstream.Swarm @@ -100,7 +100,7 @@ func (n *Network) close() error { // Close calls the ContextCloser func func (n *Network) Close() error { - return n.Swarm().cg.Close() + return n.Swarm().proc.Close() } // Listen tells the network to start listening on given multiaddrs. diff --git a/routing/dht/dht.go b/routing/dht/dht.go index b1b13985bf7..9dc74daeb81 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -23,8 +23,9 @@ import ( u "github.com/ipfs/go-ipfs/util" proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -57,7 +58,8 @@ type IpfsDHT struct { Validator record.Validator // record validator funcs - ctxgroup.ContextGroup + ctx context.Context + proc goprocess.Process } // NewDHT creates a new DHT object with the given peer as the 'local' host @@ -71,15 +73,18 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip // register for network notifs. dht.host.Network().Notify((*netNotifiee)(dht)) - dht.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, func() error { + dht.proc = goprocess.WithTeardown(func() error { // remove ourselves from network notifs. dht.host.Network().StopNotify((*netNotifiee)(dht)) return nil }) + dht.ctx = ctx + h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) - dht.providers = NewProviderManager(dht.Context(), dht.self) - dht.AddChild(dht.providers) + dht.providers = NewProviderManager(dht.ctx, dht.self) + dht.proc.AddChild(dht.providers.proc) + goprocessctx.CloseAfterContext(dht.proc, ctx) dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore) dht.birth = time.Now() @@ -88,8 +93,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip dht.Validator["pk"] = record.PublicKeyValidator if doPinging { - dht.Children().Add(1) - go dht.PingRoutine(time.Second * 10) + dht.proc.Go(func(p goprocess.Process) { + dht.PingRoutine(time.Second * 10) + }) } return dht } @@ -348,8 +354,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error // PingRoutine periodically pings nearest neighbors. func (dht *IpfsDHT) PingRoutine(t time.Duration) { - defer dht.Children().Done() - tick := time.Tick(t) for { select { @@ -365,8 +369,23 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) { } cancel() } - case <-dht.Closing(): + case <-dht.proc.Closing(): return } } } + +// Context return dht's context +func (dht *IpfsDHT) Context() context.Context { + return dht.ctx +} + +// Process return dht's process +func (dht *IpfsDHT) Process() goprocess.Process { + return dht.proc +} + +// Close calls Process Close +func (dht *IpfsDHT) Close() error { + return dht.proc.Close() +} diff --git a/routing/dht/notif.go b/routing/dht/notif.go index 70144481a32..cfe411c38ee 100644 --- a/routing/dht/notif.go +++ b/routing/dht/notif.go @@ -16,7 +16,7 @@ func (nn *netNotifiee) DHT() *IpfsDHT { func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { dht := nn.DHT() select { - case <-dht.Closing(): + case <-dht.Process().Closing(): return default: } @@ -26,7 +26,7 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { dht := nn.DHT() select { - case <-dht.Closing(): + case <-dht.Process().Closing(): return default: } diff --git a/routing/dht/providers.go b/routing/dht/providers.go index 74c79b8e973..17455b33682 100644 --- a/routing/dht/providers.go +++ b/routing/dht/providers.go @@ -3,7 +3,8 @@ package dht import ( "time" - ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" key "github.com/ipfs/go-ipfs/blocks/key" peer "github.com/ipfs/go-ipfs/p2p/peer" @@ -21,7 +22,7 @@ type ProviderManager struct { newprovs chan *addProv getprovs chan *getProv period time.Duration - ctxgroup.ContextGroup + proc goprocess.Process } type providerSet struct { @@ -46,17 +47,13 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager { pm.providers = make(map[key.Key]*providerSet) pm.getlocal = make(chan chan []key.Key) pm.local = make(map[key.Key]struct{}) - pm.ContextGroup = ctxgroup.WithContext(ctx) - - pm.Children().Add(1) - go pm.run() + pm.proc = goprocessctx.WithContext(ctx) + pm.proc.Go(func(p goprocess.Process) { pm.run() }) return pm } func (pm *ProviderManager) run() { - defer pm.Children().Done() - tick := time.NewTicker(time.Hour) for { select { @@ -100,7 +97,7 @@ func (pm *ProviderManager) run() { provs.providers = filtered } - case <-pm.Closing(): + case <-pm.proc.Closing(): return } } diff --git a/routing/dht/providers_test.go b/routing/dht/providers_test.go index ecf937962f6..7e2e47d9334 100644 --- a/routing/dht/providers_test.go +++ b/routing/dht/providers_test.go @@ -19,5 +19,5 @@ func TestProviderManager(t *testing.T) { if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } - p.Close() + p.proc.Close() } diff --git a/routing/dht/query.go b/routing/dht/query.go index c69437f4997..a6c8a14b3e4 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -127,10 +127,7 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes // now, if the context finishes, close the proc. // we have to do it here because the logic before is setup, which // should run without closing the proc. - go func() { - <-ctx.Done() - r.proc.Close() - }() + ctxproc.CloseAfterContext(r.proc, ctx) select { case <-r.peersRemaining.Done(): diff --git a/util/ctxcloser/closer.go b/util/ctxcloser/closer.go deleted file mode 100644 index 09d33dcac49..00000000000 --- a/util/ctxcloser/closer.go +++ /dev/null @@ -1,183 +0,0 @@ -package ctxcloser - -import ( - "sync" - - context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" -) - -// CloseFunc is a function used to close a ContextCloser -type CloseFunc func() error - -var nilCloseFunc = func() error { return nil } - -// ContextCloser is an interface for services able to be opened and closed. -// It has a parent Context, and Children. But ContextCloser is not a proper -// "tree" like the Context tree. It is more like a Context-WaitGroup hybrid. -// It models a main object with a few children objects -- and, unlike the -// context -- concerns itself with the parent-child closing semantics: -// -// - Can define a CloseFunc (func() error) to be run at Close time. -// - Children call Children().Add(1) to be waited upon -// - Children can select on <-Closing() to know when they should shut down. -// - Close() will wait until all children call Children().Done() -// - <-Closed() signals when the service is completely closed. -// -// ContextCloser can be embedded into the main object itself. In that case, -// the closeFunc (if a member function) has to be set after the struct -// is intialized: -// -// type service struct { -// ContextCloser -// net.Conn -// } -// -// func (s *service) close() error { -// return s.Conn.Close() -// } -// -// func newService(ctx context.Context, c net.Conn) *service { -// s := &service{c} -// s.ContextCloser = NewContextCloser(ctx, s.close) -// return s -// } -// -type ContextCloser interface { - - // Context is the context of this ContextCloser. It is "sort of" a parent. - Context() context.Context - - // Children is a sync.Waitgroup for all children goroutines that should - // shut down completely before this service is said to be "closed". - // Follows the semantics of WaitGroup: - // - // Children().Add(1) // add one more dependent child - // Children().Done() // child signals it is done - // - Children() *sync.WaitGroup - - // AddCloserChild registers a dependent ContextCloser child. The child will - // be closed when this parent is closed, and waited upon to finish. It is - // the functional equivalent of the following: - // - // go func(parent, child ContextCloser) { - // parent.Children().Add(1) // add one more dependent child - // <-parent.Closing() // wait until parent is closing - // child.Close() // signal child to close - // parent.Children().Done() // child signals it is done - // }(a, b) - // - AddCloserChild(c ContextCloser) - - // Close is a method to call when you wish to stop this ContextCloser - Close() error - - // Closing is a signal to wait upon, like Context.Done(). - // It fires when the object should be closing (but hasn't yet fully closed). - // The primary use case is for child goroutines who need to know when - // they should shut down. (equivalent to Context().Done()) - Closing() <-chan struct{} - - // Closed is a method to wait upon, like Context.Done(). - // It fires when the entire object is fully closed. - // The primary use case is for external listeners who need to know when - // this object is completly done, and all its children closed. - Closed() <-chan struct{} -} - -// contextCloser is an OpenCloser with a cancellable context -type contextCloser struct { - ctx context.Context - cancel context.CancelFunc - - // called to run the close logic. - closeFunc CloseFunc - - // closed is released once the close function is done. - closed chan struct{} - - // wait group for child goroutines - children sync.WaitGroup - - // sync primitive to ensure the close logic is only called once. - closeOnce sync.Once - - // error to return to clients of Close(). - closeErr error -} - -// NewContextCloser constructs and returns a ContextCloser. It will call -// cf CloseFunc before its Done() Wait signals fire. -func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser { - if cf == nil { - cf = nilCloseFunc - } - ctx, cancel := context.WithCancel(ctx) - c := &contextCloser{ - ctx: ctx, - cancel: cancel, - closeFunc: cf, - closed: make(chan struct{}), - } - - c.Children().Add(1) // we're a child goroutine, to be waited upon. - go c.closeOnContextDone() - return c -} - -func (c *contextCloser) Context() context.Context { - return c.ctx -} - -func (c *contextCloser) Children() *sync.WaitGroup { - return &c.children -} - -func (c *contextCloser) AddCloserChild(child ContextCloser) { - c.children.Add(1) - go func(parent, child ContextCloser) { - <-parent.Closing() // wait until parent is closing - child.Close() // signal child to close - parent.Children().Done() // child signals it is done - }(c, child) -} - -// Close is the external close function. it's a wrapper around internalClose -// that waits on Closed() -func (c *contextCloser) Close() error { - c.internalClose() - <-c.Closed() // wait until we're totally done. - return c.closeErr -} - -func (c *contextCloser) Closing() <-chan struct{} { - return c.Context().Done() -} - -func (c *contextCloser) Closed() <-chan struct{} { - return c.closed -} - -func (c *contextCloser) internalClose() { - go c.closeOnce.Do(c.closeLogic) -} - -// the _actual_ close process. -func (c *contextCloser) closeLogic() { - // this function should only be called once (hence the sync.Once). - // and it will panic at the bottom (on close(c.closed)) otherwise. - - c.cancel() // signal that we're shutting down (Closing) - c.closeErr = c.closeFunc() // actually run the close logic - c.children.Wait() // wait till all children are done. - close(c.closed) // signal that we're shut down (Closed) -} - -// if parent context is shut down before we call Close explicitly, -// we need to go through the Close motions anyway. Hence all the sync -// stuff all over the place... -func (c *contextCloser) closeOnContextDone() { - <-c.Context().Done() // wait until parent (context) is done. - c.internalClose() - c.Children().Done() -}