Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More fine grained response controls #71

Merged
merged 5 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,5 +269,12 @@ type GraphExchange interface {
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
UnpauseResponse(peer.ID, RequestID) error
// Can also send extensions with unpause
UnpauseResponse(peer.ID, RequestID, ...ExtensionData) error

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
PauseResponse(peer.ID, RequestID) error

// CancelResponse cancels an in progress response
CancelResponse(peer.ID, RequestID) error
Comment on lines +275 to +279

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an interface, would it perhaps make sense to proactively add ...ExtensionData here too? To reduce churn don the road...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly? Gonna not do it now but may revisit

}
14 changes: 12 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,18 @@ func (gs *GraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHoo
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.UnpauseResponse(p, requestID)
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (gs *GraphSync) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.PauseResponse(p, requestID)
}

// CancelResponse cancels an in progress response
func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.CancelResponse(p, requestID)
}

type graphSyncReceiver GraphSync
Expand Down
235 changes: 235 additions & 0 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package responsemanager

import (
"context"
"errors"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/runtraversal"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"
)

// TODO: Move this into a seperate module and fully seperate from the ResponseManager
type queryExecutor struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
peerManager PeerManager
loader ipld.Loader
queryQueue QueryQueue
messages chan responseManagerMessage
ctx context.Context
workSignal chan struct{}
ticker *time.Ticker
}

func (qe *queryExecutor) processQueriesWorker() {
const targetWork = 1
taskDataChan := make(chan responseTaskData)
var taskData responseTaskData
for {
pid, tasks, _ := qe.queryQueue.PopTasks(targetWork)
for len(tasks) == 0 {
select {
case <-qe.ctx.Done():
return
case <-qe.workSignal:
pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
case <-qe.ticker.C:
qe.queryQueue.ThawRound()
pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
}
}
for _, task := range tasks {
key := task.Topic.(responseKey)
select {
case qe.messages <- &responseDataRequest{key, taskDataChan}:
case <-qe.ctx.Done():
return
}
select {
case taskData = <-taskDataChan:
case <-qe.ctx.Done():
return
}
if taskData.empty {
log.Info("Empty task on peer request stack")
continue
}
status, err := qe.executeTask(key, taskData)
select {
case qe.messages <- &finishTaskRequest{key, status, err}:
case <-qe.ctx.Done():
}
}
qe.queryQueue.TasksDone(pid, tasks...)

}

}

func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) {
var err error
loader := taskData.loader
traverser := taskData.traverser
if loader == nil || traverser == nil {
loader, traverser, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request)
if err != nil {
return graphsync.RequestFailedUnknown, err
}
select {
case <-qe.ctx.Done():
return graphsync.RequestFailedUnknown, errors.New("context cancelled")
case qe.messages <- &setResponseDataRequest{key, loader, traverser}:
}
}
return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.pauseSignal, taskData.updateSignal)
}

func (qe *queryExecutor) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest) (ipld.Loader, ipldutil.Traverser, error) {
result := qe.requestHooks.ProcessRequestHooks(p, request)
peerResponseSender := qe.peerManager.SenderForPeer(p)
var validationErr error
err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
if result.Err != nil || !result.IsValidated {
transaction.FinishWithError(graphsync.RequestFailedUnknown)
validationErr = errors.New("request not valid")
}
return nil
})
if err != nil {
return nil, nil, err
}
if validationErr != nil {
return nil, nil, validationErr
}
if err := qe.processDoNoSendCids(request, peerResponseSender); err != nil {
return nil, nil, err
}
rootLink := cidlink.Link{Cid: request.Root()}
traverser := ipldutil.TraversalBuilder{
Root: rootLink,
Selector: request.Selector(),
Chooser: result.CustomChooser,
}.Start(ctx)
loader := result.CustomLoader
if loader == nil {
loader = qe.loader
}
return loader, traverser, nil
}

func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
return nil
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return err
}
links := make([]ipld.Link, 0, cidSet.Len())
err = cidSet.ForEach(func(c cid.Cid) error {
links = append(links, cidlink.Link{Cid: c})
return nil
})
if err != nil {
return err
}
peerResponseSender.IgnoreBlocks(request.ID(), links)
return nil
}

func (qe *queryExecutor) executeQuery(
p peer.ID,
request gsmsg.GraphSyncRequest,
loader ipld.Loader,
traverser ipldutil.Traverser,
pauseSignal chan struct{},
updateSignal chan struct{}) (graphsync.ResponseStatusCode, error) {
updateChan := make(chan []gsmsg.GraphSyncRequest)
peerResponseSender := qe.peerManager.SenderForPeer(p)
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
var err error
_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
err = qe.checkForUpdates(p, request, pauseSignal, updateSignal, updateChan, transaction)
if err != nil {
if err == hooks.ErrPaused {
transaction.PauseRequest()
}
return nil
}
blockData := transaction.SendResponse(link, data)
if blockData.BlockSize() > 0 {
result := qe.blockHooks.ProcessBlockHooks(p, request, blockData)
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
if result.Err == hooks.ErrPaused {
transaction.PauseRequest()
}
err = result.Err
}
return nil
})
return err
})
if err != nil {
if err != hooks.ErrPaused {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return graphsync.RequestFailedUnknown, err
}
return graphsync.RequestPaused, err
}
return peerResponseSender.FinishRequest(request.ID()), nil
}

func (qe *queryExecutor) checkForUpdates(
p peer.ID,
request gsmsg.GraphSyncRequest,
pauseSignal chan struct{},
updateSignal chan struct{},
updateChan chan []gsmsg.GraphSyncRequest,
peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error {
for {
select {
case <-pauseSignal:
return hooks.ErrPaused
case <-updateSignal:
select {
case qe.messages <- &responseUpdateRequest{responseKey{p, request.ID()}, updateChan}:
case <-qe.ctx.Done():
}
select {
case updates := <-updateChan:
for _, update := range updates {
result := qe.updateHooks.ProcessUpdateHooks(p, request, update)
for _, extension := range result.Extensions {
peerResponseSender.SendExtensionData(extension)
}
if result.Err != nil {
return result.Err
}
}
case <-qe.ctx.Done():
}
default:
return nil
}
}
}
Loading