From 4cf53ffc7af9152b006b4248fc99207dba639d77 Mon Sep 17 00:00:00 2001 From: liweiyi88 Date: Wed, 11 Jan 2023 16:18:40 +1100 Subject: [PATCH 1/3] bugfixes and refactoring --- cmd/root.go | 2 +- dump/job.go | 78 +++++++++++++++++++++++------------------ dump/job_test.go | 24 +++++++++++++ storage/storage.go | 70 ++++++++++++++++++++++-------------- storage/storage_test.go | 19 +++++----- 5 files changed, 124 insertions(+), 69 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index b8c004c..3a9230a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -53,7 +53,7 @@ var rootCmd = &cobra.Command{ wg.Add(numberOfJobs) go func(resultCh chan *dump.JobResult) { for result := range resultCh { - result.Print() + fmt.Println(result.String()) wg.Done() } }(resultCh) diff --git a/dump/job.go b/dump/job.go index e448136..874a82e 100644 --- a/dump/job.go +++ b/dump/job.go @@ -51,12 +51,12 @@ type JobResult struct { Elapsed time.Duration } -func (result *JobResult) Print() { +func (result *JobResult) String() string { if result.Error != nil { - fmt.Printf("Job: %s failed, it took %s with error: %v \n", result.JobName, result.Elapsed, result.Error) - } else { - fmt.Printf("Job: %s succeeded, it took %v \n", result.JobName, result.Elapsed) + return fmt.Sprintf("Job: %s failed, it took %s with error: %v", result.JobName, result.Elapsed, result.Error) } + + return fmt.Sprintf("Job: %s succeeded, it took %v", result.JobName, result.Elapsed) } type Job struct { @@ -247,7 +247,7 @@ func (job *Job) Run() *JobResult { return &result } -func (job *Job) writeToFile(sshSession *ssh.Session, file io.Writer) error { +func (job *Job) dumpToFile(sshSession *ssh.Session, file io.Writer) error { var gzipWriter *gzip.Writer if job.Gzip { gzipWriter = gzip.NewWriter(file) @@ -306,27 +306,32 @@ func (job *Job) writeToFile(sshSession *ssh.Session, file io.Writer) error { return nil } -func (job *Job) dumpToCacheFile(sshSession *ssh.Session) (string, error) { - dumpFileName := storage.UploadCacheFilePath(job.Gzip) - - file, err := os.Create(dumpFileName) +func (job *Job) dumpToCacheFile(sshSession *ssh.Session) (string, func(), error) { + file, cacheDir, err := storage.CreateCacheFile(job.Gzip) if err != nil { - return "", fmt.Errorf("failed to create dump file: %w", err) + return "", nil, err } defer func() { err := file.Close() if err != nil { - log.Printf("failed to close dump cache file: %v", err) + log.Printf("failed to close cache file: %v", err) } }() - err = job.writeToFile(sshSession, file) + err = job.dumpToFile(sshSession, file) if err != nil { - return "", fmt.Errorf("failed to write dump content to file: %w,", err) + return "", nil, fmt.Errorf("failed to dump content to file: %w,", err) } - return file.Name(), nil + // We have to close the file in defer funciton and returns filename instead of returing the fd (os.File) + // Otherwise if we pass the fd and the storage func reuse the same fd, the file will be corrupted. + return file.Name(), func() { + err = os.RemoveAll(cacheDir) + if err != nil { + log.Println("failed to remove cache dir after dump", err) + } + }, nil } // The core function that dump db content to a file (locally or remotely). @@ -334,47 +339,49 @@ func (job *Job) dumpToCacheFile(sshSession *ssh.Session) (string, error) { // For uploading file to S3 bucket, the filename shold follow the pattern: s3:/// . // For any remote upload, we try to cache it in a local dir then upload it to the remote storage. func (job *Job) dump(sshSession *ssh.Session) error { - err := os.MkdirAll(storage.UploadCacheDir(), 0750) + cacheFileName, cleanup, err := job.dumpToCacheFile(sshSession) + defer cleanup() + if err != nil { - return fmt.Errorf("failed to create upload cache dir for remote upload. %w", err) + return fmt.Errorf("failed to dump content to cache file: %v", err) } - defer func() { - err = os.RemoveAll(storage.UploadCacheDir()) - if err != nil { - log.Println("failed to remove cache dir after dump", err) - } - }() - - cacheFile, err := job.dumpToCacheFile(sshSession) - - dumpFile, err := os.Open(cacheFile) + cacheFile, err := os.Open(cacheFileName) if err != nil { - return fmt.Errorf("failed to open the cached dump file %w", err) + return fmt.Errorf("failed to open cache file: %v", err) } defer func() { - err := dumpFile.Close() + err := cacheFile.Close() if err != nil { - log.Printf("failed to close dump cache file for saving to destination: %v", err) + log.Printf("failed to close cache file: %v", err) } }() - job.saveToDestinations(dumpFile) + err = job.store(cacheFile) + if err != nil { + return fmt.Errorf("failed to store dump file %v", err) + } return nil } -func (job *Job) saveToDestinations(cacheFile io.Reader) error { +// Save dump file to desired storages. +func (job *Job) store(cacheFile io.Reader) error { storages := job.getStorages() numberOfStorages := len(storages) + var err error + if numberOfStorages > 0 { // Use pipe to pass content from the cache file to different writer. readers, writer, closer := storageReadWriteCloser(numberOfStorages) go func() { - io.Copy(writer, cacheFile) + _, e := io.Copy(writer, cacheFile) + if e != nil { + multierror.Append(err, e) + } closer.Close() }() @@ -384,14 +391,17 @@ func (job *Job) saveToDestinations(cacheFile io.Reader) error { storage := s go func(i int) { defer wg.Done() - storage.Save(readers[i], job.Gzip, job.Unique) + e := storage.Save(readers[i], job.Gzip, job.Unique) + if e != nil { + err = multierror.Append(err, e) + } }(i) } wg.Wait() } - return nil + return err } func (job *Job) getStorages() []storage.Storage { diff --git a/dump/job_test.go b/dump/job_test.go index 8694f19..2e0d2c3 100644 --- a/dump/job_test.go +++ b/dump/job_test.go @@ -11,6 +11,7 @@ import ( "net" "os" "testing" + "time" "github.com/liweiyi88/onedump/storage/local" "golang.org/x/crypto/ssh" @@ -218,3 +219,26 @@ func TestRun(t *testing.T) { } } } + +func TestResultString(t *testing.T) { + r1 := &JobResult{ + JobName: "job1", + Elapsed: time.Second, + } + + s := r1.String() + if s != "Job: job1 succeeded, it took 1s" { + t.Errorf("unexpected string result: %s", s) + } + + r2 := &JobResult{ + Error: errors.New("test err"), + JobName: "job1", + Elapsed: time.Second, + } + + s = r2.String() + if s != "Job: job1 failed, it took 1s with error: test err" { + t.Errorf("unexpected string result: %s", s) + } +} diff --git a/storage/storage.go b/storage/storage.go index 7f1d0eb..fed8182 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -14,35 +14,13 @@ type Storage interface { Save(reader io.Reader, gzip bool, unique bool) error } -const uploadDumpCacheDir = ".onedump" +const cacheDirPrefix = ".onedump" func init() { rand.Seed(time.Now().UnixNano()) } -// For uploading dump file to remote storage, we need to firstly dump the db content to a dir locally. -// We firstly try to get current work dir, if not successful, then try to get home dir and finally try temp dir. -// Be aware of the size limit of a temp dir in different OS. -func UploadCacheDir() string { - dir, err := os.Getwd() - if err != nil { - log.Printf("Cannot get the current directory: %v, using $HOME directory!", err) - dir, err = os.UserHomeDir() - if err != nil { - log.Printf("Cannot get the user home directory: %v, using /tmp directory!", err) - dir = os.TempDir() - } - } - - return fmt.Sprintf("%s/%s", dir, uploadDumpCacheDir) -} - -func UploadCacheFilePath(shouldGzip bool) string { - filename := fmt.Sprintf("%s/%s", UploadCacheDir(), generateCacheFileName(8)+".sql") - return ensureFileSuffix(filename, shouldGzip) -} - -func generateCacheFileName(n int) string { +func generateRandomName(n int) string { const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" b := make([]byte, n) @@ -65,6 +43,7 @@ func ensureFileSuffix(filename string, shouldGzip bool) string { return filename + ".gz" } +// Ensure a file has unique name when necessary. func ensureUniqueness(path string, unique bool) string { if !unique { return path @@ -73,9 +52,7 @@ func ensureUniqueness(path string, unique bool) string { s := strings.Split(path, "/") filename := s[len(s)-1] - now := time.Now().UTC().Format("20060102150405") - uniqueFile := now + "-" + filename s[len(s)-1] = uniqueFile @@ -83,6 +60,47 @@ func ensureUniqueness(path string, unique bool) string { return strings.Join(s, "/") } +// For uploading dump file to remote storage, we need to firstly dump the db content to a dir locally. +// We firstly try to get current work dir, if not successful, then try to get home dir and finally try temp dir. +// Be aware of the size limit of a temp dir in different OS. +func cachedFileDir() string { + dir, err := os.Getwd() + if err != nil { + log.Printf("Cannot get the current directory: %v, using $HOME directory!", err) + dir, err = os.UserHomeDir() + if err != nil { + log.Printf("Cannot get the user home directory: %v, using /tmp directory!", err) + dir = os.TempDir() + } + } + + // randomise the upload cache dir, otherwise we will have race condition when have more than one dump jobs. + return fmt.Sprintf("%s/%s%s", dir, cacheDirPrefix, generateRandomName(4)) +} + +func cachedFilePath(cacheDir string, shouldGzip bool) string { + filename := fmt.Sprintf("%s/%s", cacheDir, generateRandomName(8)+".sql") + return ensureFileSuffix(filename, shouldGzip) +} + +func CreateCacheFile(gzip bool) (*os.File, string, error) { + cacheDir := cachedFileDir() + err := os.MkdirAll(cacheDir, 0750) + + if err != nil { + return nil, "", fmt.Errorf("failed to create cache dir for remote upload. %w", err) + } + + dumpFileName := cachedFilePath(cacheDir, gzip) + + file, err := os.Create(dumpFileName) + if err != nil { + return nil, "", fmt.Errorf("failed to create cache file: %w", err) + } + + return file, cacheDir, nil +} + func EnsureFileName(path string, shouldGzip, unique bool) string { p := ensureFileSuffix(path, shouldGzip) return ensureUniqueness(p, unique) diff --git a/storage/storage_test.go b/storage/storage_test.go index e53f47c..60f3894 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -9,19 +9,19 @@ import ( ) func TestUploadCacheDir(t *testing.T) { - actual := UploadCacheDir() + actual := cachedFileDir() workDir, _ := os.Getwd() - expected := fmt.Sprintf("%s/%s", workDir, uploadDumpCacheDir) + prefix := fmt.Sprintf("%s/%s", workDir, cacheDirPrefix) - if actual != expected { - t.Errorf("get unexpected cache dir: expected: %s, actual: %s", expected, actual) + if !strings.HasPrefix(actual, prefix) { + t.Errorf("get unexpected cache dir: expected: %s, actual: %s", prefix, actual) } } func TestGenerateCacheFileName(t *testing.T) { expectedLen := 5 - name := generateCacheFileName(expectedLen) + name := generateRandomName(expectedLen) actualLen := len([]rune(name)) if actualLen != expectedLen { @@ -30,19 +30,22 @@ func TestGenerateCacheFileName(t *testing.T) { } func TestUploadCacheFilePath(t *testing.T) { - gziped := UploadCacheFilePath(true) + + cacheDir := cachedFileDir() + + gziped := cachedFilePath(cacheDir, true) if !strings.HasSuffix(gziped, ".gz") { t.Errorf("expected filename has .gz extention, actual file name: %s", gziped) } - sql := UploadCacheFilePath(false) + sql := cachedFilePath(cacheDir, false) if !strings.HasSuffix(sql, ".sql") { t.Errorf("expected filename has .sql extention, actual file name: %s", sql) } - sql2 := UploadCacheFilePath(false) + sql2 := cachedFilePath(cacheDir, false) if sql == sql2 { t.Errorf("expected unique file name but got same filename %s", sql) From 82c527837c0f8e54e48fe7a073a4cefa65112df9 Mon Sep 17 00:00:00 2001 From: liweiyi88 Date: Wed, 11 Jan 2023 21:20:09 +1100 Subject: [PATCH 2/3] add test --- storage/storage.go | 8 ++++---- storage/storage_test.go | 24 +++++++++++++++++++----- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/storage/storage.go b/storage/storage.go index fed8182..b74e03b 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -63,7 +63,7 @@ func ensureUniqueness(path string, unique bool) string { // For uploading dump file to remote storage, we need to firstly dump the db content to a dir locally. // We firstly try to get current work dir, if not successful, then try to get home dir and finally try temp dir. // Be aware of the size limit of a temp dir in different OS. -func cachedFileDir() string { +func cacheFileDir() string { dir, err := os.Getwd() if err != nil { log.Printf("Cannot get the current directory: %v, using $HOME directory!", err) @@ -78,20 +78,20 @@ func cachedFileDir() string { return fmt.Sprintf("%s/%s%s", dir, cacheDirPrefix, generateRandomName(4)) } -func cachedFilePath(cacheDir string, shouldGzip bool) string { +func cacheFilePath(cacheDir string, shouldGzip bool) string { filename := fmt.Sprintf("%s/%s", cacheDir, generateRandomName(8)+".sql") return ensureFileSuffix(filename, shouldGzip) } func CreateCacheFile(gzip bool) (*os.File, string, error) { - cacheDir := cachedFileDir() + cacheDir := cacheFileDir() err := os.MkdirAll(cacheDir, 0750) if err != nil { return nil, "", fmt.Errorf("failed to create cache dir for remote upload. %w", err) } - dumpFileName := cachedFilePath(cacheDir, gzip) + dumpFileName := cacheFilePath(cacheDir, gzip) file, err := os.Create(dumpFileName) if err != nil { diff --git a/storage/storage_test.go b/storage/storage_test.go index 60f3894..78b17fe 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -9,7 +9,7 @@ import ( ) func TestUploadCacheDir(t *testing.T) { - actual := cachedFileDir() + actual := cacheFileDir() workDir, _ := os.Getwd() prefix := fmt.Sprintf("%s/%s", workDir, cacheDirPrefix) @@ -31,21 +31,21 @@ func TestGenerateCacheFileName(t *testing.T) { func TestUploadCacheFilePath(t *testing.T) { - cacheDir := cachedFileDir() + cacheDir := cacheFileDir() - gziped := cachedFilePath(cacheDir, true) + gziped := cacheFilePath(cacheDir, true) if !strings.HasSuffix(gziped, ".gz") { t.Errorf("expected filename has .gz extention, actual file name: %s", gziped) } - sql := cachedFilePath(cacheDir, false) + sql := cacheFilePath(cacheDir, false) if !strings.HasSuffix(sql, ".sql") { t.Errorf("expected filename has .sql extention, actual file name: %s", sql) } - sql2 := cachedFilePath(cacheDir, false) + sql2 := cacheFilePath(cacheDir, false) if sql == sql2 { t.Errorf("expected unique file name but got same filename %s", sql) @@ -93,6 +93,20 @@ func TestEnsureUniqueness(t *testing.T) { } } +func TestCreateCacheFile(t *testing.T) { + file, _, _ := CreateCacheFile(true) + defer file.Close() + + fileInfo, err := os.Stat(file.Name()) + if err != nil { + t.Errorf("failed to get cache file info %v", err) + } + + if fileInfo.Size() != 0 { + t.Errorf("expected empty file but get size: %d", fileInfo.Size()) + } +} + func TestEnsureFileName(t *testing.T) { p := EnsureFileName("/Users/jack/Desktop/hello.sql", true, false) From eb4ed223d39f5e2ae929d3036fa1caf1df789c34 Mon Sep 17 00:00:00 2001 From: liweiyi88 Date: Wed, 11 Jan 2023 21:21:39 +1100 Subject: [PATCH 3/3] add test --- storage/storage_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/storage/storage_test.go b/storage/storage_test.go index 78b17fe..15d61a4 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "log" "os" "strings" "testing" @@ -94,8 +95,16 @@ func TestEnsureUniqueness(t *testing.T) { } func TestCreateCacheFile(t *testing.T) { - file, _, _ := CreateCacheFile(true) - defer file.Close() + file, cacheDir, _ := CreateCacheFile(true) + + defer func() { + file.Close() + + err := os.RemoveAll(cacheDir) + if err != nil { + log.Println("failed to remove cache dir after dump", err) + } + }() fileInfo, err := os.Stat(file.Name()) if err != nil {