Skip to content

Commit

Permalink
Test MaxMsgs and MaxMsgsPer in combination
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jun 19, 2023
1 parent f5b0610 commit e879a9f
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4321,6 +4321,55 @@ func TestFileStoreMaxMsgsPerSubject(t *testing.T) {
})
}

// Testing the case in https://github.com/nats-io/nats-server/issues/4247
func TestFileStoreMaxMsgsAndMaxMsgsPerSubject(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 128
fcfg.CacheExpire = time.Second

fs, err := newFileStore(
fcfg,
StreamConfig{
Name: "zzz",
Subjects: []string{"kv.>"},
Storage: FileStorage,
Discard: DiscardNew, MaxMsgs: 100, // Total stream policy
DiscardNewPer: true, MaxMsgsPer: 1, // Per-subject policy
},
)
require_NoError(t, err)
defer fs.Stop()

for i := 1; i <= 101; i++ {
subj := fmt.Sprintf("kv.%d", i)
_, _, err := fs.StoreMsg(subj, nil, []byte("value"))
if i == 101 {
// The 101th iteration should fail because MaxMsgs is set to
// 100 and the policy is DiscardNew.
require_Error(t, err)
} else {
require_NoError(t, err)
}
}

for i := 1; i <= 100; i++ {
subj := fmt.Sprintf("kv.%d", i)
_, _, err := fs.StoreMsg(subj, nil, []byte("value"))
// All of these iterations should fail because MaxMsgsPer is set
// to 1 and DiscardNewPer is set to true, forcing us to reject
// cases where there is already a message on this subject.
require_Error(t, err)
}

if state := fs.State(); state.Msgs != 100 || state.FirstSeq != 1 || state.LastSeq != 100 || len(state.Deleted) != 0 {
// There should be 100 messages exactly, as the 101st subject
// should have been rejected in the first loop, and any duplicates
// on the other subjects should have been rejected in the second loop.
t.Fatalf("Bad state: %+v", state)
}
})
}

func TestFileStoreSubjectStateCacheExpiration(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 32
Expand Down

0 comments on commit e879a9f

Please sign in to comment.