Skip to content

Commit

Permalink
Add mc rm support to recursively remove site (#2265)
Browse files Browse the repository at this point in the history
This requires --dangerous flag in addition to --recursive and --force

Change namespace identification check to be more robust to user
input errors like mc rm --r --force play//
  • Loading branch information
poornas authored and harshavardhana committed Dec 2, 2017
1 parent 65c582c commit 1e7b4c9
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 103 deletions.
254 changes: 165 additions & 89 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,91 +659,101 @@ func (c *s3Client) removeIncompleteObjects(bucket string, objectsCh <-chan strin
return removeObjectErrorCh
}

// Remove - remove object or bucket.
// Remove - remove object or bucket(s).
func (c *s3Client) Remove(isIncomplete bool, contentCh <-chan *clientContent) <-chan *probe.Error {
bucket, _ := c.url2BucketAndObject()

errorCh := make(chan *probe.Error)
var bucketContent *clientContent

// Goroutine
// 1. calls removeIncompleteObjects() for incomplete uploads
// or minio-go.RemoveObjects().
// 2. executes another Goroutine to copy contentCh to objectsCh.
// 3. reads statusCh and copies to errorCh.
prevBucket := ""
// maintain cContentCh, objectsCh, statusCh for each bucket
var cContentCh chan *clientContent
var objectsCh chan string
var statusCh <-chan minio.RemoveObjectError

isRemoveBucket := false

go func() {
defer close(errorCh)

objectsCh := make(chan string)
var statusCh <-chan minio.RemoveObjectError
if isIncomplete {
statusCh = c.removeIncompleteObjects(bucket, objectsCh)
} else {
statusCh = c.api.RemoveObjects(bucket, objectsCh)
}

// doneCh to control below Goroutine.
doneCh := make(chan struct{})
defer close(doneCh)

// Goroutine reads contentCh and copies to objectsCh.
go func() {
defer close(objectsCh)

for {
// Read from contentCh or doneCh. If doneCh is read, exit the function.
var content *clientContent
var ok bool
select {
case content, ok = <-contentCh:
if !ok {
// Closed channel.
return
}
case <-doneCh:
return
for content := range contentCh {
// Convert content.URL.Path to objectName for objectsCh.
bucket, objectName := c.splitPath(content.URL.Path)
// Currently only supported hosts with virtual style
// are Amazon S3 and Google Cloud Storage.
// which also support objects with "/" as delimiter.
// Skip trimming "/" and let the server reply error
// if any.
if !c.virtualStyle {
objectName = strings.TrimSuffix(objectName, string(c.targetURL.Separator))
}
// Init cContentCh channel and objectsCh the first time.
if prevBucket == "" {
cContentCh = make(chan *clientContent)
objectsCh = make(chan string)
prevBucket = bucket
if isIncomplete {
statusCh = c.removeIncompleteObjects(bucket, objectsCh)
} else {
statusCh = c.api.RemoveObjects(bucket, objectsCh)
}
}

// Convert content.URL.Path to objectName for objectsCh.
_, objectName := c.splitPath(content.URL.Path)

// Currently only supported hosts with virtual style
// are Amazon S3 and Google Cloud Storage.
// which also support objects with "/" as delimiter.
// Skip trimming "/" and let the server reply error
// if any.
if !c.virtualStyle {
objectName = strings.TrimSuffix(objectName, string(c.targetURL.Separator))
if prevBucket != bucket {
// Close cContentCh when bucket changes. Remove bucket if
// it qualifies.
close(cContentCh)
if objectsCh != nil {
close(objectsCh)
}

// As object name is empty, we need to remove the bucket as well.
if objectName == "" {
bucketContent = content
continue
for removeStatus := range statusCh {
errorCh <- probe.NewError(removeStatus.Err)
}

// Write to objectsCh or read doneCh. If doneCh is read, exit the function.
select {
case objectsCh <- objectName:
case <-doneCh:
return
if isRemoveBucket && !isIncomplete {
if err := c.api.RemoveBucket(prevBucket); err != nil {
errorCh <- probe.NewError(err)
}
}
// re-init cContentCh and objectsCh for next bucket
isRemoveBucket = false
cContentCh = make(chan *clientContent)
objectsCh = make(chan string)
if isIncomplete {
statusCh = c.removeIncompleteObjects(bucket, objectsCh)
} else {
statusCh = c.api.RemoveObjects(bucket, objectsCh)
}
prevBucket = bucket
}
}()

// Read statusCh and write to errorCh.
for removeStatus := range statusCh {
errorCh <- probe.NewError(removeStatus.Err)
if objectName != "" {
objectsCh <- objectName
} else {
// end of bucket - close the objectsCh
isRemoveBucket = true
if objectsCh != nil {
close(objectsCh)
}
objectsCh = nil
}
}

// Remove bucket for regular objects.
if bucketContent != nil && !isIncomplete {
if err := c.api.RemoveBucket(bucket); err != nil {
// close cContentCh and objectsCh at end of contentCh
if cContentCh != nil {
close(cContentCh)
}
if objectsCh != nil {
close(objectsCh)
}
// write remove objects status to errorCh
if statusCh != nil {
for removeStatus := range statusCh {
errorCh <- probe.NewError(removeStatus.Err)
}
}
// Remove last bucket if it qualifies.
if isRemoveBucket && !isIncomplete {
if err := c.api.RemoveBucket(prevBucket); err != nil {
errorCh <- probe.NewError(err)
}
}
}()

return errorCh
}

Expand Down Expand Up @@ -1208,8 +1218,7 @@ func (c *s3Client) listIncompleteRecursiveInRoutine(contentCh chan *clientConten
}

// Convert objectMultipartInfo to clientContent
func (c *s3Client) objectMultipartInfo2ClientContent(entry minio.ObjectMultipartInfo) clientContent {
bucket, _ := c.url2BucketAndObject()
func (c *s3Client) objectMultipartInfo2ClientContent(bucket string, entry minio.ObjectMultipartInfo) clientContent {

content := clientContent{}
url := *c.targetURL
Expand Down Expand Up @@ -1251,7 +1260,7 @@ func (c *s3Client) listIncompleteRecursiveInRoutineDirOpt(contentCh chan *client
return true
}

content := c.objectMultipartInfo2ClientContent(entry)
content := c.objectMultipartInfo2ClientContent(bucket, entry)

// Handle if object.Key is a directory.
if strings.HasSuffix(entry.Key, string(c.targetURL.Separator)) {
Expand All @@ -1273,7 +1282,57 @@ func (c *s3Client) listIncompleteRecursiveInRoutineDirOpt(contentCh chan *client
}

bucket, object := c.url2BucketAndObject()
listDir(bucket, object)
var cContent *clientContent
var buckets []minio.BucketInfo
var allBuckets bool
// List all buckets if bucket and object are empty.
if bucket == "" && object == "" {
var err error
allBuckets = true
buckets, err = c.api.ListBuckets()
if err != nil {
contentCh <- &clientContent{URL: *c.targetURL, Err: probe.NewError(err)}
}
} else if object == "" {
// Get bucket stat if object is empty.
content := c.bucketStat()
cContent = &content
if content.Err != nil {
contentCh <- cContent
return
}
buckets = append(buckets, minio.BucketInfo{Name: bucket, CreationDate: content.Time})
} else if strings.HasSuffix(object, string(c.targetURL.Separator)) {
// Get stat of given object is a directory.
isIncomplete := true
content, perr := c.Stat(isIncomplete, false)
cContent = content
if perr != nil {
contentCh <- &clientContent{URL: *c.targetURL, Err: perr}
return
}
buckets = append(buckets, minio.BucketInfo{Name: bucket, CreationDate: content.Time})
}
for _, bucket := range buckets {
if allBuckets {
url := *c.targetURL
url.Path = c.joinPath(bucket.Name)
cContent = &clientContent{
URL: url,
Time: bucket.CreationDate,
Type: os.ModeDir,
}
}
if cContent != nil && dirOpt == DirFirst {
contentCh <- cContent
}
//Recursively push all object prefixes into contentCh to mimic directory listing
listDir(bucket.Name, object)

if cContent != nil && dirOpt == DirLast {
contentCh <- cContent
}
}
}

// Returns new path by joining path segments with URL path separator.
Expand All @@ -1295,8 +1354,7 @@ func (c *s3Client) joinPath(segments ...string) string {
}

// Convert objectInfo to clientContent
func (c *s3Client) objectInfo2ClientContent(entry minio.ObjectInfo) clientContent {
bucket, _ := c.url2BucketAndObject()
func (c *s3Client) objectInfo2ClientContent(bucket string, entry minio.ObjectInfo) clientContent {

content := clientContent{}
url := *c.targetURL
Expand Down Expand Up @@ -1337,7 +1395,6 @@ func (c *s3Client) bucketStat() clientContent {
// Recursively lists objects.
func (c *s3Client) listRecursiveInRoutineDirOpt(contentCh chan *clientContent, dirOpt DirOpt) {
defer close(contentCh)

// Closure function reads list objects and sends to contentCh. If a directory is found, it lists
// objects of the directory content recursively.
var listDir func(bucket, object string) bool
Expand All @@ -1353,11 +1410,10 @@ func (c *s3Client) listRecursiveInRoutineDirOpt(contentCh chan *clientContent, d
if errResponse.Code == "AccessDenied" {
continue
}

return true
}

content := c.objectInfo2ClientContent(entry)
content := c.objectInfo2ClientContent(bucket, entry)

// Handle if object.Key is a directory.
if content.Type.IsDir() {
Expand All @@ -1374,23 +1430,31 @@ func (c *s3Client) listRecursiveInRoutineDirOpt(contentCh chan *clientContent, d
contentCh <- &content
}
}

return false
}

bucket, object := c.url2BucketAndObject()

var cContent *clientContent

// Get bucket stat if object is empty.
if object == "" {
var buckets []minio.BucketInfo
var allBuckets bool
// List all buckets if bucket and object are empty.
if bucket == "" && object == "" {
var err error
allBuckets = true
buckets, err = c.api.ListBuckets()
if err != nil {
contentCh <- &clientContent{URL: *c.targetURL, Err: probe.NewError(err)}
}
} else if object == "" {
// Get bucket stat if object is empty.
content := c.bucketStat()
cContent = &content

if content.Err != nil {
contentCh <- cContent
return
}
buckets = append(buckets, minio.BucketInfo{Name: bucket, CreationDate: content.Time})
} else if strings.HasSuffix(object, string(c.targetURL.Separator)) {
// Get stat of given object is a directory.
isIncomplete := false
Expand All @@ -1401,16 +1465,28 @@ func (c *s3Client) listRecursiveInRoutineDirOpt(contentCh chan *clientContent, d
contentCh <- &clientContent{URL: *c.targetURL, Err: perr}
return
}
buckets = append(buckets, minio.BucketInfo{Name: bucket, CreationDate: content.Time})
}

if cContent != nil && dirOpt == DirFirst {
contentCh <- cContent
}

listDir(bucket, object)
for _, bucket := range buckets {
if allBuckets {
url := *c.targetURL
url.Path = c.joinPath(bucket.Name)
cContent = &clientContent{
URL: url,
Time: bucket.CreationDate,
Type: os.ModeDir,
}
}
if cContent != nil && dirOpt == DirFirst {
contentCh <- cContent
}
// Recurse thru prefixes to mimic directory listing and push into contentCh
listDir(bucket.Name, object)

if cContent != nil && dirOpt == DirLast {
contentCh <- cContent
if cContent != nil && dirOpt == DirLast {
contentCh <- cContent
}
}
}

Expand Down
Loading

0 comments on commit 1e7b4c9

Please sign in to comment.