Skip to content

Commit

Permalink
Add support for TTLs and subject delete markers to memory store
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jan 23, 2025
1 parent 2352ad9 commit fdba068
Show file tree
Hide file tree
Showing 3 changed files with 394 additions and 183 deletions.
312 changes: 168 additions & 144 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24935,43 +24935,47 @@ func TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor(t *testing.T) {
}

func TestJetStreamMessageTTL(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}
msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

for i := 1; i <= 10; i++ {
msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
for i := 1; i <= 10; i++ {
msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 10)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 10)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 10)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 10)

time.Sleep(time.Second * 2)
time.Sleep(time.Second * 2)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
require_Equal(t, si.State.FirstSeq, 11)
require_Equal(t, si.State.LastSeq, 10)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
require_Equal(t, si.State.FirstSeq, 11)
require_Equal(t, si.State.LastSeq, 10)
})
}
}

func TestJetStreamMessageTTLRestart(t *testing.T) {
Expand Down Expand Up @@ -25088,31 +25092,35 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) {
}

func TestJetStreamMessageTTLInvalid(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}
msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set(JSMessageTTL, "500ms")
_, err := js.PublishMsg(msg)
require_Error(t, err)
msg.Header.Set(JSMessageTTL, "500ms")
_, err := js.PublishMsg(msg)
require_Error(t, err)

msg.Header.Set(JSMessageTTL, "something")
_, err = js.PublishMsg(msg)
require_Error(t, err)
msg.Header.Set(JSMessageTTL, "something")
_, err = js.PublishMsg(msg)
require_Error(t, err)
})
}
}

func TestJetStreamMessageTTLNotUpdatable(t *testing.T) {
Expand All @@ -25139,74 +25147,82 @@ func TestJetStreamMessageTTLNotUpdatable(t *testing.T) {
}

func TestJetStreamMessageTTLNeverExpire(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
MaxAge: time.Second,
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
AllowMsgTTL: true,
MaxAge: time.Second,
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}
msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

// The first message we publish is set to "never expire", therefore it
// won't age out with the MaxAge policy.
msg.Header.Set(JSMessageTTL, "never")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
// The first message we publish is set to "never expire", therefore it
// won't age out with the MaxAge policy.
msg.Header.Set(JSMessageTTL, "never")
_, err := js.PublishMsg(msg)
require_NoError(t, err)

// Following messages will be published as normal and will age out.
msg.Header.Del(JSMessageTTL)
for i := 1; i <= 10; i++ {
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
// Following messages will be published as normal and will age out.
msg.Header.Del(JSMessageTTL)
for i := 1; i <= 10; i++ {
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 11)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 11)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)

time.Sleep(time.Second * 2)
time.Sleep(time.Second * 2)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 1)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 1)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)
})
}
}

func TestJetStreamMessageTTLDisabled(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}
msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)
msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)
})
}
}

func TestJetStreamMessageTTLWhenSourcing(t *testing.T) {
Expand Down Expand Up @@ -25338,40 +25354,44 @@ func TestJetStreamMessageTTLWhenMirroring(t *testing.T) {
}

func TestJetStreamSubjectDeleteMarkers(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
MaxAge: time.Second,
AllowMsgTTL: true,
SubjectDeleteMarkers: true,
SubjectDeleteMarkerTTL: "1s",
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
MaxAge: time.Second,
AllowMsgTTL: true,
SubjectDeleteMarkers: true,
SubjectDeleteMarkerTTL: "1s",
})

sub, err := js.SubscribeSync("test")
require_NoError(t, err)
sub, err := js.SubscribeSync("test")
require_NoError(t, err)

for i := 0; i < 3; i++ {
_, err = js.Publish("test", nil)
require_NoError(t, err)
}
for i := 0; i < 3; i++ {
_, err = js.Publish("test", nil)
require_NoError(t, err)
}

for i := 0; i < 3; i++ {
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_NoError(t, msg.AckSync())
}
for i := 0; i < 3; i++ {
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_NoError(t, msg.AckSync())
}

msg, err := sub.NextMsg(time.Second * 10)
require_NoError(t, err)
require_Equal(t, msg.Header.Get(JSAppliedLimit), "MaxAge")
require_Equal(t, msg.Header.Get(JSMessageTTL), "1s")
msg, err := sub.NextMsg(time.Second * 10)
require_NoError(t, err)
require_Equal(t, msg.Header.Get(JSAppliedLimit), "MaxAge")
require_Equal(t, msg.Header.Get(JSMessageTTL), "1s")
})
}
}

func TestJetStreamSubjectDeleteMarkersWithMirror(t *testing.T) {
Expand Down Expand Up @@ -25402,20 +25422,24 @@ func TestJetStreamSubjectDeleteMarkersWithMirror(t *testing.T) {
}

func TestJetStreamSubjectDeleteMarkersDefaultTTL(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()
nc, _ := jsClientConnect(t, s)
defer nc.Close()

sc, err := jsStreamCreate(t, nc, &StreamConfig{
Name: "Origin",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
SubjectDeleteMarkers: true,
})
require_NoError(t, err)
sc, err := jsStreamCreate(t, nc, &StreamConfig{
Name: "Origin",
Storage: storage,
Subjects: []string{"test"},
AllowMsgTTL: true,
SubjectDeleteMarkers: true,
})
require_NoError(t, err)

require_Equal(t, sc.SubjectDeleteMarkerTTL, subjectDeleteMarkerDefaultTTL)
require_Equal(t, sc.SubjectDeleteMarkerTTL, subjectDeleteMarkerDefaultTTL)
})
}
}
Loading

0 comments on commit fdba068

Please sign in to comment.