Skip to content

Commit

Permalink
feat(remote): embed log hook oplogs in hook context
Browse files Browse the repository at this point in the history
I see hooks as part of a request/response lifecycle, which makes it appropriate to embed values in a context for later fetching.

We're doing exactly that for oplogs, so hooks can get access to the logs being pushed without affecting the signature of a remote Hook, which is shared by datasets.
  • Loading branch information
b5 committed Oct 18, 2019
1 parent 70077bd commit 412852d
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 10 deletions.
7 changes: 7 additions & 0 deletions logbook/logsync/logsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ func (lsync *Logsync) put(ctx context.Context, author oplog.Author, r io.Reader)
return err
}

if lsync.pushFinalCheck != nil {
// TODO (b5) - need to populate path
if err := lsync.pushFinalCheck(ctx, author, dsref.Ref{}, lg); err != nil {
return err
}
}

if err := lsync.book.MergeLog(ctx, author, lg); err != nil {
return err
}
Expand Down
72 changes: 72 additions & 0 deletions logbook/logsync/logsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
crypto "github.com/libp2p/go-libp2p-crypto"
"github.com/qri-io/dataset"
"github.com/qri-io/qfs"
Expand Down Expand Up @@ -108,6 +109,77 @@ func Example() {
// johnathon has 2 references for basit/nasdaq
}

func TestHookCalls(t *testing.T) {
tr, cleanup := newTestRunner(t)
defer cleanup()

hooksCalled := []string{}
callCheck := func(s string) Hook {
return func(ctx context.Context, a Author, ref dsref.Ref, l *oplog.Log) error {
hooksCalled = append(hooksCalled, s)
return nil
}
}

nasdaqRef, err := writeNasdaqLogs(tr.Ctx, tr.A)
if err != nil {
t.Fatal(err)
}

lsA := New(tr.A, func(o *Options) {
o.PullPreCheck = callCheck("PullPreCheck")
o.Pulled = callCheck("Pulled")
o.PushPreCheck = callCheck("PushPreCheck")
o.PushFinalCheck = callCheck("PushFinalCheck")
o.Pushed = callCheck("Pushed")
o.RemovePreCheck = callCheck("RemovePreCheck")
o.Removed = callCheck("Removed")
})

s := httptest.NewServer(HTTPHandler(lsA))
defer s.Close()

lsB := New(tr.B)

pull, err := lsB.NewPull(nasdaqRef, s.URL)
if err != nil {
t.Fatal(err)
}
if err := pull.Do(tr.Ctx); err != nil {
t.Fatal(err)
}

worldBankRef, err := writeWorldBankLogs(tr.Ctx, tr.B)
if err != nil {
t.Fatal(err)
}
push, err := lsB.NewPush(worldBankRef, s.URL)
if err != nil {
t.Fatal(err)
}
if err := push.Do(tr.Ctx); err != nil {
t.Fatal(err)
}

if err := lsB.DoRemove(tr.Ctx, worldBankRef, s.URL); err != nil {
t.Fatal(err)
}

expectHooksCallOrder := []string{
"PullPreCheck",
"Pulled",
"PushPreCheck",
"PushFinalCheck",
"Pushed",
"RemovePreCheck",
"Removed",
}

if diff := cmp.Diff(expectHooksCallOrder, hooksCalled); diff != "" {
t.Errorf("result mismatch (-want +got):\n%s", diff)
}

}
func TestNilCallable(t *testing.T) {
var logsync *Logsync

Expand Down
2 changes: 0 additions & 2 deletions remote/client_test.go

This file was deleted.

26 changes: 26 additions & 0 deletions remote/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package remote

import (
"context"

"github.com/qri-io/qri/logbook/oplog"
)

// The ctxKey type is unexported to prevent collisions with context keys defined
// in other packages.
type ctxKey int

// oplogKey is the context key for an oplog.Log pointer. log hooks embed oplogs
// in a context for access within a lifecycle hook
const oplogKey ctxKey = 0

// newLogHookContext creates
func newLogHookContext(ctx context.Context, l *oplog.Log) context.Context {
return context.WithValue(ctx, oplogKey, l)
}

// OplogFromContext pulls an oplog value from
func OplogFromContext(ctx context.Context) (l *oplog.Log, ok bool) {
l, ok = ctx.Value(oplogKey).(*oplog.Log)
return l, ok
}
5 changes: 5 additions & 0 deletions remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Options struct {
DatasetPulled Hook

LogPushPreCheck Hook
LogPushFinalCheck Hook
LogPushed Hook
LogPullPreCheck Hook
LogPulled Hook
Expand Down Expand Up @@ -125,6 +126,7 @@ func NewRemote(node *p2p.QriNode, cfg *config.Remote, opts ...func(o *Options))
if book := node.Repo.Logbook(); book != nil {
r.logsync = logsync.New(book, func(lso *logsync.Options) {
lso.PushPreCheck = r.logHook(o.LogPushPreCheck)
lso.PushFinalCheck = r.logHook(o.LogPushFinalCheck)
lso.Pushed = r.logHook(o.LogPushed)
lso.PullPreCheck = r.logHook(o.LogPullPreCheck)
lso.Pulled = r.logHook(o.LogPulled)
Expand Down Expand Up @@ -304,6 +306,9 @@ func (r *Remote) logHook(h Hook) logsync.Hook {
}
}

// embed the log oplog pointer in our hook
ctx = newLogHookContext(ctx, l)

return h(ctx, pid, r)
}

Expand Down
26 changes: 18 additions & 8 deletions remote/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ func TestDatasetPullPushDeleteHTTP(t *testing.T) {
tr, cleanup := newTestRunner(t)
defer cleanup()

aCfg := &config.Remote{
Enabled: true,
AllowRemoves: true,
AcceptSizeMax: 10000,
}

hooksCalled := []string{}
callCheck := func(s string) Hook {
return func(ctx context.Context, pid profile.ID, ref repo.DatasetRef) error {
Expand All @@ -38,6 +32,15 @@ func TestDatasetPullPushDeleteHTTP(t *testing.T) {
}
}

requireLogCallCheck := func(t *testing.T, s string) Hook {
return func(ctx context.Context, pid profile.ID, ref repo.DatasetRef) error {
if l, ok := OplogFromContext(ctx); !ok {
t.Errorf("hook %s expected log to be in context. got: %v", s, l)
}
return callCheck(s)(ctx, pid, ref)
}
}

opts := func(o *Options) {
o.DatasetPushPreCheck = callCheck("DatasetPushPreCheck")
o.DatasetPushFinalCheck = callCheck("DatasetPushFinalCheck")
Expand All @@ -46,14 +49,20 @@ func TestDatasetPullPushDeleteHTTP(t *testing.T) {
o.DatasetRemoved = callCheck("DatasetRemoved")

o.LogPushPreCheck = callCheck("LogPushPreCheck")
// TODO (b5) - log push final check
o.LogPushed = callCheck("LogPushed")
o.LogPushFinalCheck = requireLogCallCheck(t, "LogPushFinalCheck")
o.LogPushed = requireLogCallCheck(t, "LogPushed")
o.LogPullPreCheck = callCheck("LogPullPreCheck")
o.LogPulled = callCheck("LogPulled")
o.LogRemovePreCheck = callCheck("LogRemovePreCheck")
o.LogRemoved = callCheck("LogRemoved")
}

aCfg := &config.Remote{
Enabled: true,
AllowRemoves: true,
AcceptSizeMax: 10000,
}

rem, err := NewRemote(tr.NodeA, aCfg, opts)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -109,6 +118,7 @@ func TestDatasetPullPushDeleteHTTP(t *testing.T) {
"LogPulled",
"DatasetPulled",
"LogPushPreCheck",
"LogPushFinalCheck",
"LogPushed",
"DatasetPushPreCheck",
"DatasetPushFinalCheck",
Expand Down

0 comments on commit 412852d

Please sign in to comment.