Skip to content

Commit

Permalink
Merge pull request #2917 from nats-io/file_path
Browse files Browse the repository at this point in the history
Ensure file path is correct during stream restore
  • Loading branch information
kozlovic authored Mar 9, 2022
2 parents 0cb0f6d + b412869 commit 818c2c7
Show file tree
Hide file tree
Showing 23 changed files with 467 additions and 496 deletions.
87 changes: 43 additions & 44 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"io/ioutil"
"net"
"os"
"path"
"path/filepath"
"runtime"
"sort"
Expand Down Expand Up @@ -296,8 +295,8 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
fs.fip = !fcfg.AsyncFlush

// Check if this is a new setup.
mdir := path.Join(fcfg.StoreDir, msgDir)
odir := path.Join(fcfg.StoreDir, consumerDir)
mdir := filepath.Join(fcfg.StoreDir, msgDir)
odir := filepath.Join(fcfg.StoreDir, consumerDir)
if err := os.MkdirAll(mdir, defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create message storage directory - %v", err)
}
Expand All @@ -321,7 +320,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}

// Write our meta data iff does not exist.
meta := path.Join(fcfg.StoreDir, JetStreamMetaFile)
meta := filepath.Join(fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
if err := fs.writeStreamMeta(); err != nil {
return nil, err
Expand All @@ -331,7 +330,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// If we expect to be encrypted check that what we are restoring is not plaintext.
// This can happen on snapshot restores or conversions.
if fs.prf != nil {
keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) {
if err := fs.writeStreamMeta(); err != nil {
return nil, err
Expand Down Expand Up @@ -454,7 +453,7 @@ func (fs *fileStore) writeStreamMeta() error {
return err
}
fs.aek = key
keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -463,7 +462,7 @@ func (fs *fileStore) writeStreamMeta() error {
}
}

meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
meta := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -484,7 +483,7 @@ func (fs *fileStore) writeStreamMeta() error {
fs.hh.Reset()
fs.hh.Write(b)
checksum := hex.EncodeToString(fs.hh.Sum(nil))
sum := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
}
Expand All @@ -503,10 +502,10 @@ const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize
func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, error) {
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire}

mdir := path.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = path.Join(mdir, fi.Name())
mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, index))
mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, index))
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fi.Name())
mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, index))
mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, index))

if mb.hh == nil {
key := sha256.Sum256(fs.hashKeyForBlock(index))
Expand All @@ -517,7 +516,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e

// Check if encryption is enabled.
if fs.prf != nil {
ekey, err := ioutil.ReadFile(path.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
ekey, err := ioutil.ReadFile(filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
if err != nil {
// We do not seem to have keys even though we should. Could be a plaintext conversion.
// Create the keys and we will double check below.
Expand Down Expand Up @@ -863,12 +862,12 @@ func (fs *fileStore) recoverMsgs() error {
defer fs.mu.Unlock()

// Check for any left over purged messages.
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
if _, err := os.Stat(pdir); err == nil {
os.RemoveAll(pdir)
}

mdir := path.Join(fs.fcfg.StoreDir, msgDir)
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
fis, err := ioutil.ReadDir(mdir)
if err != nil {
return errNotReadable
Expand Down Expand Up @@ -916,21 +915,21 @@ func (fs *fileStore) recoverMsgs() error {

// We had a bug that would leave fss files around during a snapshot.
// Clean them up here if we see them.
if fms, err := filepath.Glob(path.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 {
if fms, err := filepath.Glob(filepath.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 {
for _, fn := range fms {
os.Remove(fn)
}
}
// Same bug for keyfiles but for these we just need to identify orphans.
if kms, err := filepath.Glob(path.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 {
if kms, err := filepath.Glob(filepath.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 {
valid := make(map[uint64]bool)
for _, mb := range fs.blks {
valid[mb.index] = true
}
for _, fn := range kms {
var index uint64
shouldRemove := true
if n, err := fmt.Sscanf(path.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] {
if n, err := fmt.Sscanf(filepath.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] {
shouldRemove = false
}
if shouldRemove {
Expand Down Expand Up @@ -1516,16 +1515,16 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
}
mb.hh = hh

mdir := path.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = path.Join(mdir, fmt.Sprintf(blkScan, mb.index))
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, mb.index))
mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file [%q]: %v", mb.mfn, err)
}
mb.mfd = mfd

mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, mb.index))
mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, mb.index))
ifd, err := os.OpenFile(mb.ifn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
mb.dirtyCloseWithRemove(true)
Expand All @@ -1534,7 +1533,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
mb.ifd = ifd

// For subject based info.
mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, mb.index))
mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, mb.index))

// Check if encryption is enabled.
if fs.prf != nil {
Expand Down Expand Up @@ -1576,8 +1575,8 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
return err
}
mb.aek, mb.bek, mb.seed, mb.nonce = key, bek, seed, encrypted[:key.NonceSize()]
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
keyFile := path.Join(mdir, fmt.Sprintf(keyScan, mb.index))
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
Expand Down Expand Up @@ -2119,7 +2118,7 @@ func (mb *msgBlock) compact() {
mb.closeFDsLocked()

// We will write to a new file and mv/rename it in case of failure.
mfn := path.Join(path.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
defer os.Remove(mfn)
if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
return
Expand Down Expand Up @@ -4061,8 +4060,8 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {

// Move the msgs directory out of the way, will delete out of band.
// FIXME(dlc) - These can error and we need to change api above to propagate?
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
// If purge directory still exists then we need to wait
// in place and remove since rename would fail.
if _, err := os.Stat(pdir); err == nil {
Expand Down Expand Up @@ -4593,7 +4592,7 @@ func (fs *fileStore) Delete() error {
}
fs.Purge()

pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
// If purge directory still exists then we need to wait
// in place and remove since rename would fail.
if _, err := os.Stat(pdir); err == nil {
Expand Down Expand Up @@ -4820,13 +4819,13 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ
o.mu.Unlock()

// Write all the consumer files.
if writeFile(path.Join(odirPre, JetStreamMetaFile), meta) != nil {
if writeFile(filepath.Join(odirPre, JetStreamMetaFile), meta) != nil {
return
}
if writeFile(path.Join(odirPre, JetStreamMetaFileSum), sum) != nil {
if writeFile(filepath.Join(odirPre, JetStreamMetaFileSum), sum) != nil {
return
}
writeFile(path.Join(odirPre, consumerState), state)
writeFile(filepath.Join(odirPre, consumerState), state)
}
}

Expand Down Expand Up @@ -4909,7 +4908,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
if cfg == nil || name == _EMPTY_ {
return nil, fmt.Errorf("bad consumer config")
}
odir := path.Join(fs.fcfg.StoreDir, consumerDir, name)
odir := filepath.Join(fs.fcfg.StoreDir, consumerDir, name)
if err := os.MkdirAll(odir, defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create consumer directory - %v", err)
}
Expand All @@ -4920,7 +4919,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
prf: fs.prf,
name: name,
odir: odir,
ifn: path.Join(odir, consumerState),
ifn: filepath.Join(odir, consumerState),
}
key := sha256.Sum256([]byte(fs.cfg.Name + "/" + name))
hh, err := highwayhash.New64(key[:])
Expand All @@ -4931,7 +4930,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt

// Check for encryption.
if o.prf != nil {
if ekey, err := ioutil.ReadFile(path.Join(odir, JetStreamMetaFileKey)); err == nil {
if ekey, err := ioutil.ReadFile(filepath.Join(odir, JetStreamMetaFileKey)); err == nil {
// Recover key encryption key.
rb, err := fs.prf([]byte(fs.cfg.Name + tsep + o.name))
if err != nil {
Expand All @@ -4953,7 +4952,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
}

// Write our meta data iff does not exist.
meta := path.Join(odir, JetStreamMetaFile)
meta := filepath.Join(odir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
csi.Created = time.Now().UTC()
if err := o.writeConsumerMeta(); err != nil {
Expand All @@ -4964,7 +4963,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
// If we expect to be encrypted check that what we are restoring is not plaintext.
// This can happen on snapshot restores or conversions.
if o.prf != nil {
keyFile := path.Join(odir, JetStreamMetaFileKey)
keyFile := filepath.Join(odir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) {
if err := o.writeConsumerMeta(); err != nil {
return nil, err
Expand Down Expand Up @@ -5418,7 +5417,7 @@ func (o *consumerFileStore) updateConfig(cfg ConsumerConfig) error {
// Write out the consumer meta data, i.e. state.
// Lock should be held.
func (cfs *consumerFileStore) writeConsumerMeta() error {
meta := path.Join(cfs.odir, JetStreamMetaFile)
meta := filepath.Join(cfs.odir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -5430,7 +5429,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
return err
}
cfs.aek = key
keyFile := path.Join(cfs.odir, JetStreamMetaFileKey)
keyFile := filepath.Join(cfs.odir, JetStreamMetaFileKey)
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -5456,7 +5455,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
cfs.hh.Reset()
cfs.hh.Write(b)
checksum := hex.EncodeToString(cfs.hh.Sum(nil))
sum := path.Join(cfs.odir, JetStreamMetaFileSum)
sum := filepath.Join(cfs.odir, JetStreamMetaFileSum)
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
}
Expand Down Expand Up @@ -5787,7 +5786,7 @@ type templateFileStore struct {
}

func newTemplateFileStore(storeDir string) *templateFileStore {
tdir := path.Join(storeDir, tmplsDir)
tdir := filepath.Join(storeDir, tmplsDir)
key := sha256.Sum256([]byte("templates"))
hh, err := highwayhash.New64(key[:])
if err != nil {
Expand All @@ -5797,11 +5796,11 @@ func newTemplateFileStore(storeDir string) *templateFileStore {
}

func (ts *templateFileStore) Store(t *streamTemplate) error {
dir := path.Join(ts.dir, t.Name)
dir := filepath.Join(ts.dir, t.Name)
if err := os.MkdirAll(dir, defaultDirPerms); err != nil {
return fmt.Errorf("could not create templates storage directory for %q- %v", t.Name, err)
}
meta := path.Join(dir, JetStreamMetaFile)
meta := filepath.Join(dir, JetStreamMetaFile)
if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil {
return err
}
Expand All @@ -5818,13 +5817,13 @@ func (ts *templateFileStore) Store(t *streamTemplate) error {
ts.hh.Reset()
ts.hh.Write(b)
checksum := hex.EncodeToString(ts.hh.Sum(nil))
sum := path.Join(dir, JetStreamMetaFileSum)
sum := filepath.Join(dir, JetStreamMetaFileSum)
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
}
return nil
}

func (ts *templateFileStore) Delete(t *streamTemplate) error {
return os.RemoveAll(path.Join(ts.dir, t.Name))
return os.RemoveAll(filepath.Join(ts.dir, t.Name))
}
Loading

0 comments on commit 818c2c7

Please sign in to comment.