From 16c7b4e1f55970645326f421ba3b944a23625519 Mon Sep 17 00:00:00 2001 From: b5 Date: Wed, 16 Oct 2019 19:32:06 -0400 Subject: [PATCH] feat(remote): push & pull logs on publish & add --- api/remote_test.go | 25 ------ api/testdata/api.snapshot | Bin 200836 -> 200845 bytes lib/datasets.go | 8 ++ lib/remote.go | 19 +++- logbook/logsync/logsync.go | 153 ++++++++++++++++++-------------- logbook/logsync/logsync_test.go | 18 ++++ remote/client.go | 44 ++++++++- 7 files changed, 167 insertions(+), 100 deletions(-) diff --git a/api/remote_test.go b/api/remote_test.go index 43941f228..ec9559d61 100644 --- a/api/remote_test.go +++ b/api/remote_test.go @@ -25,29 +25,4 @@ func TestRemoteClientHandlers(t *testing.T) { {"GET", "/fetch/me/cities", nil}, } runHandlerTestCases(t, "fetch", h.NewFetchHandler("/fetch"), fetchCases, true) - - // node, teardown := newTestNode(t) - // defer teardown() - - // inst := newTestInstanceWithProfileFromNode(node) - - // // Set a seed so that the sessionID is deterministic - // rand.Seed(1234) - - // testCases := []handlerTestCase{ - // {"POST", "/", mustFile(t, "testdata/postRemoteRequest.json")}, - // } - - // cfg, _ := testConfigAndSetter() - // // testReceivers := dsync.NewTestReceivers() - - // // Reject all dag.Info's - // cfg.API.RemoteAcceptSizeMax = 0 - // rh := NewRemoteHandlers(inst) - // runHandlerTestCases(t, "remote reject", rh.ReceiveHandler, testCases, true) - - // // Accept all dag.Info's - // cfg.API.RemoteAcceptSizeMax = -1 - // rh = NewRemoteHandlers(inst) - // runHandlerTestCases(t, "remote accept", rh.ReceiveHandler, testCases, true) } diff --git a/api/testdata/api.snapshot b/api/testdata/api.snapshot index aa3ca02b2e8a215c1573d75f6455284fe2c993de..ada20bf7b89ca97d2be03d924443e000eb73728f 100755 GIT binary patch delta 94 zcmZo!$kV%!r=f*$3zPI`!J^dM{E}2Fg}i))hLCE delta 85 zcmeBO$kVcrr=f*$3zPI`zP$Vrh2;Faw9NF3E!APwl{ocdcufJX8OsmOl@cq J+kskir2y7@BJKbH diff --git a/lib/datasets.go b/lib/datasets.go index 1cb5f456c..33759b392 100644 --- a/lib/datasets.go +++ b/lib/datasets.go @@ -652,6 +652,14 @@ func (r *DatasetRequests) Add(p *AddParams, res *repo.DatasetRef) (err error) { p.RemoteAddr = r.inst.cfg.Registry.Location } + // TODO (b5) - we're early in log syncronization days. This is going to fail a bunch + // while we work to upgrade the stack. Long term we may want to consider a mechanism + // for allowing partial completion where only one of logs or dataset pulling works + // by doing both in parallel and reporting issues on both + if pullLogsErr := r.inst.RemoteClient().PullLogs(ctx, repo.ConvertToDsref(ref), p.RemoteAddr); pullLogsErr != nil { + log.Errorf("pulling logs: %s", pullLogsErr) + } + if err = actions.AddDataset(ctx, r.node, r.inst.RemoteClient(), p.RemoteAddr, &ref); err != nil { return err } diff --git a/lib/remote.go b/lib/remote.go index 87bd25ada..1d5a30869 100644 --- a/lib/remote.go +++ b/lib/remote.go @@ -60,8 +60,7 @@ func (r *RemoteMethods) Fetch(p *FetchParams, res *repo.DatasetRef) error { // TODO (b5) - need contexts yo ctx := context.TODO() - - return r.cli.Fetch(ctx, repo.ConvertToDsref(ref), addr) + return r.cli.PullLogs(ctx, repo.ConvertToDsref(ref), addr) } // PublicationParams encapsulates parmeters for dataset publication @@ -96,6 +95,14 @@ func (r *RemoteMethods) Publish(p *PublicationParams, res *repo.DatasetRef) erro // TODO (b5) - need contexts yo ctx := context.TODO() + // TODO (b5) - we're early in log syncronization days. This is going to fail a bunch + // while we work to upgrade the stack. Long term we may want to consider a mechanism + // for allowing partial completion where only one of logs or dataset pushing works + // by doing both in parallel and reporting issues on both + if pushLogsErr := r.cli.PushLogs(ctx, repo.ConvertToDsref(ref), addr); pushLogsErr != nil { + log.Errorf("pushing logs: %s", pushLogsErr) + } + if err = r.cli.PushDataset(ctx, ref, addr); err != nil { return err } @@ -128,6 +135,14 @@ func (r *RemoteMethods) Unpublish(p *PublicationParams, res *repo.DatasetRef) er // TODO (b5) - need contexts yo ctx := context.TODO() + // TODO (b5) - we're early in log syncronization days. This is going to fail a bunch + // while we work to upgrade the stack. Long term we may want to consider a mechanism + // for allowing partial completion where only one of logs or dataset pushing works + // by doing both in parallel and reporting issues on both + if removeLogsErr := r.cli.RemoveLogs(ctx, repo.ConvertToDsref(ref), addr); removeLogsErr != nil { + log.Error("removing logs: %s", removeLogsErr.Error()) + } + if err := r.cli.RemoveDataset(ctx, ref, addr); err != nil { return err } diff --git a/logbook/logsync/logsync.go b/logbook/logsync/logsync.go index 0b42bc040..9caf908b1 100644 --- a/logbook/logsync/logsync.go +++ b/logbook/logsync/logsync.go @@ -29,20 +29,6 @@ type Logsync struct { didReceive Hook } -// remove is an internal interface for methods available on foreign logbooks -// the logsync struct contains the canonical implementation of a remote -// interface. network clients wrap the remote interface with network behaviours, -// using Logsync methods to do the "real work" and echoing that back across the -// client protocol -type remote interface { - put(ctx context.Context, author log.Author, r io.Reader) error - get(ctx context.Context, author log.Author, ref dsref.Ref) (sender log.Author, data io.Reader, err error) - del(ctx context.Context, author log.Author, ref dsref.Ref) error -} - -// assert at compile-time that Logsync is a remote -var _ remote = (*Logsync)(nil) - // Options encapsulates runtime configuration for a remote type Options struct { // ReceiveCheck is called before accepting a log, returning an error from this @@ -81,63 +67,20 @@ type Hook func(ctx context.Context, author log.Author, path string) error // Author is the local author of lsync's logbook func (lsync *Logsync) Author() log.Author { - return lsync.book.Author() -} - -func (lsync *Logsync) put(ctx context.Context, author log.Author, r io.Reader) error { - if lsync == nil { - return ErrNoLogsync - } - - if lsync.receiveCheck != nil { - // TODO (b5) - need to populate path - if err := lsync.receiveCheck(ctx, author, ""); err != nil { - return err - } - } - - data, err := ioutil.ReadAll(r) - if err != nil { - return err - } - - if err := lsync.book.MergeLogBytes(ctx, author, data); err != nil { - return err - } - - if lsync.didReceive != nil { - // TODO (b5) - need to populate path - if err := lsync.didReceive(ctx, author, ""); err != nil { - return err - } - } - return nil -} - -func (lsync *Logsync) get(ctx context.Context, author log.Author, ref dsref.Ref) (log.Author, io.Reader, error) { if lsync == nil { - return nil, nil, ErrNoLogsync - } - - data, err := lsync.book.LogBytes(ref) - if err != nil { - return lsync.Author(), nil, err + return nil } - return lsync.Author(), bytes.NewReader(data), nil + return lsync.book.Author() } -func (lsync *Logsync) del(ctx context.Context, sender log.Author, ref dsref.Ref) error { +// NewPush prepares a Push from the local logsync to a remote destination +// doing a push places a local log on the remote +func (lsync *Logsync) NewPush(ref dsref.Ref, remoteAddr string) (*Push, error) { if lsync == nil { - return ErrNoLogsync + return nil, ErrNoLogsync } - return lsync.book.RemoveLog(ctx, sender, ref) -} - -// NewPush prepares a Push from the local logsync to a remote destination -// doing a push places a local log on the remote -func (lsync *Logsync) NewPush(ref dsref.Ref, remote string) (*Push, error) { - rem, err := lsync.getRemote(remote) + rem, err := lsync.remoteClient(remoteAddr) if err != nil { return nil, err } @@ -151,8 +94,12 @@ func (lsync *Logsync) NewPush(ref dsref.Ref, remote string) (*Push, error) { // NewPull creates a Pull from the local logsync to a remote destination // doing a pull fetches a log from the remote to the local logbook -func (lsync *Logsync) NewPull(ref dsref.Ref, remote string) (*Pull, error) { - rem, err := lsync.getRemote(remote) +func (lsync *Logsync) NewPull(ref dsref.Ref, remoteAddr string) (*Pull, error) { + if lsync == nil { + return nil, ErrNoLogsync + } + + rem, err := lsync.remoteClient(remoteAddr) if err != nil { return nil, err } @@ -165,8 +112,12 @@ func (lsync *Logsync) NewPull(ref dsref.Ref, remote string) (*Pull, error) { } // DoRemove asks a remote to remove a log -func (lsync *Logsync) DoRemove(ctx context.Context, ref dsref.Ref, remote string) error { - rem, err := lsync.getRemote(remote) +func (lsync *Logsync) DoRemove(ctx context.Context, ref dsref.Ref, remoteAddr string) error { + if lsync == nil { + return ErrNoLogsync + } + + rem, err := lsync.remoteClient(remoteAddr) if err != nil { return err } @@ -174,7 +125,7 @@ func (lsync *Logsync) DoRemove(ctx context.Context, ref dsref.Ref, remote string return rem.del(ctx, lsync.Author(), ref) } -func (lsync *Logsync) getRemote(remoteAddr string) (rem remote, err error) { +func (lsync *Logsync) remoteClient(remoteAddr string) (rem remote, err error) { // if a valid base58 peerID is passed, we're doing a p2p dsync if id, err := peer.IDB58Decode(remoteAddr); err == nil { if lsync.p2pHandler == nil { @@ -190,6 +141,70 @@ func (lsync *Logsync) getRemote(remoteAddr string) (rem remote, err error) { return rem, nil } +// remove is an internal interface for methods available on foreign logbooks +// the logsync struct contains the canonical implementation of a remote +// interface. network clients wrap the remote interface with network behaviours, +// using Logsync methods to do the "real work" and echoing that back across the +// client protocol +type remote interface { + put(ctx context.Context, author log.Author, r io.Reader) error + get(ctx context.Context, author log.Author, ref dsref.Ref) (sender log.Author, data io.Reader, err error) + del(ctx context.Context, author log.Author, ref dsref.Ref) error +} + +// assert at compile-time that Logsync is a remote +var _ remote = (*Logsync)(nil) + +func (lsync *Logsync) put(ctx context.Context, author log.Author, r io.Reader) error { + if lsync == nil { + return ErrNoLogsync + } + + if lsync.receiveCheck != nil { + // TODO (b5) - need to populate path + if err := lsync.receiveCheck(ctx, author, ""); err != nil { + return err + } + } + + data, err := ioutil.ReadAll(r) + if err != nil { + return err + } + + if err := lsync.book.MergeLogBytes(ctx, author, data); err != nil { + return err + } + + if lsync.didReceive != nil { + // TODO (b5) - need to populate path + if err := lsync.didReceive(ctx, author, ""); err != nil { + return err + } + } + return nil +} + +func (lsync *Logsync) get(ctx context.Context, author log.Author, ref dsref.Ref) (log.Author, io.Reader, error) { + if lsync == nil { + return nil, nil, ErrNoLogsync + } + + data, err := lsync.book.LogBytes(ref) + if err != nil { + return lsync.Author(), nil, err + } + return lsync.Author(), bytes.NewReader(data), nil +} + +func (lsync *Logsync) del(ctx context.Context, sender log.Author, ref dsref.Ref) error { + if lsync == nil { + return ErrNoLogsync + } + + return lsync.book.RemoveLog(ctx, sender, ref) +} + // Push is a request to place a log on a remote type Push struct { ref dsref.Ref diff --git a/logbook/logsync/logsync_test.go b/logbook/logsync/logsync_test.go index 817b124a8..a1705774e 100644 --- a/logbook/logsync/logsync_test.go +++ b/logbook/logsync/logsync_test.go @@ -108,6 +108,24 @@ func Example() { // johnathon has 2 references for basit/nasdaq } +func TestNilCallable(t *testing.T) { + var logsync *Logsync + + if a := logsync.Author(); a != nil { + t.Errorf("author mismatch. expected: '%v', got: '%v' ", nil, a) + } + + if _, err := logsync.NewPush(dsref.Ref{}, ""); err != ErrNoLogsync { + t.Errorf("error mismatch. expected: '%v', got: '%v' ", ErrNoLogsync, err) + } + if _, err := logsync.NewPull(dsref.Ref{}, ""); err != ErrNoLogsync { + t.Errorf("error mismatch. expected: '%v', got: '%v' ", ErrNoLogsync, err) + } + if err := logsync.DoRemove(context.Background(), dsref.Ref{}, ""); err != ErrNoLogsync { + t.Errorf("error mismatch. expected: '%v', got: '%v' ", ErrNoLogsync, err) + } +} + func makeJohnathonLogbook() *logbook.Book { pk, err := decodePk(aPk) if err != nil { diff --git a/remote/client.go b/remote/client.go index 4d18bf142..45362722c 100644 --- a/remote/client.go +++ b/remote/client.go @@ -24,7 +24,7 @@ import ( ) // ErrNoRemoteClient is returned when no client is allocated -var ErrNoRemoteClient = fmt.Errorf("not configured to make remote requests") +var ErrNoRemoteClient = fmt.Errorf("remote: no client to make remote requests") // Address extracts the address of a remote from a configuration for a given // remote name @@ -73,7 +73,11 @@ func NewClient(node *p2p.QriNode) (*Client, error) { var ls *logsync.Logsync if book := node.Repo.Logbook(); book != nil { - ls = logsync.New(book) + ls = logsync.New(book, func(logsyncConfig *logsync.Options) { + if host := node.Host(); host != nil { + logsyncConfig.Libp2pHost = host + } + }) } return &Client{ @@ -91,8 +95,8 @@ func (c *Client) CoreAPI() coreiface.CoreAPI { return c.capi } -// Fetch pulls a logbook from a remote -func (c *Client) Fetch(ctx context.Context, ref dsref.Ref, remoteAddr string) error { +// PullLogs pulls logbook data from a remote +func (c *Client) PullLogs(ctx context.Context, ref dsref.Ref, remoteAddr string) error { if c == nil { return ErrNoRemoteClient } @@ -109,6 +113,38 @@ func (c *Client) Fetch(ctx context.Context, ref dsref.Ref, remoteAddr string) er return pull.Do(ctx) } +// PushLogs pushes logbook data to a remote address +func (c *Client) PushLogs(ctx context.Context, ref dsref.Ref, remoteAddr string) error { + if c == nil { + return ErrNoRemoteClient + } + + if t := addressType(remoteAddr); t == "http" { + remoteAddr = remoteAddr + "/remote/logsync" + } + log.Debugf("pushing logs for %s from %s", ref.Alias(), remoteAddr) + push, err := c.logsync.NewPush(ref, remoteAddr) + if err != nil { + return err + } + + return push.Do(ctx) +} + +// RemoveLogs requests a remote remove logbook data from an address +func (c *Client) RemoveLogs(ctx context.Context, ref dsref.Ref, remoteAddr string) error { + if c == nil { + return ErrNoRemoteClient + } + + if t := addressType(remoteAddr); t == "http" { + remoteAddr = remoteAddr + "/remote/logsync" + } + + log.Debugf("deleting logs for %s from %s", ref.Alias(), remoteAddr) + return c.logsync.DoRemove(ctx, ref, remoteAddr) +} + // PushDataset pushes the contents of a dataset to a remote func (c *Client) PushDataset(ctx context.Context, ref repo.DatasetRef, remoteAddr string) error { if c == nil {