Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add extensions on complete #76

Merged
merged 2 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ type RequestUpdatedHookActions interface {
UnpauseResponse()
}

// ResponseCompletedHookActions are actions that can be taken in response completed hook to add a
// final extension on a response
type ResponseCompletedHookActions interface {
SendExtensionData(ExtensionData)
}

// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It receives an interface for customizing the response to this request
Expand Down Expand Up @@ -272,8 +278,8 @@ type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, h
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)

// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)
// OnResponseCompletedHook provides a way to listen for when responder has finished serving a response
type OnResponseCompletedHook func(p peer.ID, request RequestData, status ResponseStatusCode, hookActions ResponseCompletedHookActions)

// OnRequestorCancelledListener provides a way to listen for responses the requestor canncels
type OnRequestorCancelledListener func(p peer.ID, request RequestData)
Expand Down Expand Up @@ -307,8 +313,8 @@ type GraphExchange interface {
// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc
// RegisterCompletedResponseHook adds a hook on the responder for completed responses
RegisterCompletedResponseHook(hook OnResponseCompletedHook) UnregisterHookFunc

// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
Expand Down
14 changes: 7 additions & 7 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type GraphSync struct {
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
completedResponseHooks *responderhooks.CompletedResponseHooks
requestorCancelledListeners *responderhooks.RequestorCancelledListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
Expand Down Expand Up @@ -88,9 +88,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
completedResponseHooks := responderhooks.NewCompletedResponseHooks()
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners)
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseHooks, requestorCancelledListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -103,7 +103,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
completedResponseHooks: completedResponseHooks,
requestorCancelledListeners: requestorCancelledListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
Expand Down Expand Up @@ -170,9 +170,9 @@ func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedH
return gs.requestUpdatedHooks.Register(hook)
}

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
return gs.completedResponseListeners.Register(listener)
// RegisterCompletedResponseHook adds a hook on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseHook(hook graphsync.OnResponseCompletedHook) graphsync.UnregisterHookFunc {
return gs.completedResponseHooks.Register(hook)
}

// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
Expand Down
25 changes: 17 additions & 8 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func TestGraphsyncRoundTrip(t *testing.T) {
// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

var receivedResponseData []byte
var receivedResponseData [][]byte
var receivedRequestData []byte

requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
receivedResponseData = append(receivedResponseData, data)
}
})

Expand All @@ -213,7 +213,8 @@ func TestGraphsyncRoundTrip(t *testing.T) {
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
responder.RegisterCompletedResponseHook(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
hookActions.SendExtensionData(td.extensionFinal)
select {
case finalResponseStatusChan <- status:
default:
Expand All @@ -227,9 +228,11 @@ func TestGraphsyncRoundTrip(t *testing.T) {

// verify extension roundtrip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")
require.Len(t, receivedResponseData, 2)
require.Equal(t, td.extensionResponseData, receivedResponseData[0], "did not receive correct extension response data")
require.Equal(t, td.extensionFinalData, receivedResponseData[1], "did not receive correct extension response data")

// verify listener
// verify completed hook
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
Expand All @@ -256,7 +259,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
responder := td.GraphSyncHost2()

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
responder.RegisterCompletedResponseHook(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
select {
case finalResponseStatusChan <- status:
default:
Expand All @@ -278,7 +281,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
require.Equal(t, tree.MiddleMapBlock.RawData(), td.blockStore1[tree.MiddleMapNodeLnk])
require.Equal(t, tree.RootBlock.RawData(), td.blockStore1[tree.RootNodeLnk])

// verify listener
// verify completed hook
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedPartial, finalResponseStatus)
Expand Down Expand Up @@ -820,6 +823,8 @@ type gsTestData struct {
extensionResponse graphsync.ExtensionData
extensionUpdateData []byte
extensionUpdate graphsync.ExtensionData
extensionFinalData []byte
extensionFinal graphsync.ExtensionData
}

func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
Expand Down Expand Up @@ -857,7 +862,11 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
Name: td.extensionName,
Data: td.extensionUpdateData,
}

td.extensionFinalData = testutil.RandomBytes(100)
td.extensionFinal = graphsync.ExtensionData{
Name: td.extensionName,
Data: td.extensionFinalData,
}
return td
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,59 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)

// CompletedResponseListeners is a set of listeners for completed responses
type CompletedResponseListeners struct {
// CompletedResponseHooks is a set of hooks for completed responses
type CompletedResponseHooks struct {
pubSub *pubsub.PubSub
}

type internalCompletedResponseEvent struct {
p peer.ID
request graphsync.RequestData
status graphsync.ResponseStatusCode
cha *completeHookActions
}

func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalCompletedResponseEvent)
listener := subscriberFn.(graphsync.OnResponseCompletedListener)
listener(ie.p, ie.request, ie.status)
hook := subscriberFn.(graphsync.OnResponseCompletedHook)
hook(ie.p, ie.request, ie.status, ie.cha)
return nil
}

// NewCompletedResponseListeners returns a new list of completed response listeners
func NewCompletedResponseListeners() *CompletedResponseListeners {
return &CompletedResponseListeners{pubSub: pubsub.New(completedResponseDispatcher)}
// NewCompletedResponseHooks returns a new list of completed response hooks
func NewCompletedResponseHooks() *CompletedResponseHooks {
return &CompletedResponseHooks{pubSub: pubsub.New(completedResponseDispatcher)}
}

// Register registers an listener for completed responses
func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(listener))
// Register registers an hook for completed responses
func (crl *CompletedResponseHooks) Register(hook graphsync.OnResponseCompletedHook) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(hook))
}

// ProcessCompleteHooks runs notifies all completed hooks that a response has completed
func (crl *CompletedResponseHooks) ProcessCompleteHooks(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) CompleteResult {
ha := &completeHookActions{}
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status, ha})
return ha.result()
}

// CompleteResult is the outcome of running complete response hooks
type CompleteResult struct {
Extensions []graphsync.ExtensionData
}

type completeHookActions struct {
extensions []graphsync.ExtensionData
}

func (ha *completeHookActions) result() CompleteResult {
return CompleteResult{
Extensions: ha.extensions,
}
}

// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status})
func (ha *completeHookActions) SendExtensionData(ext graphsync.ExtensionData) {
ha.extensions = append(ha.extensions, ext)
}

// RequestorCancelledListeners is a set of listeners for when requestors cancel
Expand Down
57 changes: 57 additions & 0 deletions responsemanager/hooks/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,60 @@ func TestUpdateHookProcessing(t *testing.T) {
})
}
}

func TestCompleteHookProcessing(t *testing.T) {
extensionData := testutil.RandomBytes(100)
extensionName := graphsync.ExtensionName("AppleSauce/McGee")
extension := graphsync.ExtensionData{
Name: extensionName,
Data: extensionData,
}
extensionResponseData := testutil.RandomBytes(100)
extensionResponse := graphsync.ExtensionData{
Name: extensionName,
Data: extensionResponseData,
}

root := testutil.GenerateCids(1)[0]
requestID := graphsync.RequestID(rand.Int31())
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
request := gsmsg.NewRequest(requestID, root, ssb.Matcher().Node(), graphsync.Priority(0), extension)
status := graphsync.RequestCompletedFull
p := testutil.GeneratePeers(1)[0]
testCases := map[string]struct {
configure func(t *testing.T, completedHooks *hooks.CompletedResponseHooks)
assert func(t *testing.T, result hooks.CompleteResult)
}{
"no hooks": {
assert: func(t *testing.T, result hooks.CompleteResult) {
require.Empty(t, result.Extensions)
},
},
"send extension data": {
configure: func(t *testing.T, completedHooks *hooks.CompletedResponseHooks) {
completedHooks.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
_, found := requestData.Extension(extensionName)
if found {
hookActions.SendExtensionData(extensionResponse)
}
})
},
assert: func(t *testing.T, result hooks.CompleteResult) {
require.Len(t, result.Extensions, 1)
require.Contains(t, result.Extensions, extensionResponse)
},
},
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
completedHooks := hooks.NewCompletedResponseHooks()
if data.configure != nil {
data.configure(t, completedHooks)
}
result := completedHooks.ProcessCompleteHooks(p, request, status)
if data.assert != nil {
data.assert(t, result)
}
})
}
}
57 changes: 34 additions & 23 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type queryExecutor struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
completedListeners CompletedListeners
completedHooks CompletedHooks
cancelledListeners CancelledListeners
peerManager PeerManager
loader ipld.Loader
Expand Down Expand Up @@ -71,13 +71,6 @@ func (qe *queryExecutor) processQueriesWorker() {
continue
}
status, err := qe.executeTask(key, taskData)
_, isPaused := err.(hooks.ErrPaused)
isCancelled := err != nil && isContextErr(err)
if isCancelled {
qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request)
} else if !isPaused {
qe.completedListeners.NotifyCompletedListeners(key.p, taskData.request, status)
}
select {
case qe.messages <- &finishTaskRequest{key, status, err}:
case <-qe.ctx.Done():
Expand Down Expand Up @@ -207,23 +200,41 @@ func (qe *queryExecutor) executeQuery(
})
return err
})
if err != nil {
_, isPaused := err.(hooks.ErrPaused)
if isPaused {
return graphsync.RequestPaused, err
}

var status graphsync.ResponseStatusCode
_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
status = qe.closeRequest(transaction, err)
if isContextErr(err) {
peerResponseSender.FinishWithCancel(request.ID())
return graphsync.RequestCancelled, err
}
if err == errCancelledByCommand {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestCancelled)
return graphsync.RequestCancelled, err
qe.cancelledListeners.NotifyCancelledListeners(p, request)
} else if status != graphsync.RequestPaused {
result := qe.completedHooks.ProcessCompleteHooks(p, request, status)
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
}
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return graphsync.RequestFailedUnknown, err
return nil
})
return status, err
}

func (qe *queryExecutor) closeRequest(peerResponseSender peerresponsemanager.PeerResponseTransactionSender, err error) graphsync.ResponseStatusCode {
_, isPaused := err.(hooks.ErrPaused)
if isPaused {
return graphsync.RequestPaused
}
if isContextErr(err) {
peerResponseSender.FinishWithCancel()
return graphsync.RequestCancelled
}
if err == errCancelledByCommand {
peerResponseSender.FinishWithError(graphsync.RequestCancelled)
return graphsync.RequestCancelled
}
if err != nil {
peerResponseSender.FinishWithError(graphsync.RequestFailedUnknown)
return graphsync.RequestFailedUnknown
}
return peerResponseSender.FinishRequest(request.ID()), nil
return peerResponseSender.FinishRequest()
}

func (qe *queryExecutor) checkForUpdates(
Expand Down Expand Up @@ -268,5 +279,5 @@ func (qe *queryExecutor) checkForUpdates(

func isContextErr(err error) bool {
// TODO: Match with errors.Is when https://github.com/ipld/go-ipld-prime/issues/58 is resolved
return strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error())
return err != nil && strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error())
}
Loading