Skip to content

Commit

Permalink
[FIXED] Excessive blk compacts with delete tombstones. (#5719)
Browse files Browse the repository at this point in the history
Make sure to account for tombstone in rbytes to avoid potential
excessive compact attempts.

Partially resolves:  #5702 

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored Jul 29, 2024
2 parents 12e1889 + a0a7183 commit aa37839
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 2 deletions.
22 changes: 20 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4147,7 +4147,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// All other more thorough cleanup will happen in syncBlocks logic.
// Note that we do not have to store empty records for the deleted, so don't use to calculate.
// TODO(dlc) - This should not be inline, should kick the sync routine.
if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock {
if !isLastBlock && mb.shouldCompactInline() {
mb.compact()
}
}
Expand Down Expand Up @@ -4209,6 +4209,21 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
return true, nil
}

// Tests whether we should try to compact this block while inline removing msgs.
// We will want rbytes to be over the minimum and have a 2x potential savings.
// Lock should be held.
func (mb *msgBlock) shouldCompactInline() bool {
return mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes
}

// Tests whether we should try to compact this block while running periodic sync.
// We will want rbytes to be over the minimum and have a 2x potential savings.
// Ignores 2MB minimum.
// Lock should be held.
func (mb *msgBlock) shouldCompactSync() bool {
return mb.bytes*2 < mb.rbytes
}

// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
Expand Down Expand Up @@ -5102,6 +5117,9 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
}
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
} else {
// Make sure to account for tombstones in rbytes.
mb.rbytes += rl
}

fch, werr := mb.fch, mb.werr
Expand Down Expand Up @@ -5445,7 +5463,7 @@ func (fs *fileStore) syncBlocks() {
// Check if we should compact here as well.
// Do not compact last mb.
var needsCompact bool
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes {
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.shouldCompactSync() {
needsCompact = true
markDirty = true
}
Expand Down
66 changes: 66 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7371,6 +7371,72 @@ func TestFileStoreCheckSkipFirstBlockEmptyFilter(t *testing.T) {
require_Equal(t, lbi, 3)
}

// https://github.com/nats-io/nats-server/issues/5702
func TestFileStoreTombstoneRbytes(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 1024},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// Block can hold 24 msgs.
// So will fill one block and half of the other
msg := []byte("hello")
for i := 0; i < 34; i++ {
fs.StoreMsg("foo.22", nil, msg)
}
require_True(t, fs.numMsgBlocks() > 1)
// Now delete second half of first block which will place tombstones in second blk.
for seq := 11; seq <= 24; seq++ {
fs.RemoveMsg(uint64(seq))
}
// Now check that rbytes has been properly accounted for in second block.
fs.mu.RLock()
blk := fs.blks[1]
fs.mu.RUnlock()

blk.mu.RLock()
bytes, rbytes := blk.bytes, blk.rbytes
blk.mu.RUnlock()
require_True(t, rbytes > bytes)
}

func TestFileStoreMsgBlockShouldCompact(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// 127 fit into a block.
msg := bytes.Repeat([]byte("Z"), 64*1024)
for i := 0; i < 190; i++ {
fs.StoreMsg("foo.22", nil, msg)
}
require_True(t, fs.numMsgBlocks() > 1)
// Now delete second half of first block which will place tombstones in second blk.
for seq := 64; seq <= 127; seq++ {
fs.RemoveMsg(uint64(seq))
}
fs.mu.RLock()
fblk := fs.blks[0]
sblk := fs.blks[1]
fs.mu.RUnlock()

fblk.mu.RLock()
bytes, rbytes := fblk.bytes, fblk.rbytes
shouldCompact := fblk.shouldCompactInline()
fblk.mu.RUnlock()
// Should have tripped compaction already.
require_Equal(t, bytes, rbytes)
require_False(t, shouldCompact)

sblk.mu.RLock()
shouldCompact = sblk.shouldCompactInline()
sblk.mu.RUnlock()
require_False(t, shouldCompact)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit aa37839

Please sign in to comment.