Skip to content

Commit

Permalink
feat(net): initial dag-cbor protocol support
Browse files Browse the repository at this point in the history
also added first roundtrip benchmark
  • Loading branch information
mvdan authored and rvagg committed Jan 14, 2022
1 parent 4f4414d commit 789b34a
Show file tree
Hide file tree
Showing 9 changed files with 537 additions and 320 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/ipfs/go-peertaskqueue v0.7.1
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.14.3
github.com/ipld/go-ipld-prime v0.14.4-0.20220110161855-fc09d6b768e9
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.16.0
Expand All @@ -44,3 +44,5 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/protobuf v1.27.1
)

replace github.com/ipld/go-ipld-prime => ../../src/ipld
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSat
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
github.com/ipld/go-ipld-prime v0.14.3 h1:cGUmxSws2IHurn00/iLMDapeXsnf9+FyAtYVy8G/JsQ=
github.com/ipld/go-ipld-prime v0.14.3/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.14.4-0.20220110161855-fc09d6b768e9 h1:fqQSvdPznhyE5jZoCwbvykHbK+QukxJcS1SFWbj9ig0=
github.com/ipld/go-ipld-prime v0.14.4-0.20220110161855-fc09d6b768e9/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
81 changes: 81 additions & 0 deletions message/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package message_test

import (
"bytes"
"math/rand"
"testing"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/node/bindnode"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"
)

func BenchmarkMessageEncodingRoundtrip(b *testing.B) {
root := testutil.GenerateCids(1)[0]
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selector := ssb.Matcher().Node()
extensionName := graphsync.ExtensionName("graphsync/awesome")
extension := message.NamedExtension{
Name: extensionName,
Data: basicnode.NewBytes(testutil.RandomBytes(100)),
}
id := graphsync.RequestID(rand.Int31())
priority := graphsync.Priority(rand.Int31())
status := graphsync.RequestAcknowledged

builder := message.NewBuilder()
builder.AddRequest(message.NewRequest(id, root, selector, priority, extension))
builder.AddResponseCode(id, status)
builder.AddExtensionData(id, extension)
builder.AddBlock(blocks.NewBlock([]byte("W")))
builder.AddBlock(blocks.NewBlock([]byte("E")))
builder.AddBlock(blocks.NewBlock([]byte("F")))
builder.AddBlock(blocks.NewBlock([]byte("M")))

gsm, err := builder.Build()
require.NoError(b, err)

b.Run("Protobuf", func(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
buf := new(bytes.Buffer)
for pb.Next() {
buf.Reset()

err := gsm.ToNet(buf)
require.NoError(b, err)

gsm2, err := message.FromNet(buf)
require.NoError(b, err)
require.Equal(b, gsm, gsm2)
}
})
})

b.Run("DagCbor", func(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
buf := new(bytes.Buffer)
for pb.Next() {
buf.Reset()

node := bindnode.Wrap(&gsm, message.Prototype.Message.Type())
err := dagcbor.Encode(node.Representation(), buf)
require.NoError(b, err)

builder := message.Prototype.Message.Representation().NewBuilder()
err = dagcbor.Decode(builder, buf)
require.NoError(b, err)
node2 := builder.Build()
gsm2 := *bindnode.Unwrap(node2).(*message.GraphSyncMessage)
require.Equal(b, gsm, gsm2)
}
})
})
}
30 changes: 20 additions & 10 deletions message/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package message

import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
Expand All @@ -18,7 +19,7 @@ type Builder struct {
blkSize uint64
completedResponses map[graphsync.RequestID]graphsync.ResponseStatusCode
outgoingResponses map[graphsync.RequestID]metadata.Metadata
extensions map[graphsync.RequestID][]graphsync.ExtensionData
extensions map[graphsync.RequestID][]NamedExtension
requests map[graphsync.RequestID]GraphSyncRequest
}

Expand All @@ -29,13 +30,13 @@ func NewBuilder() *Builder {
outgoingBlocks: make(map[cid.Cid]blocks.Block),
completedResponses: make(map[graphsync.RequestID]graphsync.ResponseStatusCode),
outgoingResponses: make(map[graphsync.RequestID]metadata.Metadata),
extensions: make(map[graphsync.RequestID][]graphsync.ExtensionData),
extensions: make(map[graphsync.RequestID][]NamedExtension),
}
}

// AddRequest registers a new request to be added to the message.
func (b *Builder) AddRequest(request GraphSyncRequest) {
b.requests[request.ID()] = request
b.requests[request.ID] = request
}

// AddBlock adds the given block to the message.
Expand All @@ -45,7 +46,7 @@ func (b *Builder) AddBlock(block blocks.Block) {
}

// AddExtensionData adds the given extension data to to the message
func (b *Builder) AddExtensionData(requestID graphsync.RequestID, extension graphsync.ExtensionData) {
func (b *Builder) AddExtensionData(requestID graphsync.RequestID, extension NamedExtension) {
b.extensions[requestID] = append(b.extensions[requestID], extension)
// make sure this extension goes out in next response even if no links are sent
_, ok := b.outgoingResponses[requestID]
Expand Down Expand Up @@ -109,21 +110,30 @@ func (b *Builder) ScrubResponses(requestIDs []graphsync.RequestID) uint64 {

// Build assembles and encodes message data from the added requests, links, and blocks.
func (b *Builder) Build() (GraphSyncMessage, error) {
responses := make(map[graphsync.RequestID]GraphSyncResponse, len(b.outgoingResponses))
requests := make([]GraphSyncRequest, 0, len(b.requests))
for _, request := range b.requests {
requests = append(requests, request)
}
responses := make([]GraphSyncResponse, 0, len(b.outgoingResponses))
for requestID, linkMap := range b.outgoingResponses {
mdRaw, err := metadata.EncodeMetadata(linkMap)
if err != nil {
return GraphSyncMessage{}, err
}
b.extensions[requestID] = append(b.extensions[requestID], graphsync.ExtensionData{
b.extensions[requestID] = append(b.extensions[requestID], NamedExtension{
Name: graphsync.ExtensionMetadata,
Data: mdRaw,
Data: basicnode.NewBytes(mdRaw), // TODO: likely wrong
})
status, isComplete := b.completedResponses[requestID]
responses[requestID] = NewResponse(requestID, responseCode(status, isComplete), b.extensions[requestID]...)
responses = append(responses, NewResponse(requestID, responseCode(status, isComplete), b.extensions[requestID]...))
}
blocks := make([]GraphSyncBlock, 0, len(b.outgoingBlocks))
for _, block := range b.outgoingBlocks {
blocks = append(blocks, FromBlockFormat(block))
}
// TODO: sort requests, responses, and blocks? map order is randomized
return GraphSyncMessage{
b.requests, responses, b.outgoingBlocks,
requests, responses, blocks,
}, nil
}

Expand Down
73 changes: 49 additions & 24 deletions message/builder_test.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,57 @@
package message

import (
"bytes"
"io"
"math/rand"
"testing"

"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/stretchr/testify/require"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/testutil"
)

// Like the funcs in testutil above, but using blocks at the protocol level.
// We can't put them there right away, due to import cycles.
// We need to refactor these tests to be external, i.e. "package message_test".

func ContainsGraphSyncBlock(blks []GraphSyncBlock, block GraphSyncBlock) bool {
for _, blk := range blks {
if bytes.Equal(blk.Prefix, block.Prefix) && bytes.Equal(blk.Data, block.Data) {
return true
}
}
return false
}
func AssertContainsGraphSyncBlock(t testing.TB, blks []GraphSyncBlock, block GraphSyncBlock) {
t.Helper()
require.True(t, ContainsGraphSyncBlock(blks, block), "given block should be in list")
}
func RefuteContainsGraphSyncBlock(t testing.TB, blks []GraphSyncBlock, block GraphSyncBlock) {
t.Helper()
require.False(t, ContainsGraphSyncBlock(blks, block), "given block should not be in list")
}

func TestMessageBuilding(t *testing.T) {
blocks := testutil.GenerateBlocksOfSize(3, 100)
links := make([]ipld.Link, 0, len(blocks))
for _, block := range blocks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
extensionData1 := testutil.RandomBytes(100)
extensionData1 := basicnode.NewBytes(testutil.RandomBytes(100))
extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
extension1 := graphsync.ExtensionData{
extension1 := NamedExtension{
Name: extensionName1,
Data: extensionData1,
}
extensionData2 := testutil.RandomBytes(100)
extensionData2 := basicnode.NewBytes(testutil.RandomBytes(100))
extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
extension2 := graphsync.ExtensionData{
extension2 := NamedExtension{
Name: extensionName2,
Data: extensionData2,
}
Expand Down Expand Up @@ -75,12 +98,12 @@ func TestMessageBuilding(t *testing.T) {
},
checkMsg: func(t *testing.T, message GraphSyncMessage) {

responses := message.Responses()
sentBlocks := message.Blocks()
responses := message.Responses
sentBlocks := BlockFormatSlice(message.Blocks)
require.Len(t, responses, 4, "did not assemble correct number of responses")

response1 := findResponseForRequestID(t, responses, requestID1)
require.Equal(t, graphsync.RequestCompletedPartial, response1.Status(), "did not generate completed partial response")
require.Equal(t, graphsync.RequestCompletedPartial, response1.Status, "did not generate completed partial response")
assertMetadata(t, response1, metadata.Metadata{
metadata.Item{Link: links[0].(cidlink.Link).Cid, BlockPresent: true},
metadata.Item{Link: links[1].(cidlink.Link).Cid, BlockPresent: false},
Expand All @@ -89,23 +112,23 @@ func TestMessageBuilding(t *testing.T) {
assertExtension(t, response1, extension1)

response2 := findResponseForRequestID(t, responses, requestID2)
require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not generate completed full response")
require.Equal(t, graphsync.RequestCompletedFull, response2.Status, "did not generate completed full response")
assertMetadata(t, response2, metadata.Metadata{
metadata.Item{Link: links[1].(cidlink.Link).Cid, BlockPresent: true},
metadata.Item{Link: links[2].(cidlink.Link).Cid, BlockPresent: true},
metadata.Item{Link: links[1].(cidlink.Link).Cid, BlockPresent: true},
})

response3 := findResponseForRequestID(t, responses, requestID3)
require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not generate partial response")
require.Equal(t, graphsync.PartialResponse, response3.Status, "did not generate partial response")
assertMetadata(t, response3, metadata.Metadata{
metadata.Item{Link: links[0].(cidlink.Link).Cid, BlockPresent: true},
metadata.Item{Link: links[1].(cidlink.Link).Cid, BlockPresent: true},
})
assertExtension(t, response3, extension2)

response4 := findResponseForRequestID(t, responses, requestID4)
require.Equal(t, graphsync.RequestCompletedFull, response4.Status(), "did not generate completed full response")
require.Equal(t, graphsync.RequestCompletedFull, response4.Status, "did not generate completed full response")

require.Equal(t, len(blocks), len(sentBlocks), "did not send all blocks")

Expand All @@ -121,15 +144,15 @@ func TestMessageBuilding(t *testing.T) {
},
expectedSize: 0,
checkMsg: func(t *testing.T, message GraphSyncMessage) {
responses := message.Responses()
responses := message.Responses

response1 := findResponseForRequestID(t, responses, requestID1)
require.Equal(t, graphsync.PartialResponse, response1.Status(), "did not generate partial response")
require.Equal(t, graphsync.PartialResponse, response1.Status, "did not generate partial response")
assertMetadata(t, response1, nil)
assertExtension(t, response1, extension1)

response2 := findResponseForRequestID(t, responses, requestID2)
require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not generate partial response")
require.Equal(t, graphsync.PartialResponse, response2.Status, "did not generate partial response")
assertMetadata(t, response2, nil)
assertExtension(t, response2, extension2)
},
Expand Down Expand Up @@ -162,27 +185,27 @@ func TestMessageBuilding(t *testing.T) {
expectedSize: 200,
checkMsg: func(t *testing.T, message GraphSyncMessage) {

responses := message.Responses()
sentBlocks := message.Blocks()
responses := message.Responses
sentBlocks := BlockFormatSlice(message.Blocks)
require.Len(t, responses, 3, "did not assemble correct number of responses")

response2 := findResponseForRequestID(t, responses, requestID2)
require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not generate completed full response")
require.Equal(t, graphsync.RequestCompletedFull, response2.Status, "did not generate completed full response")
assertMetadata(t, response2, metadata.Metadata{
metadata.Item{Link: links[1].(cidlink.Link).Cid, BlockPresent: true},
metadata.Item{Link: links[2].(cidlink.Link).Cid, BlockPresent: true},
metadata.Item{Link: links[1].(cidlink.Link).Cid, BlockPresent: true},
})

response3 := findResponseForRequestID(t, responses, requestID3)
require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not generate partial response")
require.Equal(t, graphsync.PartialResponse, response3.Status, "did not generate partial response")
assertMetadata(t, response3, metadata.Metadata{
metadata.Item{Link: links[1].(cidlink.Link).Cid, BlockPresent: true},
})
assertExtension(t, response3, extension2)

response4 := findResponseForRequestID(t, responses, requestID4)
require.Equal(t, graphsync.RequestCompletedFull, response4.Status(), "did not generate completed full response")
require.Equal(t, graphsync.RequestCompletedFull, response4.Status, "did not generate completed full response")

require.Equal(t, len(blocks)-1, len(sentBlocks), "did not send all blocks")

Expand Down Expand Up @@ -220,12 +243,12 @@ func TestMessageBuilding(t *testing.T) {
expectedSize: 100,
checkMsg: func(t *testing.T, message GraphSyncMessage) {

responses := message.Responses()
sentBlocks := message.Blocks()
responses := message.Responses
sentBlocks := BlockFormatSlice(message.Blocks)
require.Len(t, responses, 1, "did not assemble correct number of responses")

response3 := findResponseForRequestID(t, responses, requestID3)
require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not generate partial response")
require.Equal(t, graphsync.PartialResponse, response3.Status, "did not generate partial response")
assertMetadata(t, response3, metadata.Metadata{
metadata.Item{Link: links[1].(cidlink.Link).Cid, BlockPresent: true},
})
Expand All @@ -251,23 +274,25 @@ func TestMessageBuilding(t *testing.T) {

func findResponseForRequestID(t *testing.T, responses []GraphSyncResponse, requestID graphsync.RequestID) GraphSyncResponse {
for _, response := range responses {
if response.RequestID() == requestID {
if response.ID == requestID {
return response
}
}
require.FailNow(t, "Could not find request")
return GraphSyncResponse{}
}

func assertExtension(t *testing.T, response GraphSyncResponse, extension graphsync.ExtensionData) {
func assertExtension(t *testing.T, response GraphSyncResponse, extension NamedExtension) {
returnedExtensionData, found := response.Extension(extension.Name)
require.True(t, found)
require.Equal(t, extension.Data, returnedExtensionData, "did not encode extension")
}

func assertMetadata(t *testing.T, response GraphSyncResponse, expectedMetadata metadata.Metadata) {
responseMetadataRaw, found := response.Extension(graphsync.ExtensionMetadata)
responseMetadataNode, found := response.Extension(graphsync.ExtensionMetadata)
require.True(t, found, "Metadata should be included in response")
responseMetadataRaw, err := responseMetadataNode.AsBytes()
require.NoError(t, err)
responseMetadata, err := metadata.DecodeMetadata(responseMetadataRaw)
require.NoError(t, err)
require.Equal(t, expectedMetadata, responseMetadata, "incorrect metadata included in response")
Expand Down
Loading

0 comments on commit 789b34a

Please sign in to comment.