From 70fd0fd93e76c95fab000a2aa6447ff2697261f8 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 14 Jun 2019 02:02:30 -0700 Subject: [PATCH] aggressively free memory This ensures we don't keep large buffers allocated. --- go.mod | 3 ++- go.sum | 4 ++-- message/message.go | 42 +++++++++++++++++++++++++++++------------- network/ipfs_impl.go | 6 +++--- 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 9fb78e7f..1bafbdc6 100644 --- a/go.mod +++ b/go.mod @@ -22,12 +22,13 @@ require ( github.com/ipfs/go-metrics-interface v0.0.1 github.com/ipfs/go-peertaskqueue v0.1.1 github.com/jbenet/goprocess v0.1.3 + github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-libp2p v0.1.1 github.com/libp2p/go-libp2p-core v0.0.3 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-netutil v0.1.0 github.com/libp2p/go-libp2p-testing v0.0.4 - github.com/libp2p/go-msgio v0.0.3 // indirect + github.com/libp2p/go-msgio v0.0.4 github.com/mattn/go-colorable v0.1.2 // indirect github.com/multiformats/go-multiaddr v0.0.4 github.com/opentracing/opentracing-go v1.1.0 // indirect diff --git a/go.sum b/go.sum index 6740d9e1..c80ee5bb 100644 --- a/go.sum +++ b/go.sum @@ -185,8 +185,8 @@ github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0 github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU= github.com/libp2p/go-msgio v0.0.2 h1:ivPvEKHxmVkTClHzg6RXTYHqaJQ0V9cDbq+6lKb3UV0= github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= -github.com/libp2p/go-msgio v0.0.3 h1:VsOlWispTivSsOMg70e0W77y6oiSBSRCyP6URrWvE04= -github.com/libp2p/go-msgio v0.0.3/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= +github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA= +github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-nat v0.0.3 h1:l6fKV+p0Xa354EqQOQP+d8CivdLM4kl5GxC1hSc/UeI= github.com/libp2p/go-nat v0.0.3/go.mod h1:88nUEt0k0JD45Bk93NIwDqjlhiOwOoV36GchpcVc1yI= github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw= diff --git a/message/message.go b/message/message.go index df44d112..a1604619 100644 --- a/message/message.go +++ b/message/message.go @@ -1,6 +1,7 @@ package message import ( + "encoding/binary" "fmt" "io" @@ -8,8 +9,9 @@ import ( wantlist "github.com/ipfs/go-bitswap/wantlist" blocks "github.com/ipfs/go-block-format" - ggio "github.com/gogo/protobuf/io" cid "github.com/ipfs/go-cid" + pool "github.com/libp2p/go-buffer-pool" + msgio "github.com/libp2p/go-msgio" "github.com/libp2p/go-libp2p-core/network" ) @@ -170,18 +172,22 @@ func (m *impl) AddBlock(b blocks.Block) { // FromNet generates a new BitswapMessage from incoming data on an io.Reader. func FromNet(r io.Reader) (BitSwapMessage, error) { - pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax) - return FromPBReader(pbr) + reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax) + return FromMsgReader(reader) } // FromPBReader generates a new Bitswap message from a gogo-protobuf reader -func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) { - pb := new(pb.Message) - if err := pbr.ReadMsg(pb); err != nil { +func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) { + msg, err := r.ReadMsg() + if err != nil { return nil, err } - - return newMessageFromProto(*pb) + var pb pb.Message + if err := pb.Unmarshal(msg); err != nil { + return nil, err + } + r.ReleaseMsg(msg) + return newMessageFromProto(pb) } func (m *impl) ToProtoV0() *pb.Message { @@ -228,15 +234,25 @@ func (m *impl) ToProtoV1() *pb.Message { } func (m *impl) ToNetV0(w io.Writer) error { - pbw := ggio.NewDelimitedWriter(w) - - return pbw.WriteMsg(m.ToProtoV0()) + return write(w, m.ToProtoV0()) } func (m *impl) ToNetV1(w io.Writer) error { - pbw := ggio.NewDelimitedWriter(w) + return write(w, m.ToProtoV1()) +} - return pbw.WriteMsg(m.ToProtoV1()) +func write(w io.Writer, m *pb.Message) error { + size := m.Size() + buf := pool.Get(size + binary.MaxVarintLen64) + defer pool.Put(buf) + n := binary.PutUvarint(buf, uint64(size)) + if written, err := m.MarshalTo(buf[n:]); err != nil { + return err + } else { + n += written + } + _, err := w.Write(buf[:n]) + return err } func (m *impl) Loggable() map[string]interface{} { diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 2cfbbcbf..52ee64c6 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -10,7 +10,6 @@ import ( bsmsg "github.com/ipfs/go-bitswap/message" "github.com/libp2p/go-libp2p-core/helpers" - ggio "github.com/gogo/protobuf/io" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/connmgr" @@ -19,6 +18,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" peerstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + msgio "github.com/libp2p/go-msgio" ma "github.com/multiformats/go-multiaddr" ) @@ -178,9 +178,9 @@ func (bsnet *impl) handleNewStream(s network.Stream) { return } - reader := ggio.NewDelimitedReader(s, network.MessageSizeMax) + reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax) for { - received, err := bsmsg.FromPBReader(reader) + received, err := bsmsg.FromMsgReader(reader) if err != nil { if err != io.EOF { s.Reset()