Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor encoding replicated stream messages to properly enforce framing #6052

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 28 additions & 36 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
@@ -7691,54 +7691,46 @@ const compressThreshold = 8192 // 8k

// If allowed and contents over the threshold we will compress.
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, compressOK bool) []byte {
shouldCompress := compressOK && len(subject)+len(reply)+len(hdr)+len(msg) > compressThreshold

elen := 1 + 8 + 8 + len(subject) + len(reply) + len(hdr) + len(msg)
// Clip the subject, reply, header and msgs down. Operate on
// uint64 lengths to avoid overflowing.
slen := min(uint64(len(subject)), math.MaxUint16)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
rlen := min(uint64(len(reply)), math.MaxUint16)
hlen := min(uint64(len(hdr)), math.MaxUint16)
mlen := min(uint64(len(msg)), math.MaxUint32)
total := slen + rlen + hlen + mlen

shouldCompress := compressOK && total > compressThreshold
elen := int(1 + 8 + 8 + total)
elen += (2 + 2 + 2 + 4) // Encoded lengths, 4bytes
// TODO(dlc) - check sizes of subject, reply and hdr, make sure uint16 ok.
buf := make([]byte, elen)

buf := make([]byte, 1, elen)
buf[0] = byte(streamMsgOp)

var le = binary.LittleEndian
wi := 1
le.PutUint64(buf[wi:], lseq)
wi += 8
le.PutUint64(buf[wi:], uint64(ts))
wi += 8
le.PutUint16(buf[wi:], uint16(len(subject)))
wi += 2
copy(buf[wi:], subject)
wi += len(subject)
le.PutUint16(buf[wi:], uint16(len(reply)))
wi += 2
copy(buf[wi:], reply)
wi += len(reply)
le.PutUint16(buf[wi:], uint16(len(hdr)))
wi += 2
if len(hdr) > 0 {
copy(buf[wi:], hdr)
wi += len(hdr)
}
le.PutUint32(buf[wi:], uint32(len(msg)))
wi += 4
if len(msg) > 0 {
copy(buf[wi:], msg)
wi += len(msg)
}
buf = le.AppendUint64(buf, lseq)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
buf = le.AppendUint64(buf, uint64(ts))
buf = le.AppendUint16(buf, uint16(slen))
buf = append(buf, subject[:slen]...)
buf = le.AppendUint16(buf, uint16(rlen))
buf = append(buf, reply[:rlen]...)
buf = le.AppendUint16(buf, uint16(hlen))
buf = append(buf, hdr[:hlen]...)
buf = le.AppendUint32(buf, uint32(mlen))
buf = append(buf, msg[:mlen]...)

// Check if we should compress.
if shouldCompress {
nbuf := make([]byte, s2.MaxEncodedLen(elen))
nbuf[0] = byte(compressedStreamMsgOp)
ebuf := s2.Encode(nbuf[1:], buf[1:wi])
// Only pay cost of decode the other side if we compressed.
ebuf := s2.Encode(nbuf[1:], buf[1:])
// Only pay the cost of decode on the other side if we compressed.
// S2 will allow us to try without major penalty for non-compressable data.
if len(ebuf) < wi {
nbuf = nbuf[:len(ebuf)+1]
buf, wi = nbuf, len(nbuf)
if len(ebuf) < len(buf) {
buf = nbuf[:len(ebuf)+1]
}
}

return buf[:wi]
return buf
}

// Determine if all peers in our set support the binary snapshot.