Skip to content

Commit

Permalink
feat(p2p local streams): add local streams to QriNode for local stdio…
Browse files Browse the repository at this point in the history
… interaction
  • Loading branch information
b5 committed Sep 17, 2018
1 parent e26be1b commit 53ff3fd
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
2 changes: 1 addition & 1 deletion actions/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// ExecTransform executes a designated transformation
func ExecTransform(node *p2p.QriNode, ds *dataset.Dataset, infile cafs.File, secrets map[string]string) (file cafs.File, err error) {
filepath := ds.Transform.ScriptPath
rr, err := skytf.ExecFile(ds, filepath, infile, func(o *skytf.ExecOpts) {
rr, err := skytf.ExecFile(ds, filepath, infile, skytf.AddQriNodeOpt(node), func(o *skytf.ExecOpts) {
if secrets != nil {
// convert to map[string]interface{}, which the lower-level skytf supports
// until we're sure map[string]string is going to work in the majority of use cases
Expand Down
1 change: 1 addition & 0 deletions cmd/qri.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (o *QriOptions) init() (err error) {
if err != nil {
return
}
o.node.LocalStreams = p2p.IOStreams(o.IOStreams)
}
o.initialized.Do(initBody)
return err
Expand Down
25 changes: 21 additions & 4 deletions p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"context"
"fmt"
"io"
"sync"

"github.com/qri-io/cafs/ipfs"
Expand Down Expand Up @@ -48,17 +49,33 @@ type QriNode struct {
// ipfs node provided by repo
Repo repo.Repo

// handlers maps this nodes registered handlers. This works in a way similary to a router
// in traditional client/server models, but messages are flying around all over the place
// instead of a request/response pattern
// handlers maps this nodes registered handlers. This works in a way
// similary to a router in traditional client/server models, but messages
// are flying around all over the place instead of a
// request/response pattern
handlers map[MsgType]HandlerFunc

// msgState keeps a "scratch pad" of message IDS & timeouts
msgState *sync.Map
// msgChan provides a channel of received messages for others to tune into
msgChan chan Message
// receivers is a list of anyone who wants to be notifed on new message arrival
// receivers is a list of anyone who wants to be notifed on new
// message arrival
receivers []chan Message

// node keeps a set of IOStreams for "node local" io, often to the
// command line, to give feedback to the user. These may be piped to
// local http handlers/websockets/stdio, but these streams are meant for
// local feedback as opposed to p2p connections
LocalStreams IOStreams
}

// IOStreams provides the standard names for iostreams. This is useful for embedding and for unit testing.
// Inconsistent and different names make it hard to read and review code
type IOStreams struct {
In io.Reader
Out io.Writer
ErrOut io.Writer
}

// Assert that conversions needed by the tests are valid.
Expand Down

0 comments on commit 53ff3fd

Please sign in to comment.