-
Notifications
You must be signed in to change notification settings - Fork 519
network: proposal payload compression #4589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
algorandskiy
merged 16 commits into
algorand:master
from
algorandskiy:pavel/pp-compression
Oct 14, 2022
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
1696294
WIP: proposal payload compression
algorandskiy 425ee2f
WIP: test
algorandskiy 25a2157
Fixes and basic test
algorandskiy be82b01
Add unit tests
algorandskiy 2f3b220
Fix tests
algorandskiy bea1deb
Use reader to prevent zip bombing
algorandskiy b6916ec
Fix CR
algorandskiy a4d5442
refactoring and unit tests
algorandskiy b4f8360
Fix linter
algorandskiy 916de2d
add zstdProposalDecompressor
algorandskiy 4b1b4ca
CR feedback: fast comp + bound
algorandskiy f8fb0fb
CR fixes: prio outside canCompress
algorandskiy a625572
CR: move PP compresion feature to the HTTP headers instead of the pro…
algorandskiy aaaa19c
Add metrics
algorandskiy 0c37d7d
Rename method
algorandskiy 3f7d72f
Start decompression with 3x buffer
algorandskiy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,157 @@ | ||
| // Copyright (C) 2019-2022 Algorand, Inc. | ||
| // This file is part of go-algorand | ||
| // | ||
| // go-algorand is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU Affero General Public License as | ||
| // published by the Free Software Foundation, either version 3 of the | ||
| // License, or (at your option) any later version. | ||
| // | ||
| // go-algorand is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU Affero General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU Affero General Public License | ||
| // along with go-algorand. If not, see <https://www.gnu.org/licenses/>. | ||
|
|
||
| package network | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "fmt" | ||
| "io" | ||
|
|
||
| "github.com/DataDog/zstd" | ||
|
|
||
| "github.com/algorand/go-algorand/logging" | ||
| "github.com/algorand/go-algorand/protocol" | ||
| ) | ||
|
|
||
| var zstdCompressionMagic = [4]byte{0x28, 0xb5, 0x2f, 0xfd} | ||
|
|
||
| const zstdCompressionLevel = zstd.BestSpeed | ||
|
|
||
| // checkCanCompress checks if there is an proposal payload message and peers supporting compression | ||
| func checkCanCompress(request broadcastRequest, peers []*wsPeer) bool { | ||
| canCompress := false | ||
| hasPP := false | ||
| for _, tag := range request.tags { | ||
| if tag == protocol.ProposalPayloadTag { | ||
| hasPP = true | ||
| break | ||
| } | ||
| } | ||
| // if have proposal payload check if there are any peers supporting compression | ||
| if hasPP { | ||
| for _, peer := range peers { | ||
| if peer.pfProposalCompressionSupported() { | ||
| canCompress = true | ||
| break | ||
| } | ||
| } | ||
| } | ||
| return canCompress | ||
| } | ||
|
|
||
| // zstdCompressMsg returns a concatenation of a tag and compressed data | ||
| func zstdCompressMsg(tbytes []byte, d []byte) ([]byte, string) { | ||
| bound := zstd.CompressBound(len(d)) | ||
| if bound < len(d) { | ||
| // although CompressBound allocated more than the src size, this is an implementation detail. | ||
| // increase the buffer size to always have enough space for the raw data if compression fails. | ||
| bound = len(d) | ||
| } | ||
| mbytesComp := make([]byte, len(tbytes)+bound) | ||
| copy(mbytesComp, tbytes) | ||
| comp, err := zstd.CompressLevel(mbytesComp[len(tbytes):], d, zstdCompressionLevel) | ||
| if err != nil { | ||
| // fallback and reuse non-compressed original data | ||
| logMsg := fmt.Sprintf("failed to compress into buffer of len %d: %v", len(d), err) | ||
| copied := copy(mbytesComp[len(tbytes):], d) | ||
| return mbytesComp[:len(tbytes)+copied], logMsg | ||
algorandskiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| mbytesComp = mbytesComp[:len(tbytes)+len(comp)] | ||
| return mbytesComp, "" | ||
| } | ||
|
|
||
| // MaxDecompressedMessageSize defines a maximum decompressed data size | ||
| // to prevent zip bombs | ||
| const MaxDecompressedMessageSize = 20 * 1024 * 1024 // some large enough value | ||
algorandskiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // wsPeerMsgDataConverter performs optional incoming messages conversion. | ||
| // At the moment it only supports zstd decompression for payload proposal | ||
| type wsPeerMsgDataConverter struct { | ||
| log logging.Logger | ||
| origin string | ||
|
|
||
| // actual converter(s) | ||
| ppdec zstdProposalDecompressor | ||
| } | ||
|
|
||
| type zstdProposalDecompressor struct { | ||
| active bool | ||
| } | ||
|
|
||
| func (dec zstdProposalDecompressor) enabled() bool { | ||
| return dec.active | ||
| } | ||
|
|
||
| func (dec zstdProposalDecompressor) accept(data []byte) bool { | ||
| return len(data) > 4 && bytes.Equal(data[:4], zstdCompressionMagic[:]) | ||
| } | ||
algorandskiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) { | ||
| r := zstd.NewReader(bytes.NewReader(data)) | ||
| defer r.Close() | ||
| b := make([]byte, 0, 3*len(data)) | ||
| for { | ||
| if len(b) == cap(b) { | ||
| // grow capacity, retain length | ||
| b = append(b, 0)[:len(b)] | ||
algorandskiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| n, err := r.Read(b[len(b):cap(b)]) | ||
| b = b[:len(b)+n] | ||
| if err != nil { | ||
| if err == io.EOF { | ||
| return b, nil | ||
| } | ||
| return nil, err | ||
| } | ||
| if len(b) > MaxDecompressedMessageSize { | ||
| return nil, fmt.Errorf("proposal data is too large: %d", len(b)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte, error) { | ||
| if tag == protocol.ProposalPayloadTag { | ||
| if c.ppdec.enabled() { | ||
| // sender might support compressed payload but fail to compress for whatever reason, | ||
| // in this case it sends non-compressed payload - the receiver decompress only if it is compressed. | ||
| if c.ppdec.accept(data) { | ||
| res, err := c.ppdec.convert(data) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("peer %s: %w", c.origin, err) | ||
| } | ||
| return res, nil | ||
| } | ||
| c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin) | ||
| } | ||
| } | ||
| return data, nil | ||
| } | ||
|
|
||
| func makeWsPeerMsgDataConverter(wp *wsPeer) *wsPeerMsgDataConverter { | ||
| c := wsPeerMsgDataConverter{ | ||
| log: wp.net.log, | ||
| origin: wp.originAddress, | ||
| } | ||
|
|
||
| if wp.pfProposalCompressionSupported() { | ||
| c.ppdec = zstdProposalDecompressor{ | ||
| active: true, | ||
| } | ||
| } | ||
|
|
||
| return &c | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| // Copyright (C) 2019-2022 Algorand, Inc. | ||
| // This file is part of go-algorand | ||
| // | ||
| // go-algorand is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU Affero General Public License as | ||
| // published by the Free Software Foundation, either version 3 of the | ||
| // License, or (at your option) any later version. | ||
| // | ||
| // go-algorand is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU Affero General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU Affero General Public License | ||
| // along with go-algorand. If not, see <https://www.gnu.org/licenses/>. | ||
|
|
||
| package network | ||
|
|
||
| import ( | ||
| "strings" | ||
| "testing" | ||
|
|
||
| "github.com/DataDog/zstd" | ||
| "github.com/algorand/go-algorand/logging" | ||
| "github.com/algorand/go-algorand/protocol" | ||
| "github.com/algorand/go-algorand/test/partitiontest" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestZstdDecompress(t *testing.T) { | ||
| partitiontest.PartitionTest(t) | ||
|
|
||
| // happy case - small message | ||
| msg := []byte(strings.Repeat("1", 2048)) | ||
| compressed, err := zstd.Compress(nil, msg) | ||
| require.NoError(t, err) | ||
| d := zstdProposalDecompressor{} | ||
| decompressed, err := d.convert(compressed) | ||
| require.NoError(t, err) | ||
| require.Equal(t, msg, decompressed) | ||
|
|
||
| // error case - large message | ||
| msg = []byte(strings.Repeat("1", MaxDecompressedMessageSize+10)) | ||
| compressed, err = zstd.Compress(nil, msg) | ||
| require.NoError(t, err) | ||
| decompressed, err = d.convert(compressed) | ||
| require.Error(t, err) | ||
| require.Nil(t, decompressed) | ||
| } | ||
|
|
||
| func TestCheckCanCompress(t *testing.T) { | ||
| partitiontest.PartitionTest(t) | ||
|
|
||
| req := broadcastRequest{} | ||
| peers := []*wsPeer{} | ||
| r := checkCanCompress(req, peers) | ||
| require.False(t, r) | ||
|
|
||
| req.tags = []protocol.Tag{protocol.AgreementVoteTag} | ||
| r = checkCanCompress(req, peers) | ||
| require.False(t, r) | ||
|
|
||
| req.tags = []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag} | ||
| r = checkCanCompress(req, peers) | ||
| require.False(t, r) | ||
|
|
||
| peer1 := wsPeer{ | ||
| features: 0, | ||
| } | ||
| peers = []*wsPeer{&peer1} | ||
| r = checkCanCompress(req, peers) | ||
| require.False(t, r) | ||
|
|
||
| peer2 := wsPeer{ | ||
| features: pfCompressedProposal, | ||
| } | ||
| peers = []*wsPeer{&peer1, &peer2} | ||
| r = checkCanCompress(req, peers) | ||
| require.True(t, r) | ||
| } | ||
|
|
||
| func TestZstdCompressMsg(t *testing.T) { | ||
| partitiontest.PartitionTest(t) | ||
|
|
||
| ppt := len(protocol.ProposalPayloadTag) | ||
| data := []byte("data") | ||
| comp, msg := zstdCompressMsg([]byte(protocol.ProposalPayloadTag), data) | ||
| require.Empty(t, msg) | ||
| require.Equal(t, []byte(protocol.ProposalPayloadTag), comp[:ppt]) | ||
| require.Equal(t, zstdCompressionMagic[:], comp[ppt:ppt+len(zstdCompressionMagic)]) | ||
| d := zstdProposalDecompressor{} | ||
| decompressed, err := d.convert(comp[ppt:]) | ||
| require.NoError(t, err) | ||
| require.Equal(t, data, decompressed) | ||
| } | ||
|
|
||
| type converterTestLogger struct { | ||
| logging.Logger | ||
| WarnfCallback func(string, ...interface{}) | ||
| warnMsgCount int | ||
| } | ||
|
|
||
| func (cl *converterTestLogger) Warnf(s string, args ...interface{}) { | ||
| cl.warnMsgCount++ | ||
| } | ||
|
|
||
| func TestWsPeerMsgDataConverterConvert(t *testing.T) { | ||
| partitiontest.PartitionTest(t) | ||
|
|
||
| c := wsPeerMsgDataConverter{} | ||
| c.ppdec = zstdProposalDecompressor{active: false} | ||
| tag := protocol.AgreementVoteTag | ||
| data := []byte("data") | ||
|
|
||
| r, err := c.convert(tag, data) | ||
| require.NoError(t, err) | ||
| require.Equal(t, data, r) | ||
|
|
||
| tag = protocol.ProposalPayloadTag | ||
| r, err = c.convert(tag, data) | ||
| require.NoError(t, err) | ||
| require.Equal(t, data, r) | ||
|
|
||
| l := converterTestLogger{} | ||
| c.log = &l | ||
| c.ppdec = zstdProposalDecompressor{active: true} | ||
| r, err = c.convert(tag, data) | ||
| require.NoError(t, err) | ||
| require.Equal(t, data, r) | ||
| require.Equal(t, 1, l.warnMsgCount) | ||
|
|
||
| l = converterTestLogger{} | ||
| c.log = &l | ||
|
|
||
| comp, err := zstd.Compress(nil, data) | ||
| require.NoError(t, err) | ||
|
|
||
| r, err = c.convert(tag, comp) | ||
| require.NoError(t, err) | ||
| require.Equal(t, data, r) | ||
| require.Equal(t, 0, l.warnMsgCount) | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.