Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mc rm to support entire namespace removal #2265

Merged
merged 1 commit into from
Dec 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 165 additions & 89 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,91 +659,101 @@ func (c *s3Client) removeIncompleteObjects(bucket string, objectsCh <-chan strin
return removeObjectErrorCh
}

// Remove - remove object or bucket.
// Remove - remove object or bucket(s).
func (c *s3Client) Remove(isIncomplete bool, contentCh <-chan *clientContent) <-chan *probe.Error {
bucket, _ := c.url2BucketAndObject()

errorCh := make(chan *probe.Error)
var bucketContent *clientContent

// Goroutine
// 1. calls removeIncompleteObjects() for incomplete uploads
// or minio-go.RemoveObjects().
// 2. executes another Goroutine to copy contentCh to objectsCh.
// 3. reads statusCh and copies to errorCh.
prevBucket := ""
// maintain cContentCh, objectsCh, statusCh for each bucket
var cContentCh chan *clientContent
var objectsCh chan string
var statusCh <-chan minio.RemoveObjectError

isRemoveBucket := false

go func() {
defer close(errorCh)

objectsCh := make(chan string)
var statusCh <-chan minio.RemoveObjectError
if isIncomplete {
statusCh = c.removeIncompleteObjects(bucket, objectsCh)
} else {
statusCh = c.api.RemoveObjects(bucket, objectsCh)
}

// doneCh to control below Goroutine.
doneCh := make(chan struct{})
defer close(doneCh)

// Goroutine reads contentCh and copies to objectsCh.
go func() {
defer close(objectsCh)

for {
// Read from contentCh or doneCh. If doneCh is read, exit the function.
var content *clientContent
var ok bool
select {
case content, ok = <-contentCh:
if !ok {
// Closed channel.
return
}
case <-doneCh:
return
for content := range contentCh {
// Convert content.URL.Path to objectName for objectsCh.
bucket, objectName := c.splitPath(content.URL.Path)
// Currently only supported hosts with virtual style
// are Amazon S3 and Google Cloud Storage.
// which also support objects with "/" as delimiter.
// Skip trimming "/" and let the server reply error
// if any.
if !c.virtualStyle {
objectName = strings.TrimSuffix(objectName, string(c.targetURL.Separator))
}
// Init cContentCh channel and objectsCh the first time.
if prevBucket == "" {
cContentCh = make(chan *clientContent)
objectsCh = make(chan string)
prevBucket = bucket
if isIncomplete {
statusCh = c.removeIncompleteObjects(bucket, objectsCh)
} else {
statusCh = c.api.RemoveObjects(bucket, objectsCh)
}
}

// Convert content.URL.Path to objectName for objectsCh.
_, objectName := c.splitPath(content.URL.Path)

// Currently only supported hosts with virtual style
// are Amazon S3 and Google Cloud Storage.
// which also support objects with "/" as delimiter.
// Skip trimming "/" and let the server reply error
// if any.
if !c.virtualStyle {
objectName = strings.TrimSuffix(objectName, string(c.targetURL.Separator))
if prevBucket != bucket {
// Close cContentCh when bucket changes. Remove bucket if
// it qualifies.
close(cContentCh)
if objectsCh != nil {
close(objectsCh)
}

// As object name is empty, we need to remove the bucket as well.
if objectName == "" {
bucketContent = content
continue
for removeStatus := range statusCh {
errorCh <- probe.NewError(removeStatus.Err)
}

// Write to objectsCh or read doneCh. If doneCh is read, exit the function.
select {
case objectsCh <- objectName:
case <-doneCh:
return
if isRemoveBucket && !isIncomplete {
if err := c.api.RemoveBucket(prevBucket); err != nil {
errorCh <- probe.NewError(err)
}
}
// re-init cContentCh and objectsCh for next bucket
isRemoveBucket = false
cContentCh = make(chan *clientContent)
objectsCh = make(chan string)
if isIncomplete {
statusCh = c.removeIncompleteObjects(bucket, objectsCh)
} else {
statusCh = c.api.RemoveObjects(bucket, objectsCh)
}
prevBucket = bucket
}
}()

// Read statusCh and write to errorCh.
for removeStatus := range statusCh {
errorCh <- probe.NewError(removeStatus.Err)
if objectName != "" {
objectsCh <- objectName
} else {
// end of bucket - close the objectsCh
isRemoveBucket = true
if objectsCh != nil {
close(objectsCh)
}
objectsCh = nil
}
}

// Remove bucket for regular objects.
if bucketContent != nil && !isIncomplete {
if err := c.api.RemoveBucket(bucket); err != nil {
// close cContentCh and objectsCh at end of contentCh
if cContentCh != nil {
close(cContentCh)
}
if objectsCh != nil {
close(objectsCh)
}
// write remove objects status to errorCh
if statusCh != nil {
for removeStatus := range statusCh {
errorCh <- probe.NewError(removeStatus.Err)
}
}
// Remove last bucket if it qualifies.
if isRemoveBucket && !isIncomplete {
if err := c.api.RemoveBucket(prevBucket); err != nil {
errorCh <- probe.NewError(err)
}
}
}()

return errorCh
}

Expand Down Expand Up @@ -1208,8 +1218,7 @@ func (c *s3Client) listIncompleteRecursiveInRoutine(contentCh chan *clientConten
}

// Convert objectMultipartInfo to clientContent
func (c *s3Client) objectMultipartInfo2ClientContent(entry minio.ObjectMultipartInfo) clientContent {
bucket, _ := c.url2BucketAndObject()
func (c *s3Client) objectMultipartInfo2ClientContent(bucket string, entry minio.ObjectMultipartInfo) clientContent {

content := clientContent{}
url := *c.targetURL
Expand Down Expand Up @@ -1251,7 +1260,7 @@ func (c *s3Client) listIncompleteRecursiveInRoutineDirOpt(contentCh chan *client
return true
}

content := c.objectMultipartInfo2ClientContent(entry)
content := c.objectMultipartInfo2ClientContent(bucket, entry)

// Handle if object.Key is a directory.
if strings.HasSuffix(entry.Key, string(c.targetURL.Separator)) {
Expand All @@ -1273,7 +1282,57 @@ func (c *s3Client) listIncompleteRecursiveInRoutineDirOpt(contentCh chan *client
}

bucket, object := c.url2BucketAndObject()
listDir(bucket, object)
var cContent *clientContent
var buckets []minio.BucketInfo
var allBuckets bool
// List all buckets if bucket and object are empty.
if bucket == "" && object == "" {
var err error
allBuckets = true
buckets, err = c.api.ListBuckets()
if err != nil {
contentCh <- &clientContent{URL: *c.targetURL, Err: probe.NewError(err)}
}
} else if object == "" {
// Get bucket stat if object is empty.
content := c.bucketStat()
cContent = &content
if content.Err != nil {
contentCh <- cContent
return
}
buckets = append(buckets, minio.BucketInfo{Name: bucket, CreationDate: content.Time})
} else if strings.HasSuffix(object, string(c.targetURL.Separator)) {
// Get stat of given object is a directory.
isIncomplete := true
content, perr := c.Stat(isIncomplete, false)
cContent = content
if perr != nil {
contentCh <- &clientContent{URL: *c.targetURL, Err: perr}
return
}
buckets = append(buckets, minio.BucketInfo{Name: bucket, CreationDate: content.Time})
}
for _, bucket := range buckets {
if allBuckets {
url := *c.targetURL
url.Path = c.joinPath(bucket.Name)
cContent = &clientContent{
URL: url,
Time: bucket.CreationDate,
Type: os.ModeDir,
}
}
if cContent != nil && dirOpt == DirFirst {
contentCh <- cContent
}
//Recursively push all object prefixes into contentCh to mimic directory listing
listDir(bucket.Name, object)

if cContent != nil && dirOpt == DirLast {
contentCh <- cContent
}
}
}

// Returns new path by joining path segments with URL path separator.
Expand All @@ -1295,8 +1354,7 @@ func (c *s3Client) joinPath(segments ...string) string {
}

// Convert objectInfo to clientContent
func (c *s3Client) objectInfo2ClientContent(entry minio.ObjectInfo) clientContent {
bucket, _ := c.url2BucketAndObject()
func (c *s3Client) objectInfo2ClientContent(bucket string, entry minio.ObjectInfo) clientContent {

content := clientContent{}
url := *c.targetURL
Expand Down Expand Up @@ -1337,7 +1395,6 @@ func (c *s3Client) bucketStat() clientContent {
// Recursively lists objects.
func (c *s3Client) listRecursiveInRoutineDirOpt(contentCh chan *clientContent, dirOpt DirOpt) {
defer close(contentCh)

// Closure function reads list objects and sends to contentCh. If a directory is found, it lists
// objects of the directory content recursively.
var listDir func(bucket, object string) bool
Expand All @@ -1353,11 +1410,10 @@ func (c *s3Client) listRecursiveInRoutineDirOpt(contentCh chan *clientContent, d
if errResponse.Code == "AccessDenied" {
continue
}

return true
}

content := c.objectInfo2ClientContent(entry)
content := c.objectInfo2ClientContent(bucket, entry)

// Handle if object.Key is a directory.
if content.Type.IsDir() {
Expand All @@ -1374,23 +1430,31 @@ func (c *s3Client) listRecursiveInRoutineDirOpt(contentCh chan *clientContent, d
contentCh <- &content
}
}

return false
}

bucket, object := c.url2BucketAndObject()

var cContent *clientContent

// Get bucket stat if object is empty.
if object == "" {
var buckets []minio.BucketInfo
var allBuckets bool
// List all buckets if bucket and object are empty.
if bucket == "" && object == "" {
var err error
allBuckets = true
buckets, err = c.api.ListBuckets()
if err != nil {
contentCh <- &clientContent{URL: *c.targetURL, Err: probe.NewError(err)}
}
} else if object == "" {
// Get bucket stat if object is empty.
content := c.bucketStat()
cContent = &content

if content.Err != nil {
contentCh <- cContent
return
}
buckets = append(buckets, minio.BucketInfo{Name: bucket, CreationDate: content.Time})
} else if strings.HasSuffix(object, string(c.targetURL.Separator)) {
// Get stat of given object is a directory.
isIncomplete := false
Expand All @@ -1401,16 +1465,28 @@ func (c *s3Client) listRecursiveInRoutineDirOpt(contentCh chan *clientContent, d
contentCh <- &clientContent{URL: *c.targetURL, Err: perr}
return
}
buckets = append(buckets, minio.BucketInfo{Name: bucket, CreationDate: content.Time})
}

if cContent != nil && dirOpt == DirFirst {
contentCh <- cContent
}

listDir(bucket, object)
for _, bucket := range buckets {
if allBuckets {
url := *c.targetURL
url.Path = c.joinPath(bucket.Name)
cContent = &clientContent{
URL: url,
Time: bucket.CreationDate,
Type: os.ModeDir,
}
}
if cContent != nil && dirOpt == DirFirst {
contentCh <- cContent
}
// Recurse thru prefixes to mimic directory listing and push into contentCh
listDir(bucket.Name, object)

if cContent != nil && dirOpt == DirLast {
contentCh <- cContent
if cContent != nil && dirOpt == DirLast {
contentCh <- cContent
}
}
}

Expand Down
Loading