Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfixes and refactoring #8

Merged
merged 3 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var rootCmd = &cobra.Command{
wg.Add(numberOfJobs)
go func(resultCh chan *dump.JobResult) {
for result := range resultCh {
result.Print()
fmt.Println(result.String())
wg.Done()
}
}(resultCh)
Expand Down
78 changes: 44 additions & 34 deletions dump/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ type JobResult struct {
Elapsed time.Duration
}

func (result *JobResult) Print() {
func (result *JobResult) String() string {
if result.Error != nil {
fmt.Printf("Job: %s failed, it took %s with error: %v \n", result.JobName, result.Elapsed, result.Error)
} else {
fmt.Printf("Job: %s succeeded, it took %v \n", result.JobName, result.Elapsed)
return fmt.Sprintf("Job: %s failed, it took %s with error: %v", result.JobName, result.Elapsed, result.Error)
}

return fmt.Sprintf("Job: %s succeeded, it took %v", result.JobName, result.Elapsed)
}

type Job struct {
Expand Down Expand Up @@ -247,7 +247,7 @@ func (job *Job) Run() *JobResult {
return &result
}

func (job *Job) writeToFile(sshSession *ssh.Session, file io.Writer) error {
func (job *Job) dumpToFile(sshSession *ssh.Session, file io.Writer) error {
var gzipWriter *gzip.Writer
if job.Gzip {
gzipWriter = gzip.NewWriter(file)
Expand Down Expand Up @@ -306,75 +306,82 @@ func (job *Job) writeToFile(sshSession *ssh.Session, file io.Writer) error {
return nil
}

func (job *Job) dumpToCacheFile(sshSession *ssh.Session) (string, error) {
dumpFileName := storage.UploadCacheFilePath(job.Gzip)

file, err := os.Create(dumpFileName)
func (job *Job) dumpToCacheFile(sshSession *ssh.Session) (string, func(), error) {
file, cacheDir, err := storage.CreateCacheFile(job.Gzip)
if err != nil {
return "", fmt.Errorf("failed to create dump file: %w", err)
return "", nil, err
}

defer func() {
err := file.Close()
if err != nil {
log.Printf("failed to close dump cache file: %v", err)
log.Printf("failed to close cache file: %v", err)
}
}()

err = job.writeToFile(sshSession, file)
err = job.dumpToFile(sshSession, file)
if err != nil {
return "", fmt.Errorf("failed to write dump content to file: %w,", err)
return "", nil, fmt.Errorf("failed to dump content to file: %w,", err)
}

return file.Name(), nil
// We have to close the file in defer funciton and returns filename instead of returing the fd (os.File)
// Otherwise if we pass the fd and the storage func reuse the same fd, the file will be corrupted.
return file.Name(), func() {
err = os.RemoveAll(cacheDir)
if err != nil {
log.Println("failed to remove cache dir after dump", err)
}
}, nil
}

// The core function that dump db content to a file (locally or remotely).
// It checks the filename to determine if we need to upload the file to remote storage or keep it locally.
// For uploading file to S3 bucket, the filename shold follow the pattern: s3://<bucket_name>/<key> .
// For any remote upload, we try to cache it in a local dir then upload it to the remote storage.
func (job *Job) dump(sshSession *ssh.Session) error {
err := os.MkdirAll(storage.UploadCacheDir(), 0750)
cacheFileName, cleanup, err := job.dumpToCacheFile(sshSession)
defer cleanup()

if err != nil {
return fmt.Errorf("failed to create upload cache dir for remote upload. %w", err)
return fmt.Errorf("failed to dump content to cache file: %v", err)
}

defer func() {
err = os.RemoveAll(storage.UploadCacheDir())
if err != nil {
log.Println("failed to remove cache dir after dump", err)
}
}()

cacheFile, err := job.dumpToCacheFile(sshSession)

dumpFile, err := os.Open(cacheFile)
cacheFile, err := os.Open(cacheFileName)
if err != nil {
return fmt.Errorf("failed to open the cached dump file %w", err)
return fmt.Errorf("failed to open cache file: %v", err)
}

defer func() {
err := dumpFile.Close()
err := cacheFile.Close()
if err != nil {
log.Printf("failed to close dump cache file for saving to destination: %v", err)
log.Printf("failed to close cache file: %v", err)
}
}()

job.saveToDestinations(dumpFile)
err = job.store(cacheFile)
if err != nil {
return fmt.Errorf("failed to store dump file %v", err)
}

return nil
}

func (job *Job) saveToDestinations(cacheFile io.Reader) error {
// Save dump file to desired storages.
func (job *Job) store(cacheFile io.Reader) error {
storages := job.getStorages()
numberOfStorages := len(storages)

var err error

if numberOfStorages > 0 {
// Use pipe to pass content from the cache file to different writer.
readers, writer, closer := storageReadWriteCloser(numberOfStorages)

go func() {
io.Copy(writer, cacheFile)
_, e := io.Copy(writer, cacheFile)
if e != nil {
multierror.Append(err, e)
}
closer.Close()
}()

Expand All @@ -384,14 +391,17 @@ func (job *Job) saveToDestinations(cacheFile io.Reader) error {
storage := s
go func(i int) {
defer wg.Done()
storage.Save(readers[i], job.Gzip, job.Unique)
e := storage.Save(readers[i], job.Gzip, job.Unique)
if e != nil {
err = multierror.Append(err, e)
}
}(i)
}

wg.Wait()
}

return nil
return err
}

func (job *Job) getStorages() []storage.Storage {
Expand Down
24 changes: 24 additions & 0 deletions dump/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net"
"os"
"testing"
"time"

"github.com/liweiyi88/onedump/storage/local"
"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -218,3 +219,26 @@ func TestRun(t *testing.T) {
}
}
}

func TestResultString(t *testing.T) {
r1 := &JobResult{
JobName: "job1",
Elapsed: time.Second,
}

s := r1.String()
if s != "Job: job1 succeeded, it took 1s" {
t.Errorf("unexpected string result: %s", s)
}

r2 := &JobResult{
Error: errors.New("test err"),
JobName: "job1",
Elapsed: time.Second,
}

s = r2.String()
if s != "Job: job1 failed, it took 1s with error: test err" {
t.Errorf("unexpected string result: %s", s)
}
}
70 changes: 44 additions & 26 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,13 @@ type Storage interface {
Save(reader io.Reader, gzip bool, unique bool) error
}

const uploadDumpCacheDir = ".onedump"
const cacheDirPrefix = ".onedump"

func init() {
rand.Seed(time.Now().UnixNano())
}

// For uploading dump file to remote storage, we need to firstly dump the db content to a dir locally.
// We firstly try to get current work dir, if not successful, then try to get home dir and finally try temp dir.
// Be aware of the size limit of a temp dir in different OS.
func UploadCacheDir() string {
dir, err := os.Getwd()
if err != nil {
log.Printf("Cannot get the current directory: %v, using $HOME directory!", err)
dir, err = os.UserHomeDir()
if err != nil {
log.Printf("Cannot get the user home directory: %v, using /tmp directory!", err)
dir = os.TempDir()
}
}

return fmt.Sprintf("%s/%s", dir, uploadDumpCacheDir)
}

func UploadCacheFilePath(shouldGzip bool) string {
filename := fmt.Sprintf("%s/%s", UploadCacheDir(), generateCacheFileName(8)+".sql")
return ensureFileSuffix(filename, shouldGzip)
}

func generateCacheFileName(n int) string {
func generateRandomName(n int) string {
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

b := make([]byte, n)
Expand All @@ -65,6 +43,7 @@ func ensureFileSuffix(filename string, shouldGzip bool) string {
return filename + ".gz"
}

// Ensure a file has unique name when necessary.
func ensureUniqueness(path string, unique bool) string {
if !unique {
return path
Expand All @@ -73,16 +52,55 @@ func ensureUniqueness(path string, unique bool) string {
s := strings.Split(path, "/")

filename := s[len(s)-1]

now := time.Now().UTC().Format("20060102150405")

uniqueFile := now + "-" + filename

s[len(s)-1] = uniqueFile

return strings.Join(s, "/")
}

// For uploading dump file to remote storage, we need to firstly dump the db content to a dir locally.
// We firstly try to get current work dir, if not successful, then try to get home dir and finally try temp dir.
// Be aware of the size limit of a temp dir in different OS.
func cacheFileDir() string {
dir, err := os.Getwd()
if err != nil {
log.Printf("Cannot get the current directory: %v, using $HOME directory!", err)
dir, err = os.UserHomeDir()
if err != nil {
log.Printf("Cannot get the user home directory: %v, using /tmp directory!", err)
dir = os.TempDir()
}
}

// randomise the upload cache dir, otherwise we will have race condition when have more than one dump jobs.
return fmt.Sprintf("%s/%s%s", dir, cacheDirPrefix, generateRandomName(4))
}

func cacheFilePath(cacheDir string, shouldGzip bool) string {
filename := fmt.Sprintf("%s/%s", cacheDir, generateRandomName(8)+".sql")
return ensureFileSuffix(filename, shouldGzip)
}

func CreateCacheFile(gzip bool) (*os.File, string, error) {
cacheDir := cacheFileDir()
err := os.MkdirAll(cacheDir, 0750)

if err != nil {
return nil, "", fmt.Errorf("failed to create cache dir for remote upload. %w", err)
}

dumpFileName := cacheFilePath(cacheDir, gzip)

file, err := os.Create(dumpFileName)
if err != nil {
return nil, "", fmt.Errorf("failed to create cache file: %w", err)
}

return file, cacheDir, nil
}

func EnsureFileName(path string, shouldGzip, unique bool) string {
p := ensureFileSuffix(path, shouldGzip)
return ensureUniqueness(p, unique)
Expand Down
42 changes: 34 additions & 8 deletions storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,27 @@ package storage

import (
"fmt"
"log"
"os"
"strings"
"testing"
"time"
)

func TestUploadCacheDir(t *testing.T) {
actual := UploadCacheDir()
actual := cacheFileDir()

workDir, _ := os.Getwd()
expected := fmt.Sprintf("%s/%s", workDir, uploadDumpCacheDir)
prefix := fmt.Sprintf("%s/%s", workDir, cacheDirPrefix)

if actual != expected {
t.Errorf("get unexpected cache dir: expected: %s, actual: %s", expected, actual)
if !strings.HasPrefix(actual, prefix) {
t.Errorf("get unexpected cache dir: expected: %s, actual: %s", prefix, actual)
}
}

func TestGenerateCacheFileName(t *testing.T) {
expectedLen := 5
name := generateCacheFileName(expectedLen)
name := generateRandomName(expectedLen)

actualLen := len([]rune(name))
if actualLen != expectedLen {
Expand All @@ -30,19 +31,22 @@ func TestGenerateCacheFileName(t *testing.T) {
}

func TestUploadCacheFilePath(t *testing.T) {
gziped := UploadCacheFilePath(true)

cacheDir := cacheFileDir()

gziped := cacheFilePath(cacheDir, true)

if !strings.HasSuffix(gziped, ".gz") {
t.Errorf("expected filename has .gz extention, actual file name: %s", gziped)
}

sql := UploadCacheFilePath(false)
sql := cacheFilePath(cacheDir, false)

if !strings.HasSuffix(sql, ".sql") {
t.Errorf("expected filename has .sql extention, actual file name: %s", sql)
}

sql2 := UploadCacheFilePath(false)
sql2 := cacheFilePath(cacheDir, false)

if sql == sql2 {
t.Errorf("expected unique file name but got same filename %s", sql)
Expand Down Expand Up @@ -90,6 +94,28 @@ func TestEnsureUniqueness(t *testing.T) {
}
}

func TestCreateCacheFile(t *testing.T) {
file, cacheDir, _ := CreateCacheFile(true)

defer func() {
file.Close()

err := os.RemoveAll(cacheDir)
if err != nil {
log.Println("failed to remove cache dir after dump", err)
}
}()

fileInfo, err := os.Stat(file.Name())
if err != nil {
t.Errorf("failed to get cache file info %v", err)
}

if fileInfo.Size() != 0 {
t.Errorf("expected empty file but get size: %d", fileInfo.Size())
}
}

func TestEnsureFileName(t *testing.T) {
p := EnsureFileName("/Users/jack/Desktop/hello.sql", true, false)

Expand Down