Skip to content

Commit

Permalink
complete for messageing with tests
Browse files Browse the repository at this point in the history
wip

wip

without tests

test

fix

done

fix

fix

fix

fix

comment remove

refactor

change names

change var names

FIXME comments

cleared output handling

green color

errors output

change method names

each live message methods

todo comments
  • Loading branch information
go-to-k committed Jan 28, 2025
1 parent 8e68bd9 commit b8e4d41
Show file tree
Hide file tree
Showing 9 changed files with 767 additions and 105 deletions.
126 changes: 117 additions & 9 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"context"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/go-to-k/cls3/internal/io"
"github.com/go-to-k/cls3/internal/wrapper"
"github.com/go-to-k/cls3/pkg/client"
"github.com/gosuri/uilive"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -162,32 +166,139 @@ func (a *App) getAction() func(c *cli.Context) error {
a.targetBuckets = append(a.targetBuckets, outputBuckets...)
}

concurrencyNumber := a.determineConcurrencyNumber()
// TODO: Refactor to separate the display and the deletion process.
writer := uilive.New()
writer.Start()
defer writer.Stop()

clearingLines := make([]string, len(a.targetBuckets))
var clearingLinesMutex sync.Mutex
var clearingCountsMutex sync.Mutex

clearingCountChannels := make(map[string]chan int64, len(a.targetBuckets))
clearedCompletedChannels := make(map[string]chan bool, len(a.targetBuckets))
clearingCounts := make(map[string]*atomic.Int64, len(a.targetBuckets))

for _, bucket := range a.targetBuckets {
if err := s3Wrapper.OutputCheckingMessage(bucket); err != nil {
return err
}
clearingCountChannels[bucket] = make(chan int64)
clearedCompletedChannels[bucket] = make(chan bool)
clearingCounts[bucket] = &atomic.Int64{}
}

var displayEg errgroup.Group
if !a.QuietMode {
for i, bucket := range a.targetBuckets {
i, bucket := i, bucket

// Necessary to first display all bucket rows together
clearingLinesMutex.Lock()
message, err := s3Wrapper.GetLiveClearingMessage(bucket, 0)
if err != nil {
return err
}
clearingLines[i] = message
clearingLinesMutex.Unlock()

displayEg.Go(func() error {
clearingCountsMutex.Lock()
ch := clearingCountChannels[bucket]
clearedCompletedCh := clearedCompletedChannels[bucket]
counter := clearingCounts[bucket]
clearingCountsMutex.Unlock()

for count := range ch {
counter.Store(count)
clearingLinesMutex.Lock()
message, err := s3Wrapper.GetLiveClearingMessage(bucket, count)
if err != nil {
return err
}
clearingLines[i] = message
fmt.Fprintln(writer, strings.Join(clearingLines, "\n"))
clearingLinesMutex.Unlock()
}

count := counter.Load()
clearingLinesMutex.Lock()
isCompleted := <-clearedCompletedCh
message, err := s3Wrapper.GetLiveClearedMessage(bucket, count, isCompleted)
if err != nil {
return err
}
clearingLines[i] = message
fmt.Fprintln(writer, strings.Join(clearingLines, "\n"))
clearingLinesMutex.Unlock()
return nil
})
}
}

concurrencyNumber := a.determineConcurrencyNumber()
sem := semaphore.NewWeighted(int64(concurrencyNumber))
eg := errgroup.Group{}
// FIXME: handle messages
clearEg := errgroup.Group{}

for _, bucket := range a.targetBuckets {
bucket := bucket
if err := sem.Acquire(c.Context, 1); err != nil {
return err
}

eg.Go(func() error {
clearEg.Go(func() error {
defer sem.Release(1)
return s3Wrapper.ClearBucket(c.Context, wrapper.ClearBucketInput{
clearingCountsMutex.Lock()
clearingCountCh := clearingCountChannels[bucket]
clearedCompletedCh := clearedCompletedChannels[bucket]
clearingCountsMutex.Unlock()

err := s3Wrapper.ClearBucket(c.Context, wrapper.ClearBucketInput{
TargetBucket: bucket,
ForceMode: a.ForceMode,
OldVersionsOnly: a.OldVersionsOnly,
QuietMode: a.QuietMode,
ClearingCountCh: clearingCountCh,
})
if err != nil {
close(clearingCountCh)
clearedCompletedCh <- false
close(clearedCompletedCh)
return err
}

close(clearingCountCh)
clearedCompletedCh <- true
close(clearedCompletedCh)

return nil
})
}

if err := eg.Wait(); err != nil {
if err := clearEg.Wait(); err != nil {
return err
}

if err := displayEg.Wait(); err != nil {
return err
}
if !a.QuietMode {
if err := writer.Flush(); err != nil {
return err
}

for _, bucket := range a.targetBuckets {
if err := s3Wrapper.OutputClearedMessage(bucket, clearingCounts[bucket].Load()); err != nil {
return err
}
if a.ForceMode {
if err := s3Wrapper.OutputDeletedMessage(bucket); err != nil {
return err
}
}
}
}

return nil
}
}
Expand Down Expand Up @@ -284,9 +395,6 @@ func (a *App) determineConcurrencyNumber() int {
return 1
}

// No real-time deletion counts
a.QuietMode = true

// Cases where ConcurrencyNumber is unspecified.
if a.ConcurrencyNumber == UnspecifiedConcurrencyNumber {
return len(a.targetBuckets)
Expand Down
35 changes: 7 additions & 28 deletions internal/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,73 +380,52 @@ func Test_validateOptions(t *testing.T) {

func Test_determineConcurrencyNumber(t *testing.T) {
tests := []struct {
name string
app *App
expectedNumber int
expectedQuietMode bool
name string
app *App
expectedNumber int
}{
{
name: "return 1 when concurrent mode is off",
app: &App{
ConcurrentMode: false,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
targetBuckets: []string{"bucket1", "bucket2"},
QuietMode: false,
},
expectedNumber: 1,
expectedQuietMode: false,
expectedNumber: 1,
},
{
name: "return number of target buckets when concurrency number is not specified",
app: &App{
ConcurrentMode: true,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
targetBuckets: []string{"bucket1", "bucket2", "bucket3"},
QuietMode: false,
},
expectedNumber: 3,
expectedQuietMode: true,
expectedNumber: 3,
},
{
name: "return specified concurrency number when set",
app: &App{
ConcurrentMode: true,
ConcurrencyNumber: 2,
targetBuckets: []string{"bucket1", "bucket2", "bucket3"},
QuietMode: false,
},
expectedNumber: 2,
expectedQuietMode: true,
expectedNumber: 2,
},
{
name: "return 1 when concurrent mode is off regardless of concurrency number",
app: &App{
ConcurrentMode: false,
ConcurrencyNumber: 2,
targetBuckets: []string{"bucket1", "bucket2"},
QuietMode: false,
},
expectedNumber: 1,
expectedQuietMode: false,
},
{
name: "keep quiet mode true when it was already true",
app: &App{
ConcurrentMode: true,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
targetBuckets: []string{"bucket1", "bucket2"},
QuietMode: true,
},
expectedNumber: 2,
expectedQuietMode: true,
expectedNumber: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.app.determineConcurrencyNumber()
assert.Equal(t, tt.expectedNumber, result)
assert.Equal(t, tt.expectedQuietMode, tt.app.QuietMode)
})
}
}
Loading

0 comments on commit b8e4d41

Please sign in to comment.