Skip to content

Commit

Permalink
perf(CARStream): push blocks in one large CAR archive stream
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Nov 10, 2020
1 parent cd3095e commit 207f481
Show file tree
Hide file tree
Showing 9 changed files with 479 additions and 24 deletions.
56 changes: 48 additions & 8 deletions dsync/dsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
Expand All @@ -30,11 +31,16 @@ import (
path "github.com/ipfs/interface-go-ipfs-core/path"
host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
protocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/qri-io/dag"
)

var log = golog.Logger("dsync")

func init() {
golog.SetLogLevel("dsync", "debug")
}

const (
// default to parallelism of 3. So far 4 was enough to blow up a std k8s pod running IPFS :(
defaultPushParallelism = 1
Expand All @@ -47,6 +53,16 @@ const (
maxRetries = 80
)

var (
// ErrRemoveNotSupported is the error value returned by remotes that don't
// support delete operations
ErrRemoveNotSupported = fmt.Errorf("remove is not supported")
// ErrUnknownProtocolVersion is the error for when the version of the remote
// protocol is unknown, usually because the handshake with the the remote
// hasn't happened yet
ErrUnknownProtocolVersion = fmt.Errorf("unknown protocol version")
)

// DagSyncable is a source that can be synced to & from. dsync requests automate
// calls to this interface with higher-order functions like Push and Pull
//
Expand All @@ -58,25 +74,24 @@ type DagSyncable interface {
// The remote will return a delta manifest of blocks the remote needs
// and a session id that must be sent with each block
NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error)
// ProtocolVersion indicates the version of dsync the remote speaks, only
// available after a handshake is established. Calling this method before a
// handshake must return ErrUnknownProtocolVersion
ProtocolVersion() (protocol.ID, error)

// ReceiveBlock places a block on the remote
ReceiveBlock(sid, hash string, data []byte) ReceiveResponse

// GetDagInfo asks the remote for info specified by a the root identifier
// string of a DAG
GetDagInfo(ctx context.Context, cidStr string, meta map[string]string) (info *dag.Info, err error)
// GetBlock gets a block of data from the remote
GetBlock(ctx context.Context, hash string) (rawdata []byte, err error)

// RemoveCID asks the remote to remove a cid. Supporting deletes are optional.
// DagSyncables that don't implement DeleteCID must return
// ErrDeleteNotSupported
RemoveCID(ctx context.Context, cidStr string, meta map[string]string) (err error)
}

// ErrRemoveNotSupported is the error value returned by remotes that don't
// support delete operations
var ErrRemoveNotSupported = fmt.Errorf("remove is not supported")

// Hook is a function that a dsync instance will call at specified points in the
// sync lifecycle
type Hook func(ctx context.Context, info dag.Info, meta map[string]string) error
Expand Down Expand Up @@ -263,6 +278,11 @@ func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func
return ds, nil
}

// ProtocolVersion reports the current procotol version for dsync
func (ds *Dsync) ProtocolVersion() (protocol.ID, error) {
return DsyncProtocolID, nil
}

// StartRemote makes dsync available for remote requests, starting an HTTP
// server if a listening address is specified.
// StartRemote returns immediately. Stop remote service by cancelling
Expand Down Expand Up @@ -387,13 +407,12 @@ func (ds *Dsync) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse {
return ReceiveResponse{
Hash: hash,
Status: StatusErrored,
Err: fmt.Errorf("sid '%s' not found", sid),
Err: fmt.Errorf("sid %q not found", sid),
}
}

// ReceiveBlock accepts a block from the sender, placing it in the local blockstore
res := sess.ReceiveBlock(hash, bytes.NewReader(data))
log.Debugf("received block: %s", res.Hash)

// check if transfer has completed, if so finalize it, but only once
if res.Status == StatusOk && sess.IsFinalizedOnce() {
Expand All @@ -409,6 +428,27 @@ func (ds *Dsync) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse {
return res
}

// ReceiveBlocks ingests blocks being pushed into the local store
func (ds *Dsync) ReceiveBlocks(ctx context.Context, sid string, r io.Reader) error {
sess, ok := ds.sessionPool[sid]
if !ok {
log.Debugf("couldn't find session. sid=%q", sid)
return fmt.Errorf("sid %q not found", sid)
}

if err := sess.ReceiveBlocks(ctx, r); err != nil {
log.Debugf("error receiving blocks. err=%q", err)
return err
}

if err := ds.finalizeReceive(sess); err != nil {
log.Debugf("error finalizing receive. err=%q", err)
return err
}

return nil
}

// TODO (b5): needs to be called if someone tries to sync a DAG that requires
// no blocks for an early termination, ensuring that we cache a dag.Info in
// that case as well
Expand Down
85 changes: 80 additions & 5 deletions dsync/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,28 @@ import (
"net/http"
"net/url"

"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
protocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/qri-io/dag"
)

const (
httpProtcolIDHeader = "dsync-version"
)

// HTTPClient is the request side of doing dsync over HTTP
type HTTPClient struct {
URL string
URL string
remProtocolID protocol.ID
}

// HTTPClient exists to satisfy the DaySyncable interface on the client side
// of a transfer
var _ DagSyncable = (*HTTPClient)(nil)
var (
// HTTPClient exists to satisfy the DaySyncable interface on the client side
// of a transfer
_ DagSyncable = (*HTTPClient)(nil)
_ DagStreamable = (*HTTPClient)(nil)
)

// NewReceiveSession initiates a session for pushing blocks to a remote.
// It sends a Manifest to a remote source over HTTP
Expand Down Expand Up @@ -61,12 +72,69 @@ func (rem *HTTPClient) NewReceiveSession(info *dag.Info, pinOnComplete bool, met
}

sid = res.Header.Get("sid")

protocolIDHeaderStr := res.Header.Get(httpProtcolIDHeader)
if protocolIDHeaderStr == "" {
// protocol ID header only exists in version 0.2.0 and up, when header isn't
// present assume version 0.1.1, the latest version before header was set
// 0.1.1 is wire-compatible with all lower versions of dsync
rem.remProtocolID = protocol.ID("/dsync/0.1.1")
} else {
rem.remProtocolID = protocol.ID(protocolIDHeaderStr)
}

diff = &dag.Manifest{}
err = json.NewDecoder(res.Body).Decode(diff)

return
}

// ProtocolVersion indicates the version of dsync the remote speaks, only
// available after a handshake is established
func (rem *HTTPClient) ProtocolVersion() (protocol.ID, error) {
if string(rem.remProtocolID) == "" {
return "", ErrUnknownProtocolVersion
}
return rem.remProtocolID, nil
}

// PutBlocks streams a manifest of blocks to the remote in one HTTP call
func (rem *HTTPClient) PutBlocks(ctx context.Context, sid string, ng ipld.NodeGetter, mfst *dag.Manifest, progCh chan cid.Cid) error {
r, err := NewManifestCARReader(ctx, ng, mfst, progCh)
if err != nil {
log.Debugf("err creating CARReader err=%q ", err)
return err
}

req, err := http.NewRequest("PATCH", fmt.Sprintf("%s?sid=%s", rem.URL, sid), r)
if err != nil {
log.Debugf("err creating PATCH HTTP request err=%q ", err)
return err
}

res, err := http.DefaultClient.Do(req)
if err != nil {
log.Debugf("err doing HTTP request. err=%q", err)
return err
}

if res.StatusCode != http.StatusOK {
var msg string
if data, err := ioutil.ReadAll(res.Body); err == nil {
msg = string(data)
}
log.Debugf("error response from remote. err=%q", msg)
return fmt.Errorf("remote response: %d %s", res.StatusCode, msg)
}

return nil
}

// FetchBlocks streams a manifest of requested blocks
func (rem *HTTPClient) FetchBlocks(ctx context.Context, sid string, mfst *dag.Manifest, progCh chan cid.Cid) error {
return fmt.Errorf("not implemented")
}

// ReceiveBlock asks a remote to receive a block over HTTP
func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse {
url := fmt.Sprintf("%s?sid=%s&hash=%s", rem.URL, sid, hash)
Expand Down Expand Up @@ -243,10 +311,10 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
return
}

w.Header().Set(httpProtcolIDHeader, string(DsyncProtocolID))
w.Header().Set("sid", sid)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(diff)

case "PUT":
data, err := ioutil.ReadAll(r.Body)
if err != nil {
Expand All @@ -266,6 +334,13 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
} else {
w.WriteHeader(http.StatusOK)
}
case "PATCH":
if err := ds.ReceiveBlocks(r.Context(), r.FormValue("sid"), r.Body); err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
case "GET":
mfstID := r.FormValue("manifest")
blockID := r.FormValue("block")
Expand Down
9 changes: 5 additions & 4 deletions dsync/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestSyncHTTP(t *testing.T) {
t.Fatal(err)
}

// yooooooooooooooooooooo
f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 350))))
// yooooooooooooooooooooo...
f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 3500000))))
path, err := a.Unixfs().Add(ctx, f)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -67,13 +67,14 @@ func TestSyncHTTP(t *testing.T) {

cli := &HTTPClient{URL: s.URL + "/dsync"}

fmt.Printf("pushing %#v\n", info.Manifest.Nodes)
push, err := NewPush(aGetter, info, cli, false)
if err != nil {
t.Fatal(err)
}

if err := push.Do(ctx); err != nil {
t.Error(err)
t.Fatal(err)
}

// b should now be able to generate a manifest
Expand All @@ -85,7 +86,7 @@ func TestSyncHTTP(t *testing.T) {
<-onCompleteCalled

if err := cli.RemoveCID(ctx, info.RootCID().String(), nil); err != nil {
t.Error(err)
t.Fatal(err)
}

<-removeCheckCalled
Expand Down
23 changes: 16 additions & 7 deletions dsync/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ import (
)

const (
// DsyncProtocolID is the dsyc p2p Protocol Identifier
DsyncProtocolID = protocol.ID("/dsync")
// DsyncServiceTag tags the type & version of the dsync service
DsyncServiceTag = "dsync/0.1.1-dev"
// DsyncProtocolID is the dsyc p2p Protocol Identifier & version tag
DsyncProtocolID = protocol.ID("/dsync/0.2.0")
// default value to give qri peer connections in connmanager, one hunnit
dsyncSupportValue = 100
)
Expand Down Expand Up @@ -69,6 +67,15 @@ func (c *p2pClient) NewReceiveSession(info *dag.Info, pinOnComplete bool, meta m
return sid, diff, err
}

// ProtocolVersion indicates the version of dsync the remote speaks, only
// available after a handshake is established
func (c *p2pClient) ProtocolVersion() (protocol.ID, error) {
if string(c.remoteProtocolID) == "" {
return "", ErrUnknownProtocolVersion
}
return c.remoteProtocolID, nil
}

// ReceiveBlock places a block on the remote
func (c *p2pClient) ReceiveBlock(sid, cidStr string, data []byte) ReceiveResponse {
msg := p2putil.NewMessage(c.host.ID(), mtReceiveBlock, data).WithHeaders(
Expand Down Expand Up @@ -162,9 +169,10 @@ func (c *p2pClient) RemoveCID(ctx context.Context, cidStr string, meta map[strin

// p2pHandler implements dsync as a libp2p protocol handler
type p2pHandler struct {
dsync *Dsync
host host.Host
handlers map[p2putil.MsgType]p2putil.HandlerFunc
dsync *Dsync
host host.Host
handlers map[p2putil.MsgType]p2putil.HandlerFunc
remoteProtocolID protocol.ID
}

// newp2pHandler creates a p2p remote stream handler from a dsync.Remote
Expand All @@ -191,6 +199,7 @@ func (c *p2pHandler) sendMessage(ctx context.Context, msg p2putil.Message, pid p
if err != nil {
return p2putil.Message{}, fmt.Errorf("error opening stream: %s", err.Error())
}
c.remoteProtocolID = s.Protocol()
defer s.Close()

// now that we have a confirmed working connection
Expand Down
27 changes: 27 additions & 0 deletions dsync/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,33 @@ func (snd *Push) do(ctx context.Context) (err error) {
return nil
}

protoID, err := snd.remote.ProtocolVersion()
if err != nil {
return err
}

if protocolSupportsDagStreaming(protoID) {
progCh := make(chan cid.Cid)

go func() {
for id := range progCh {
// this is the only place we should modify progress after creation
idStr := id.String()
log.Debugf("sent block %s", idStr)
for i, hash := range snd.info.Manifest.Nodes {
if idStr == hash {
snd.prog[i] = 100
}
}
go snd.completionChanged()
}
}()

if str, ok := snd.remote.(DagStreamable); ok {
return str.PutBlocks(ctx, snd.sid, snd.lng, snd.diff, progCh)
}
}

// create senders
sends := make([]sender, snd.parallelism)
for i := 0; i < snd.parallelism; i++ {
Expand Down
Loading

0 comments on commit 207f481

Please sign in to comment.