Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop checkpoint files from OS file cache to prevent file cache growing to hundreds of GB #2281

Merged
merged 6 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/util/cmd/checkpoint-list-tries/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {

func run(*cobra.Command, []string) {

tries, err := wal.LoadCheckpoint(flagCheckpoint)
tries, err := wal.LoadCheckpoint(flagCheckpoint, &log.Logger)
if err != nil {
log.Fatal().Err(err).Msg("error while loading checkpoint")
}
Expand Down
2 changes: 1 addition & 1 deletion ledger/complete/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (l *Ledger) ExportCheckpointAt(

l.logger.Info().Msg("creating a checkpoint for the new trie")

writer, err := wal.CreateCheckpointWriterForFile(outputDir, outputFile)
writer, err := wal.CreateCheckpointWriterForFile(outputDir, outputFile, &l.logger)
if err != nil {
return ledger.State(hash.DummyHash), fmt.Errorf("failed to create a checkpoint writer: %w", err)
}
Expand Down
69 changes: 60 additions & 9 deletions ledger/complete/wal/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"fmt"
"io"
"os"
"os/exec"
"path"
"runtime"
"sort"
"strconv"
"strings"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/mtrie"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
Expand Down Expand Up @@ -243,15 +247,11 @@ func NumberToFilename(n int) string {
}

func (c *Checkpointer) CheckpointWriter(to int) (io.WriteCloser, error) {
return CreateCheckpointWriter(c.dir, to)
}

func CreateCheckpointWriter(dir string, fileNo int) (io.WriteCloser, error) {
return CreateCheckpointWriterForFile(dir, NumberToFilename(fileNo))
return CreateCheckpointWriterForFile(c.dir, NumberToFilename(to), &c.wal.log)
}

// CreateCheckpointWriterForFile returns a file writer that will write to a temporary file and then move it to the checkpoint folder by renaming it.
func CreateCheckpointWriterForFile(dir, filename string) (io.WriteCloser, error) {
func CreateCheckpointWriterForFile(dir, filename string, logger *zerolog.Logger) (io.WriteCloser, error) {

fullname := path.Join(dir, filename)

Expand All @@ -266,6 +266,7 @@ func CreateCheckpointWriterForFile(dir, filename string) (io.WriteCloser, error)

writer := bufio.NewWriterSize(tmpFile, defaultBufioWriteSize)
return &SyncOnCloseRenameFile{
logger: logger,
file: tmpFile,
targetName: fullname,
Writer: writer,
Expand Down Expand Up @@ -393,12 +394,12 @@ func StoreCheckpoint(writer io.Writer, tries ...*trie.MTrie) error {

func (c *Checkpointer) LoadCheckpoint(checkpoint int) ([]*trie.MTrie, error) {
filepath := path.Join(c.dir, NumberToFilename(checkpoint))
return LoadCheckpoint(filepath)
return LoadCheckpoint(filepath, &c.wal.log)
}

func (c *Checkpointer) LoadRootCheckpoint() ([]*trie.MTrie, error) {
filepath := path.Join(c.dir, bootstrap.FilenameWALRootCheckpoint)
return LoadCheckpoint(filepath)
return LoadCheckpoint(filepath, &c.wal.log)
}

func (c *Checkpointer) HasRootCheckpoint() (bool, error) {
Expand All @@ -415,13 +416,15 @@ func (c *Checkpointer) RemoveCheckpoint(checkpoint int) error {
return os.Remove(path.Join(c.dir, NumberToFilename(checkpoint)))
}

func LoadCheckpoint(filepath string) ([]*trie.MTrie, error) {
func LoadCheckpoint(filepath string, logger *zerolog.Logger) ([]*trie.MTrie, error) {
file, err := os.Open(filepath)
if err != nil {
return nil, fmt.Errorf("cannot open checkpoint file %s: %w", filepath, err)
}
defer func() {
_ = file.Close()

_ = requestDropFromOSFileCache(filepath, logger)
}()

return readCheckpoint(file)
Expand Down Expand Up @@ -769,3 +772,51 @@ func readCheckpointV5(f *os.File) ([]*trie.MTrie, error) {

return tries, nil
}

// requestDropFromOSFileCache requests the specified file be dropped from OS file cache.
// The use case is when a new checkpoint is loaded or created, OS file cache can hold the entire
// checkpoint file in memory until requestDropFromOSFileCache() causes it to be dropped from
// the file cache. Not calling requestDropFromOSFileCache() causes two checkpoint files
// to be cached by the OS file cache for each checkpointing, eventually caching hundreds of GB.
// CAUTION: Returns nil without doing anything if GOOS != linux.
func requestDropFromOSFileCache(fileName string, logger *zerolog.Logger) error {
if runtime.GOOS != "linux" {
return nil
}

// Try using /bin/dd (Debian, Ubuntu, etc.)
cmdFileName := "/bin/dd"

// If /bin/dd isn't found, then try /usr/bin/dd (OpenSUSE Leap, etc.)
_, err := os.Stat(cmdFileName)
if os.IsNotExist(err) {
cmdFileName = "/usr/bin/dd"
_, err := os.Stat(cmdFileName)
if os.IsNotExist(err) {
return fmt.Errorf("required program dd not found in /bin/ and /usr/bin/")
}
}

// Remove some special chars from fileName just in case.
// Regex would be shorter but not as easy to read.
s := strings.ReplaceAll(fileName, " ", "")
s = strings.ReplaceAll(s, ";", "")
s = strings.ReplaceAll(s, "$", "")
s = strings.ReplaceAll(s, "|", "")
s = strings.ReplaceAll(s, ">", "")
s = strings.ReplaceAll(s, "<", "")
s = strings.ReplaceAll(s, "*", "")

_, err = os.Stat(s)
if os.IsNotExist(err) {
return fmt.Errorf("sanitized filename %s does not exist", s)
}

cmd := exec.Command(cmdFileName, "if="+s, "iflag=nocache", "count=0")

if logger != nil {
logger.Info().Msgf("run %q to drop file from OS file cache", cmd.String())
}

return cmd.Run()
}
4 changes: 2 additions & 2 deletions ledger/complete/wal/checkpointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func Test_StoringLoadingCheckpoints(t *testing.T) {
file.Close()

t.Run("works without data modification", func(t *testing.T) {
tries, err := realWAL.LoadCheckpoint(filepath)
tries, err := realWAL.LoadCheckpoint(filepath, nil)
require.NoError(t, err)
require.Equal(t, 1, len(tries))
require.Equal(t, updatedTrie, tries[0])
Expand All @@ -551,7 +551,7 @@ func Test_StoringLoadingCheckpoints(t *testing.T) {
err = os.WriteFile(filepath, b, 0644)
require.NoError(t, err)

tries, err := realWAL.LoadCheckpoint(filepath)
tries, err := realWAL.LoadCheckpoint(filepath, nil)
require.Error(t, err)
require.Nil(t, tries)
require.Contains(t, err.Error(), "checksum")
Expand Down
6 changes: 3 additions & 3 deletions ledger/complete/wal/checkpointer_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestLoadCheckpointV1(t *testing.T) {
mustToHash("63df641430e5e0745c3d99ece6ac209467ccfdb77e362e7490a830db8e8803ae"),
}

tries, err := LoadCheckpoint("test_data/checkpoint.v1")
tries, err := LoadCheckpoint("test_data/checkpoint.v1", nil)
require.NoError(t, err)
require.Equal(t, len(expectedRootHash), len(tries))

Expand All @@ -37,7 +37,7 @@ func TestLoadCheckpointV3(t *testing.T) {
mustToHash("63df641430e5e0745c3d99ece6ac209467ccfdb77e362e7490a830db8e8803ae"),
}

tries, err := LoadCheckpoint("test_data/checkpoint.v3")
tries, err := LoadCheckpoint("test_data/checkpoint.v3", nil)
require.NoError(t, err)
require.Equal(t, len(expectedRootHash), len(tries))

Expand All @@ -56,7 +56,7 @@ func TestLoadCheckpointV4(t *testing.T) {
mustToHash("63df641430e5e0745c3d99ece6ac209467ccfdb77e362e7490a830db8e8803ae"),
}

tries, err := LoadCheckpoint("test_data/checkpoint.v4")
tries, err := LoadCheckpoint("test_data/checkpoint.v4", nil)
require.NoError(t, err)
require.Equal(t, len(expectedRootHash), len(tries))

Expand Down
8 changes: 8 additions & 0 deletions ledger/complete/wal/syncrename.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"os"

"github.com/rs/zerolog"
)

type WriterSeekerCloser interface {
Expand All @@ -19,6 +21,7 @@ type WriterSeekerCloser interface {
// to target one as the last step. This help avoid situation when writing is
// interrupted and unusable file but with target name exists.
type SyncOnCloseRenameFile struct {
logger *zerolog.Logger
file *os.File
targetName string
savedError error // savedError is the first error returned from Write. Close() renames temp file to target file only if savedError is nil.
Expand Down Expand Up @@ -60,6 +63,11 @@ func (s *SyncOnCloseRenameFile) Close() error {
return fmt.Errorf("error while renaming from %s to %s: %w", s.file.Name(), s.targetName, err)
}

err = requestDropFromOSFileCache(s.targetName, s.logger)
if err != nil {
return fmt.Errorf("error while requesting drop of %s from OS file cache : %w", s.targetName, err)
}

return nil
}

Expand Down