Skip to content

Commit

Permalink
feat(remote): push & pull logs on publish & add
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Oct 17, 2019
1 parent 5637f90 commit 16c7b4e
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 100 deletions.
25 changes: 0 additions & 25 deletions api/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,4 @@ func TestRemoteClientHandlers(t *testing.T) {
{"GET", "/fetch/me/cities", nil},
}
runHandlerTestCases(t, "fetch", h.NewFetchHandler("/fetch"), fetchCases, true)

// node, teardown := newTestNode(t)
// defer teardown()

// inst := newTestInstanceWithProfileFromNode(node)

// // Set a seed so that the sessionID is deterministic
// rand.Seed(1234)

// testCases := []handlerTestCase{
// {"POST", "/", mustFile(t, "testdata/postRemoteRequest.json")},
// }

// cfg, _ := testConfigAndSetter()
// // testReceivers := dsync.NewTestReceivers()

// // Reject all dag.Info's
// cfg.API.RemoteAcceptSizeMax = 0
// rh := NewRemoteHandlers(inst)
// runHandlerTestCases(t, "remote reject", rh.ReceiveHandler, testCases, true)

// // Accept all dag.Info's
// cfg.API.RemoteAcceptSizeMax = -1
// rh = NewRemoteHandlers(inst)
// runHandlerTestCases(t, "remote accept", rh.ReceiveHandler, testCases, true)
}
Binary file modified api/testdata/api.snapshot
Binary file not shown.
8 changes: 8 additions & 0 deletions lib/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,14 @@ func (r *DatasetRequests) Add(p *AddParams, res *repo.DatasetRef) (err error) {
p.RemoteAddr = r.inst.cfg.Registry.Location
}

// TODO (b5) - we're early in log syncronization days. This is going to fail a bunch
// while we work to upgrade the stack. Long term we may want to consider a mechanism
// for allowing partial completion where only one of logs or dataset pulling works
// by doing both in parallel and reporting issues on both
if pullLogsErr := r.inst.RemoteClient().PullLogs(ctx, repo.ConvertToDsref(ref), p.RemoteAddr); pullLogsErr != nil {
log.Errorf("pulling logs: %s", pullLogsErr)
}

if err = actions.AddDataset(ctx, r.node, r.inst.RemoteClient(), p.RemoteAddr, &ref); err != nil {
return err
}
Expand Down
19 changes: 17 additions & 2 deletions lib/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func (r *RemoteMethods) Fetch(p *FetchParams, res *repo.DatasetRef) error {

// TODO (b5) - need contexts yo
ctx := context.TODO()

return r.cli.Fetch(ctx, repo.ConvertToDsref(ref), addr)
return r.cli.PullLogs(ctx, repo.ConvertToDsref(ref), addr)
}

// PublicationParams encapsulates parmeters for dataset publication
Expand Down Expand Up @@ -96,6 +95,14 @@ func (r *RemoteMethods) Publish(p *PublicationParams, res *repo.DatasetRef) erro
// TODO (b5) - need contexts yo
ctx := context.TODO()

// TODO (b5) - we're early in log syncronization days. This is going to fail a bunch
// while we work to upgrade the stack. Long term we may want to consider a mechanism
// for allowing partial completion where only one of logs or dataset pushing works
// by doing both in parallel and reporting issues on both
if pushLogsErr := r.cli.PushLogs(ctx, repo.ConvertToDsref(ref), addr); pushLogsErr != nil {
log.Errorf("pushing logs: %s", pushLogsErr)
}

if err = r.cli.PushDataset(ctx, ref, addr); err != nil {
return err
}
Expand Down Expand Up @@ -128,6 +135,14 @@ func (r *RemoteMethods) Unpublish(p *PublicationParams, res *repo.DatasetRef) er
// TODO (b5) - need contexts yo
ctx := context.TODO()

// TODO (b5) - we're early in log syncronization days. This is going to fail a bunch
// while we work to upgrade the stack. Long term we may want to consider a mechanism
// for allowing partial completion where only one of logs or dataset pushing works
// by doing both in parallel and reporting issues on both
if removeLogsErr := r.cli.RemoveLogs(ctx, repo.ConvertToDsref(ref), addr); removeLogsErr != nil {
log.Error("removing logs: %s", removeLogsErr.Error())
}

if err := r.cli.RemoveDataset(ctx, ref, addr); err != nil {
return err
}
Expand Down
153 changes: 84 additions & 69 deletions logbook/logsync/logsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,6 @@ type Logsync struct {
didReceive Hook
}

// remove is an internal interface for methods available on foreign logbooks
// the logsync struct contains the canonical implementation of a remote
// interface. network clients wrap the remote interface with network behaviours,
// using Logsync methods to do the "real work" and echoing that back across the
// client protocol
type remote interface {
put(ctx context.Context, author log.Author, r io.Reader) error
get(ctx context.Context, author log.Author, ref dsref.Ref) (sender log.Author, data io.Reader, err error)
del(ctx context.Context, author log.Author, ref dsref.Ref) error
}

// assert at compile-time that Logsync is a remote
var _ remote = (*Logsync)(nil)

// Options encapsulates runtime configuration for a remote
type Options struct {
// ReceiveCheck is called before accepting a log, returning an error from this
Expand Down Expand Up @@ -81,63 +67,20 @@ type Hook func(ctx context.Context, author log.Author, path string) error

// Author is the local author of lsync's logbook
func (lsync *Logsync) Author() log.Author {
return lsync.book.Author()
}

func (lsync *Logsync) put(ctx context.Context, author log.Author, r io.Reader) error {
if lsync == nil {
return ErrNoLogsync
}

if lsync.receiveCheck != nil {
// TODO (b5) - need to populate path
if err := lsync.receiveCheck(ctx, author, ""); err != nil {
return err
}
}

data, err := ioutil.ReadAll(r)
if err != nil {
return err
}

if err := lsync.book.MergeLogBytes(ctx, author, data); err != nil {
return err
}

if lsync.didReceive != nil {
// TODO (b5) - need to populate path
if err := lsync.didReceive(ctx, author, ""); err != nil {
return err
}
}
return nil
}

func (lsync *Logsync) get(ctx context.Context, author log.Author, ref dsref.Ref) (log.Author, io.Reader, error) {
if lsync == nil {
return nil, nil, ErrNoLogsync
}

data, err := lsync.book.LogBytes(ref)
if err != nil {
return lsync.Author(), nil, err
return nil
}
return lsync.Author(), bytes.NewReader(data), nil
return lsync.book.Author()
}

func (lsync *Logsync) del(ctx context.Context, sender log.Author, ref dsref.Ref) error {
// NewPush prepares a Push from the local logsync to a remote destination
// doing a push places a local log on the remote
func (lsync *Logsync) NewPush(ref dsref.Ref, remoteAddr string) (*Push, error) {
if lsync == nil {
return ErrNoLogsync
return nil, ErrNoLogsync
}

return lsync.book.RemoveLog(ctx, sender, ref)
}

// NewPush prepares a Push from the local logsync to a remote destination
// doing a push places a local log on the remote
func (lsync *Logsync) NewPush(ref dsref.Ref, remote string) (*Push, error) {
rem, err := lsync.getRemote(remote)
rem, err := lsync.remoteClient(remoteAddr)
if err != nil {
return nil, err
}
Expand All @@ -151,8 +94,12 @@ func (lsync *Logsync) NewPush(ref dsref.Ref, remote string) (*Push, error) {

// NewPull creates a Pull from the local logsync to a remote destination
// doing a pull fetches a log from the remote to the local logbook
func (lsync *Logsync) NewPull(ref dsref.Ref, remote string) (*Pull, error) {
rem, err := lsync.getRemote(remote)
func (lsync *Logsync) NewPull(ref dsref.Ref, remoteAddr string) (*Pull, error) {
if lsync == nil {
return nil, ErrNoLogsync
}

rem, err := lsync.remoteClient(remoteAddr)
if err != nil {
return nil, err
}
Expand All @@ -165,16 +112,20 @@ func (lsync *Logsync) NewPull(ref dsref.Ref, remote string) (*Pull, error) {
}

// DoRemove asks a remote to remove a log
func (lsync *Logsync) DoRemove(ctx context.Context, ref dsref.Ref, remote string) error {
rem, err := lsync.getRemote(remote)
func (lsync *Logsync) DoRemove(ctx context.Context, ref dsref.Ref, remoteAddr string) error {
if lsync == nil {
return ErrNoLogsync
}

rem, err := lsync.remoteClient(remoteAddr)
if err != nil {
return err
}

return rem.del(ctx, lsync.Author(), ref)
}

func (lsync *Logsync) getRemote(remoteAddr string) (rem remote, err error) {
func (lsync *Logsync) remoteClient(remoteAddr string) (rem remote, err error) {
// if a valid base58 peerID is passed, we're doing a p2p dsync
if id, err := peer.IDB58Decode(remoteAddr); err == nil {
if lsync.p2pHandler == nil {
Expand All @@ -190,6 +141,70 @@ func (lsync *Logsync) getRemote(remoteAddr string) (rem remote, err error) {
return rem, nil
}

// remove is an internal interface for methods available on foreign logbooks
// the logsync struct contains the canonical implementation of a remote
// interface. network clients wrap the remote interface with network behaviours,
// using Logsync methods to do the "real work" and echoing that back across the
// client protocol
type remote interface {
put(ctx context.Context, author log.Author, r io.Reader) error
get(ctx context.Context, author log.Author, ref dsref.Ref) (sender log.Author, data io.Reader, err error)
del(ctx context.Context, author log.Author, ref dsref.Ref) error
}

// assert at compile-time that Logsync is a remote
var _ remote = (*Logsync)(nil)

func (lsync *Logsync) put(ctx context.Context, author log.Author, r io.Reader) error {
if lsync == nil {
return ErrNoLogsync
}

if lsync.receiveCheck != nil {
// TODO (b5) - need to populate path
if err := lsync.receiveCheck(ctx, author, ""); err != nil {
return err
}
}

data, err := ioutil.ReadAll(r)
if err != nil {
return err
}

if err := lsync.book.MergeLogBytes(ctx, author, data); err != nil {
return err
}

if lsync.didReceive != nil {
// TODO (b5) - need to populate path
if err := lsync.didReceive(ctx, author, ""); err != nil {
return err
}
}
return nil
}

func (lsync *Logsync) get(ctx context.Context, author log.Author, ref dsref.Ref) (log.Author, io.Reader, error) {
if lsync == nil {
return nil, nil, ErrNoLogsync
}

data, err := lsync.book.LogBytes(ref)
if err != nil {
return lsync.Author(), nil, err
}
return lsync.Author(), bytes.NewReader(data), nil
}

func (lsync *Logsync) del(ctx context.Context, sender log.Author, ref dsref.Ref) error {
if lsync == nil {
return ErrNoLogsync
}

return lsync.book.RemoveLog(ctx, sender, ref)
}

// Push is a request to place a log on a remote
type Push struct {
ref dsref.Ref
Expand Down
18 changes: 18 additions & 0 deletions logbook/logsync/logsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ func Example() {
// johnathon has 2 references for basit/nasdaq
}

func TestNilCallable(t *testing.T) {
var logsync *Logsync

if a := logsync.Author(); a != nil {
t.Errorf("author mismatch. expected: '%v', got: '%v' ", nil, a)
}

if _, err := logsync.NewPush(dsref.Ref{}, ""); err != ErrNoLogsync {
t.Errorf("error mismatch. expected: '%v', got: '%v' ", ErrNoLogsync, err)
}
if _, err := logsync.NewPull(dsref.Ref{}, ""); err != ErrNoLogsync {
t.Errorf("error mismatch. expected: '%v', got: '%v' ", ErrNoLogsync, err)
}
if err := logsync.DoRemove(context.Background(), dsref.Ref{}, ""); err != ErrNoLogsync {
t.Errorf("error mismatch. expected: '%v', got: '%v' ", ErrNoLogsync, err)
}
}

func makeJohnathonLogbook() *logbook.Book {
pk, err := decodePk(aPk)
if err != nil {
Expand Down
44 changes: 40 additions & 4 deletions remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// ErrNoRemoteClient is returned when no client is allocated
var ErrNoRemoteClient = fmt.Errorf("not configured to make remote requests")
var ErrNoRemoteClient = fmt.Errorf("remote: no client to make remote requests")

// Address extracts the address of a remote from a configuration for a given
// remote name
Expand Down Expand Up @@ -73,7 +73,11 @@ func NewClient(node *p2p.QriNode) (*Client, error) {

var ls *logsync.Logsync
if book := node.Repo.Logbook(); book != nil {
ls = logsync.New(book)
ls = logsync.New(book, func(logsyncConfig *logsync.Options) {
if host := node.Host(); host != nil {
logsyncConfig.Libp2pHost = host
}
})
}

return &Client{
Expand All @@ -91,8 +95,8 @@ func (c *Client) CoreAPI() coreiface.CoreAPI {
return c.capi
}

// Fetch pulls a logbook from a remote
func (c *Client) Fetch(ctx context.Context, ref dsref.Ref, remoteAddr string) error {
// PullLogs pulls logbook data from a remote
func (c *Client) PullLogs(ctx context.Context, ref dsref.Ref, remoteAddr string) error {
if c == nil {
return ErrNoRemoteClient
}
Expand All @@ -109,6 +113,38 @@ func (c *Client) Fetch(ctx context.Context, ref dsref.Ref, remoteAddr string) er
return pull.Do(ctx)
}

// PushLogs pushes logbook data to a remote address
func (c *Client) PushLogs(ctx context.Context, ref dsref.Ref, remoteAddr string) error {
if c == nil {
return ErrNoRemoteClient
}

if t := addressType(remoteAddr); t == "http" {
remoteAddr = remoteAddr + "/remote/logsync"
}
log.Debugf("pushing logs for %s from %s", ref.Alias(), remoteAddr)
push, err := c.logsync.NewPush(ref, remoteAddr)
if err != nil {
return err
}

return push.Do(ctx)
}

// RemoveLogs requests a remote remove logbook data from an address
func (c *Client) RemoveLogs(ctx context.Context, ref dsref.Ref, remoteAddr string) error {
if c == nil {
return ErrNoRemoteClient
}

if t := addressType(remoteAddr); t == "http" {
remoteAddr = remoteAddr + "/remote/logsync"
}

log.Debugf("deleting logs for %s from %s", ref.Alias(), remoteAddr)
return c.logsync.DoRemove(ctx, ref, remoteAddr)
}

// PushDataset pushes the contents of a dataset to a remote
func (c *Client) PushDataset(ctx context.Context, ref repo.DatasetRef, remoteAddr string) error {
if c == nil {
Expand Down

0 comments on commit 16c7b4e

Please sign in to comment.