diff --git a/logbook/logsync/http.go b/logbook/logsync/http.go index 9c794b36f..cc23e1f19 100644 --- a/logbook/logsync/http.go +++ b/logbook/logsync/http.go @@ -31,7 +31,7 @@ func (c *HTTPClient) put(ctx context.Context, author log.Author, r io.Reader) er } req = req.WithContext(ctx) - if err := addAuthorHeaders(req.Header, author); err != nil { + if err := addAuthorHTTPHeaders(req.Header, author); err != nil { return err } @@ -53,7 +53,7 @@ func (c *HTTPClient) get(ctx context.Context, author log.Author, ref dsref.Ref) } req = req.WithContext(ctx) - if err := addAuthorHeaders(req.Header, author); err != nil { + if err := addAuthorHTTPHeaders(req.Header, author); err != nil { return nil, nil, err } res, err := http.DefaultClient.Do(req) @@ -83,7 +83,7 @@ func (c *HTTPClient) del(ctx context.Context, author log.Author, ref dsref.Ref) } req = req.WithContext(ctx) - if err := addAuthorHeaders(req.Header, author); err != nil { + if err := addAuthorHTTPHeaders(req.Header, author); err != nil { return err } @@ -96,7 +96,7 @@ func (c *HTTPClient) del(ctx context.Context, author log.Author, ref dsref.Ref) return err } -func addAuthorHeaders(h http.Header, author log.Author) error { +func addAuthorHTTPHeaders(h http.Header, author log.Author) error { h.Set("AuthorID", author.AuthorID()) pubByteStr, err := author.AuthorPubKey().Bytes() if err != nil { @@ -140,7 +140,7 @@ func HTTPHandler(lsync *Logsync) http.HandlerFunc { } r.Body.Close() - addAuthorHeaders(w.Header(), lsync.Author()) + addAuthorHTTPHeaders(w.Header(), lsync.Author()) case "GET": ref, err := repo.ParseDatasetRef(r.FormValue("ref")) if err != nil { @@ -156,7 +156,7 @@ func HTTPHandler(lsync *Logsync) http.HandlerFunc { return } - addAuthorHeaders(w.Header(), receiver) + addAuthorHTTPHeaders(w.Header(), receiver) io.Copy(w, r) case "DELETE": ref, err := repo.ParseDatasetRef(r.FormValue("ref")) @@ -172,7 +172,7 @@ func HTTPHandler(lsync *Logsync) http.HandlerFunc { return } - addAuthorHeaders(w.Header(), lsync.Author()) + addAuthorHTTPHeaders(w.Header(), lsync.Author()) } } } diff --git a/logbook/logsync/logsync.go b/logbook/logsync/logsync.go index 4a8f73de9..46d166ce7 100644 --- a/logbook/logsync/logsync.go +++ b/logbook/logsync/logsync.go @@ -19,8 +19,8 @@ import ( // Logsync fulfills requests from clients, logsync wraps a logbook.Book, pushing // and pulling logs from remote sources to its logbook type Logsync struct { - book *logbook.Book - host host.Host + book *logbook.Book + p2pHandler *p2pHandler receiveCheck Hook didReceive Hook @@ -47,6 +47,9 @@ type Options struct { ReceiveCheck Hook // DidReceive is called after a log has been merged into the logbook DidReceive Hook + + // to send & push over libp2p connections, provide a libp2p host + Libp2pHost host.Host } // New creates a remote from a logbook and optional configuration functions @@ -56,12 +59,18 @@ func New(book *logbook.Book, opts ...func(*Options)) *Logsync { opt(o) } - return &Logsync{ + logsync := &Logsync{ book: book, receiveCheck: o.ReceiveCheck, didReceive: o.DidReceive, } + + if o.Libp2pHost != nil { + logsync.p2pHandler = newp2pHandler(logsync, o.Libp2pHost) + } + + return logsync } // Hook is a function called at specified points in the sync lifecycle @@ -152,12 +161,11 @@ func (lsync *Logsync) DoRemove(ctx context.Context, ref dsref.Ref, remote string func (lsync *Logsync) getRemote(remoteAddr string) (rem remote, err error) { // if a valid base58 peerID is passed, we're doing a p2p dsync - if _, err := peer.IDB58Decode(remoteAddr); err == nil { - if lsync.host == nil { - return nil, fmt.Errorf("no p2p host provided to perform p2p dsync") + if id, err := peer.IDB58Decode(remoteAddr); err == nil { + if lsync.p2pHandler == nil { + return nil, fmt.Errorf("no p2p host provided to perform p2p logsync") } - // rem = &p2pClient{remotePeerID: id, p2pHandler: ds.p2pHandler} - return nil, fmt.Errorf("p2p logsync not finished") + return &p2pClient{remotePeerID: id, p2pHandler: lsync.p2pHandler}, nil } else if strings.HasPrefix(remoteAddr, "http") { rem = &HTTPClient{URL: remoteAddr} } else { diff --git a/logbook/logsync/logsync_test.go b/logbook/logsync/logsync_test.go index 350e19d8b..33cb5a003 100644 --- a/logbook/logsync/logsync_test.go +++ b/logbook/logsync/logsync_test.go @@ -17,7 +17,7 @@ import ( "github.com/qri-io/qri/logbook/log" ) -func ExampleNew() { +func Example() { // first some boilerplate setup ctx, done := context.WithCancel(context.Background()) defer done() @@ -110,7 +110,12 @@ func ExampleNew() { } func makeJohnathonLogbook() *logbook.Book { - book, err := newTestbook("johnathon", aPk) + pk, err := decodePk(aPk) + if err != nil { + panic(err) + } + + book, err := newTestbook("johnathon", pk) if err != nil { panic(err) } @@ -118,7 +123,12 @@ func makeJohnathonLogbook() *logbook.Book { } func makeBasitLogbook() *logbook.Book { - book, err := newTestbook("basit", bPk) + pk, err := decodePk(bPk) + if err != nil { + panic(err) + } + + book, err := newTestbook("basit", pk) if err != nil { panic(err) } @@ -219,8 +229,9 @@ func TestSyncHTTP(t *testing.T) { } type testRunner struct { - Ctx context.Context - A, B *logbook.Book + Ctx context.Context + A, B *logbook.Book + APrivKey, BPrivKey crypto.PrivKey } func (tr *testRunner) DefaultLogsyncs() (a, b *Logsync) { @@ -232,10 +243,20 @@ func newTestRunner(t *testing.T) (tr *testRunner, cleanup func()) { tr = &testRunner{ Ctx: context.Background(), } - if tr.A, err = newTestbook("a", aPk); err != nil { + + tr.APrivKey, err = decodePk(aPk) + if err != nil { t.Fatal(err) } - if tr.B, err = newTestbook("b", bPk); err != nil { + if tr.A, err = newTestbook("a", tr.APrivKey); err != nil { + t.Fatal(err) + } + + tr.BPrivKey, err = decodePk(bPk) + if err != nil { + t.Fatal(err) + } + if tr.B, err = newTestbook("b", tr.BPrivKey); err != nil { t.Fatal(err) } @@ -243,7 +264,7 @@ func newTestRunner(t *testing.T) (tr *testRunner, cleanup func()) { return tr, cleanup } -func newTestbook(username, b64pk string) (*logbook.Book, error) { +func decodePk(b64pk string) (crypto.PrivKey, error) { // logbooks are encrypted at rest, we need a private key to interact with // them, including to create a new logbook. This is a dummy Private Key // you should never, ever use in real life. demo only folks. @@ -251,15 +272,13 @@ func newTestbook(username, b64pk string) (*logbook.Book, error) { if err != nil { return nil, err } - pk, err := crypto.UnmarshalPrivateKey(data) - if err != nil { - return nil, err - } + return crypto.UnmarshalPrivateKey(data) +} +func newTestbook(username string, pk crypto.PrivKey) (*logbook.Book, error) { // logbook relies on a qfs.Filesystem for read & write. create an in-memory // filesystem we can play with fs := qfs.NewMemFS() - return logbook.NewBook(pk, username, fs, "/mem/logset") } diff --git a/logbook/logsync/p2p.go b/logbook/logsync/p2p.go index 8173ab382..8930cf45a 100644 --- a/logbook/logsync/p2p.go +++ b/logbook/logsync/p2p.go @@ -1 +1,304 @@ -package logsync \ No newline at end of file +package logsync + +import ( + "bytes" + "context" + "encoding/base64" + "fmt" + "io" + "io/ioutil" + + crypto "github.com/libp2p/go-libp2p-crypto" + host "github.com/libp2p/go-libp2p-host" + net "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + protocol "github.com/libp2p/go-libp2p-protocol" + "github.com/qri-io/dag/dsync/p2putil" + "github.com/qri-io/qri/dsref" + "github.com/qri-io/qri/logbook/log" + "github.com/qri-io/qri/repo" +) + +const ( + // LogsyncProtocolID is the dsyc p2p Protocol Identifier + LogsyncProtocolID = protocol.ID("/qri/logsync") + // LogsyncServiceTag tags the type & version of the dsync service + LogsyncServiceTag = "qri/logsync/0.1.1-dev" + // default value to give logsync peer connections in connmanager, one hunnit + logsyncSupportValue = 100 +) + +var ( + // mtGet identifies the "put" message type, a client pushing a log to a remote + mtPut = p2putil.MsgType("put") + // mtGet identifies the "get" message type, a request for a log + mtGet = p2putil.MsgType("get") + // mtDel identifies the "del" message type, a request to remove a log + mtDel = p2putil.MsgType("del") +) + +type p2pClient struct { + remotePeerID peer.ID + *p2pHandler +} + +// assert at compile time that p2pClient implements DagSyncable +var _ remote = (*p2pClient)(nil) + +func (c *p2pClient) put(ctx context.Context, author log.Author, r io.Reader) (err error) { + data, err := ioutil.ReadAll(r) + if err != nil { + return err + } + + headers := []string{"phase", "request"} + headers, err = addAuthorP2PHeaders(headers, author) + if err != nil { + return err + } + msg := p2putil.NewMessage(c.host.ID(), mtPut, data).WithHeaders(headers...) + _, err = c.sendMessage(ctx, msg, c.remotePeerID) + return err +} + +func (c *p2pClient) get(ctx context.Context, author log.Author, ref dsref.Ref) (sender log.Author, data io.Reader, err error) { + headers := []string{ + "phase", "request", + "ref", ref.String(), + } + headers, err = addAuthorP2PHeaders(headers, author) + if err != nil { + return nil, nil, err + } + + msg := p2putil.NewMessage(c.host.ID(), mtGet, nil).WithHeaders(headers...) + + res, err := c.sendMessage(ctx, msg, c.remotePeerID) + if err != nil { + return nil, nil, err + } + + sender, err = authorFromP2PHeaders(res) + return sender, bytes.NewReader(res.Body), err +} + +func (c *p2pClient) del(ctx context.Context, author log.Author, ref dsref.Ref) error { + headers := []string{ + "phase", "request", + "ref", ref.String(), + } + headers, err := addAuthorP2PHeaders(headers, author) + if err != nil { + return err + } + + msg := p2putil.NewMessage(c.host.ID(), mtDel, nil).WithHeaders(headers...) + _, err = c.sendMessage(ctx, msg, c.remotePeerID) + return err +} + +func addAuthorP2PHeaders(h []string, author log.Author) ([]string, error) { + pubByteStr, err := author.AuthorPubKey().Bytes() + if err != nil { + return h, err + } + pubKey := base64.StdEncoding.EncodeToString(pubByteStr) + return append(h, "author_id", author.AuthorID(), "pub_key", pubKey), nil +} + +func authorFromP2PHeaders(msg p2putil.Message) (log.Author, error) { + data, err := base64.StdEncoding.DecodeString(msg.Header("pub_key")) + if err != nil { + return nil, err + } + + pub, err := crypto.UnmarshalPublicKey(data) + if err != nil { + return nil, fmt.Errorf("decoding public key: %s", err) + } + + return log.NewAuthor(msg.Header("author_id"), pub), nil +} + +// p2pHandler implements logsync as a libp2p protocol handler +type p2pHandler struct { + logsync *Logsync + host host.Host + handlers map[p2putil.MsgType]p2putil.HandlerFunc +} + +// newp2pHandler creates a p2p remote stream handler from a dsync.Remote +func newp2pHandler(logsync *Logsync, host host.Host) *p2pHandler { + c := &p2pHandler{logsync: logsync, host: host} + c.handlers = map[p2putil.MsgType]p2putil.HandlerFunc{ + mtPut: c.HandlePut, + mtGet: c.HandleGet, + mtDel: c.HandleDel, + } + + go host.SetStreamHandler(LogsyncProtocolID, c.LibP2PStreamHandler) + return c +} + +// LibP2PStreamHandler provides remote access over p2p +func (c *p2pHandler) LibP2PStreamHandler(s net.Stream) { + c.handleStream(p2putil.WrapStream(s), nil) +} + +// HandlePut requests a new send session from the remote, which will return +// a delta manifest of blocks the remote needs and a session id that must +// be sent with each block +func (c *p2pHandler) HandlePut(ws *p2putil.WrappedStream, msg p2putil.Message) (hangup bool) { + if msg.Header("phase") == "request" { + ctx := context.Background() + author, err := authorFromP2PHeaders(msg) + if err != nil { + return true + } + + if err = c.logsync.put(ctx, author, bytes.NewReader(msg.Body)); err != nil { + return true + } + + headers := []string{ + "phase", "response", + } + headers, err = addAuthorP2PHeaders(headers, c.logsync.Author()) + if err != nil { + return true + } + + res := msg.WithHeaders(headers...) + if err := ws.SendMessage(res); err != nil { + return true + } + } + return true +} + +// HandleGet places a block on the remote +func (c *p2pHandler) HandleGet(ws *p2putil.WrappedStream, msg p2putil.Message) (hangup bool) { + if msg.Header("phase") == "request" { + ctx := context.Background() + author, err := authorFromP2PHeaders(msg) + if err != nil { + return true + } + + ref, err := repo.ParseDatasetRef(msg.Header("ref")) + if err != nil { + return true + } + + sender, r, err := c.logsync.get(ctx, author, repo.ConvertToDsref(ref)) + if err != nil { + return true + } + + data, err := ioutil.ReadAll(r) + if err != nil { + return true + } + + headers := []string{ + "phase", "response", + } + headers, err = addAuthorP2PHeaders(headers, sender) + if err != nil { + return true + } + + res := msg.WithHeaders(headers...).Update(data) + if err := ws.SendMessage(res); err != nil { + return true + } + } + + return true +} + +// HandleDel asks the remote for a manifest specified by the root ID of a DAG +func (c *p2pHandler) HandleDel(ws *p2putil.WrappedStream, msg p2putil.Message) (hangup bool) { + if msg.Header("phase") == "request" { + ctx := context.Background() + author, err := authorFromP2PHeaders(msg) + if err != nil { + return true + } + + ref, err := repo.ParseDatasetRef(msg.Header("ref")) + if err != nil { + return true + } + + if err = c.logsync.del(ctx, author, repo.ConvertToDsref(ref)); err != nil { + return true + } + + res := msg.WithHeaders("phase", "response") + if err := ws.SendMessage(res); err != nil { + return true + } + } + return true +} + +// sendMessage opens a stream & sends a message to a peer id +func (c *p2pHandler) sendMessage(ctx context.Context, msg p2putil.Message, pid peer.ID) (p2putil.Message, error) { + s, err := c.host.NewStream(ctx, pid, LogsyncProtocolID) + if err != nil { + return p2putil.Message{}, fmt.Errorf("error opening stream: %s", err.Error()) + } + defer s.Close() + + // now that we have a confirmed working connection + // tag this peer as supporting the qri protocol in the connection manager + // rem.host.ConnManager().TagPeer(pid, logsyncSupportKey, logsyncSupportValue) + + ws := p2putil.WrapStream(s) + replies := make(chan p2putil.Message) + go c.handleStream(ws, replies) + if err := ws.SendMessage(msg); err != nil { + return p2putil.Message{}, err + } + + reply := <-replies + return reply, nil +} + +// handleStream is a loop which receives and handles messages +// When Message.HangUp is true, it exits. This will close the stream +// on one of the sides. The other side's receiveMessage() will error +// with EOF, thus also breaking out from the loop. +func (c *p2pHandler) handleStream(ws *p2putil.WrappedStream, replies chan p2putil.Message) { + for { + // Loop forever, receiving messages until the other end hangs up + // or something goes wrong + msg, err := ws.ReceiveMessage() + + if err != nil { + if err.Error() == "EOF" { + break + } + // log.Debugf("error receiving message: %s", err.Error()) + break + } + + if replies != nil { + go func() { replies <- msg }() + } + + handler, ok := c.handlers[msg.Type] + if !ok { + // log.Errorf("peer %s sent unrecognized message type '%s', hanging up", n.ID, msg.Type) + break + } + + // call the handler, if it returns true, we hangup + if handler(ws, msg) { + break + } + } + + ws.Close() +} diff --git a/logbook/logsync/p2p_test.go b/logbook/logsync/p2p_test.go new file mode 100644 index 000000000..5098c2beb --- /dev/null +++ b/logbook/logsync/p2p_test.go @@ -0,0 +1,116 @@ +package logsync + +import ( + "context" + "testing" + + libp2p "github.com/libp2p/go-libp2p" + crypto "github.com/libp2p/go-libp2p-crypto" + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" + pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" + ma "github.com/multiformats/go-multiaddr" +) + +func TestP2PLogsync(t *testing.T) { + tr, cleanup := newTestRunner(t) + defer cleanup() + + aHost := p2pHost(tr.Ctx, tr.APrivKey, t) + bHost := p2pHost(tr.Ctx, tr.BPrivKey, t) + + lsA := New(tr.A, func(o *Options) { + o.Libp2pHost = aHost + }) + + lsB := New(tr.B, func(o *Options) { + o.Libp2pHost = bHost + }) + + // connect a & b + if err := aHost.Connect(tr.Ctx, pstore.PeerInfo{ID: bHost.ID(), Addrs: bHost.Addrs()}); err != nil { + t.Fatal(err) + } + + // make some logs on A + worldBankRef, err := writeWorldBankLogs(tr.Ctx, tr.A) + if err != nil { + t.Fatal(err) + } + + // pull logs to B from A + pull, err := lsB.NewPull(worldBankRef, tr.A.Author().AuthorID()) + if err != nil { + t.Error(err) + } + if err := pull.Do(tr.Ctx); err != nil { + t.Error(err) + } + + vs, err := tr.B.Versions(worldBankRef, 0, 10) + if err != nil { + t.Errorf("expected no error fetching versions after pull. got: %s", err) + } + if len(vs) == 0 { + t.Errorf("expected some length of logs. got: %d", len(vs)) + } + + // add moar logs to A + nasdaqRef, err := writeNasdaqLogs(tr.Ctx, tr.A) + if err != nil { + t.Fatal(err) + } + + // push logs from A to B + push, err := lsA.NewPush(nasdaqRef, tr.B.Author().AuthorID()) + if err != nil { + t.Fatal(err) + } + + if err := push.Do(tr.Ctx); err != nil { + t.Fatal(err) + } + + vs, err = tr.B.Versions(nasdaqRef, 0, 10) + if err != nil { + t.Errorf("expected no error fetching versions after pull. got: %s", err) + } + if len(vs) == 0 { + t.Errorf("expected some length of logs. got: %d", len(vs)) + } + + // A request B removes nasdaq + if err := lsA.DoRemove(tr.Ctx, nasdaqRef, tr.B.Author().AuthorID()); err != nil { + t.Errorf("unexpected error doing remove request: %s", err) + } + if _, err = tr.B.Versions(nasdaqRef, 0, 10); err == nil { + t.Errorf("expected error fetching versions. got nil") + } +} + +// makeBasicHost creates a LibP2P host from a NodeCfg +func p2pHost(ctx context.Context, pk crypto.PrivKey, t *testing.T) host.Host { + pid, err := peer.IDFromPrivateKey(pk) + if err != nil { + t.Fatal(err) + } + + ps := pstoremem.NewPeerstore() + ps.AddPrivKey(pid, pk) + ps.AddPubKey(pid, pk.GetPublic()) + + addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") + + opts := []libp2p.Option{ + libp2p.Identity(pk), + libp2p.ListenAddrs(addr), + libp2p.Peerstore(ps), + } + + host, err := libp2p.New(ctx, opts...) + if err != nil { + t.Fatal(err) + } + return host +}