diff --git a/cmd/util/cmd/checkpoint-list-tries/cmd.go b/cmd/util/cmd/checkpoint-list-tries/cmd.go index bfad7c18bef..58587abe879 100644 --- a/cmd/util/cmd/checkpoint-list-tries/cmd.go +++ b/cmd/util/cmd/checkpoint-list-tries/cmd.go @@ -28,7 +28,7 @@ func init() { func run(*cobra.Command, []string) { - tries, err := wal.LoadCheckpoint(flagCheckpoint) + tries, err := wal.LoadCheckpoint(flagCheckpoint, &log.Logger) if err != nil { log.Fatal().Err(err).Msg("error while loading checkpoint") } diff --git a/ledger/complete/ledger.go b/ledger/complete/ledger.go index 57bdbe37292..9eadc2cd76b 100644 --- a/ledger/complete/ledger.go +++ b/ledger/complete/ledger.go @@ -355,7 +355,7 @@ func (l *Ledger) ExportCheckpointAt( l.logger.Info().Msg("creating a checkpoint for the new trie") - writer, err := wal.CreateCheckpointWriterForFile(outputDir, outputFile) + writer, err := wal.CreateCheckpointWriterForFile(outputDir, outputFile, &l.logger) if err != nil { return ledger.State(hash.DummyHash), fmt.Errorf("failed to create a checkpoint writer: %w", err) } diff --git a/ledger/complete/wal/checkpointer.go b/ledger/complete/wal/checkpointer.go index ad225e984fd..8765dbfeaad 100644 --- a/ledger/complete/wal/checkpointer.go +++ b/ledger/complete/wal/checkpointer.go @@ -7,11 +7,15 @@ import ( "fmt" "io" "os" + "os/exec" "path" + "runtime" "sort" "strconv" "strings" + "github.com/rs/zerolog" + "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/complete/mtrie" "github.com/onflow/flow-go/ledger/complete/mtrie/flattener" @@ -243,15 +247,11 @@ func NumberToFilename(n int) string { } func (c *Checkpointer) CheckpointWriter(to int) (io.WriteCloser, error) { - return CreateCheckpointWriter(c.dir, to) -} - -func CreateCheckpointWriter(dir string, fileNo int) (io.WriteCloser, error) { - return CreateCheckpointWriterForFile(dir, NumberToFilename(fileNo)) + return CreateCheckpointWriterForFile(c.dir, NumberToFilename(to), &c.wal.log) } // CreateCheckpointWriterForFile returns a file writer that will write to a temporary file and then move it to the checkpoint folder by renaming it. -func CreateCheckpointWriterForFile(dir, filename string) (io.WriteCloser, error) { +func CreateCheckpointWriterForFile(dir, filename string, logger *zerolog.Logger) (io.WriteCloser, error) { fullname := path.Join(dir, filename) @@ -266,6 +266,7 @@ func CreateCheckpointWriterForFile(dir, filename string) (io.WriteCloser, error) writer := bufio.NewWriterSize(tmpFile, defaultBufioWriteSize) return &SyncOnCloseRenameFile{ + logger: logger, file: tmpFile, targetName: fullname, Writer: writer, @@ -393,12 +394,12 @@ func StoreCheckpoint(writer io.Writer, tries ...*trie.MTrie) error { func (c *Checkpointer) LoadCheckpoint(checkpoint int) ([]*trie.MTrie, error) { filepath := path.Join(c.dir, NumberToFilename(checkpoint)) - return LoadCheckpoint(filepath) + return LoadCheckpoint(filepath, &c.wal.log) } func (c *Checkpointer) LoadRootCheckpoint() ([]*trie.MTrie, error) { filepath := path.Join(c.dir, bootstrap.FilenameWALRootCheckpoint) - return LoadCheckpoint(filepath) + return LoadCheckpoint(filepath, &c.wal.log) } func (c *Checkpointer) HasRootCheckpoint() (bool, error) { @@ -415,13 +416,15 @@ func (c *Checkpointer) RemoveCheckpoint(checkpoint int) error { return os.Remove(path.Join(c.dir, NumberToFilename(checkpoint))) } -func LoadCheckpoint(filepath string) ([]*trie.MTrie, error) { +func LoadCheckpoint(filepath string, logger *zerolog.Logger) ([]*trie.MTrie, error) { file, err := os.Open(filepath) if err != nil { return nil, fmt.Errorf("cannot open checkpoint file %s: %w", filepath, err) } defer func() { _ = file.Close() + + _ = requestDropFromOSFileCache(filepath, logger) }() return readCheckpoint(file) @@ -769,3 +772,51 @@ func readCheckpointV5(f *os.File) ([]*trie.MTrie, error) { return tries, nil } + +// requestDropFromOSFileCache requests the specified file be dropped from OS file cache. +// The use case is when a new checkpoint is loaded or created, OS file cache can hold the entire +// checkpoint file in memory until requestDropFromOSFileCache() causes it to be dropped from +// the file cache. Not calling requestDropFromOSFileCache() causes two checkpoint files +// to be cached by the OS file cache for each checkpointing, eventually caching hundreds of GB. +// CAUTION: Returns nil without doing anything if GOOS != linux. +func requestDropFromOSFileCache(fileName string, logger *zerolog.Logger) error { + if runtime.GOOS != "linux" { + return nil + } + + // Try using /bin/dd (Debian, Ubuntu, etc.) + cmdFileName := "/bin/dd" + + // If /bin/dd isn't found, then try /usr/bin/dd (OpenSUSE Leap, etc.) + _, err := os.Stat(cmdFileName) + if os.IsNotExist(err) { + cmdFileName = "/usr/bin/dd" + _, err := os.Stat(cmdFileName) + if os.IsNotExist(err) { + return fmt.Errorf("required program dd not found in /bin/ and /usr/bin/") + } + } + + // Remove some special chars from fileName just in case. + // Regex would be shorter but not as easy to read. + s := strings.ReplaceAll(fileName, " ", "") + s = strings.ReplaceAll(s, ";", "") + s = strings.ReplaceAll(s, "$", "") + s = strings.ReplaceAll(s, "|", "") + s = strings.ReplaceAll(s, ">", "") + s = strings.ReplaceAll(s, "<", "") + s = strings.ReplaceAll(s, "*", "") + + _, err = os.Stat(s) + if os.IsNotExist(err) { + return fmt.Errorf("sanitized filename %s does not exist", s) + } + + cmd := exec.Command(cmdFileName, "if="+s, "iflag=nocache", "count=0") + + if logger != nil { + logger.Info().Msgf("run %q to drop file from OS file cache", cmd.String()) + } + + return cmd.Run() +} diff --git a/ledger/complete/wal/checkpointer_test.go b/ledger/complete/wal/checkpointer_test.go index f59ac3a5fc0..e9f84610329 100644 --- a/ledger/complete/wal/checkpointer_test.go +++ b/ledger/complete/wal/checkpointer_test.go @@ -534,7 +534,7 @@ func Test_StoringLoadingCheckpoints(t *testing.T) { file.Close() t.Run("works without data modification", func(t *testing.T) { - tries, err := realWAL.LoadCheckpoint(filepath) + tries, err := realWAL.LoadCheckpoint(filepath, nil) require.NoError(t, err) require.Equal(t, 1, len(tries)) require.Equal(t, updatedTrie, tries[0]) @@ -551,7 +551,7 @@ func Test_StoringLoadingCheckpoints(t *testing.T) { err = os.WriteFile(filepath, b, 0644) require.NoError(t, err) - tries, err := realWAL.LoadCheckpoint(filepath) + tries, err := realWAL.LoadCheckpoint(filepath, nil) require.Error(t, err) require.Nil(t, tries) require.Contains(t, err.Error(), "checksum") diff --git a/ledger/complete/wal/checkpointer_versioning_test.go b/ledger/complete/wal/checkpointer_versioning_test.go index 7e8359ad0d1..ba3138e6c4f 100644 --- a/ledger/complete/wal/checkpointer_versioning_test.go +++ b/ledger/complete/wal/checkpointer_versioning_test.go @@ -18,7 +18,7 @@ func TestLoadCheckpointV1(t *testing.T) { mustToHash("63df641430e5e0745c3d99ece6ac209467ccfdb77e362e7490a830db8e8803ae"), } - tries, err := LoadCheckpoint("test_data/checkpoint.v1") + tries, err := LoadCheckpoint("test_data/checkpoint.v1", nil) require.NoError(t, err) require.Equal(t, len(expectedRootHash), len(tries)) @@ -37,7 +37,7 @@ func TestLoadCheckpointV3(t *testing.T) { mustToHash("63df641430e5e0745c3d99ece6ac209467ccfdb77e362e7490a830db8e8803ae"), } - tries, err := LoadCheckpoint("test_data/checkpoint.v3") + tries, err := LoadCheckpoint("test_data/checkpoint.v3", nil) require.NoError(t, err) require.Equal(t, len(expectedRootHash), len(tries)) @@ -56,7 +56,7 @@ func TestLoadCheckpointV4(t *testing.T) { mustToHash("63df641430e5e0745c3d99ece6ac209467ccfdb77e362e7490a830db8e8803ae"), } - tries, err := LoadCheckpoint("test_data/checkpoint.v4") + tries, err := LoadCheckpoint("test_data/checkpoint.v4", nil) require.NoError(t, err) require.Equal(t, len(expectedRootHash), len(tries)) diff --git a/ledger/complete/wal/syncrename.go b/ledger/complete/wal/syncrename.go index 584efac641e..f084ff03a96 100644 --- a/ledger/complete/wal/syncrename.go +++ b/ledger/complete/wal/syncrename.go @@ -5,6 +5,8 @@ import ( "fmt" "io" "os" + + "github.com/rs/zerolog" ) type WriterSeekerCloser interface { @@ -19,6 +21,7 @@ type WriterSeekerCloser interface { // to target one as the last step. This help avoid situation when writing is // interrupted and unusable file but with target name exists. type SyncOnCloseRenameFile struct { + logger *zerolog.Logger file *os.File targetName string savedError error // savedError is the first error returned from Write. Close() renames temp file to target file only if savedError is nil. @@ -60,6 +63,11 @@ func (s *SyncOnCloseRenameFile) Close() error { return fmt.Errorf("error while renaming from %s to %s: %w", s.file.Name(), s.targetName, err) } + err = requestDropFromOSFileCache(s.targetName, s.logger) + if err != nil { + return fmt.Errorf("error while requesting drop of %s from OS file cache : %w", s.targetName, err) + } + return nil }