Skip to content

Commit

Permalink
Refactor encoding replicated stream messages to properly enforce fram…
Browse files Browse the repository at this point in the history
…ing (#6052)

This PR fixes two bugs in `encodeStreamMsgAllowCompress`:

* The boundary lengths could have overflowed when casting potentially
large values down to uint16 or uint32 and we wouldn't have noticed;
* We would then copy in the subject/reply/header/msg with the original
length regardless, not the overflowed length, meaning the decoding side
would have the wrong boundaries.

Now we will correctly calculate the bounds, avoiding overflows, and only
append as many bytes as we clipped down to.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison authored Oct 29, 2024
2 parents 07c7eda + 0458586 commit 06e5aa2
Showing 1 changed file with 28 additions and 36 deletions.
64 changes: 28 additions & 36 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7691,54 +7691,46 @@ const compressThreshold = 8192 // 8k

// If allowed and contents over the threshold we will compress.
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, compressOK bool) []byte {
shouldCompress := compressOK && len(subject)+len(reply)+len(hdr)+len(msg) > compressThreshold

elen := 1 + 8 + 8 + len(subject) + len(reply) + len(hdr) + len(msg)
// Clip the subject, reply, header and msgs down. Operate on
// uint64 lengths to avoid overflowing.
slen := min(uint64(len(subject)), math.MaxUint16)
rlen := min(uint64(len(reply)), math.MaxUint16)
hlen := min(uint64(len(hdr)), math.MaxUint16)
mlen := min(uint64(len(msg)), math.MaxUint32)
total := slen + rlen + hlen + mlen

shouldCompress := compressOK && total > compressThreshold
elen := int(1 + 8 + 8 + total)
elen += (2 + 2 + 2 + 4) // Encoded lengths, 4bytes
// TODO(dlc) - check sizes of subject, reply and hdr, make sure uint16 ok.
buf := make([]byte, elen)

buf := make([]byte, 1, elen)
buf[0] = byte(streamMsgOp)

var le = binary.LittleEndian
wi := 1
le.PutUint64(buf[wi:], lseq)
wi += 8
le.PutUint64(buf[wi:], uint64(ts))
wi += 8
le.PutUint16(buf[wi:], uint16(len(subject)))
wi += 2
copy(buf[wi:], subject)
wi += len(subject)
le.PutUint16(buf[wi:], uint16(len(reply)))
wi += 2
copy(buf[wi:], reply)
wi += len(reply)
le.PutUint16(buf[wi:], uint16(len(hdr)))
wi += 2
if len(hdr) > 0 {
copy(buf[wi:], hdr)
wi += len(hdr)
}
le.PutUint32(buf[wi:], uint32(len(msg)))
wi += 4
if len(msg) > 0 {
copy(buf[wi:], msg)
wi += len(msg)
}
buf = le.AppendUint64(buf, lseq)
buf = le.AppendUint64(buf, uint64(ts))
buf = le.AppendUint16(buf, uint16(slen))
buf = append(buf, subject[:slen]...)
buf = le.AppendUint16(buf, uint16(rlen))
buf = append(buf, reply[:rlen]...)
buf = le.AppendUint16(buf, uint16(hlen))
buf = append(buf, hdr[:hlen]...)
buf = le.AppendUint32(buf, uint32(mlen))
buf = append(buf, msg[:mlen]...)

// Check if we should compress.
if shouldCompress {
nbuf := make([]byte, s2.MaxEncodedLen(elen))
nbuf[0] = byte(compressedStreamMsgOp)
ebuf := s2.Encode(nbuf[1:], buf[1:wi])
// Only pay cost of decode the other side if we compressed.
ebuf := s2.Encode(nbuf[1:], buf[1:])
// Only pay the cost of decode on the other side if we compressed.
// S2 will allow us to try without major penalty for non-compressable data.
if len(ebuf) < wi {
nbuf = nbuf[:len(ebuf)+1]
buf, wi = nbuf, len(nbuf)
if len(ebuf) < len(buf) {
buf = nbuf[:len(ebuf)+1]
}
}

return buf[:wi]
return buf
}

// Determine if all peers in our set support the binary snapshot.
Expand Down

0 comments on commit 06e5aa2

Please sign in to comment.