Skip to content

Commit

Permalink
Support copy in parallel (minio#2258)
Browse files Browse the repository at this point in the history
  • Loading branch information
chibiegg authored and deekoder committed Sep 29, 2017
1 parent 36d7d0b commit 5a2798d
Showing 1 changed file with 43 additions and 5 deletions.
48 changes: 43 additions & 5 deletions cmd/cp-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"syscall"

"github.com/cheggaaa/pb"
Expand All @@ -40,6 +41,11 @@ var (
Name: "recursive, r",
Usage: "Copy recursively.",
},
cli.UintFlag{
Name: "parallel, p",
Usage: "Number of copies in parallel",
Value: 1,
},
}
)

Expand All @@ -66,16 +72,19 @@ EXAMPLES:
2. Copy a folder recursively from Minio cloud storage to Amazon S3 cloud storage.
$ {{.HelpName}} --recursive play/mybucket/burningman2011/ s3/mybucket/
3. Copy multiple local folders recursively to Minio cloud storage.
3. Copy a folder recursively from Minio cloud storage to Amazon S3 cloud storage in parallel.
$ {{.HelpName}} --recursive --parallel 10 play/mybucket/burningman2011/ s3/mybucket/
4. Copy multiple local folders recursively to Minio cloud storage.
$ {{.HelpName}} --recursive backup/2014/ backup/2015/ play/archive/
4. Copy a bucket recursively from aliased Amazon S3 cloud storage to local filesystem on Windows.
5. Copy a bucket recursively from aliased Amazon S3 cloud storage to local filesystem on Windows.
$ {{.HelpName}} --recursive s3\documents\2014\ C:\Backups\2014
5. Copy an object with name containing unicode characters to Amazon S3 cloud storage.
6. Copy an object with name containing unicode characters to Amazon S3 cloud storage.
$ {{.HelpName}} 本語 s3/andoria/
6. Copy a local folder with space separated characters to Amazon S3 cloud storage.
7. Copy a local folder with space separated characters to Amazon S3 cloud storage.
$ {{.HelpName}} --recursive 'workdir/documents/May 2014/' s3/miniocloud
`,
Expand Down Expand Up @@ -257,6 +266,16 @@ func doCopySession(session *sessionV8) error {
// or not. This is useful when we resume from a session.
isCopied := isLastFactory(session.Header.LastCopied)

// Number of parallel
parallel := 1
if session.Header.CommandIntFlags["parallel"] > 1 {
parallel = session.Header.CommandIntFlags["parallel"]
}
// Queue of doCopy() operation.
var queueCh = make(chan URLs, parallel)
// The number of waiting jobs
var waitGroup = &sync.WaitGroup{}

// Enable progress bar reader only during default mode.
var progressReader *progressBar
if !globalQuiet && !globalJSON { // set up progress bar
Expand All @@ -266,6 +285,19 @@ func doCopySession(session *sessionV8) error {
// Wait on status of doCopy() operation.
var statusCh = make(chan URLs)

for i := 0; i < parallel; i++ {
go func() {
for {
cpURLs, ok := <-queueCh
if !ok {
return
}
statusCh <- doCopy(cpURLs, progressReader, accntReader)
waitGroup.Done()
}
}()
}

go func() {
// Loop through all urls.
for urlScanner.Scan() {
Expand All @@ -283,10 +315,15 @@ func doCopySession(session *sessionV8) error {
if isCopied(cpURLs.SourceContent.URL.String()) {
statusCh <- doCopyFake(cpURLs, progressReader)
} else {
statusCh <- doCopy(cpURLs, progressReader, accntReader)
waitGroup.Add(1)
queueCh <- cpURLs
}
}

// Waiting to complete jobs
waitGroup.Wait()
// Close
close(queueCh)
// URLs feeding finished
close(statusCh)

Expand Down Expand Up @@ -359,6 +396,7 @@ func mainCopy(ctx *cli.Context) error {
session := newSessionV8()
session.Header.CommandType = "cp"
session.Header.CommandBoolFlags["recursive"] = ctx.Bool("recursive")
session.Header.CommandIntFlags["parallel"] = ctx.Int("parallel")

var e error
if session.Header.RootPath, e = os.Getwd(); e != nil {
Expand Down

0 comments on commit 5a2798d

Please sign in to comment.