Skip to content

Commit

Permalink
Merge branch 'nats-io:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
deem0n authored Jan 21, 2025
2 parents dfbc9ee + abd6197 commit 89ad5b0
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 28 deletions.
86 changes: 81 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type fileStore struct {
tombs []uint64
ld *LostStreamData
scb StorageUpdateHandler
sdmcb SubjectDeleteMarkerUpdateHandler
ageChk *time.Timer
syncTmr *time.Timer
cfg FileStreamInfo
Expand Down Expand Up @@ -2131,7 +2132,11 @@ func (fs *fileStore) expireMsgsOnRecover() {
break
}
// Can we remove whole block here?
if mb.last.ts <= minAge {
// TODO(nat): We can't do this with LimitsTTL as we have no way to know
// if we're throwing away real messages or other tombstones without
// loading them, so in this case we'll fall through to the "slow path".
// There might be a better way of handling this though.
if !mb.fs.cfg.SubjectDeleteMarkers && mb.last.ts <= minAge {
purged += mb.msgs
bytes += mb.bytes
deleteEmptyBlock(mb)
Expand Down Expand Up @@ -2198,7 +2203,17 @@ func (fs *fileStore) expireMsgsOnRecover() {
// Update fss
// Make sure we have fss loaded.
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
if fs.removePerSubject(sm.subj) && fs.cfg.SubjectDeleteMarkers {
// Need to release the mb lock here in case we need to write a new
// tombstone into the same mb in subjectDeleteMarkerIfNeeded. However
// at this point fs.mu is held, so nothing else should happen here.
// No need to process the callbacks from subjectDeleteMarkerIfNeeded
// here as none will have been registered yet (we haven't yet returned
// from newFileStore*).
mb.mu.Unlock()
fs.subjectDeleteMarkerIfNeeded(sm, JSAppliedLimitMaxAge)
mb.mu.Lock()
}
}
// Make sure we have a proper next first sequence.
if needNextFirst {
Expand Down Expand Up @@ -3612,6 +3627,13 @@ func (fs *fileStore) RegisterStorageUpdates(cb StorageUpdateHandler) {
}
}

// RegisterSubjectDeleteMarkerUpdates registers a callback for updates to new tombstones.
func (fs *fileStore) RegisterSubjectDeleteMarkerUpdates(cb SubjectDeleteMarkerUpdateHandler) {
fs.mu.Lock()
fs.sdmcb = cb
fs.mu.Unlock()
}

// Helper to get hash key for specific message block.
// Lock should be held
func (fs *fileStore) hashKeyForBlock(index uint32) []byte {
Expand Down Expand Up @@ -4325,10 +4347,10 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
}

// Convenience function to remove per subject tracking at the filestore level.
// Lock should be held.
func (fs *fileStore) removePerSubject(subj string) {
// Lock should be held. Returns if we deleted the last message on the subject.
func (fs *fileStore) removePerSubject(subj string) bool {
if len(subj) == 0 || fs.psim == nil {
return
return false
}
// We do not update sense of fblk here but will do so when we resolve during lookup.
bsubj := stringToBytes(subj)
Expand All @@ -4339,9 +4361,11 @@ func (fs *fileStore) removePerSubject(subj string) {
} else if info.total == 0 {
if _, ok = fs.psim.Delete(bsubj); ok {
fs.tsl -= len(subj)
return true
}
}
}
return false
}

// Remove a message, optionally rewriting the mb file.
Expand Down Expand Up @@ -5295,6 +5319,52 @@ func (fs *fileStore) cancelAgeChk() {
}
}

// Lock must be held so that nothing else can interleave and write a
// new message on this subject before we get the chance to write the
// delete marker. If the delete marker is written successfully then
// this function returns a callback func to call scb and sdmcb after
// the lock has been released.
func (fs *fileStore) subjectDeleteMarkerIfNeeded(sm *StoreMsg, reason string) func() {
// If the deleted message was itself a delete marker then
// don't write out more of them or we'll churn endlessly.
if len(getHeader(JSAppliedLimit, sm.hdr)) != 0 {
return nil
}
if !fs.cfg.SubjectDeleteMarkers {
return nil
}
if _, ok := fs.psim.Find(stringToBytes(sm.subj)); ok {
// There are still messages left with this subject,
// therefore it wasn't the last message deleted.
return nil
}
// Build the subject delete marker. If no TTL is specified then
// we'll default to 15 minutes — by that time every possible condition
// should have cleared (i.e. ordered consumer timeout, client timeouts,
// route/gateway interruptions, even device/client restarts etc).
var ttl int64 = 60 * 15
if fs.cfg.SubjectDeleteMarkerTTL != _EMPTY_ {
ttl, _ = parseMessageTTL(fs.cfg.SubjectDeleteMarkerTTL)
}
var _hdr [128]byte
hdr := fmt.Appendf(_hdr[:0], "NATS/1.0\r\n%s: %s\r\n%s: %s\r\n\r\n", JSAppliedLimit, reason, JSMessageTTL, time.Duration(ttl)*time.Second)
seq, ts := fs.state.LastSeq+1, time.Now().UnixNano()
// Store it in the stream and then prepare the callbacks
// to return to the caller.
if err := fs.storeRawMsg(sm.subj, hdr, nil, seq, ts, ttl); err != nil {
return nil
}
cb, tcb := fs.scb, fs.sdmcb
return func() {
if cb != nil {
cb(1, int64(fileStoreMsgSize(sm.subj, hdr, nil)), seq, sm.subj)
}
if tcb != nil {
tcb(seq, sm.subj)
}
}
}

// Will expire msgs that are too old.
func (fs *fileStore) expireMsgs() {
// We need to delete one by one here and can not optimize for the time being.
Expand All @@ -5316,9 +5386,15 @@ func (fs *fileStore) expireMsgs() {
continue
}
}
// Remove the message and then, if LimitsTTL is enabled, try and work out
// if it was the last message of that particular subject that we just deleted.
fs.mu.Lock()
fs.removeMsgViaLimits(sm.seq)
cbs := fs.subjectDeleteMarkerIfNeeded(sm, JSAppliedLimitMaxAge)
fs.mu.Unlock()
if cbs != nil {
cbs()
}
// Recalculate in case we are expiring a bunch.
minAge = time.Now().UnixNano() - maxAge
}
Expand Down
123 changes: 123 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8562,3 +8562,126 @@ func TestFileStoreDontSpamCompactWhenMostlyTombstones(t *testing.T) {
fmb.bytes /= 2
require_True(t, fmb.shouldCompactInline())
}

func TestFileStoreSubjectDeleteMarkers(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{
Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage,
MaxAge: time.Second, AllowMsgTTL: true,
SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s",
},
)
require_NoError(t, err)
defer fs.Stop()

// Store three messages that will expire because of MaxAge.
var seq uint64
for i := 0; i < 3; i++ {
seq, _, err = fs.StoreMsg("test", nil, nil, 0)
require_NoError(t, err)
}

// The last message should be gone after MaxAge has passed.
time.Sleep(time.Second * 2)
sm, err := fs.LoadMsg(seq, nil)
require_Error(t, err)
require_Equal(t, sm, nil)

// We should have replaced it with a tombstone.
sm, err = fs.LoadMsg(seq+1, nil)
require_NoError(t, err)
require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge)
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")

time.Sleep(time.Second * 2)

// The tombstone itself only has a TTL of 1 second so that should
// also be gone by now too. No more tombstones should have been
// published.
var ss StreamState
fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, sm.seq+1)
require_Equal(t, ss.LastSeq, sm.seq)
require_Equal(t, ss.Msgs, 0)
}

func TestFileStoreSubjectDeleteMarkersDefaultTTL(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{
Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage,
MaxAge: time.Second, AllowMsgTTL: true,
SubjectDeleteMarkers: true,
},
)
require_NoError(t, err)
defer fs.Stop()

// Store three messages that will expire because of MaxAge.
var seq uint64
for i := 0; i < 3; i++ {
seq, _, err = fs.StoreMsg("test", nil, nil, 0)
require_NoError(t, err)
}

// The last message should be gone after MaxAge has passed.
time.Sleep(time.Second * 2)
sm, err := fs.LoadMsg(seq, nil)
require_Error(t, err)
require_Equal(t, sm, nil)

// We should have replaced it with a tombstone.
sm, err = fs.LoadMsg(seq+1, nil)
require_NoError(t, err)
require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge)
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "15m0s")
}

func TestFileStoreSubjectDeleteMarkersOnRestart(t *testing.T) {
storeDir := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{
Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage,
MaxAge: time.Second, AllowMsgTTL: true,
SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s",
},
)
require_NoError(t, err)
defer fs.Stop()

// Store three messages that will expire because of MaxAge.
var seq uint64
for i := 0; i < 3; i++ {
seq, _, err = fs.StoreMsg("test", nil, nil, 0)
require_NoError(t, err)
}

// Stop the store so that the expiry happens while it's technically
// offline. Then wait for at least MaxAge and then restart, which should
// hit the expireMsgsOnRecover path instead.
require_NoError(t, fs.Stop())
time.Sleep(time.Second * 2)
fs, err = newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{
Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage,
MaxAge: time.Second, AllowMsgTTL: true,
SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s",
},
)
require_NoError(t, err)
defer fs.Stop()

// The last message should be gone after MaxAge has passed.
sm, err := fs.LoadMsg(seq, nil)
require_Error(t, err)
require_Equal(t, sm, nil)

// We should have replaced it with a tombstone.
sm, err = fs.LoadMsg(seq+1, nil)
require_NoError(t, err)
require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge)
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")
}
6 changes: 3 additions & 3 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5369,7 +5369,7 @@ func TestJetStreamClusterMessageTTLWhenSourcing(t *testing.T) {
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "1s")
hdr.Add(JSMessageTTL, "1s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Expand Down Expand Up @@ -5436,7 +5436,7 @@ func TestJetStreamClusterMessageTTLWhenMirroring(t *testing.T) {
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "1s")
hdr.Add(JSMessageTTL, "1s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Expand Down Expand Up @@ -5486,7 +5486,7 @@ func TestJetStreamClusterMessageTTLDisabled(t *testing.T) {
Header: nats.Header{},
}

msg.Header.Set("Nats-TTL", "1s")
msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)

Expand Down
9 changes: 7 additions & 2 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ func jsClientConnectURL(t testing.TB, url string, opts ...nats.Option) (*nats.Co
}

// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfig {
func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) (*StreamConfig, error) {
t.Helper()

j, err := json.Marshal(cfg)
Expand All @@ -1276,8 +1276,13 @@ func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfi

var resp JSApiStreamUpdateResponse
require_NoError(t, json.Unmarshal(msg.Data, &resp))

if resp.Error != nil {
return nil, resp.Error
}

require_NotNil(t, resp.StreamInfo)
return &resp.Config
return &resp.Config, nil
}

// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet.
Expand Down
Loading

0 comments on commit 89ad5b0

Please sign in to comment.