Skip to content

Commit

Permalink
Make sure to account for tombstone in rbytes to avoid potential exces…
Browse files Browse the repository at this point in the history
…sive compact attempts.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jul 29, 2024
1 parent 12e1889 commit a0a7183
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 2 deletions.
22 changes: 20 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4147,7 +4147,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// All other more thorough cleanup will happen in syncBlocks logic.
// Note that we do not have to store empty records for the deleted, so don't use to calculate.
// TODO(dlc) - This should not be inline, should kick the sync routine.
if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock {
if !isLastBlock && mb.shouldCompactInline() {
mb.compact()
}
}
Expand Down Expand Up @@ -4209,6 +4209,21 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
return true, nil
}

// Tests whether we should try to compact this block while inline removing msgs.
// We will want rbytes to be over the minimum and have a 2x potential savings.
// Lock should be held.
func (mb *msgBlock) shouldCompactInline() bool {
return mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes
}

// Tests whether we should try to compact this block while running periodic sync.
// We will want rbytes to be over the minimum and have a 2x potential savings.
// Ignores 2MB minimum.
// Lock should be held.
func (mb *msgBlock) shouldCompactSync() bool {
return mb.bytes*2 < mb.rbytes
}

// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
Expand Down Expand Up @@ -5102,6 +5117,9 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
}
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
} else {
// Make sure to account for tombstones in rbytes.
mb.rbytes += rl
}

fch, werr := mb.fch, mb.werr
Expand Down Expand Up @@ -5445,7 +5463,7 @@ func (fs *fileStore) syncBlocks() {
// Check if we should compact here as well.
// Do not compact last mb.
var needsCompact bool
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes {
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.shouldCompactSync() {
needsCompact = true
markDirty = true
}
Expand Down
66 changes: 66 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7371,6 +7371,72 @@ func TestFileStoreCheckSkipFirstBlockEmptyFilter(t *testing.T) {
require_Equal(t, lbi, 3)
}

// https://github.com/nats-io/nats-server/issues/5702
func TestFileStoreTombstoneRbytes(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 1024},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// Block can hold 24 msgs.
// So will fill one block and half of the other
msg := []byte("hello")
for i := 0; i < 34; i++ {
fs.StoreMsg("foo.22", nil, msg)
}
require_True(t, fs.numMsgBlocks() > 1)
// Now delete second half of first block which will place tombstones in second blk.
for seq := 11; seq <= 24; seq++ {
fs.RemoveMsg(uint64(seq))
}
// Now check that rbytes has been properly accounted for in second block.
fs.mu.RLock()
blk := fs.blks[1]
fs.mu.RUnlock()

blk.mu.RLock()
bytes, rbytes := blk.bytes, blk.rbytes
blk.mu.RUnlock()
require_True(t, rbytes > bytes)
}

func TestFileStoreMsgBlockShouldCompact(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// 127 fit into a block.
msg := bytes.Repeat([]byte("Z"), 64*1024)
for i := 0; i < 190; i++ {
fs.StoreMsg("foo.22", nil, msg)
}
require_True(t, fs.numMsgBlocks() > 1)
// Now delete second half of first block which will place tombstones in second blk.
for seq := 64; seq <= 127; seq++ {
fs.RemoveMsg(uint64(seq))
}
fs.mu.RLock()
fblk := fs.blks[0]
sblk := fs.blks[1]
fs.mu.RUnlock()

fblk.mu.RLock()
bytes, rbytes := fblk.bytes, fblk.rbytes
shouldCompact := fblk.shouldCompactInline()
fblk.mu.RUnlock()
// Should have tripped compaction already.
require_Equal(t, bytes, rbytes)
require_False(t, shouldCompact)

sblk.mu.RLock()
shouldCompact = sblk.shouldCompactInline()
sblk.mu.RUnlock()
require_False(t, shouldCompact)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit a0a7183

Please sign in to comment.