Skip to content

Commit

Permalink
feat(dsync remove): added hooks, remove, and meta params to dsync
Browse files Browse the repository at this point in the history
Merge pull request #18 from qri-io/pull_hook
  • Loading branch information
b5 authored Aug 26, 2019
2 parents f898d15 + 64f1cb1 commit bee27f6
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 94 deletions.
119 changes: 91 additions & 28 deletions dsync/dsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,20 @@ type DagSyncable interface {

// GetDagInfo asks the remote for info specified by a the root identifier
// string of a DAG
GetDagInfo(ctx context.Context, cidStr string) (info *dag.Info, err error)
GetDagInfo(ctx context.Context, cidStr string, meta map[string]string) (info *dag.Info, err error)
// GetBlock gets a block of data from the remote
GetBlock(ctx context.Context, hash string) (rawdata []byte, err error)

// RemoveCID asks the remote to remove a cid. Supporting deletes are optional.
// DagSyncables that don't implement DeleteCID must return
// ErrDeleteNotSupported
RemoveCID(ctx context.Context, cidStr string, meta map[string]string) (err error)
}

// ErrRemoveNotSupported is the error value returned by remotes that don't
// support delete operations
var ErrRemoveNotSupported = fmt.Errorf("remove is not supported")

// Hook is a function that a dsync instance will call at specified points in the
// sync lifecycle
type Hook func(ctx context.Context, info dag.Info, meta map[string]string) error
Expand Down Expand Up @@ -110,16 +119,23 @@ type Dsync struct {
// struct for accepting p2p dsync requests
p2pHandler *p2pHandler

// requireAllBlocks forces pushes to send *all* blocks,
// skipping manifest diffing
requireAllBlocks bool
// should dsync honor remove requests?
allowRemoves bool

// preCheck is called before creating a receive session
preCheck Hook
// dagFinalCheck is called before finalizing a receive session
finalCheck Hook
// onCompleteHook is optionally called once dag sync is complete
onCompleteHook Hook

// requireAllBlocks forces pushes to send *all* blocks,
// skipping manifest diffing
requireAllBlocks bool
// getDagInfoCheck is an optional hook to call when a client asks for a dag
// info
getDagInfoCheck Hook
// removeCheck is an optional hook to call before allowing a delete
removeCheck Hook

// inbound transfers in progress, will be nil if not acting as a remote
sessionLock sync.Mutex
Expand All @@ -140,29 +156,39 @@ type Config struct {
HTTPRemoteAddress string
// to send & push over libp2p connections, provide a libp2p host
Libp2pHost host.Host

// PinAPI is required for remotes to accept
// PinAPI is required for remotes to accept pinning requests
PinAPI coreiface.PinAPI
// required check function for a remote accepting DAGs
PreCheck Hook
// optional check function for screening a receive before potentially pinning
FinalCheck Hook
// optional check function called after successful transfer
OnComplete Hook

// RequireAllBlocks will skip checking for blocks already present on the
// remote, requiring push requests to send all blocks each time
// This is a helpful override if the receiving node can't distinguish between
// local and network block access, as with the ipfs-http-api intreface
RequireAllBlocks bool
// AllowRemoves let's dsync opt into remove requests. removes are
// disabled by default
AllowRemoves bool

// required check function for a remote accepting DAGs, this hook will be
// called before a push is allowed to begin
PushPreCheck Hook
// optional check function for screening a receive before potentially pinning
PushFinalCheck Hook
// optional check function called after successful transfer
PushComplete Hook
// optional check to run on dagInfo requests before sending an info back
GetDagInfoCheck Hook
// optional check to run before executing a remove operation
// the dag.Info given to this check will only contain the root CID being
// removed
RemoveCheck Hook
}

// Validate confirms the configuration is valid
func (cfg *Config) Validate() error {
if cfg.PreCheck == nil {
if cfg.PushPreCheck == nil {
return fmt.Errorf("PreCheck is required")
}
if cfg.FinalCheck == nil {
if cfg.PushFinalCheck == nil {
return fmt.Errorf("FinalCheck is required")
}
return nil
Expand All @@ -182,8 +208,8 @@ func OptLibp2pHost(host host.Host) func(cfg *Config) {
// to get an offline-only node getter from an ipfs CoreAPI interface
func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func(cfg *Config)) (*Dsync, error) {
cfg := &Config{
PreCheck: DefaultDagPrecheck,
FinalCheck: DefaultDagFinalCheck,
PushPreCheck: DefaultDagPrecheck,
PushFinalCheck: DefaultDagFinalCheck,
}

for _, opt := range opts {
Expand All @@ -198,14 +224,18 @@ func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func
lng: localNodes,
bapi: blockStore,

preCheck: cfg.PreCheck,
finalCheck: cfg.FinalCheck,
onCompleteHook: cfg.OnComplete,

requireAllBlocks: cfg.RequireAllBlocks,
sessionPool: map[string]*session{},
sessionCancels: map[string]context.CancelFunc{},
sessionTTLDur: time.Hour * 5,
allowRemoves: cfg.AllowRemoves,

preCheck: cfg.PushPreCheck,
finalCheck: cfg.PushFinalCheck,
onCompleteHook: cfg.PushComplete,
getDagInfoCheck: cfg.GetDagInfoCheck,
removeCheck: cfg.RemoveCheck,

sessionPool: map[string]*session{},
sessionCancels: map[string]context.CancelFunc{},
sessionTTLDur: time.Hour * 5,
}

if cfg.PinAPI != nil {
Expand Down Expand Up @@ -303,12 +333,12 @@ func (ds *Dsync) NewPushInfo(info *dag.Info, remoteAddr string, pinOnComplete bo

// NewPull creates a pull. A pull fetches an entire DAG from a remote, placing
// it in the local block store
func (ds *Dsync) NewPull(cidStr, remoteAddr string) (*Pull, error) {
func (ds *Dsync) NewPull(cidStr, remoteAddr string, meta map[string]string) (*Pull, error) {
rem, err := ds.syncableRemote(remoteAddr)
if err != nil {
return nil, err
}
return NewPull(cidStr, ds.lng, ds.bapi, rem)
return NewPull(cidStr, ds.lng, ds.bapi, rem, meta)
}

// NewReceiveSession takes a manifest sent by a remote and initiates a
Expand Down Expand Up @@ -419,7 +449,7 @@ func (ds *Dsync) finalizeReceive(sess *session) error {
}

// GetDagInfo gets the manifest for a DAG rooted at id, checking any configured cache before falling back to generating a new manifest
func (ds *Dsync) GetDagInfo(ctx context.Context, hash string) (info *dag.Info, err error) {
func (ds *Dsync) GetDagInfo(ctx context.Context, hash string, meta map[string]string) (info *dag.Info, err error) {
// check cache if one is specified
if ds.infoStore != nil {
if info, err = ds.infoStore.DAGInfo(ctx, hash); err == nil {
Expand All @@ -433,7 +463,18 @@ func (ds *Dsync) GetDagInfo(ctx context.Context, hash string) (info *dag.Info, e
return nil, err
}

return dag.NewInfo(ctx, ds.lng, id)
info, err = dag.NewInfo(ctx, ds.lng, id)
if err != nil {
return nil, err
}

if ds.getDagInfoCheck != nil {
if err = ds.getDagInfoCheck(ctx, *info, meta); err != nil {
return nil, err
}
}

return info, nil
}

// GetBlock returns a single block from the store
Expand All @@ -445,3 +486,25 @@ func (ds *Dsync) GetBlock(ctx context.Context, hash string) ([]byte, error) {

return ioutil.ReadAll(rdr)
}

// RemoveCID unpins a CID if removes are enabled, does not immideately remove
// unpinned content
func (ds *Dsync) RemoveCID(ctx context.Context, cidStr string, meta map[string]string) error {
if !ds.allowRemoves {
return ErrRemoveNotSupported
}

log.Debug("removing cid", cidStr)
if ds.removeCheck != nil {
info := dag.Info{Manifest: &dag.Manifest{Nodes: []string{cidStr}}}
if err := ds.removeCheck(ctx, info, meta); err != nil {
return err
}
}

if ds.pin != nil {
return ds.pin.Rm(ctx, path.New(cidStr))
}

return nil
}
4 changes: 2 additions & 2 deletions dsync/dsync_ipfs_plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (p *DsyncPlugin) Start(capi coreiface.CoreAPI) error {

// we MUST override the PreCheck function. In this example we're making sure
// no one sends us a bad hash:
cfg.PreCheck = p.pushPreCheck
cfg.PushPreCheck = p.pushPreCheck

// in order for remotes to allow pinning, dsync must be provided a PinAPI:
cfg.PinAPI = capi.Pin()
Expand Down Expand Up @@ -320,7 +320,7 @@ func newPullHandler(dsyncHost *dsync.Dsync) http.HandlerFunc {
}
fmt.Printf("performing pull:\n\tcid: %s\n\tremote: %s\n\tpin: %t\n", p.Cid, p.Addr, p.Pin)

pull, err := dsyncHost.NewPull(p.Cid, p.Addr)
pull, err := dsyncHost.NewPull(p.Cid, p.Addr, nil)
if err != nil {
fmt.Printf("error creating pull: %s\n", err.Error())
w.Write([]byte(err.Error()))
Expand Down
2 changes: 1 addition & 1 deletion dsync/dsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func ExampleNew() {

// we MUST override the PreCheck function. In this example we're making sure
// no one sends us a bad hash:
cfg.PreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error {
cfg.PushPreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error {
if info.Manifest.Nodes[0] == "BadHash" {
return fmt.Errorf("rejected for secret reasons")
}
Expand Down
80 changes: 75 additions & 5 deletions dsync/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,19 @@ func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveRespon
}

// GetDagInfo fetches a manifest from a remote source over HTTP
func (rem *HTTPClient) GetDagInfo(ctx context.Context, id string) (info *dag.Info, err error) {
url := fmt.Sprintf("%s?manifest=%s", rem.URL, id)
req, err := http.NewRequest("GET", url, nil)
func (rem *HTTPClient) GetDagInfo(ctx context.Context, id string, meta map[string]string) (info *dag.Info, err error) {
u, err := url.Parse(rem.URL)
if err != nil {
return
}
q := u.Query()
q.Set("manifest", id)
for key, val := range meta {
q.Set(key, val)
}
u.RawQuery = q.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,6 +168,43 @@ func (rem *HTTPClient) GetBlock(ctx context.Context, id string) (data []byte, er
return ioutil.ReadAll(res.Body)
}

// RemoveCID asks a remote to remove a CID
func (rem *HTTPClient) RemoveCID(ctx context.Context, id string, meta map[string]string) (err error) {
u, err := url.Parse(rem.URL)
if err != nil {
return
}
q := u.Query()
q.Set("cid", id)
for key, val := range meta {
q.Set(key, val)
}
u.RawQuery = q.Encode()

req, err := http.NewRequest("DELETE", u.String(), nil)
if err != nil {
return err
}

res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

if res.StatusCode != http.StatusOK {
var msg string
if data, err := ioutil.ReadAll(res.Body); err == nil {
msg = string(data)
}
if msg == ErrRemoveNotSupported.Error() {
return ErrRemoveNotSupported
}
return fmt.Errorf("remote: %d %s", res.StatusCode, msg)
}

return nil
}

// HTTPRemoteHandler exposes a Dsync remote over HTTP by exposing a HTTP handler
// that interlocks with methods exposed by HTTPClient
func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
Expand All @@ -178,7 +225,6 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
return
}

log.Debug("new receive via HTTP", r.URL.String())
pinOnComplete := r.FormValue("pin") == "true"
meta := map[string]string{}
for key := range r.URL.Query() {
Expand Down Expand Up @@ -224,7 +270,15 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("either manifest or block query params are required"))
} else if mfstID != "" {
mfst, err := ds.GetDagInfo(r.Context(), mfstID)

meta := map[string]string{}
for key := range r.URL.Query() {
if key != "manifest" {
meta[key] = r.URL.Query().Get(key)
}
}

mfst, err := ds.GetDagInfo(r.Context(), mfstID, meta)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
Expand All @@ -250,6 +304,22 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(data)
}
case "DELETE":
cid := r.FormValue("cid")
meta := map[string]string{}
for key := range r.URL.Query() {
if key != "cid" {
meta[key] = r.URL.Query().Get(key)
}
}

if err := ds.RemoveCID(r.Context(), cid, meta); err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}

w.WriteHeader(http.StatusOK)
}
}
}
Loading

0 comments on commit bee27f6

Please sign in to comment.