Skip to content

Commit

Permalink
feat(logsync): add p2p logsync handler
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Oct 16, 2019
1 parent 4087455 commit 1b0f1c2
Show file tree
Hide file tree
Showing 5 changed files with 475 additions and 29 deletions.
14 changes: 7 additions & 7 deletions logbook/logsync/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (c *HTTPClient) put(ctx context.Context, author log.Author, r io.Reader) er
}
req = req.WithContext(ctx)

if err := addAuthorHeaders(req.Header, author); err != nil {
if err := addAuthorHTTPHeaders(req.Header, author); err != nil {
return err
}

Expand All @@ -53,7 +53,7 @@ func (c *HTTPClient) get(ctx context.Context, author log.Author, ref dsref.Ref)
}
req = req.WithContext(ctx)

if err := addAuthorHeaders(req.Header, author); err != nil {
if err := addAuthorHTTPHeaders(req.Header, author); err != nil {
return nil, nil, err
}
res, err := http.DefaultClient.Do(req)
Expand Down Expand Up @@ -83,7 +83,7 @@ func (c *HTTPClient) del(ctx context.Context, author log.Author, ref dsref.Ref)
}
req = req.WithContext(ctx)

if err := addAuthorHeaders(req.Header, author); err != nil {
if err := addAuthorHTTPHeaders(req.Header, author); err != nil {
return err
}

Expand All @@ -96,7 +96,7 @@ func (c *HTTPClient) del(ctx context.Context, author log.Author, ref dsref.Ref)
return err
}

func addAuthorHeaders(h http.Header, author log.Author) error {
func addAuthorHTTPHeaders(h http.Header, author log.Author) error {
h.Set("AuthorID", author.AuthorID())
pubByteStr, err := author.AuthorPubKey().Bytes()
if err != nil {
Expand Down Expand Up @@ -140,7 +140,7 @@ func HTTPHandler(lsync *Logsync) http.HandlerFunc {
}
r.Body.Close()

addAuthorHeaders(w.Header(), lsync.Author())
addAuthorHTTPHeaders(w.Header(), lsync.Author())
case "GET":
ref, err := repo.ParseDatasetRef(r.FormValue("ref"))
if err != nil {
Expand All @@ -156,7 +156,7 @@ func HTTPHandler(lsync *Logsync) http.HandlerFunc {
return
}

addAuthorHeaders(w.Header(), receiver)
addAuthorHTTPHeaders(w.Header(), receiver)
io.Copy(w, r)
case "DELETE":
ref, err := repo.ParseDatasetRef(r.FormValue("ref"))
Expand All @@ -172,7 +172,7 @@ func HTTPHandler(lsync *Logsync) http.HandlerFunc {
return
}

addAuthorHeaders(w.Header(), lsync.Author())
addAuthorHTTPHeaders(w.Header(), lsync.Author())
}
}
}
24 changes: 16 additions & 8 deletions logbook/logsync/logsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
// Logsync fulfills requests from clients, logsync wraps a logbook.Book, pushing
// and pulling logs from remote sources to its logbook
type Logsync struct {
book *logbook.Book
host host.Host
book *logbook.Book
p2pHandler *p2pHandler

receiveCheck Hook
didReceive Hook
Expand All @@ -47,6 +47,9 @@ type Options struct {
ReceiveCheck Hook
// DidReceive is called after a log has been merged into the logbook
DidReceive Hook

// to send & push over libp2p connections, provide a libp2p host
Libp2pHost host.Host
}

// New creates a remote from a logbook and optional configuration functions
Expand All @@ -56,12 +59,18 @@ func New(book *logbook.Book, opts ...func(*Options)) *Logsync {
opt(o)
}

return &Logsync{
logsync := &Logsync{
book: book,

receiveCheck: o.ReceiveCheck,
didReceive: o.DidReceive,
}

if o.Libp2pHost != nil {
logsync.p2pHandler = newp2pHandler(logsync, o.Libp2pHost)
}

return logsync
}

// Hook is a function called at specified points in the sync lifecycle
Expand Down Expand Up @@ -152,12 +161,11 @@ func (lsync *Logsync) DoRemove(ctx context.Context, ref dsref.Ref, remote string

func (lsync *Logsync) getRemote(remoteAddr string) (rem remote, err error) {
// if a valid base58 peerID is passed, we're doing a p2p dsync
if _, err := peer.IDB58Decode(remoteAddr); err == nil {
if lsync.host == nil {
return nil, fmt.Errorf("no p2p host provided to perform p2p dsync")
if id, err := peer.IDB58Decode(remoteAddr); err == nil {
if lsync.p2pHandler == nil {
return nil, fmt.Errorf("no p2p host provided to perform p2p logsync")
}
// rem = &p2pClient{remotePeerID: id, p2pHandler: ds.p2pHandler}
return nil, fmt.Errorf("p2p logsync not finished")
return &p2pClient{remotePeerID: id, p2pHandler: lsync.p2pHandler}, nil
} else if strings.HasPrefix(remoteAddr, "http") {
rem = &HTTPClient{URL: remoteAddr}
} else {
Expand Down
45 changes: 32 additions & 13 deletions logbook/logsync/logsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/qri-io/qri/logbook/log"
)

func ExampleNew() {
func Example() {
// first some boilerplate setup
ctx, done := context.WithCancel(context.Background())
defer done()
Expand Down Expand Up @@ -110,15 +110,25 @@ func ExampleNew() {
}

func makeJohnathonLogbook() *logbook.Book {
book, err := newTestbook("johnathon", aPk)
pk, err := decodePk(aPk)
if err != nil {
panic(err)
}

book, err := newTestbook("johnathon", pk)
if err != nil {
panic(err)
}
return book
}

func makeBasitLogbook() *logbook.Book {
book, err := newTestbook("basit", bPk)
pk, err := decodePk(bPk)
if err != nil {
panic(err)
}

book, err := newTestbook("basit", pk)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -219,8 +229,9 @@ func TestSyncHTTP(t *testing.T) {
}

type testRunner struct {
Ctx context.Context
A, B *logbook.Book
Ctx context.Context
A, B *logbook.Book
APrivKey, BPrivKey crypto.PrivKey
}

func (tr *testRunner) DefaultLogsyncs() (a, b *Logsync) {
Expand All @@ -232,34 +243,42 @@ func newTestRunner(t *testing.T) (tr *testRunner, cleanup func()) {
tr = &testRunner{
Ctx: context.Background(),
}
if tr.A, err = newTestbook("a", aPk); err != nil {

tr.APrivKey, err = decodePk(aPk)
if err != nil {
t.Fatal(err)
}
if tr.B, err = newTestbook("b", bPk); err != nil {
if tr.A, err = newTestbook("a", tr.APrivKey); err != nil {
t.Fatal(err)
}

tr.BPrivKey, err = decodePk(bPk)
if err != nil {
t.Fatal(err)
}
if tr.B, err = newTestbook("b", tr.BPrivKey); err != nil {
t.Fatal(err)
}

cleanup = func() {}
return tr, cleanup
}

func newTestbook(username, b64pk string) (*logbook.Book, error) {
func decodePk(b64pk string) (crypto.PrivKey, error) {
// logbooks are encrypted at rest, we need a private key to interact with
// them, including to create a new logbook. This is a dummy Private Key
// you should never, ever use in real life. demo only folks.
data, err := base64.StdEncoding.DecodeString(b64pk)
if err != nil {
return nil, err
}
pk, err := crypto.UnmarshalPrivateKey(data)
if err != nil {
return nil, err
}
return crypto.UnmarshalPrivateKey(data)
}

func newTestbook(username string, pk crypto.PrivKey) (*logbook.Book, error) {
// logbook relies on a qfs.Filesystem for read & write. create an in-memory
// filesystem we can play with
fs := qfs.NewMemFS()

return logbook.NewBook(pk, username, fs, "/mem/logset")
}

Expand Down
Loading

0 comments on commit 1b0f1c2

Please sign in to comment.