diff --git a/server/filestore.go b/server/filestore.go index 28ac847f69..d23cdf5948 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -29,7 +29,6 @@ import ( "io/ioutil" "net" "os" - "path" "path/filepath" "runtime" "sort" @@ -296,8 +295,8 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim fs.fip = !fcfg.AsyncFlush // Check if this is a new setup. - mdir := path.Join(fcfg.StoreDir, msgDir) - odir := path.Join(fcfg.StoreDir, consumerDir) + mdir := filepath.Join(fcfg.StoreDir, msgDir) + odir := filepath.Join(fcfg.StoreDir, consumerDir) if err := os.MkdirAll(mdir, defaultDirPerms); err != nil { return nil, fmt.Errorf("could not create message storage directory - %v", err) } @@ -321,7 +320,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim } // Write our meta data iff does not exist. - meta := path.Join(fcfg.StoreDir, JetStreamMetaFile) + meta := filepath.Join(fcfg.StoreDir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) { if err := fs.writeStreamMeta(); err != nil { return nil, err @@ -331,7 +330,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // If we expect to be encrypted check that what we are restoring is not plaintext. // This can happen on snapshot restores or conversions. if fs.prf != nil { - keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey) + keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey) if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) { if err := fs.writeStreamMeta(); err != nil { return nil, err @@ -454,7 +453,7 @@ func (fs *fileStore) writeStreamMeta() error { return err } fs.aek = key - keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey) + keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey) if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } @@ -463,7 +462,7 @@ func (fs *fileStore) writeStreamMeta() error { } } - meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile) + meta := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) { return err } @@ -484,7 +483,7 @@ func (fs *fileStore) writeStreamMeta() error { fs.hh.Reset() fs.hh.Write(b) checksum := hex.EncodeToString(fs.hh.Sum(nil)) - sum := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum) + sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum) if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil { return err } @@ -503,10 +502,10 @@ const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, error) { mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire} - mdir := path.Join(fs.fcfg.StoreDir, msgDir) - mb.mfn = path.Join(mdir, fi.Name()) - mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, index)) - mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, index)) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + mb.mfn = filepath.Join(mdir, fi.Name()) + mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, index)) + mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, index)) if mb.hh == nil { key := sha256.Sum256(fs.hashKeyForBlock(index)) @@ -517,7 +516,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e // Check if encryption is enabled. if fs.prf != nil { - ekey, err := ioutil.ReadFile(path.Join(mdir, fmt.Sprintf(keyScan, mb.index))) + ekey, err := ioutil.ReadFile(filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))) if err != nil { // We do not seem to have keys even though we should. Could be a plaintext conversion. // Create the keys and we will double check below. @@ -863,12 +862,12 @@ func (fs *fileStore) recoverMsgs() error { defer fs.mu.Unlock() // Check for any left over purged messages. - pdir := path.Join(fs.fcfg.StoreDir, purgeDir) + pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir) if _, err := os.Stat(pdir); err == nil { os.RemoveAll(pdir) } - mdir := path.Join(fs.fcfg.StoreDir, msgDir) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) fis, err := ioutil.ReadDir(mdir) if err != nil { return errNotReadable @@ -916,13 +915,13 @@ func (fs *fileStore) recoverMsgs() error { // We had a bug that would leave fss files around during a snapshot. // Clean them up here if we see them. - if fms, err := filepath.Glob(path.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 { + if fms, err := filepath.Glob(filepath.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 { for _, fn := range fms { os.Remove(fn) } } // Same bug for keyfiles but for these we just need to identify orphans. - if kms, err := filepath.Glob(path.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 { + if kms, err := filepath.Glob(filepath.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 { valid := make(map[uint64]bool) for _, mb := range fs.blks { valid[mb.index] = true @@ -930,7 +929,7 @@ func (fs *fileStore) recoverMsgs() error { for _, fn := range kms { var index uint64 shouldRemove := true - if n, err := fmt.Sscanf(path.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] { + if n, err := fmt.Sscanf(filepath.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] { shouldRemove = false } if shouldRemove { @@ -1516,8 +1515,8 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } mb.hh = hh - mdir := path.Join(fs.fcfg.StoreDir, msgDir) - mb.mfn = path.Join(mdir, fmt.Sprintf(blkScan, mb.index)) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, mb.index)) mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms) if err != nil { mb.dirtyCloseWithRemove(true) @@ -1525,7 +1524,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } mb.mfd = mfd - mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, mb.index)) + mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, mb.index)) ifd, err := os.OpenFile(mb.ifn, os.O_CREATE|os.O_RDWR, defaultFilePerms) if err != nil { mb.dirtyCloseWithRemove(true) @@ -1534,7 +1533,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mb.ifd = ifd // For subject based info. - mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, mb.index)) + mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, mb.index)) // Check if encryption is enabled. if fs.prf != nil { @@ -1576,8 +1575,8 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error { return err } mb.aek, mb.bek, mb.seed, mb.nonce = key, bek, seed, encrypted[:key.NonceSize()] - mdir := path.Join(fs.fcfg.StoreDir, msgDir) - keyFile := path.Join(mdir, fmt.Sprintf(keyScan, mb.index)) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)) if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } @@ -2119,7 +2118,7 @@ func (mb *msgBlock) compact() { mb.closeFDsLocked() // We will write to a new file and mv/rename it in case of failure. - mfn := path.Join(path.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index)) + mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index)) defer os.Remove(mfn) if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil { return @@ -4061,8 +4060,8 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { // Move the msgs directory out of the way, will delete out of band. // FIXME(dlc) - These can error and we need to change api above to propagate? - mdir := path.Join(fs.fcfg.StoreDir, msgDir) - pdir := path.Join(fs.fcfg.StoreDir, purgeDir) + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir) // If purge directory still exists then we need to wait // in place and remove since rename would fail. if _, err := os.Stat(pdir); err == nil { @@ -4593,7 +4592,7 @@ func (fs *fileStore) Delete() error { } fs.Purge() - pdir := path.Join(fs.fcfg.StoreDir, purgeDir) + pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir) // If purge directory still exists then we need to wait // in place and remove since rename would fail. if _, err := os.Stat(pdir); err == nil { @@ -4820,13 +4819,13 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ o.mu.Unlock() // Write all the consumer files. - if writeFile(path.Join(odirPre, JetStreamMetaFile), meta) != nil { + if writeFile(filepath.Join(odirPre, JetStreamMetaFile), meta) != nil { return } - if writeFile(path.Join(odirPre, JetStreamMetaFileSum), sum) != nil { + if writeFile(filepath.Join(odirPre, JetStreamMetaFileSum), sum) != nil { return } - writeFile(path.Join(odirPre, consumerState), state) + writeFile(filepath.Join(odirPre, consumerState), state) } } @@ -4909,7 +4908,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt if cfg == nil || name == _EMPTY_ { return nil, fmt.Errorf("bad consumer config") } - odir := path.Join(fs.fcfg.StoreDir, consumerDir, name) + odir := filepath.Join(fs.fcfg.StoreDir, consumerDir, name) if err := os.MkdirAll(odir, defaultDirPerms); err != nil { return nil, fmt.Errorf("could not create consumer directory - %v", err) } @@ -4920,7 +4919,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt prf: fs.prf, name: name, odir: odir, - ifn: path.Join(odir, consumerState), + ifn: filepath.Join(odir, consumerState), } key := sha256.Sum256([]byte(fs.cfg.Name + "/" + name)) hh, err := highwayhash.New64(key[:]) @@ -4931,7 +4930,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt // Check for encryption. if o.prf != nil { - if ekey, err := ioutil.ReadFile(path.Join(odir, JetStreamMetaFileKey)); err == nil { + if ekey, err := ioutil.ReadFile(filepath.Join(odir, JetStreamMetaFileKey)); err == nil { // Recover key encryption key. rb, err := fs.prf([]byte(fs.cfg.Name + tsep + o.name)) if err != nil { @@ -4953,7 +4952,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt } // Write our meta data iff does not exist. - meta := path.Join(odir, JetStreamMetaFile) + meta := filepath.Join(odir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) { csi.Created = time.Now().UTC() if err := o.writeConsumerMeta(); err != nil { @@ -4964,7 +4963,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt // If we expect to be encrypted check that what we are restoring is not plaintext. // This can happen on snapshot restores or conversions. if o.prf != nil { - keyFile := path.Join(odir, JetStreamMetaFileKey) + keyFile := filepath.Join(odir, JetStreamMetaFileKey) if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) { if err := o.writeConsumerMeta(); err != nil { return nil, err @@ -5418,7 +5417,7 @@ func (o *consumerFileStore) updateConfig(cfg ConsumerConfig) error { // Write out the consumer meta data, i.e. state. // Lock should be held. func (cfs *consumerFileStore) writeConsumerMeta() error { - meta := path.Join(cfs.odir, JetStreamMetaFile) + meta := filepath.Join(cfs.odir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) { return err } @@ -5430,7 +5429,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { return err } cfs.aek = key - keyFile := path.Join(cfs.odir, JetStreamMetaFileKey) + keyFile := filepath.Join(cfs.odir, JetStreamMetaFileKey) if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } @@ -5456,7 +5455,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { cfs.hh.Reset() cfs.hh.Write(b) checksum := hex.EncodeToString(cfs.hh.Sum(nil)) - sum := path.Join(cfs.odir, JetStreamMetaFileSum) + sum := filepath.Join(cfs.odir, JetStreamMetaFileSum) if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil { return err } @@ -5787,7 +5786,7 @@ type templateFileStore struct { } func newTemplateFileStore(storeDir string) *templateFileStore { - tdir := path.Join(storeDir, tmplsDir) + tdir := filepath.Join(storeDir, tmplsDir) key := sha256.Sum256([]byte("templates")) hh, err := highwayhash.New64(key[:]) if err != nil { @@ -5797,11 +5796,11 @@ func newTemplateFileStore(storeDir string) *templateFileStore { } func (ts *templateFileStore) Store(t *streamTemplate) error { - dir := path.Join(ts.dir, t.Name) + dir := filepath.Join(ts.dir, t.Name) if err := os.MkdirAll(dir, defaultDirPerms); err != nil { return fmt.Errorf("could not create templates storage directory for %q- %v", t.Name, err) } - meta := path.Join(dir, JetStreamMetaFile) + meta := filepath.Join(dir, JetStreamMetaFile) if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil { return err } @@ -5818,7 +5817,7 @@ func (ts *templateFileStore) Store(t *streamTemplate) error { ts.hh.Reset() ts.hh.Write(b) checksum := hex.EncodeToString(ts.hh.Sum(nil)) - sum := path.Join(dir, JetStreamMetaFileSum) + sum := filepath.Join(dir, JetStreamMetaFileSum) if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil { return err } @@ -5826,5 +5825,5 @@ func (ts *templateFileStore) Store(t *streamTemplate) error { } func (ts *templateFileStore) Delete(t *streamTemplate) error { - return os.RemoveAll(path.Join(ts.dir, t.Name)) + return os.RemoveAll(filepath.Join(ts.dir, t.Name)) } diff --git a/server/filestore_test.go b/server/filestore_test.go index a4a16da947..1814680d93 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -27,7 +27,6 @@ import ( "math/bits" "math/rand" "os" - "path" "path/filepath" "reflect" "strings" @@ -516,6 +515,7 @@ func TestFileStoreMsgLimitBug(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() fs.StoreMsg(subj, nil, msg) } @@ -742,8 +742,8 @@ func TestFileStorePurge(t *testing.T) { } // We will simulate crashing before the purge directory is cleared. - mdir := path.Join(storeDir, msgDir) - pdir := path.Join(fs.fcfg.StoreDir, "ptest") + mdir := filepath.Join(storeDir, msgDir) + pdir := filepath.Join(fs.fcfg.StoreDir, "ptest") os.Rename(mdir, pdir) os.MkdirAll(mdir, 0755) @@ -753,7 +753,7 @@ func TestFileStorePurge(t *testing.T) { // Make sure we recover same state. fs.Stop() - purgeDir := path.Join(fs.fcfg.StoreDir, purgeDir) + purgeDir := filepath.Join(fs.fcfg.StoreDir, purgeDir) os.Rename(pdir, purgeDir) fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: blkSize}, StreamConfig{Name: "zzz", Storage: FileStorage}) @@ -1253,7 +1253,7 @@ func TestFileStoreEraseMsg(t *testing.T) { // Now look on disk as well. rl := fileStoreMsgSize(subj, nil, msg) buf := make([]byte, rl) - fp, err := os.Open(path.Join(storeDir, msgDir, fmt.Sprintf(blkScan, 1))) + fp, err := os.Open(filepath.Join(storeDir, msgDir, fmt.Sprintf(blkScan, 1))) if err != nil { t.Fatalf("Error opening msg block file: %v", err) } @@ -1311,7 +1311,7 @@ func TestFileStoreEraseAndNoIndexRecovery(t *testing.T) { // Stop and remove the index file. fs.Stop() - ifn := path.Join(storeDir, msgDir, fmt.Sprintf(indexScan, 1)) + ifn := filepath.Join(storeDir, msgDir, fmt.Sprintf(indexScan, 1)) removeFile(t, ifn) fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) @@ -1344,8 +1344,8 @@ func TestFileStoreMeta(t *testing.T) { } defer fs.Stop() - metafile := path.Join(storeDir, JetStreamMetaFile) - metasum := path.Join(storeDir, JetStreamMetaFileSum) + metafile := filepath.Join(storeDir, JetStreamMetaFile) + metasum := filepath.Join(storeDir, JetStreamMetaFileSum) // Test to make sure meta file and checksum are present. if _, err := os.Stat(metafile); os.IsNotExist(err) { @@ -1389,8 +1389,8 @@ func TestFileStoreMeta(t *testing.T) { t.Fatalf("Unexepected error: %v", err) } - ometafile := path.Join(storeDir, consumerDir, oname, JetStreamMetaFile) - ometasum := path.Join(storeDir, consumerDir, oname, JetStreamMetaFileSum) + ometafile := filepath.Join(storeDir, consumerDir, oname, JetStreamMetaFile) + ometasum := filepath.Join(storeDir, consumerDir, oname, JetStreamMetaFileSum) // Test to make sure meta file and checksum are present. if _, err := os.Stat(ometafile); os.IsNotExist(err) { @@ -1766,7 +1766,7 @@ func TestFileStoreSnapshot(t *testing.T) { if err != nil { t.Fatalf("Error getting next entry from snapshot: %v", err) } - fpath := path.Join(rstoreDir, filepath.Clean(hdr.Name)) + fpath := filepath.Join(rstoreDir, filepath.Clean(hdr.Name)) pdir := filepath.Dir(fpath) os.MkdirAll(pdir, 0755) fd, err := os.OpenFile(fpath, os.O_CREATE|os.O_RDWR, 0600) @@ -2055,13 +2055,13 @@ func TestFileStoreWriteFailures(t *testing.T) { // has a limited size. // E.g. Docker // docker run -ti --tmpfs /jswf_test:rw,size=32k --rm -v ~/Development/go/src:/go/src -w /go/src/github.com/nats-io/nats-server/ golang:1.16 /bin/bash - tdir := path.Join("/", "jswf_test") + tdir := filepath.Join("/", "jswf_test") if stat, err := os.Stat(tdir); err != nil || !stat.IsDir() { t.SkipNow() } defer removeDir(t, tdir) - storeDir := path.Join(tdir, JetStreamStoreDir) + storeDir := filepath.Join(tdir, JetStreamStoreDir) os.MkdirAll(storeDir, 0755) subj, msg := "foo", []byte("Hello Write Failures!") @@ -2772,7 +2772,7 @@ func TestFileStoreStreamDeleteDirNotEmpty(t *testing.T) { ready := make(chan bool) go func() { - g := path.Join(storeDir, "g") + g := filepath.Join(storeDir, "g") ready <- true for i := 0; i < 100; i++ { ioutil.WriteFile(g, []byte("OK"), defaultFilePerms) @@ -2859,7 +2859,7 @@ func TestFileStoreStreamIndexBug(t *testing.T) { badIdxBytes, _ := base64.StdEncoding.DecodeString("FgGBkw7D/f8/772iDPDIgbU=") dir := createDir(t, "js-bad-idx-") defer removeDir(t, dir) - fn := path.Join(dir, "1.idx") + fn := filepath.Join(dir, "1.idx") ioutil.WriteFile(fn, badIdxBytes, 0644) mb := &msgBlock{index: 1, ifn: fn} if err := mb.readIndexInfo(); err == nil || !strings.Contains(err.Error(), "short index") { @@ -3225,8 +3225,10 @@ func TestFileStoreExpireMsgsOnStart(t *testing.T) { restartFS(ttl - 100*time.Millisecond + 25*time.Millisecond) checkState(0, 11, 10) + fs.Stop() // Not for start per se but since we have all the test tooling here check that Compact() does right thing as well. fs = newFS() + defer fs.Stop() loadMsgs(100) checkFiltered("orders.*", SimpleState{Msgs: 100, First: 1, Last: 100}) checkFiltered("orders.5", SimpleState{Msgs: 10, First: 5, Last: 95}) diff --git a/server/gateway_test.go b/server/gateway_test.go index dd8a044f9f..8d7ae9e027 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -6484,6 +6484,7 @@ func testTLSGatewaysCertificateImplicitAllow(t *testing.T, pass bool) { if err := cfg.Sync(); err != nil { t.Fatal(err) } + cfg.Close() optsA := LoadConfig(cfg.Name()) optsB := LoadConfig(cfg.Name()) diff --git a/server/jetstream.go b/server/jetstream.go index 88e80029d6..6b13ac7afd 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -24,7 +24,6 @@ import ( "io/ioutil" "math" "os" - "path" "path/filepath" "strconv" "strings" @@ -970,7 +969,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { jsa := &jsAccount{js: js, account: a, limits: *limits, streams: make(map[string]*stream), sendq: sendq} jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer) - jsa.storeDir = path.Join(js.config.StoreDir, a.Name) + jsa.storeDir = filepath.Join(js.config.StoreDir, a.Name) js.accounts[a.Name] = jsa js.mu.Unlock() @@ -998,9 +997,9 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Debugf(" Max Storage: %s", friendlyBytes(limits.MaxStore)) // Clean up any old snapshots that were orphaned while staging. - os.RemoveAll(path.Join(js.config.StoreDir, snapStagingDir)) + os.RemoveAll(filepath.Join(js.config.StoreDir, snapStagingDir)) - sdir := path.Join(jsa.storeDir, streamsDir) + sdir := filepath.Join(jsa.storeDir, streamsDir) if _, err := os.Stat(sdir); os.IsNotExist(err) { if err := os.MkdirAll(sdir, defaultDirPerms); err != nil { return fmt.Errorf("could not create storage streams directory - %v", err) @@ -1016,7 +1015,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // Check templates first since messsage sets will need proper ownership. // FIXME(dlc) - Make this consistent. - tdir := path.Join(jsa.storeDir, tmplsDir) + tdir := filepath.Join(jsa.storeDir, tmplsDir) if stat, err := os.Stat(tdir); err == nil && stat.IsDir() { key := sha256.Sum256([]byte("templates")) hh, err := highwayhash.New64(key[:]) @@ -1025,8 +1024,8 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } fis, _ := ioutil.ReadDir(tdir) for _, fi := range fis { - metafile := path.Join(tdir, fi.Name(), JetStreamMetaFile) - metasum := path.Join(tdir, fi.Name(), JetStreamMetaFileSum) + metafile := filepath.Join(tdir, fi.Name(), JetStreamMetaFile) + metasum := filepath.Join(tdir, fi.Name(), JetStreamMetaFileSum) buf, err := ioutil.ReadFile(metafile) if err != nil { s.Warnf(" Error reading StreamTemplate metafile %q: %v", metasum, err) @@ -1071,14 +1070,14 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // Now recover the streams. fis, _ := ioutil.ReadDir(sdir) for _, fi := range fis { - mdir := path.Join(sdir, fi.Name()) + mdir := filepath.Join(sdir, fi.Name()) key := sha256.Sum256([]byte(fi.Name())) hh, err := highwayhash.New64(key[:]) if err != nil { return err } - metafile := path.Join(mdir, JetStreamMetaFile) - metasum := path.Join(mdir, JetStreamMetaFileSum) + metafile := filepath.Join(mdir, JetStreamMetaFile) + metasum := filepath.Join(mdir, JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing stream metafile for %q", metafile) continue @@ -1105,7 +1104,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } // Check if we are encrypted. - if key, err := ioutil.ReadFile(path.Join(mdir, JetStreamMetaFileKey)); err == nil { + if key, err := ioutil.ReadFile(filepath.Join(mdir, JetStreamMetaFileKey)); err == nil { s.Debugf(" Stream metafile is encrypted, reading encrypted keyfile") if len(key) != metaKeySize { s.Warnf(" Bad stream encryption key length of %d", len(key)) @@ -1170,7 +1169,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Noticef(" Restored %s messages for stream '%s > %s'", comma(int64(state.Msgs)), mset.accName(), mset.name()) // Now do the consumers. - odir := path.Join(sdir, fi.Name(), consumerDir) + odir := filepath.Join(sdir, fi.Name(), consumerDir) consumers = append(consumers, &ce{mset, odir}) } @@ -1180,8 +1179,8 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), e.mset.accName(), e.mset.name()) } for _, ofi := range ofis { - metafile := path.Join(e.odir, ofi.Name(), JetStreamMetaFile) - metasum := path.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) + metafile := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFile) + metasum := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing consumer metafile %q", metafile) continue @@ -1197,7 +1196,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } // Check if we are encrypted. - if key, err := ioutil.ReadFile(path.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { + if key, err := ioutil.ReadFile(filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") // Decode the buffer before proceeding. if buf, err = s.decryptMeta(key, buf, a.Name, e.mset.name()+tsep+ofi.Name()); err != nil { @@ -1238,7 +1237,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } // Make sure to cleanup any old remaining snapshots. - os.RemoveAll(path.Join(jsa.storeDir, snapsDir)) + os.RemoveAll(filepath.Join(jsa.storeDir, snapsDir)) s.Debugf("JetStream state for account %q recovered", a.Name) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 4f06e543cb..31c0de0544 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -21,7 +21,7 @@ import ( "io/ioutil" "math/rand" "os" - "path" + "path/filepath" "sort" "strconv" "strings" @@ -2740,7 +2740,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}} - snapDir := path.Join(js.config.StoreDir, snapStagingDir) + snapDir := filepath.Join(js.config.StoreDir, snapStagingDir) if _, err := os.Stat(snapDir); os.IsNotExist(err) { if err := os.MkdirAll(snapDir, defaultDirPerms); err != nil { resp.Error = &ApiError{Code: 503, Description: "JetStream unable to create temp storage for restore"} diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 0e85c1e863..584c55d3b7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -21,7 +21,7 @@ import ( "fmt" "math" "math/rand" - "path" + "path/filepath" "reflect" "sort" "strings" @@ -501,7 +501,7 @@ func (js *jetStream) setupMetaGroup() error { // Setup our WAL for the metagroup. sysAcc := s.SystemAccount() - storeDir := path.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName) + storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName) fs, err := newFileStore( FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false}, @@ -1407,7 +1407,7 @@ func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error { return errors.New("shutting down") } - storeDir := path.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name) + storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name) var store StreamStore if storage == FileStorage { fs, err := newFileStore( diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index af96996e3b..2f7bce7ee7 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -23,7 +23,7 @@ import ( "io/ioutil" "math/rand" "os" - "path" + "path/filepath" "reflect" "strconv" "strings" @@ -38,7 +38,7 @@ import ( func TestJetStreamClusterConfig(t *testing.T) { conf := createConfFile(t, []byte(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: "%s"} + jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: '%s'} cluster { listen: 127.0.0.1:-1 } `)) defer removeFile(t, conf) @@ -59,7 +59,7 @@ func TestJetStreamClusterConfig(t *testing.T) { conf = createConfFile(t, []byte(` listen: 127.0.0.1:-1 server_name: "TEST" - jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: "%s"} + jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: '%s'} cluster { listen: 127.0.0.1:-1 } `)) defer removeFile(t, conf) @@ -282,7 +282,7 @@ func TestJetStreamClusterMultiReplicaStreamsDefaultFileMem(t *testing.T) { const testConfig = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {store_dir: "%s"} + jetstream: {store_dir: '%s'} cluster { name: %s @@ -5469,7 +5469,7 @@ func TestJetStreamClusterSuperClusterInterestOnlyMode(t *testing.T) { template := ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} accounts { one { jetstream: enabled @@ -7668,7 +7668,7 @@ func TestJetStreamClusterCrossAccountInterop(t *testing.T) { template := ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: HUB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: HUB, store_dir: '%s'} cluster { name: %s @@ -8685,7 +8685,7 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { c.stopAll() // Remove all state by truncating for the non-leader. for _, fn := range []string{"1.blk", "1.idx", "1.fss"} { - fname := path.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn) + fname := filepath.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn) fd, err := os.OpenFile(fname, os.O_RDWR, defaultFilePerms) if err != nil { continue @@ -8694,9 +8694,9 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { fd.Close() } // For both make sure we have no raft snapshots. - snapDir := path.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots") + snapDir := filepath.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots") os.RemoveAll(snapDir) - snapDir = path.Join(config.StoreDir, "$SYS", "_js_", gname, "snapshots") + snapDir = filepath.Join(config.StoreDir, "$SYS", "_js_", gname, "snapshots") os.RemoveAll(snapDir) // Now restart. @@ -8775,7 +8775,7 @@ func TestJetStreamClusterFlowControlRequiresHeartbeats(t *testing.T) { var jsClusterAccountLimitsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} cluster { name: %s @@ -8850,7 +8850,7 @@ func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) storeDir1 := createDir(t, JetStreamStoreDir) conf1 := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain1, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain1, store_dir: '%s'} accounts { A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, SYS:{ users:[ {user:s1,password:s1}]}, @@ -8866,7 +8866,7 @@ func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) storeDir2 := createDir(t, JetStreamStoreDir) conf2 := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain2, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain2, store_dir: '%s'} accounts { A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, SYS:{ users:[ {user:s1,password:s1}]}, @@ -8883,7 +8883,7 @@ func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) storeDir3 := createDir(t, JetStreamStoreDir) conf3 := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain3, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 256MB, domain: domain3, store_dir: '%s'} accounts { A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, SYS:{ users:[ {user:s1,password:s1}]}, @@ -8961,11 +8961,10 @@ func TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain(t *testing.T) func TestJetStreamSeal(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown() @@ -9572,11 +9571,10 @@ func TestJetStreamClusterAccountInfoForSystemAccount(t *testing.T) { func TestJetStreamListFilter(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -9629,11 +9627,10 @@ func TestJetStreamListFilter(t *testing.T) { func TestJetStreamConsumerUpdates(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 5) defer c.shutdown() @@ -10427,11 +10424,10 @@ func TestJetStreamClusterRedeliverBackoffs(t *testing.T) { func TestJetStreamConsumerUpgrade(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown() @@ -10457,11 +10453,10 @@ func TestJetStreamConsumerUpgrade(t *testing.T) { func TestJetStreamAddConsumerWithInfo(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown() @@ -10960,11 +10955,10 @@ func TestJetStreamClusterMirrorOrSourceNotActiveReporting(t *testing.T) { func TestJetStreamStreamAdvisories(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() c := createJetStreamClusterExplicit(t, "JSC", 3) defer c.shutdown() @@ -11139,7 +11133,7 @@ var jsClusterAccountsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { listen: 127.0.0.1:-1 @@ -11164,7 +11158,7 @@ var jsClusterAccountsTempl = ` var jsClusterTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { listen: 127.0.0.1:-1 @@ -11183,7 +11177,7 @@ var jsClusterTempl = ` var jsClusterMaxBytesTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { listen: 127.0.0.1:-1 @@ -11225,7 +11219,7 @@ var jsSuperClusterTempl = ` var jsClusterLimitsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: "%s"} + jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s'} cluster { name: %s @@ -11247,7 +11241,7 @@ var jsClusterLimitsTempl = ` var jsMixedModeGlobalAccountTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: "%s"} + jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s'} cluster { name: %s @@ -11434,7 +11428,7 @@ func (sc *supercluster) waitOnPeerCount(n int) { var jsClusterMirrorSourceImportsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} cluster { name: %s @@ -11470,7 +11464,7 @@ var jsClusterMirrorSourceImportsTempl = ` var jsClusterImportsTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} cluster { name: %s @@ -11672,7 +11666,7 @@ func (c *cluster) createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDoma var jsClusterSingleLeafNodeLikeNGSTempl = ` listen: 127.0.0.1:-1 server_name: LNJS - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { remotes [ { urls: [ %s ] } ] } ` @@ -11680,7 +11674,7 @@ var jsClusterSingleLeafNodeLikeNGSTempl = ` var jsClusterSingleLeafNodeTempl = ` listen: 127.0.0.1:-1 server_name: LNJS - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} leaf { remotes [ { urls: [ %s ], account: "JSY" } @@ -11697,7 +11691,7 @@ var jsClusterSingleLeafNodeTempl = ` var jsClusterTemplWithLeafNode = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} {{leaf}} @@ -11716,7 +11710,7 @@ var jsClusterTemplWithLeafNodeNoJS = ` server_name: %s # Need to keep below since it fills in the store dir by default so just comment out. - # jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + # jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} {{leaf}} @@ -11733,7 +11727,7 @@ var jsClusterTemplWithLeafNodeNoJS = ` var jsClusterTemplWithSingleLeafNode = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} {{leaf}} @@ -11745,7 +11739,7 @@ var jsClusterTemplWithSingleLeafNodeNoJS = ` listen: 127.0.0.1:-1 server_name: %s - # jetstream: {store_dir: "%s"} + # jetstream: {store_dir: '%s'} {{leaf}} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index e715241633..078da74269 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -20,12 +20,12 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "math/rand" "net/http" "net/url" "os" - "path" "path/filepath" "reflect" "runtime" @@ -46,11 +46,10 @@ import ( func TestJetStreamBasicNilConfig(t *testing.T) { s := RunRandClientPortServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if err := s.EnableJetStream(nil); err != nil { t.Fatalf("Expected no error, got %v", err) @@ -124,11 +123,10 @@ func clientConnectWithOldRequest(t *testing.T, s *Server) *nats.Conn { func TestJetStreamEnableAndDisableAccount(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Global in simple setup should be enabled already. if !s.GlobalAccount().JetStreamEnabled() { @@ -366,11 +364,10 @@ func TestJetStreamAutoTuneFSConfig(t *testing.T) { func TestJetStreamConsumerAndStreamDescriptions(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() descr := "foo asset" acc := s.GlobalAccount() @@ -418,11 +415,10 @@ func TestJetStreamConsumerAndStreamDescriptions(t *testing.T) { func TestJetStreamPubAck(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sname := "PUBACK" acc := s.GlobalAccount() @@ -900,11 +896,10 @@ func TestJetStreamAddStreamCanonicalNames(t *testing.T) { func TestJetStreamAddStreamBadSubjects(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc := clientConnectToServer(t, s) @@ -935,11 +930,10 @@ func TestJetStreamAddStreamBadSubjects(t *testing.T) { func TestJetStreamMaxConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -978,11 +972,10 @@ func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) { } s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() mset, err := acc.addStream(mconfig) @@ -1010,11 +1003,10 @@ func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) { func TestJetStreamAddStreamOverlapWithJSAPISubjects(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -1044,11 +1036,10 @@ func TestJetStreamAddStreamSameConfigOK(t *testing.T) { } s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() mset, err := acc.addStream(mconfig) @@ -3112,11 +3103,10 @@ func TestJetStreamWorkQueueTerminateDelivery(t *testing.T) { func TestJetStreamConsumerAckAck(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) @@ -3163,11 +3153,10 @@ func TestJetStreamConsumerAckAck(t *testing.T) { func TestJetStreamAckNext(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "ACKNXT" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: MemoryStorage}) @@ -3259,11 +3248,10 @@ func TestJetStreamAckNext(t *testing.T) { func TestJetStreamPublishDeDupe(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "DeDupe" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage, MaxAge: time.Hour, Subjects: []string{"foo.*"}}) @@ -3423,11 +3411,10 @@ func getPubAckResponse(msg []byte) *JSPubAckResponse { func TestJetStreamPublishExpect(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "EXPECT" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage, MaxAge: time.Hour, Subjects: []string{"foo.*"}}) @@ -3519,11 +3506,10 @@ func TestJetStreamPublishExpect(t *testing.T) { func TestJetStreamPullConsumerRemoveInterest(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MYS-PULL" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: MemoryStorage}) @@ -3606,11 +3592,10 @@ func TestJetStreamPullConsumerRemoveInterest(t *testing.T) { func TestJetStreamConsumerRateLimit(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "RATELIMIT" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage}) @@ -3684,11 +3669,10 @@ func TestJetStreamConsumerRateLimit(t *testing.T) { func TestJetStreamEphemeralConsumerRecoveryAfterServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MYS" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage}) @@ -3787,11 +3771,10 @@ func TestJetStreamEphemeralConsumerRecoveryAfterServerRestart(t *testing.T) { func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MYS" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage}) @@ -3928,11 +3911,10 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) { func TestJetStreamDeleteConsumerAndServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sendSubj := "MYQ" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: sendSubj, Storage: FileStorage}) @@ -3979,11 +3961,10 @@ func TestJetStreamDeleteConsumerAndServerRestart(t *testing.T) { func TestJetStreamRedeliveryAfterServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sendSubj := "MYQ" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: sendSubj, Storage: FileStorage}) @@ -4056,11 +4037,10 @@ func TestJetStreamRedeliveryAfterServerRestart(t *testing.T) { func TestJetStreamSnapshots(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MY-STREAM" subjects := []string{"foo", "bar", "baz"} @@ -4570,11 +4550,10 @@ func TestJetStreamPubAckPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -4599,11 +4578,10 @@ func TestJetStreamPubPerfWithFullStream(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -4638,11 +4616,10 @@ func TestJetStreamSnapshotsAPIPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() cfg := StreamConfig{ Name: "snap-perf", @@ -5559,12 +5536,10 @@ func TestJetStreamRedeliverCount(t *testing.T) { // not get the message back. func TestJetStreamRedeliverAndLateAck(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: "LA", Storage: MemoryStorage}) if err != nil { @@ -5603,12 +5578,10 @@ func TestJetStreamRedeliverAndLateAck(t *testing.T) { // https://github.com/nats-io/nats-server/issues/1502 func TestJetStreamPendingNextTimer(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: "NT", Storage: MemoryStorage, Subjects: []string{"ORDERS.*"}}) if err != nil { @@ -6659,11 +6632,10 @@ func TestJetStreamConsumerReplayQuit(t *testing.T) { func TestJetStreamSystemLimits(t *testing.T) { s := RunRandClientPortServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if _, _, err := s.JetStreamReservedResources(); err == nil { t.Fatalf("Expected error requesting jetstream reserved resources when not enabled") @@ -6831,11 +6803,10 @@ func TestJetStreamSystemLimits(t *testing.T) { func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() gacc := s.GlobalAccount() @@ -6960,11 +6931,10 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { func TestJetStreamStreamFileTrackingAndLimits(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() gacc := s.GlobalAccount() @@ -7093,11 +7063,10 @@ func TestJetStreamSimpleFileRecovery(t *testing.T) { base := runtime.NumGoroutine() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -7210,12 +7179,10 @@ func TestJetStreamSimpleFileRecovery(t *testing.T) { func TestJetStreamPushConsumerFlowControl(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -7315,11 +7282,10 @@ func TestJetStreamPushConsumerFlowControl(t *testing.T) { func TestJetStreamFlowControlRequiresHeartbeats(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -7339,12 +7305,10 @@ func TestJetStreamFlowControlRequiresHeartbeats(t *testing.T) { func TestJetStreamPushConsumerIdleHeartbeats(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -7414,12 +7378,10 @@ func TestJetStreamPushConsumerIdleHeartbeats(t *testing.T) { func TestJetStreamPushConsumerIdleHeartbeatsWithFilterSubject(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -7478,12 +7440,10 @@ func TestJetStreamPushConsumerIdleHeartbeatsWithFilterSubject(t *testing.T) { func TestJetStreamPushConsumerIdleHeartbeatsWithNoInterest(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -7542,12 +7502,10 @@ func TestJetStreamPushConsumerIdleHeartbeatsWithNoInterest(t *testing.T) { func TestJetStreamInfoAPIWithHeaders(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc := clientConnectToServer(t, s) @@ -7574,12 +7532,10 @@ func TestJetStreamInfoAPIWithHeaders(t *testing.T) { func TestJetStreamRequestAPI(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc := clientConnectToServer(t, s) @@ -8095,12 +8051,10 @@ func TestJetStreamRequestAPI(t *testing.T) { func TestJetStreamFilteredStreamNames(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc := clientConnectToServer(t, s) @@ -8562,11 +8516,10 @@ func TestJetStreamLimitLockBug(t *testing.T) { t.Run(c.name, func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil && config.StoreDir != "" { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStream(c.mconfig) if err != nil { @@ -8614,11 +8567,10 @@ func TestJetStreamNextMsgNoInterest(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() cfg := &StreamConfig{Name: "foo", Storage: FileStorage} mset, err := s.GlobalAccount().addStream(cfg) @@ -8782,11 +8734,10 @@ func TestJetStreamMsgHeaders(t *testing.T) { func TestJetStreamTemplateBasics(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -8854,11 +8805,10 @@ func TestJetStreamTemplateBasics(t *testing.T) { func TestJetStreamTemplateFileStoreRecovery(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9054,11 +9004,10 @@ func clientConnectToServerWithUP(t *testing.T, opts *Options, user, pass string) func TestJetStreamCanNotEnableOnSystemAccount(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sa := s.SystemAccount() if err := sa.EnableJetStream(nil); err == nil { @@ -9087,11 +9036,10 @@ func TestJetStreamMultipleAccountsBasics(t *testing.T) { defer removeFile(t, conf) s, opts := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if !s.JetStreamEnabled() { t.Fatalf("Expected JetStream to be enabled") @@ -9334,11 +9282,10 @@ func TestJetStreamStoreDirectoryFix(t *testing.T) { func TestJetStreamPushConsumersPullError(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -9380,11 +9327,10 @@ func TestJetStreamPubPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9446,11 +9392,10 @@ func TestJetStreamPubWithAsyncResponsePerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9486,11 +9431,10 @@ func TestJetStreamPubWithSyncPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -9518,11 +9462,10 @@ func TestJetStreamConsumerPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9580,11 +9523,10 @@ func TestJetStreamConsumerAckFileStorePerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9651,11 +9593,10 @@ func TestJetStreamPubSubPerf(t *testing.T) { t.SkipNow() s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc := s.GlobalAccount() @@ -9924,11 +9865,10 @@ func TestJetStreamStoredMsgsDontDisappearAfterCacheExpiration(t *testing.T) { } s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStreamWithStore(sc, &FileStoreConfig{BlockSize: 128, CacheExpire: 15 * time.Millisecond}) if err != nil { @@ -10496,11 +10436,10 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { opts.Port = -1 opts.JetStream = true s := RunServer(&opts) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mset, err := s.GlobalAccount().addStream(&StreamConfig{ Name: "MY_STREAM", @@ -10631,11 +10570,10 @@ func TestJetStreamAccountImportBasics(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc, err := s.LookupAccount("JS") if err != nil { @@ -10760,11 +10698,10 @@ func TestJetStreamAccountImportAll(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() acc, err := s.LookupAccount("JS") if err != nil { @@ -10830,11 +10767,10 @@ func TestJetStreamServerReload(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() if !s.JetStreamEnabled() { t.Fatalf("Expected JetStream to be enabled") @@ -10906,11 +10842,10 @@ func TestJetStreamConfigReloadWithGlobalAccount(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11267,11 +11202,10 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) { func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11323,11 +11257,10 @@ func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) { func TestJetStreamMirrorAndSourcesFilteredConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11419,11 +11352,10 @@ func TestJetStreamMirrorAndSourcesFilteredConsumers(t *testing.T) { func TestJetStreamMirrorBasics(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11579,11 +11511,10 @@ func TestJetStreamMirrorBasics(t *testing.T) { func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11603,11 +11534,10 @@ func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) { func TestJetStreamSourceBasics(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11761,11 +11691,10 @@ func TestJetStreamSourceBasics(t *testing.T) { func TestJetStreamOperatorAccounts(t *testing.T) { s, _ := RunServerWithConfig("./configs/js-op.conf") - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserCredentials("./configs/one.creds")) defer nc.Close() @@ -11873,12 +11802,11 @@ func TestJetStreamDomainInPubAck(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -11907,11 +11835,10 @@ func TestJetStreamDomainInPubAck(t *testing.T) { // Issue #2213 func TestJetStreamDirectConsumersBeingReported(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -11981,11 +11908,10 @@ func TestJetStreamDirectConsumersBeingReported(t *testing.T) { // https://github.com/nats-io/nats-server/issues/2290 func TestJetStreamTemplatedErrorsBug(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12064,7 +11990,7 @@ func TestJetStreamServerEncryption(t *testing.T) { ci, _ := js.ConsumerInfo("TEST", "dlc") // Quick check to make sure everything not just plaintext still. - sdir := path.Join(config.StoreDir, "$G", "streams", "TEST") + sdir := filepath.Join(config.StoreDir, "$G", "streams", "TEST") // Make sure we can not find any plaintext strings in the target file. checkFor := func(fn string, strs ...string) { t.Helper() @@ -12087,17 +12013,17 @@ func TestJetStreamServerEncryption(t *testing.T) { // Check stream meta. checkEncrypted := func() { - checkKeyFile(path.Join(sdir, JetStreamMetaFileKey)) - checkFor(path.Join(sdir, JetStreamMetaFile), "TEST", "foo", "bar", "baz", "max_msgs", "max_bytes") + checkKeyFile(filepath.Join(sdir, JetStreamMetaFileKey)) + checkFor(filepath.Join(sdir, JetStreamMetaFile), "TEST", "foo", "bar", "baz", "max_msgs", "max_bytes") // Check a message block. - checkKeyFile(path.Join(sdir, "msgs", "1.key")) - checkFor(path.Join(sdir, "msgs", "1.blk"), "ENCRYPTED PAYLOAD!!", "foo", "bar", "baz") + checkKeyFile(filepath.Join(sdir, "msgs", "1.key")) + checkFor(filepath.Join(sdir, "msgs", "1.blk"), "ENCRYPTED PAYLOAD!!", "foo", "bar", "baz") // Check consumer meta and state. - checkKeyFile(path.Join(sdir, "obs", "dlc", JetStreamMetaFileKey)) - checkFor(path.Join(sdir, "obs", "dlc", JetStreamMetaFile), "TEST", "dlc", "foo", "bar", "baz", "max_msgs", "ack_policy") + checkKeyFile(filepath.Join(sdir, "obs", "dlc", JetStreamMetaFileKey)) + checkFor(filepath.Join(sdir, "obs", "dlc", JetStreamMetaFile), "TEST", "dlc", "foo", "bar", "baz", "max_msgs", "ack_policy") // Load and see if we can parse the consumer state. - state, err := ioutil.ReadFile(path.Join(sdir, "obs", "dlc", "o.dat")) + state, err := ioutil.ReadFile(filepath.Join(sdir, "obs", "dlc", "o.dat")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -12204,11 +12130,10 @@ func TestJetStreamServerEncryption(t *testing.T) { // User report of bug. func TestJetStreamConsumerBadNumPending(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12288,11 +12213,10 @@ func TestJetStreamDeliverLastPerSubject(t *testing.T) { for _, st := range []StorageType{FileStorage, MemoryStorage} { t.Run(st.String(), func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12428,11 +12352,10 @@ func TestJetStreamDeliverLastPerSubject(t *testing.T) { func TestJetStreamDeliverLastPerSubjectNumPending(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12477,11 +12400,10 @@ func TestJetStreamDeliverLastPerSubjectNumPending(t *testing.T) { // This I believe is only really possible in clustered mode, but we will force the issue here. func TestJetStreamConsumerCleanupWithRetentionPolicy(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12547,11 +12469,10 @@ func TestJetStreamConsumerCleanupWithRetentionPolicy(t *testing.T) { // Issue #2392 func TestJetStreamPurgeEffectsConsumerDelivery(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12599,11 +12520,10 @@ func TestJetStreamPurgeEffectsConsumerDelivery(t *testing.T) { // Issue #2403 func TestJetStreamExpireCausesDeadlock(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12668,11 +12588,10 @@ func TestJetStreamConsumerPendingBugWithKV(t *testing.T) { t.Run(c.name, func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client based API nc, js := jsClientConnect(t, s) @@ -12716,11 +12635,10 @@ func TestJetStreamConsumerPendingBugWithKV(t *testing.T) { // Issue #2420 func TestJetStreamDefaultMaxMsgsPer(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12743,11 +12661,10 @@ func TestJetStreamDefaultMaxMsgsPer(t *testing.T) { // Issue #2423 func TestJetStreamBadConsumerCreateErr(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -12779,12 +12696,11 @@ func TestJetStreamBadConsumerCreateErr(t *testing.T) { func TestJetStreamConsumerPushBound(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -12887,12 +12803,11 @@ func TestJetStreamConsumerPushBound(t *testing.T) { // Got a report of memory leaking, tracked it to internal clients for consumers. func TestJetStreamConsumerInternalClientLeak(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -12949,12 +12864,11 @@ func TestJetStreamConsumerInternalClientLeak(t *testing.T) { func TestJetStreamConsumerEventingRaceOnShutdown(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.NoReconnect()) defer nc.Close() @@ -12990,12 +12904,11 @@ func TestJetStreamConsumerEventingRaceOnShutdown(t *testing.T) { // and try to send new messages. func TestJetStreamExpireAllWhileServerDown(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13046,12 +12959,11 @@ func TestJetStreamExpireAllWhileServerDown(t *testing.T) { func TestJetStreamLongStreamNamesAndPubAck(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13075,12 +12987,11 @@ func TestJetStreamPerSubjectPending(t *testing.T) { t.Run(st.String(), func(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13133,12 +13044,11 @@ func TestJetStreamPerSubjectPending(t *testing.T) { func TestJetStreamPublishExpectNoMsg(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13182,12 +13092,11 @@ func TestJetStreamPublishExpectNoMsg(t *testing.T) { func TestJetStreamPullLargeBatchExpired(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13229,12 +13138,11 @@ func TestJetStreamPullLargeBatchExpired(t *testing.T) { func TestJetStreamNegativeDupeWindow(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13263,12 +13171,11 @@ func TestJetStreamNegativeDupeWindow(t *testing.T) { // Issue #2551 func TestJetStreamMirroredConsumerFailAfterRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13356,11 +13263,11 @@ func TestJetStreamDisabledLimitsEnforcementJWT(t *testing.T) { defer removeDir(t, storeDir1) conf := createConfFile(t, []byte(fmt.Sprintf(` listen: -1 - jetstream: {store_dir: %s} + jetstream: {store_dir: '%s'} operator: %s resolver: { type: full - dir: %s + dir: '%s' } system_account: %s `, storeDir1, ojwt, dir, sysPub))) @@ -13385,7 +13292,7 @@ func TestJetStreamDisabledLimitsEnforcement(t *testing.T) { storeDir1 := createDir(t, JetStreamStoreDir) conf1 := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} accounts { one { jetstream: { @@ -13417,12 +13324,11 @@ func TestJetStreamDisabledLimitsEnforcement(t *testing.T) { func TestJetStreamConsumerNoMsgPayload(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13464,12 +13370,11 @@ func TestJetStreamConsumerNoMsgPayload(t *testing.T) { // Issue #2607 func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13534,12 +13439,11 @@ func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) { // Issue #2662 func TestJetStreamLargeExpiresAndServerRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13588,12 +13492,11 @@ func TestJetStreamLargeExpiresAndServerRestart(t *testing.T) { func TestJetStreamMessagePerSubjectKeepBug(t *testing.T) { test := func(t *testing.T, keep int64, store nats.StorageType) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13631,12 +13534,11 @@ func TestJetStreamMessagePerSubjectKeepBug(t *testing.T) { func TestJetStreamInvalidDeliverSubject(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13723,7 +13625,7 @@ func TestJetStreamRecoverBadStreamSubjects(t *testing.T) { sd := config.StoreDir s.Shutdown() - f := path.Join(sd, "$G", "streams", "TEST") + f := filepath.Join(sd, "$G", "streams", "TEST") fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{ Name: "TEST", Subjects: []string{"foo", "bar", " baz "}, // baz has spaces @@ -13748,12 +13650,11 @@ func TestJetStreamRecoverBadStreamSubjects(t *testing.T) { func TestJetStreamRecoverBadMirrorConfigWithSubjects(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sd := config.StoreDir // Client for API requests. @@ -13769,7 +13670,7 @@ func TestJetStreamRecoverBadMirrorConfigWithSubjects(t *testing.T) { s.Shutdown() - f := path.Join(sd, "$G", "streams", "M") + f := filepath.Join(sd, "$G", "streams", "M") fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{ Name: "M", Subjects: []string{"foo", "bar", "baz"}, // Mirrors should not have spaces. @@ -13816,11 +13717,10 @@ func TestJetStreamCrossAccountsDeliverSubjectInterest(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserInfo("a", "pwd")) defer nc.Close() @@ -13876,11 +13776,10 @@ func TestJetStreamCrossAccountsDeliverSubjectInterest(t *testing.T) { func TestJetStreamPullConsumerRequestCleanup(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -13918,11 +13817,10 @@ func TestJetStreamPullConsumerRequestCleanup(t *testing.T) { func TestJetStreamPullConsumerRequestMaximums(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, _ := jsClientConnect(t, s) defer nc.Close() @@ -13966,11 +13864,10 @@ func TestJetStreamPullConsumerRequestMaximums(t *testing.T) { func TestJetStreamEphemeralPullConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14058,11 +13955,10 @@ func TestJetStreamPullConsumerCrossAccountExpires(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Connect to JS account and create stream, put some messages into it. nc, js := jsClientConnect(t, s, nats.UserInfo("dlc", "foo")) @@ -14263,11 +14159,10 @@ func TestJetStreamPullConsumerCrossAccountsAndLeafNodes(t *testing.T) { defer removeFile(t, conf) s, o := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() conf2 := createConfFile(t, []byte(fmt.Sprintf(` server_name: SLN @@ -14362,11 +14257,10 @@ func TestJetStreamPullConsumerCrossAccountsAndLeafNodes(t *testing.T) { // 4. Try, which never waits at all ever. func TestJetStreamPullConsumersOneShotBehavior(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -14474,11 +14368,10 @@ func TestJetStreamPullConsumersOneShotBehavior(t *testing.T) { func TestJetStreamPullConsumersMultipleRequestsExpireOutOfOrder(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -14533,11 +14426,10 @@ func TestJetStreamPullConsumersMultipleRequestsExpireOutOfOrder(t *testing.T) { func TestJetStreamConsumerUpdateSurvival(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14577,11 +14469,10 @@ func TestJetStreamConsumerUpdateSurvival(t *testing.T) { func TestJetStreamNakRedeliveryWithNoWait(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14725,11 +14616,10 @@ func TestJetStreamMaxMsgsPerSubjectWithDiscardNew(t *testing.T) { func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14789,11 +14679,10 @@ func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) { func TestJetStreamStreamInfoSubjectsDetailsWithDeleteAndPurge(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14879,11 +14768,10 @@ func TestJetStreamStreamInfoSubjectsDetailsWithDeleteAndPurge(t *testing.T) { func TestJetStreamStreamInfoSubjectsDetailsAfterRestart(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14940,11 +14828,10 @@ func TestJetStreamStreamInfoSubjectsDetailsAfterRestart(t *testing.T) { // Issue #2836 func TestJetStreamInterestRetentionBug(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -14989,11 +14876,10 @@ func TestJetStreamInterestRetentionBug(t *testing.T) { // exceed the outstanding FC we would become stalled. func TestJetStreamFlowControlStall(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -15019,11 +14905,10 @@ func TestJetStreamFlowControlStall(t *testing.T) { func TestJetStreamConsumerPendingCountWithRedeliveries(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -15086,11 +14971,10 @@ func TestJetStreamConsumerPendingCountWithRedeliveries(t *testing.T) { func TestJetStreamPullConsumerHeartBeats(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -15211,11 +15095,10 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) { func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -15232,7 +15115,7 @@ func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) // Now we need a non-clean shutdown. // For this use case that means we do *not* write the fss file. sd := s.JetStreamConfig().StoreDir - fss := path.Join(sd, "$G", "streams", "T", "msgs", "1.fss") + fss := filepath.Join(sd, "$G", "streams", "T", "msgs", "1.fss") // Stop current nc.Close() @@ -15254,6 +15137,66 @@ func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) require_NoError(t, err) } +func TestJetStreamRestoreBadStream(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + var rreq JSApiStreamRestoreRequest + buf, err := os.ReadFile("../test/configs/jetstream/restore_bad_stream/backup.json") + require_NoError(t, err) + err = json.Unmarshal(buf, &rreq) + require_NoError(t, err) + + data, err := os.Open("../test/configs/jetstream/restore_bad_stream/stream.tar.s2") + require_NoError(t, err) + defer data.Close() + + var rresp JSApiStreamRestoreResponse + msg, err := nc.Request(fmt.Sprintf(JSApiStreamRestoreT, rreq.Config.Name), buf, 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + + var chunk [1024]byte + for { + n, err := data.Read(chunk[:]) + if err == io.EOF { + break + } + require_NoError(t, err) + + msg, err = nc.Request(rresp.DeliverSubject, chunk[:n], 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + } + msg, err = nc.Request(rresp.DeliverSubject, nil, 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error == nil || !strings.Contains(rresp.Error.Description, "unexpected") { + t.Fatalf("Expected error about unexpected content, got: %+v", rresp.Error) + } + + dir := filepath.Join(s.JetStreamConfig().StoreDir, globalAccountName) + f1 := filepath.Join(dir, "fail1.txt") + f2 := filepath.Join(dir, "fail2.txt") + for _, f := range []string{f1, f2} { + if _, err := os.Stat(f); err == nil { + t.Fatalf("Found file %s", f) + } + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/jwt_test.go b/server/jwt_test.go index cb81cd3e20..a391a71bd3 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3487,7 +3487,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" limit: 4 } @@ -3513,7 +3513,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" limit: 4 } @@ -3537,7 +3537,7 @@ func TestJWTAccountNATSResolverFetch(t *testing.T) { system_account: %s resolver: { type: cache - dir: %s + dir: '%s' ttl: "%dms" limit: 4 } @@ -3729,7 +3729,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" } gateway: { @@ -3753,7 +3753,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" } gateway: { @@ -3780,7 +3780,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" } gateway: { @@ -3807,7 +3807,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' interval: "200ms" } cluster { @@ -4296,7 +4296,7 @@ func TestJWTJetStreamLimits(t *testing.T) { operator: %s resolver: { type: full - dir: %s + dir: '%s' } system_account: %s `, ojwt, dir, sysPub))) @@ -4342,7 +4342,7 @@ func TestJWTJetStreamLimits(t *testing.T) { operator: %s resolver: { type: full - dir: %s + dir: '%s' } system_account: %s `, port, ojwt, dir, sysPub))) @@ -4359,7 +4359,7 @@ func TestJWTJetStreamLimits(t *testing.T) { operator: %s resolver: { type: full - dir: %s + dir: '%s' } system_account: %s `, port, ojwt, dir, sysPub))) @@ -4434,7 +4434,7 @@ func TestJWTUserRevocation(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, conf) @@ -4547,7 +4547,7 @@ func TestJWTActivationRevocation(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, conf) @@ -4660,7 +4660,7 @@ func TestJWTAccountFetchTimeout(t *testing.T) { resolver: { %s timeout: "100ms" - dir: %s + dir: '%s' } `, ojwt, syspub, cfg, dirSrv))) defer removeFile(t, conf) @@ -4741,7 +4741,7 @@ func TestJWTAccountOps(t *testing.T) { system_account: %s resolver: { %s - dir: %s + dir: '%s' } `, opJwt, syspub, cfg, dirSrv))) disconnectErrChan := make(chan struct{}, 1) @@ -4876,7 +4876,7 @@ func TestJWTHeader(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, conf) @@ -5180,7 +5180,7 @@ func TestJWTAccountTokenImportMisuse(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) @@ -5429,7 +5429,7 @@ func TestJWScopedSigningKeys(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, cf) @@ -5614,7 +5614,7 @@ func TestJWTStrictSigningKeys(t *testing.T) { operator = %s resolver: { type: full - dir: %s + dir: '%s' } resolver_preload = { %s : "%s" @@ -5786,7 +5786,7 @@ func TestJWTAccountProtectedImport(t *testing.T) { system_account = %s resolver: { type: full - dir: %s + dir: '%s' }`, ojwt, sysPub, dirSrv))) defer removeFile(t, cf) s, _ := RunServerWithConfig(cf) @@ -5864,7 +5864,7 @@ func TestJWTMappings(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, syspub, dirSrv))) defer removeFile(t, conf) @@ -5976,7 +5976,7 @@ func TestJWTNoSystemAccountButNatsResolver(t *testing.T) { operator: %s resolver: { type: %s - dir: %s + dir: '%s' }`, ojwt, resType, dirSrv))) defer removeFile(t, conf) opts := LoadConfig(conf) @@ -6018,7 +6018,7 @@ func TestJWTAccountConnzAccessAfterClaimUpdate(t *testing.T) { system_account: %s resolver: { type: full - dir: %s + dir: '%s' } `, ojwt, spub, dirSrv))) defer removeFile(t, conf) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index c1fcbe154e..ec508faa02 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2535,7 +2535,7 @@ func TestLeafNodeOperatorBadCfg(t *testing.T) { system_account: %s resolver: { type: cache - dir: %s + dir: '%s' } leafnodes: { %s @@ -3835,7 +3835,7 @@ func TestLeafNodeUniqueServerNameCrossJSDomain(t *testing.T) { jetstream { max_mem_store: 256MB, max_file_store: 2GB, - store_dir: "%s", + store_dir: '%s', domain: hub } accounts { @@ -3852,7 +3852,7 @@ func TestLeafNodeUniqueServerNameCrossJSDomain(t *testing.T) { jetstream { max_mem_store: 256MB, max_file_store: 2GB, - store_dir: "%s", + store_dir: '%s', domain: %s } accounts { @@ -3944,7 +3944,7 @@ leafnodes: { } jetstream :{ domain: "cluster" - store_dir: "%s" + store_dir: '%s' max_mem: 100Mb max_file: 100Mb } @@ -3959,13 +3959,13 @@ accounts :{ system_account = SYS jetstream: { domain: ln1 - store_dir: %s + store_dir: '%s' max_mem: 50Mb max_file: 50Mb } leafnodes:{ - remotes:[{ url:nats://127.0.0.1:%d, account: A, credentials: %s}, - { url:nats://127.0.0.1:%d, account: SYS, credentials: %s}] + remotes:[{ url:nats://127.0.0.1:%d, account: A, credentials: '%s'}, + { url:nats://127.0.0.1:%d, account: SYS, credentials: '%s'}] } ` @@ -4139,7 +4139,7 @@ leafnodes: { } jetstream :{ domain: "cluster" - store_dir: "%s" + store_dir: '%s' max_mem: 100Mb max_file: 100Mb } @@ -4168,7 +4168,7 @@ leafnodes: { } jetstream: { domain: "cluster" - store_dir: "%s" + store_dir: '%s' max_mem: 100Mb max_file: 100Mb } @@ -4190,7 +4190,7 @@ accounts :{ system_account = SYS jetstream: { domain: "cluster" - store_dir: %s + store_dir: '%s' max_mem: 50Mb max_file: 50Mb %s @@ -4218,7 +4218,7 @@ accounts :{ system_account = SYS jetstream: { domain: "cluster" - store_dir: %s + store_dir: '%s' max_mem: 50Mb max_file: 50Mb %s @@ -4420,7 +4420,7 @@ leafnodes: { timeout: 0.5 } } -jetstream: { %s store_dir: %s; max_mem: 50Mb, max_file: 50Mb } +jetstream: { %s store_dir: '%s'; max_mem: 50Mb, max_file: 50Mb } server_name: A cluster: { name: clust1 @@ -4444,7 +4444,7 @@ leafnodes: { timeout: 0.5 } } -jetstream: { %s store_dir: %s; max_mem: 50Mb, max_file: 50Mb } +jetstream: { %s store_dir: '%s'; max_mem: 50Mb, max_file: 50Mb } server_name: B cluster: { name: clust1 @@ -4489,7 +4489,7 @@ accounts :{ } system_account = SYS # the extension hint is to simplify this test. without it present we would need a cluster of size 2 -jetstream: { %s store_dir: %s; max_mem: 50Mb, max_file: 50Mb, extension_hint: will_extend } +jetstream: { %s store_dir: '%s'; max_mem: 50Mb, max_file: 50Mb, extension_hint: will_extend } server_name: LA leafnodes:{ no_advertise: true @@ -4599,13 +4599,13 @@ accounts :{ system_account = SYS jetstream: { domain: "cluster" - store_dir: %s + store_dir: '%s' max_mem: 50Mb max_file: 50Mb } leafnodes:{ - remotes:[{url:nats://a1:a1@127.0.0.1:50555, account: A, credentials: %s }, - {url:nats://s1:s1@127.0.0.1:50555, account: SYS, credentials: %s, deny_imports: foo, deny_exports: bar}] + remotes:[{url:nats://a1:a1@127.0.0.1:50555, account: A, credentials: '%s' }, + {url:nats://s1:s1@127.0.0.1:50555, account: SYS, credentials: '%s', deny_imports: foo, deny_exports: bar}] } ` akp, err := nkeys.CreateAccount() @@ -4681,7 +4681,7 @@ accounts :{ SYS:{ users:[ {user:s1,password:s1}]}, } system_account: SYS -jetstream: { domain: "%s", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream: { domain: "%s", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: LEAF leafnodes: { remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},%s] @@ -4751,7 +4751,7 @@ leafnodes: { // Enable jetstream in hub. sdHub := createDir(t, JetStreamStoreDir) defer os.RemoveAll(sdHub) - jsEnabled := fmt.Sprintf(`{ domain: "%s", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb }`, domain, sdHub) + jsEnabled := fmt.Sprintf(`{ domain: "%s", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb }`, domain, sdHub) require_NoError(t, ioutil.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub, sHubUpd1.opts.Port, "disabled", @@ -4833,11 +4833,11 @@ accounts :{ SYS:{ users:[ {user:s1,password:s1}]}, } system_account: SYS -jetstream: { domain: "%s", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream: { domain: "%s", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: LEAF leafnodes: { - remotes:[{url:nats://127.0.0.1:%d, account: A, credentials: %s}, - {url:nats://127.0.0.1:%d, account: SYS, credentials: %s}] + remotes:[{url:nats://127.0.0.1:%d, account: A, credentials: '%s'}, + {url:nats://127.0.0.1:%d, account: SYS, credentials: '%s'}] } %s ` @@ -4922,7 +4922,7 @@ accounts :{ A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, B:{ jetstream: enabled, users:[ {user:b1,password:b1}]} } -jetstream : { domain: "DHUB", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream : { domain: "DHUB", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: HUB1 cluster: { name: HUB @@ -4940,7 +4940,7 @@ accounts :{ A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, B:{ jetstream: enabled, users:[ {user:b1,password:b1}]} } -jetstream : { domain: "DHUB", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream : { domain: "DHUB", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: HUB2 cluster: { name: HUB @@ -4958,7 +4958,7 @@ accounts :{ A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, B:{ jetstream: disabled, users:[ {user:b1,password:b1}]} } -jetstream: { domain: "DLEAF", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream: { domain: "DLEAF", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: LEAF1 cluster: { name: LEAF @@ -4977,7 +4977,7 @@ accounts :{ A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, B:{ jetstream: disabled, users:[ {user:b1,password:b1}]} } -jetstream: { domain: "DLEAF", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +jetstream: { domain: "DLEAF", store_dir: '%s', max_mem: 100Mb, max_file: 100Mb } server_name: LEAF2 cluster: { name: LEAF diff --git a/server/log_test.go b/server/log_test.go index 23e2033806..d41fba4b52 100644 --- a/server/log_test.go +++ b/server/log_test.go @@ -142,14 +142,14 @@ func TestReOpenLogFile(t *testing.T) { } func TestFileLoggerSizeLimitAndReopen(t *testing.T) { - s := &Server{opts: &Options{}} - defer s.SetLogger(nil, false, false) - tmpDir := createDir(t, "nats-server") defer removeDir(t, tmpDir) file := createFileAtDir(t, tmpDir, "log_") file.Close() + s := &Server{opts: &Options{}} + defer s.SetLogger(nil, false, false) + // Set a File log s.opts.LogFile = file.Name() s.opts.Logtime = true diff --git a/server/monitor_test.go b/server/monitor_test.go index 77976a5816..041638c1c2 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4022,7 +4022,7 @@ func TestMonitorJsz(t *testing.T) { jetstream: { max_mem_store: 10Mb max_file_store: 10Mb - store_dir: %s + store_dir: '%s' } cluster { name: cluster_name diff --git a/server/mqtt_test.go b/server/mqtt_test.go index e6e4abdcf0..7cabba2b63 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -40,7 +40,7 @@ var testMQTTTimeout = 4 * time.Second var jsClusterTemplWithLeafAndMQTT = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} {{leaf}} diff --git a/server/norace_test.go b/server/norace_test.go index 117a9ee0ea..df78606d72 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -29,7 +29,6 @@ import ( "net" "net/http" "net/url" - "path" "path/filepath" "runtime" "runtime/debug" @@ -1092,11 +1091,10 @@ func TestNoRaceAcceptLoopsDoNotLeaveOpenedConn(t *testing.T) { func TestNoRaceJetStreamDeleteStreamManyConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() mname := "MYS" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: FileStorage}) @@ -1125,11 +1123,10 @@ func TestNoRaceJetStreamDeleteStreamManyConsumers(t *testing.T) { // This test is to show that issue and that the fix works, meaning we no longer swap c.acc. func TestNoRaceJetStreamServiceImportAccountSwapIssue(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client based API nc, js := jsClientConnect(t, s) @@ -1203,12 +1200,10 @@ func TestNoRaceJetStreamServiceImportAccountSwapIssue(t *testing.T) { func TestNoRaceJetStreamAPIStreamListPaging(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Create 2X limit streamsNum := 2 * JSApiNamesLimit @@ -1272,12 +1267,10 @@ func TestNoRaceJetStreamAPIStreamListPaging(t *testing.T) { func TestNoRaceJetStreamAPIConsumerListPaging(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - - // Forced cleanup of all persisted state. if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() sname := "MYSTREAM" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: sname}) @@ -1867,11 +1860,10 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { } s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -2198,11 +2190,10 @@ func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) { func TestNoRaceJetStreamSlowFilteredInititalPendingAndFirstMsg(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Create directly here to force multiple blocks, etc. a, err := s.LookupAccount("$G") @@ -2352,11 +2343,10 @@ func TestNoRaceJetStreamFileStoreBufferReuse(t *testing.T) { skip(t) s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() cfg := &StreamConfig{Name: "TEST", Subjects: []string{"foo", "bar", "baz"}, Storage: FileStorage} if _, err := s.GlobalAccount().addStreamWithStore(cfg, nil); err != nil { @@ -2433,11 +2423,10 @@ func TestNoRaceJetStreamSlowRestartWithManyExpiredMsgs(t *testing.T) { opts.Port = -1 opts.JetStream = true s := RunServer(&opts) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -2697,12 +2686,11 @@ func TestNoRaceAccountConnz(t *testing.T) { func TestNoRaceCompressedConnz(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, _ := jsClientConnect(t, s) defer nc.Close() @@ -2930,12 +2918,11 @@ func TestNoRaceJetStreamClusterExtendedStreamPurge(t *testing.T) { func TestNoRaceJetStreamFileStoreCompaction(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -3057,12 +3044,11 @@ func TestNoRaceJetStreamOrderedConsumerMissingMsg(t *testing.T) { skip(t) s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -3935,12 +3921,11 @@ func TestNoRaceJetStreamClusterStreamDropCLFS(t *testing.T) { func TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - config := s.JetStreamConfig() if config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() // Client for API requests. nc, js := jsClientConnect(t, s) @@ -4133,11 +4118,10 @@ func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) { defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserInfo("me", "pwd")) defer nc.Close() @@ -4203,11 +4187,10 @@ func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) { func TestNoRaceJetStreamSparseConsumers(t *testing.T) { s := RunBasicJetStreamServer() - defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } + defer s.Shutdown() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -4319,7 +4302,7 @@ func TestNoRaceFileStoreSubjectInfoWithSnapshotCleanup(t *testing.T) { } // We will have cleanup the main .blk and .idx sans the lmb, but we should not have any *.fss files. - fms, err := filepath.Glob(path.Join(storeDir, msgDir, fssScanAll)) + fms, err := filepath.Glob(filepath.Join(storeDir, msgDir, fssScanAll)) require_NoError(t, err) if len(fms) > 0 { @@ -4366,7 +4349,7 @@ func TestNoRaceFileStoreKeyFileCleanup(t *testing.T) { } // We will have cleanup the main .blk and .idx sans the lmb, but we should not have any *.fss files. - kms, err := filepath.Glob(path.Join(storeDir, msgDir, keyScanAll)) + kms, err := filepath.Glob(filepath.Join(storeDir, msgDir, keyScanAll)) require_NoError(t, err) if len(kms) > 1 { diff --git a/server/raft.go b/server/raft.go index df72d0daaf..19bf653312 100644 --- a/server/raft.go +++ b/server/raft.go @@ -25,7 +25,6 @@ import ( "math/rand" "net" "os" - "path" "path/filepath" "sync" "sync/atomic" @@ -409,13 +408,13 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { n.vote = vote } - if err := os.MkdirAll(path.Join(n.sd, snapshotsDir), 0750); err != nil { + if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), 0750); err != nil { return nil, fmt.Errorf("could not create snapshots directory - %v", err) } // Can't recover snapshots if memory based. if _, ok := n.wal.(*memStore); ok { - os.Remove(path.Join(n.sd, snapshotsDir, "*")) + os.Remove(filepath.Join(n.sd, snapshotsDir, "*")) } else { // See if we have any snapshots and if so load and process on startup. n.setupLastSnapshot() @@ -913,9 +912,9 @@ func (n *raft) InstallSnapshot(data []byte) error { data: data, } - snapDir := path.Join(n.sd, snapshotsDir) + snapDir := filepath.Join(n.sd, snapshotsDir) sn := fmt.Sprintf(snapFileT, snap.lastTerm, snap.lastIndex) - sfile := path.Join(snapDir, sn) + sfile := filepath.Join(snapDir, sn) // Remember our latest snapshot file. n.snapfile = sfile @@ -938,7 +937,7 @@ func (n *raft) InstallSnapshot(data []byte) error { for _, fi := range psnaps { pn := fi.Name() if pn != sn { - os.Remove(path.Join(snapDir, pn)) + os.Remove(filepath.Join(snapDir, pn)) } } @@ -968,7 +967,7 @@ func termAndIndexFromSnapFile(sn string) (term, index uint64, err error) { } func (n *raft) setupLastSnapshot() { - snapDir := path.Join(n.sd, snapshotsDir) + snapDir := filepath.Join(n.sd, snapshotsDir) psnaps, err := ioutil.ReadDir(snapDir) if err != nil { return @@ -977,7 +976,7 @@ func (n *raft) setupLastSnapshot() { var lterm, lindex uint64 var latest string for _, sf := range psnaps { - sfile := path.Join(snapDir, sf.Name()) + sfile := filepath.Join(snapDir, sf.Name()) var term, index uint64 term, index, err := termAndIndexFromSnapFile(sf.Name()) if err == nil { @@ -998,7 +997,7 @@ func (n *raft) setupLastSnapshot() { // Now cleanup any old entries for _, sf := range psnaps { - sfile := path.Join(snapDir, sf.Name()) + sfile := filepath.Join(snapDir, sf.Name()) if sfile != latest { n.debug("Removing old snapshot: %q", sfile) os.Remove(sfile) @@ -1334,9 +1333,9 @@ func (n *raft) shutdown(shouldDelete bool) { // Delete our peer state and vote state and any snapshots. if shouldDelete { - os.Remove(path.Join(n.sd, peerStateFile)) - os.Remove(path.Join(n.sd, termVoteFile)) - os.RemoveAll(path.Join(n.sd, snapshotsDir)) + os.Remove(filepath.Join(n.sd, peerStateFile)) + os.Remove(filepath.Join(n.sd, termVoteFile)) + os.RemoveAll(filepath.Join(n.sd, snapshotsDir)) } n.Unlock() @@ -3027,7 +3026,7 @@ func (n *raft) writePeerState(ps *peerState) { // Writes out our peer state outside of a specific raft context. func writePeerState(sd string, ps *peerState) error { - psf := path.Join(sd, peerStateFile) + psf := filepath.Join(sd, peerStateFile) if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) { return err } @@ -3038,7 +3037,7 @@ func writePeerState(sd string, ps *peerState) error { } func readPeerState(sd string) (ps *peerState, err error) { - buf, err := ioutil.ReadFile(path.Join(sd, peerStateFile)) + buf, err := ioutil.ReadFile(filepath.Join(sd, peerStateFile)) if err != nil { return nil, err } @@ -3051,7 +3050,7 @@ const termVoteLen = idLen + 8 // readTermVote will read the largest term and who we voted from to stable storage. // Lock should be held. func (n *raft) readTermVote() (term uint64, voted string, err error) { - buf, err := ioutil.ReadFile(path.Join(n.sd, termVoteFile)) + buf, err := ioutil.ReadFile(filepath.Join(n.sd, termVoteFile)) if err != nil { return 0, noVote, err } @@ -3100,8 +3099,8 @@ func (n *raft) fileWriter() { defer s.grWG.Done() n.RLock() - tvf := path.Join(n.sd, termVoteFile) - psf := path.Join(n.sd, peerStateFile) + tvf := filepath.Join(n.sd, termVoteFile) + psf := filepath.Join(n.sd, peerStateFile) n.RUnlock() for s.isRunning() { diff --git a/server/reload_test.go b/server/reload_test.go index ee8eea2b91..7be0b6d238 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -4161,6 +4161,10 @@ func TestLoggingReload(t *testing.T) { conf := createConfFile(t, []byte(commonCfg)) defer removeFile(t, conf) + defer removeFile(t, "off-pre.log") + defer removeFile(t, "on.log") + defer removeFile(t, "off-post.log") + s, opts := RunServerWithConfig(conf) defer s.Shutdown() @@ -4204,12 +4208,10 @@ func TestLoggingReload(t *testing.T) { nc.Close() } - defer removeFile(t, "off-pre.log") reload("log_file: off-pre.log") traffic(10) // generate NO trace/debug entries in off-pre.log - defer removeFile(t, "on.log") reload(` log_file: on.log debug: true @@ -4218,7 +4220,6 @@ func TestLoggingReload(t *testing.T) { traffic(10) // generate trace/debug entries in on.log - defer removeFile(t, "off-post.log") reload(` log_file: off-post.log debug: false diff --git a/server/routes_test.go b/server/routes_test.go index 5a37ea3378..13910f2743 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1465,6 +1465,7 @@ func testTLSRoutesCertificateImplicitAllow(t *testing.T, pass bool) { if err := cfg.Sync(); err != nil { t.Fatal(err) } + cfg.Close() optsA := LoadConfig(cfg.Name()) optsB := LoadConfig(cfg.Name()) diff --git a/server/server_test.go b/server/server_test.go index f1dfcd6979..9d6a19f0df 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1946,7 +1946,7 @@ func TestServerLogsConfigurationFile(t *testing.T) { conf := createConfFile(t, []byte(fmt.Sprintf(` port: -1 - logfile: "%s" + logfile: '%s' `, file.Name()))) defer removeFile(t, conf) diff --git a/server/stream.go b/server/stream.go index c5cd88a9d0..20c25f02ac 100644 --- a/server/stream.go +++ b/server/stream.go @@ -23,7 +23,6 @@ import ( "io/ioutil" "math" "os" - "path" "path/filepath" "reflect" "strconv" @@ -389,7 +388,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } jsa.streams[cfg.Name] = mset - storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name) + storeDir := filepath.Join(jsa.storeDir, streamsDir, cfg.Name) jsa.mu.Unlock() // Bind to the user account. @@ -3603,7 +3602,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error return nil, err } - sd := path.Join(jsa.storeDir, snapsDir) + sd := filepath.Join(jsa.storeDir, snapsDir) if _, err := os.Stat(sd); os.IsNotExist(err) { if err := os.MkdirAll(sd, defaultDirPerms); err != nil { return nil, fmt.Errorf("could not create snapshots directory - %v", err) @@ -3620,6 +3619,17 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error } defer os.RemoveAll(sdir) + logAndReturnError := func() error { + a.mu.RLock() + err := fmt.Errorf("unexpected content (account=%s)", a.Name) + if a.srv != nil { + a.srv.Errorf("Stream restore failed due to %v", err) + } + a.mu.RUnlock() + return err + } + sdirCheck := filepath.Clean(sdir) + string(os.PathSeparator) + tr := tar.NewReader(s2.NewReader(r)) for { hdr, err := tr.Next() @@ -3629,7 +3639,13 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error if err != nil { return nil, err } - fpath := path.Join(sdir, filepath.Clean(hdr.Name)) + if hdr.Typeflag != tar.TypeReg && hdr.Typeflag != tar.TypeRegA { + return nil, logAndReturnError() + } + fpath := filepath.Join(sdir, filepath.Clean(hdr.Name)) + if !strings.HasPrefix(fpath, sdirCheck) { + return nil, logAndReturnError() + } os.MkdirAll(filepath.Dir(fpath), defaultDirPerms) fd, err := os.OpenFile(fpath, os.O_CREATE|os.O_RDWR, 0600) if err != nil { @@ -3645,7 +3661,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error // Check metadata. // The cfg passed in will be the new identity for the stream. var fcfg FileStreamInfo - b, err := ioutil.ReadFile(path.Join(sdir, JetStreamMetaFile)) + b, err := ioutil.ReadFile(filepath.Join(sdir, JetStreamMetaFile)) if err != nil { return nil, err } @@ -3663,13 +3679,13 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error return nil, NewJSStreamNameExistError() } // Move into the correct place here. - ndir := path.Join(jsa.storeDir, streamsDir, cfg.Name) + ndir := filepath.Join(jsa.storeDir, streamsDir, cfg.Name) // Remove old one if for some reason it is still here. if _, err := os.Stat(ndir); err == nil { os.RemoveAll(ndir) } // Make sure our destination streams directory exists. - if err := os.MkdirAll(path.Join(jsa.storeDir, streamsDir), defaultDirPerms); err != nil { + if err := os.MkdirAll(filepath.Join(jsa.storeDir, streamsDir), defaultDirPerms); err != nil { return nil, err } // Move into new location. @@ -3690,11 +3706,11 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error } // Now do consumers. - odir := path.Join(ndir, consumerDir) + odir := filepath.Join(ndir, consumerDir) ofis, _ := ioutil.ReadDir(odir) for _, ofi := range ofis { - metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile) - metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum) + metafile := filepath.Join(odir, ofi.Name(), JetStreamMetaFile) + metasum := filepath.Join(odir, ofi.Name(), JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { mset.stop(true, false) return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err) diff --git a/test/configs/jetstream/restore_bad_stream/backup.json b/test/configs/jetstream/restore_bad_stream/backup.json new file mode 100644 index 0000000000..86b7391f9d --- /dev/null +++ b/test/configs/jetstream/restore_bad_stream/backup.json @@ -0,0 +1,33 @@ +{ + "config": { + "name": "TEST", + "subjects": [ + "foo" + ], + "retention": "limits", + "max_consumers": -1, + "max_msgs_per_subject": -1, + "max_msgs": -1, + "max_bytes": -1, + "max_age": 0, + "max_msg_size": -1, + "storage": "file", + "discard": "old", + "num_replicas": 1, + "duplicate_window": 120000000000, + "sealed": false, + "deny_delete": false, + "deny_purge": false, + "allow_rollup_hdrs": false + }, + "state": { + "messages": 10, + "bytes": 381, + "first_seq": 1, + "first_ts": "2022-03-07T23:59:01.710801Z", + "last_seq": 10, + "last_ts": "2022-03-07T23:59:01.712378Z", + "num_subjects": 1, + "consumer_count": 1 + } +} \ No newline at end of file diff --git a/test/configs/jetstream/restore_bad_stream/stream.tar.s2 b/test/configs/jetstream/restore_bad_stream/stream.tar.s2 new file mode 100755 index 0000000000..f83c8d76d7 Binary files /dev/null and b/test/configs/jetstream/restore_bad_stream/stream.tar.s2 differ diff --git a/test/ocsp_test.go b/test/ocsp_test.go index e9fe35fd95..694dd305b5 100644 --- a/test/ocsp_test.go +++ b/test/ocsp_test.go @@ -690,7 +690,7 @@ func TestOCSPReloadRotateTLSCertDisableMustStaple(t *testing.T) { originalContent := ` port: -1 - store_dir: "%s" + store_dir: '%s' tls { cert_file: "configs/certs/ocsp/server-status-request-url-01-cert.pem" @@ -769,7 +769,7 @@ func TestOCSPReloadRotateTLSCertDisableMustStaple(t *testing.T) { updatedContent := ` port: -1 - store_dir: "%s" + store_dir: '%s' tls { cert_file: "configs/certs/ocsp/server-cert.pem" @@ -1010,7 +1010,7 @@ func TestOCSPCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: AB host: "127.0.0.1" @@ -1044,7 +1044,7 @@ func TestOCSPCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: AB host: "127.0.0.1" @@ -1111,7 +1111,7 @@ func TestOCSPCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: AB host: "127.0.0.1" @@ -1206,7 +1206,7 @@ func TestOCSPCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { port: -1 name: AB @@ -1284,7 +1284,7 @@ func TestOCSPLeaf(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' leafnodes { host: "127.0.0.1" port: -1 @@ -1317,7 +1317,7 @@ func TestOCSPLeaf(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' leafnodes { remotes: [ { url: "tls://127.0.0.1:%d" @@ -1378,7 +1378,7 @@ func TestOCSPLeaf(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' leafnodes { remotes: [ { url: "tls://127.0.0.1:%d" @@ -1468,7 +1468,7 @@ func TestOCSPLeaf(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' leafnodes { host: "127.0.0.1" port: -1 @@ -1555,7 +1555,7 @@ func TestOCSPGateway(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: A host: "127.0.0.1" @@ -1589,7 +1589,7 @@ func TestOCSPGateway(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: B host: "127.0.0.1" @@ -1662,7 +1662,7 @@ func TestOCSPGateway(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: C host: "127.0.0.1" @@ -1758,7 +1758,7 @@ func TestOCSPGateway(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: A host: "127.0.0.1" @@ -2623,7 +2623,7 @@ func TestOCSPSuperCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: A @@ -2675,7 +2675,7 @@ func TestOCSPSuperCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' cluster { name: A @@ -2748,7 +2748,7 @@ func TestOCSPSuperCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: C host: "127.0.0.1" @@ -2798,7 +2798,7 @@ func TestOCSPSuperCluster(t *testing.T) { ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } - store_dir: "%s" + store_dir: '%s' gateway { name: D host: "127.0.0.1" diff --git a/test/service_latency_test.go b/test/service_latency_test.go index abb1b6710e..c2dbc6d350 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -19,7 +19,7 @@ import ( "io/ioutil" "math/rand" "net/http" - "path" + "path/filepath" "strings" "sync" "sync/atomic" @@ -1814,7 +1814,7 @@ func TestServiceLatencyMissingResults(t *testing.T) { server_name: s1 cluster { port: -1 } include %q - `, path.Base(accConf)))) + `, filepath.Base(accConf)))) defer removeFile(t, s1Conf) s1, opts1 := RunServerWithConfig(s1Conf) @@ -1828,7 +1828,7 @@ func TestServiceLatencyMissingResults(t *testing.T) { routes = [ nats-route://127.0.0.1:%d ] } include %q - `, opts1.Cluster.Port, path.Base(accConf)))) + `, opts1.Cluster.Port, filepath.Base(accConf)))) defer removeFile(t, s2Conf) s2, opts2 := RunServerWithConfig(s2Conf)