Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.19-RC.2 #5718

Merged
merged 16 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion scripts/runTestsOnTravis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ elif [ "$1" = "srv_pkg_non_js_tests" ]; then
# by using `skip_js_tests`, MQTT tests by using `skip_mqtt_tests` and
# message tracing tests by using `skip_msgtrace_tests`.

go test -race -v -p=1 ./server/... -tags=skip_store_tests,skip_js_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast
# Also including the ldflag with the version since this includes the `TestVersionMatchesTag`.
go test -race -v -p=1 ./server/... -ldflags="-X=github.com/nats-io/nats-server/v2/server.serverVersion=$TRAVIS_TAG" -tags=skip_store_tests,skip_js_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast

elif [ "$1" = "non_srv_pkg_tests" ]; then

Expand Down
4 changes: 3 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5436,7 +5436,9 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {

if !ok {
if c.kind == ROUTER && len(c.route.accName) > 0 {
acc = c.acc
if acc = c.acc; acc == nil {
return nil, nil
}
} else {
// Match correct account and sublist.
if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil {
Expand Down
26 changes: 25 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2898,6 +2898,28 @@ func (o *consumer) isFiltered() bool {
return false
}

// Check if we would have matched and needed an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) matchAck(sseq uint64) bool {
o.mu.RLock()
defer o.mu.RUnlock()

// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() {
if o.mset == nil {
return false
}
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
return false
}
if !o.isFilteredMatch(svp.subj) {
return false
}
}
return true
}

// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) needAck(sseq uint64, subj string) bool {
Expand Down Expand Up @@ -5499,7 +5521,9 @@ func (o *consumer) checkStateForInterestStream() error {
}

for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
mset.ackMsg(o, seq)
if o.matchAck(seq) {
mset.ackMsg(o, seq)
}
}

o.mu.RLock()
Expand Down
32 changes: 29 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2610,6 +2610,11 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
// Will return -1 if no matches at all.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
if filter == _EMPTY_ {
filter = fwcs
wc = true
}

start, stop := uint32(math.MaxUint32), uint32(0)
if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
Expand Down Expand Up @@ -4029,7 +4034,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// All other more thorough cleanup will happen in syncBlocks logic.
// Note that we do not have to store empty records for the deleted, so don't use to calculate.
// TODO(dlc) - This should not be inline, should kick the sync routine.
if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock {
if !isLastBlock && mb.shouldCompactInline() {
mb.compact()
}
}
Expand Down Expand Up @@ -4091,6 +4096,21 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
return true, nil
}

// Tests whether we should try to compact this block while inline removing msgs.
// We will want rbytes to be over the minimum and have a 2x potential savings.
// Lock should be held.
func (mb *msgBlock) shouldCompactInline() bool {
return mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes
}

// Tests whether we should try to compact this block while running periodic sync.
// We will want rbytes to be over the minimum and have a 2x potential savings.
// Ignores 2MB minimum.
// Lock should be held.
func (mb *msgBlock) shouldCompactSync() bool {
return mb.bytes*2 < mb.rbytes
}

// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
Expand Down Expand Up @@ -4984,6 +5004,9 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
}
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
} else {
// Make sure to account for tombstones in rbytes.
mb.rbytes += rl
}

fch, werr := mb.fch, mb.werr
Expand Down Expand Up @@ -5327,7 +5350,7 @@ func (fs *fileStore) syncBlocks() {
// Check if we should compact here as well.
// Do not compact last mb.
var needsCompact bool
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes {
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.shouldCompactSync() {
needsCompact = true
markDirty = true
}
Expand Down Expand Up @@ -6424,7 +6447,10 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Nothing found in this block. We missed, if first block (bi) check psim.
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
// We should not do this at all if we are already on the last block.
// Also if we are a wildcard do not check if large subject space.
const wcMaxSizeToCheck = 64 * 1024
if i == bi && i < len(fs.blks)-1 && (!wc || fs.psim.Size() < wcMaxSizeToCheck) {
nbi, lbi := fs.checkSkipFirstBlock(filter, wc)
// Nothing available.
if nbi < 0 || lbi <= bi {
Expand Down
119 changes: 119 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7248,6 +7248,94 @@ func TestFileStoreCheckSkipFirstBlockBug(t *testing.T) {
require_NoError(t, err)
}

// https://github.com/nats-io/nats-server/issues/5705
func TestFileStoreCheckSkipFirstBlockEmptyFilter(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")
// Create 4 blocks, each block holds 2 msgs
for i := 0; i < 4; i++ {
fs.StoreMsg("foo.22.bar", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 4)

nbi, lbi := fs.checkSkipFirstBlock(_EMPTY_, false)
require_Equal(t, nbi, 0)
require_Equal(t, lbi, 3)
}

// https://github.com/nats-io/nats-server/issues/5702
func TestFileStoreTombstoneRbytes(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 1024},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// Block can hold 24 msgs.
// So will fill one block and half of the other
msg := []byte("hello")
for i := 0; i < 34; i++ {
fs.StoreMsg("foo.22", nil, msg)
}
require_True(t, fs.numMsgBlocks() > 1)
// Now delete second half of first block which will place tombstones in second blk.
for seq := 11; seq <= 24; seq++ {
fs.RemoveMsg(uint64(seq))
}
// Now check that rbytes has been properly accounted for in second block.
fs.mu.RLock()
blk := fs.blks[1]
fs.mu.RUnlock()

blk.mu.RLock()
bytes, rbytes := blk.bytes, blk.rbytes
blk.mu.RUnlock()
require_True(t, rbytes > bytes)
}

func TestFileStoreMsgBlockShouldCompact(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// 127 fit into a block.
msg := bytes.Repeat([]byte("Z"), 64*1024)
for i := 0; i < 190; i++ {
fs.StoreMsg("foo.22", nil, msg)
}
require_True(t, fs.numMsgBlocks() > 1)
// Now delete second half of first block which will place tombstones in second blk.
for seq := 64; seq <= 127; seq++ {
fs.RemoveMsg(uint64(seq))
}
fs.mu.RLock()
fblk := fs.blks[0]
sblk := fs.blks[1]
fs.mu.RUnlock()

fblk.mu.RLock()
bytes, rbytes := fblk.bytes, fblk.rbytes
shouldCompact := fblk.shouldCompactInline()
fblk.mu.RUnlock()
// Should have tripped compaction already.
require_Equal(t, bytes, rbytes)
require_False(t, shouldCompact)

sblk.mu.RLock()
shouldCompact = sblk.shouldCompactInline()
sblk.mu.RUnlock()
require_False(t, shouldCompact)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -7506,6 +7594,37 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
}
}

func Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Make first msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)
// Add in a bunch of msgs.
// We need to make sure we have a range of subjects that could kick in a linear scan.
for i := 0; i < 1_000_000; i++ {
subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2)
fs.StoreMsg(subj, nil, msg)
}
// Make last msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// Make sure not first seq.
_, _, err := fs.LoadNextMsg("foo.*.baz", true, 999_990, &smv)
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
Expand Down
Loading