diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1da5a95c52..9b03f6bee6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7691,54 +7691,46 @@ const compressThreshold = 8192 // 8k // If allowed and contents over the threshold we will compress. func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, compressOK bool) []byte { - shouldCompress := compressOK && len(subject)+len(reply)+len(hdr)+len(msg) > compressThreshold - - elen := 1 + 8 + 8 + len(subject) + len(reply) + len(hdr) + len(msg) + // Clip the subject, reply, header and msgs down. Operate on + // uint64 lengths to avoid overflowing. + slen := min(uint64(len(subject)), math.MaxUint16) + rlen := min(uint64(len(reply)), math.MaxUint16) + hlen := min(uint64(len(hdr)), math.MaxUint16) + mlen := min(uint64(len(msg)), math.MaxUint32) + total := slen + rlen + hlen + mlen + + shouldCompress := compressOK && total > compressThreshold + elen := int(1 + 8 + 8 + total) elen += (2 + 2 + 2 + 4) // Encoded lengths, 4bytes - // TODO(dlc) - check sizes of subject, reply and hdr, make sure uint16 ok. - buf := make([]byte, elen) + + buf := make([]byte, 1, elen) buf[0] = byte(streamMsgOp) + var le = binary.LittleEndian - wi := 1 - le.PutUint64(buf[wi:], lseq) - wi += 8 - le.PutUint64(buf[wi:], uint64(ts)) - wi += 8 - le.PutUint16(buf[wi:], uint16(len(subject))) - wi += 2 - copy(buf[wi:], subject) - wi += len(subject) - le.PutUint16(buf[wi:], uint16(len(reply))) - wi += 2 - copy(buf[wi:], reply) - wi += len(reply) - le.PutUint16(buf[wi:], uint16(len(hdr))) - wi += 2 - if len(hdr) > 0 { - copy(buf[wi:], hdr) - wi += len(hdr) - } - le.PutUint32(buf[wi:], uint32(len(msg))) - wi += 4 - if len(msg) > 0 { - copy(buf[wi:], msg) - wi += len(msg) - } + buf = le.AppendUint64(buf, lseq) + buf = le.AppendUint64(buf, uint64(ts)) + buf = le.AppendUint16(buf, uint16(slen)) + buf = append(buf, subject[:slen]...) + buf = le.AppendUint16(buf, uint16(rlen)) + buf = append(buf, reply[:rlen]...) + buf = le.AppendUint16(buf, uint16(hlen)) + buf = append(buf, hdr[:hlen]...) + buf = le.AppendUint32(buf, uint32(mlen)) + buf = append(buf, msg[:mlen]...) // Check if we should compress. if shouldCompress { nbuf := make([]byte, s2.MaxEncodedLen(elen)) nbuf[0] = byte(compressedStreamMsgOp) - ebuf := s2.Encode(nbuf[1:], buf[1:wi]) - // Only pay cost of decode the other side if we compressed. + ebuf := s2.Encode(nbuf[1:], buf[1:]) + // Only pay the cost of decode on the other side if we compressed. // S2 will allow us to try without major penalty for non-compressable data. - if len(ebuf) < wi { - nbuf = nbuf[:len(ebuf)+1] - buf, wi = nbuf, len(nbuf) + if len(ebuf) < len(buf) { + buf = nbuf[:len(ebuf)+1] } } - return buf[:wi] + return buf } // Determine if all peers in our set support the binary snapshot.