Skip to content

Commit

Permalink
Add context in Put api call for cp/mirror. (#2291)
Browse files Browse the repository at this point in the history
  • Loading branch information
poornas authored and deekoder committed Oct 20, 2017
1 parent 59685e2 commit a86080a
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 36 deletions.
3 changes: 2 additions & 1 deletion cmd/client-fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cmd

import (
"context"
"io"
"os"
"path"
Expand Down Expand Up @@ -314,7 +315,7 @@ func (f *fsClient) put(reader io.Reader, size int64, metadata map[string][]strin
}

// Put - create a new file with metadata.
func (f *fsClient) Put(reader io.Reader, size int64, metadata map[string]string, progress io.Reader) (int64, *probe.Error) {
func (f *fsClient) Put(ctx context.Context, reader io.Reader, size int64, metadata map[string]string, progress io.Reader) (int64, *probe.Error) {
return f.put(reader, size, nil, progress)
}

Expand Down
19 changes: 10 additions & 9 deletions cmd/client-fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cmd

import (
"bytes"
"context"
"io"
"io/ioutil"
"os"
Expand All @@ -42,7 +43,7 @@ func (s *TestSuite) TestList(c *C) {

reader := bytes.NewReader([]byte(data))
var n int64
n, err = fsClient.Put(reader, int64(len(data)), map[string]string{
n, err = fsClient.Put(context.Background(), reader, int64(len(data)), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand All @@ -53,7 +54,7 @@ func (s *TestSuite) TestList(c *C) {
c.Assert(err, IsNil)

reader = bytes.NewReader([]byte(data))
n, err = fsClient.Put(reader, int64(len(data)), map[string]string{
n, err = fsClient.Put(context.Background(), reader, int64(len(data)), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -81,7 +82,7 @@ func (s *TestSuite) TestList(c *C) {
c.Assert(err, IsNil)

reader = bytes.NewReader([]byte(data))
n, err = fsClient.Put(reader, int64(len(data)), map[string]string{
n, err = fsClient.Put(context.Background(), reader, int64(len(data)), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -141,7 +142,7 @@ func (s *TestSuite) TestList(c *C) {
c.Assert(err, IsNil)

reader = bytes.NewReader([]byte(data))
n, err = fsClient.Put(reader, int64(len(data)), map[string]string{
n, err = fsClient.Put(context.Background(), reader, int64(len(data)), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -248,7 +249,7 @@ func (s *TestSuite) TestPut(c *C) {
data := "hello"
reader := bytes.NewReader([]byte(data))
var n int64
n, err = fsClient.Put(reader, int64(len(data)), map[string]string{
n, err = fsClient.Put(context.Background(), reader, int64(len(data)), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand All @@ -268,7 +269,7 @@ func (s *TestSuite) TestGet(c *C) {
data := "hello"
var reader io.Reader
reader = bytes.NewReader([]byte(data))
n, err := fsClient.Put(reader, int64(len(data)), map[string]string{
n, err := fsClient.Put(context.Background(), reader, int64(len(data)), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -296,7 +297,7 @@ func (s *TestSuite) TestGetRange(c *C) {
data := "hello world"
var reader io.Reader
reader = bytes.NewReader([]byte(data))
n, err := fsClient.Put(reader, int64(len(data)), map[string]string{
n, err := fsClient.Put(context.Background(), reader, int64(len(data)), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -327,7 +328,7 @@ func (s *TestSuite) TestStatObject(c *C) {
data := "hello"
dataLen := len(data)
reader := bytes.NewReader([]byte(data))
n, err := fsClient.Put(reader, int64(dataLen), map[string]string{
n, err := fsClient.Put(context.Background(), reader, int64(dataLen), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand All @@ -353,7 +354,7 @@ func (s *TestSuite) TestCopy(c *C) {
data := "hello world"
var reader io.Reader
reader = bytes.NewReader([]byte(data))
n, err := fsClientSource.Put(reader, int64(len(data)), map[string]string{
n, err := fsClientSource.Put(context.Background(), reader, int64(len(data)), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand Down
5 changes: 3 additions & 2 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cmd

import (
"context"
"crypto/tls"
"hash/fnv"
"io"
Expand Down Expand Up @@ -570,7 +571,7 @@ func (c *s3Client) Copy(source string, size int64, progress io.Reader) *probe.Er
}

// Put - upload an object with custom metadata.
func (c *s3Client) Put(reader io.Reader, size int64, metadata map[string]string, progress io.Reader) (int64, *probe.Error) {
func (c *s3Client) Put(ctx context.Context, reader io.Reader, size int64, metadata map[string]string, progress io.Reader) (int64, *probe.Error) {
bucket, object := c.url2BucketAndObject()
contentType, ok := metadata["Content-Type"]
if ok {
Expand All @@ -589,7 +590,7 @@ func (c *s3Client) Put(reader io.Reader, size int64, metadata map[string]string,
NumThreads: numParallelThreads,
ContentType: contentType,
}
n, e := c.api.PutObject(bucket, object, reader, size, opts)
n, e := c.api.PutObjectWithContext(ctx, bucket, object, reader, size, opts)
if e != nil {
errResponse := minio.ToErrorResponse(e)
if errResponse.Code == "UnexpectedEOF" || e == io.EOF {
Expand Down
3 changes: 2 additions & 1 deletion cmd/client-s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
// bucketHandler is an http.Handler that verifies bucket responses and validates incoming requests
import (
"bytes"
"context"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -219,7 +220,7 @@ func (s *TestSuite) TestObjectOperations(c *C) {

var reader io.Reader
reader = bytes.NewReader(object.data)
n, err := s3c.Put(reader, int64(len(object.data)), map[string]string{
n, err := s3c.Put(context.Background(), reader, int64(len(object.data)), map[string]string{
"Content-Type": "application/octet-stream",
}, nil)
c.Assert(err, IsNil)
Expand Down
3 changes: 2 additions & 1 deletion cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cmd

import (
"context"
"io"
"os"
"time"
Expand Down Expand Up @@ -58,7 +59,7 @@ type Client interface {

// I/O operations with metadata.
Get() (reader io.Reader, err *probe.Error)
Put(reader io.Reader, size int64, metadata map[string]string, progress io.Reader) (n int64, err *probe.Error)
Put(ctx context.Context, reader io.Reader, size int64, metadata map[string]string, progress io.Reader) (n int64, err *probe.Error)

// I/O operations with expiration
ShareDownload(expires time.Duration) (string, *probe.Error)
Expand Down
11 changes: 6 additions & 5 deletions cmd/common-methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cmd

import (
"context"
"io"
"path/filepath"
"regexp"
Expand Down Expand Up @@ -100,12 +101,12 @@ func getSourceStream(alias string, urlStr string, fetchStat bool) (reader io.Rea
}

// putTargetStream writes to URL from Reader.
func putTargetStream(alias string, urlStr string, reader io.Reader, size int64, metadata map[string]string, progress io.Reader) (int64, *probe.Error) {
func putTargetStream(ctx context.Context, alias string, urlStr string, reader io.Reader, size int64, metadata map[string]string, progress io.Reader) (int64, *probe.Error) {
targetClnt, err := newClientFromAlias(alias, urlStr)
if err != nil {
return 0, err.Trace(alias, urlStr)
}
n, err := targetClnt.Put(reader, size, metadata, progress)
n, err := targetClnt.Put(ctx, reader, size, metadata, progress)
if err != nil {
return n, err.Trace(alias, urlStr)
}
Expand All @@ -122,7 +123,7 @@ func putTargetStreamWithURL(urlStr string, reader io.Reader, size int64) (int64,
metadata := map[string]string{
"Content-Type": contentType,
}
return putTargetStream(alias, urlStrFull, reader, size, metadata, nil)
return putTargetStream(context.Background(), alias, urlStrFull, reader, size, metadata, nil)
}

// copySourceToTargetURL copies to targetURL from source.
Expand All @@ -141,7 +142,7 @@ func copySourceToTargetURL(alias string, urlStr string, source string, size int6
// uploadSourceToTargetURL - uploads to targetURL from source.
// optionally optimizes copy for object sizes <= 5GiB by using
// server side copy operation.
func uploadSourceToTargetURL(urls URLs, progress io.Reader) URLs {
func uploadSourceToTargetURL(ctx context.Context, urls URLs, progress io.Reader) URLs {
sourceAlias := urls.SourceAlias
sourceURL := urls.SourceContent.URL
targetAlias := urls.TargetAlias
Expand All @@ -161,7 +162,7 @@ func uploadSourceToTargetURL(urls URLs, progress io.Reader) URLs {
if err != nil {
return urls.WithError(err.Trace(sourceURL.String()))
}
_, err = putTargetStream(targetAlias, targetURL.String(), reader, length, metadata, progress)
_, err = putTargetStream(ctx, targetAlias, targetURL.String(), reader, length, metadata, progress)
if err != nil {
return urls.WithError(err.Trace(targetURL.String()))
}
Expand Down
15 changes: 10 additions & 5 deletions cmd/cp-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cmd

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (c copyStatMessage) String() string {
}

// doCopy - Copy a singe file from source to destination
func doCopy(cpURLs URLs, progressReader *progressBar, accountingReader *accounter) URLs {
func doCopy(ctx context.Context, cpURLs URLs, progressReader *progressBar, accountingReader *accounter) URLs {
if cpURLs.Error != nil {
cpURLs.Error = cpURLs.Error.Trace()
return cpURLs
Expand Down Expand Up @@ -173,7 +174,7 @@ func doCopy(cpURLs URLs, progressReader *progressBar, accountingReader *accounte
// Set up progress reader.
progress = progressReader.ProgressBar
}
return uploadSourceToTargetURL(cpURLs, progress)
return uploadSourceToTargetURL(ctx, cpURLs, progress)
}

// doCopyFake - Perform a fake copy to update the progress bar appropriately.
Expand All @@ -185,7 +186,7 @@ func doCopyFake(cpURLs URLs, progressReader *progressBar) URLs {
}

// doPrepareCopyURLs scans the source URL and prepares a list of objects for copying.
func doPrepareCopyURLs(session *sessionV8, trapCh <-chan bool) {
func doPrepareCopyURLs(session *sessionV8, trapCh <-chan bool, cancelCopy context.CancelFunc) {
// Separate source and target. 'cp' can take only one target,
// but any number of sources.
sourceURLs := session.Header.CommandArgs[:len(session.Header.CommandArgs)-1]
Expand Down Expand Up @@ -240,6 +241,7 @@ func doPrepareCopyURLs(session *sessionV8, trapCh <-chan bool) {
totalBytes += cpURLs.SourceContent.Size
totalObjects++
case <-trapCh:
cancelCopy()
// Print in new line and adjust to top so that we don't print over the ongoing scan bar
if !globalQuiet && !globalJSON {
console.Eraseline()
Expand All @@ -256,8 +258,10 @@ func doPrepareCopyURLs(session *sessionV8, trapCh <-chan bool) {
func doCopySession(session *sessionV8) error {
trapCh := signalTrap(os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)

ctx, cancelCopy := context.WithCancel(context.Background())
defer cancelCopy()
if !session.HasData() {
doPrepareCopyURLs(session, trapCh)
doPrepareCopyURLs(session, trapCh, cancelCopy)
}

// Enable accounting reader by default.
Expand Down Expand Up @@ -295,7 +299,7 @@ func doCopySession(session *sessionV8) error {
if !ok {
return
}
statusCh <- doCopy(cpURLs, progressReader, accntReader)
statusCh <- doCopy(ctx, cpURLs, progressReader, accntReader)
waitGroup.Done()
}
}()
Expand Down Expand Up @@ -338,6 +342,7 @@ loop:
for {
select {
case <-trapCh:
cancelCopy()
// Receive interrupt notification.
if !globalQuiet && !globalJSON {
console.Eraseline()
Expand Down
27 changes: 15 additions & 12 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cmd

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -201,7 +202,7 @@ func (mj *mirrorJob) doRemove(sURLs URLs) URLs {
}

// doMirror - Mirror an object to multiple destination. URLs status contains a copy of sURLs and error if any.
func (mj *mirrorJob) doMirror(sURLs URLs) URLs {
func (mj *mirrorJob) doMirror(ctx context.Context, cancelMirror context.CancelFunc, sURLs URLs) URLs {

if sURLs.Error != nil { // Erroneous sURLs passed.
return sURLs.WithError(sURLs.Error.Trace())
Expand Down Expand Up @@ -231,7 +232,7 @@ func (mj *mirrorJob) doMirror(sURLs URLs) URLs {
TotalCount: sURLs.TotalCount,
TotalSize: sURLs.TotalSize,
})
return uploadSourceToTargetURL(sURLs, mj.status)
return uploadSourceToTargetURL(ctx, sURLs, mj.status)
}

// Go routine to update progress status
Expand Down Expand Up @@ -278,7 +279,7 @@ func (mj *mirrorJob) stopStatus() {
}

// this goroutine will watch for notifications, and add modified objects to the queue
func (mj *mirrorJob) watchMirror() {
func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.CancelFunc) {

for {
select {
Expand Down Expand Up @@ -355,7 +356,7 @@ func (mj *mirrorJob) watchMirror() {
mirrorURL.TotalSize = mj.TotalBytes
// adjust total, because we want to show progress of the item still queued to be copied.
mj.status.SetTotal(mj.status.Total() + sourceContent.Size).Update()
mj.statusCh <- mj.doMirror(mirrorURL)
mj.statusCh <- mj.doMirror(ctx, cancelMirror, mirrorURL)
}
continue
}
Expand All @@ -379,7 +380,7 @@ func (mj *mirrorJob) watchMirror() {
mirrorURL.TotalSize = mj.TotalBytes
// adjust total, because we want to show progress of the itemj stiil queued to be copied.
mj.status.SetTotal(mj.status.Total() + event.Size).Update()
mj.statusCh <- mj.doMirror(mirrorURL)
mj.statusCh <- mj.doMirror(ctx, cancelMirror, mirrorURL)
}
} else if event.Type == EventRemove {
mirrorURL := URLs{
Expand Down Expand Up @@ -422,7 +423,7 @@ func (mj *mirrorJob) watchURL(sourceClient Client) *probe.Error {
}

// Fetch urls that need to be mirrored
func (mj *mirrorJob) startMirror() {
func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.CancelFunc) {
var totalBytes int64
var totalObjects int64

Expand Down Expand Up @@ -458,18 +459,19 @@ func (mj *mirrorJob) startMirror() {
sURLs.TotalSize = mj.TotalBytes

if sURLs.SourceContent != nil {
mj.statusCh <- mj.doMirror(sURLs)
mj.statusCh <- mj.doMirror(ctx, cancelMirror, sURLs)
} else if sURLs.TargetContent != nil && mj.isRemove && mj.isForce {
mj.statusCh <- mj.doRemove(sURLs)
}
case <-mj.trapCh:
cancelMirror()
os.Exit(0)
}
}
}

// when using a struct for copying, we could save a lot of passing of variables
func (mj *mirrorJob) mirror() {
func (mj *mirrorJob) mirror(ctx context.Context, cancelMirror context.CancelFunc) {
if globalQuiet || globalJSON {
} else {
// Enable progress bar reader only during default mode
Expand All @@ -481,11 +483,11 @@ func (mj *mirrorJob) mirror() {

// Starts additional watcher thread for watching for new events.
if mj.isWatch {
go mj.watchMirror()
go mj.watchMirror(ctx, cancelMirror)
}

// Start mirroring.
mj.startMirror()
mj.startMirror(ctx, cancelMirror)

if mj.isWatch {
<-mj.trapCh
Expand Down Expand Up @@ -631,9 +633,10 @@ func runMirror(srcURL, dstURL string, ctx *cli.Context) *probe.Error {
mj.status.fatalIf(err, fmt.Sprintf("Failed to start monitoring."))
}
}

ctxt, cancelMirror := context.WithCancel(context.Background())
defer cancelMirror()
// Start mirroring job
mj.mirror()
mj.mirror(ctxt, cancelMirror)

// Check for errors during mirroring or watching to return
if mj.mirrorErr != nil {
Expand Down

0 comments on commit a86080a

Please sign in to comment.