diff --git a/go.mod b/go.mod index 3c2d72a..f63c063 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( github.com/aws/smithy-go v1.22.1 github.com/charmbracelet/bubbletea v1.1.1 github.com/fatih/color v1.18.0 - github.com/gosuri/uilive v0.0.4 github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.8.0 github.com/urfave/cli/v2 v2.27.4 diff --git a/go.sum b/go.sum index 1b03d20..0ff69ac 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,6 @@ github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97 github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY= -github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/internal/app/app.go b/internal/app/app.go index 194b63d..f25e8a9 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -191,11 +191,7 @@ func (a *App) initBucketProcessor() error { ForceMode: a.ForceMode, OldVersionsOnly: a.OldVersionsOnly, } - processor, err := NewBucketProcessor(processorConfig, a.s3Wrapper) - if err != nil { - return err - } - a.bucketProcessor = processor + a.bucketProcessor = NewBucketProcessor(processorConfig, a.s3Wrapper) } return nil } diff --git a/internal/app/bucket_processor.go b/internal/app/bucket_processor.go index aae2e7e..52af2a7 100644 --- a/internal/app/bucket_processor.go +++ b/internal/app/bucket_processor.go @@ -4,6 +4,7 @@ package app import ( "context" + "github.com/go-to-k/cls3/internal/io" "github.com/go-to-k/cls3/internal/wrapper" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -37,11 +38,8 @@ type BucketProcessor struct { func NewBucketProcessor( config BucketProcessorConfig, s3Wrapper wrapper.IWrapper, -) (*BucketProcessor, error) { - state, err := NewClearingState(config.TargetBuckets, s3Wrapper, config.ForceMode) - if err != nil { - return nil, err - } +) *BucketProcessor { + state := NewClearingState(config.TargetBuckets, s3Wrapper, config.ForceMode) display := NewDisplayManager(state, config.QuietMode) @@ -50,25 +48,48 @@ func NewBucketProcessor( s3Wrapper: s3Wrapper, state: state, display: display, - }, nil + } } // Process executes the bucket processing workflow func (p *BucketProcessor) Process(ctx context.Context) error { - if err := p.display.Start(p.config.TargetBuckets); err != nil { - return err + concurrencyNumber := p.determineConcurrencyNumber() + io.Logger.Info().Msgf("Number of buckets: %v", len(p.config.TargetBuckets)) + io.Logger.Info().Msgf("Concurrency number: %v", concurrencyNumber) + + for _, bucket := range p.config.TargetBuckets { + if err := p.s3Wrapper.OutputCheckingMessage(bucket); err != nil { + return err + } } - if err := p.clearBuckets(ctx); err != nil { + p.display.Start(p.config.TargetBuckets) + + if err := p.clearBuckets(ctx, concurrencyNumber); err != nil { return err } return p.display.Finish(p.config.TargetBuckets) } +// determineConcurrencyNumber calculates the appropriate concurrency number +func (p *BucketProcessor) determineConcurrencyNumber() int { + // Series when ConcurrentMode is off. + if !p.config.ConcurrentMode { + return 1 + } + + // Cases where ConcurrencyNumber is unspecified. + if p.config.ConcurrencyNumber == UnspecifiedConcurrencyNumber { + return len(p.config.TargetBuckets) + } + + // Cases where ConcurrencyNumber is specified. + return p.config.ConcurrencyNumber +} + // clearBuckets processes all buckets with the specified concurrency -func (p *BucketProcessor) clearBuckets(ctx context.Context) error { - concurrencyNumber := p.determineConcurrencyNumber() +func (p *BucketProcessor) clearBuckets(ctx context.Context, concurrencyNumber int) error { sem := semaphore.NewWeighted(int64(concurrencyNumber)) clearEg := errgroup.Group{} @@ -87,22 +108,6 @@ func (p *BucketProcessor) clearBuckets(ctx context.Context) error { return clearEg.Wait() } -// determineConcurrencyNumber calculates the appropriate concurrency number -func (p *BucketProcessor) determineConcurrencyNumber() int { - // Series when ConcurrentMode is off. - if !p.config.ConcurrentMode { - return 1 - } - - // Cases where ConcurrencyNumber is unspecified. - if p.config.ConcurrencyNumber == UnspecifiedConcurrencyNumber { - return len(p.config.TargetBuckets) - } - - // Cases where ConcurrencyNumber is specified. - return p.config.ConcurrencyNumber -} - // clearSingleBucket processes a single bucket func (p *BucketProcessor) clearSingleBucket(ctx context.Context, bucket string) error { clearingCountCh, clearingCompletedCh := p.state.GetChannelsForBucket(bucket) diff --git a/internal/app/bucket_processor_test.go b/internal/app/bucket_processor_test.go index e86dbb6..5b6bf06 100644 --- a/internal/app/bucket_processor_test.go +++ b/internal/app/bucket_processor_test.go @@ -81,11 +81,18 @@ func TestBucketProcessor_Process(t *testing.T) { { name: "successfully process buckets", prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState, md *MockIDisplayManager) { - md.EXPECT().Start([]string{"bucket1"}).Return(nil) - md.EXPECT().Finish([]string{"bucket1"}).Return(nil) - countCh := make(chan int64) - completedCh := make(chan bool) - mc.EXPECT().GetChannelsForBucket("bucket1").Return(countCh, completedCh) + m.EXPECT().OutputCheckingMessage("bucket1").Return(nil) + m.EXPECT().OutputCheckingMessage("bucket2").Return(nil) + md.EXPECT().Start([]string{"bucket1", "bucket2"}) + md.EXPECT().Finish([]string{"bucket1", "bucket2"}).Return(nil) + + countCh1 := make(chan int64) + completedCh1 := make(chan bool) + countCh2 := make(chan int64) + completedCh2 := make(chan bool) + + mc.EXPECT().GetChannelsForBucket("bucket1").Return(countCh1, completedCh1) + mc.EXPECT().GetChannelsForBucket("bucket2").Return(countCh2, completedCh2) m.EXPECT().ClearBucket( gomock.Any(), wrapper.ClearBucketInput{ @@ -93,16 +100,30 @@ func TestBucketProcessor_Process(t *testing.T) { ForceMode: false, OldVersionsOnly: false, QuietMode: false, - ClearingCountCh: countCh, + ClearingCountCh: countCh1, + }, + ).Return(nil) + m.EXPECT().ClearBucket( + gomock.Any(), + wrapper.ClearBucketInput{ + TargetBucket: "bucket2", + ForceMode: false, + OldVersionsOnly: false, + QuietMode: false, + ClearingCountCh: countCh2, }, ).Return(nil) go func() { - completed := <-completedCh + completed := <-completedCh1 + assert.True(t, completed, "value from completedCh should be true") + }() + go func() { + completed := <-completedCh2 assert.True(t, completed, "value from completedCh should be true") }() }, config: BucketProcessorConfig{ - TargetBuckets: []string{"bucket1"}, + TargetBuckets: []string{"bucket1", "bucket2"}, QuietMode: false, ConcurrentMode: false, ConcurrencyNumber: UnspecifiedConcurrencyNumber, @@ -112,9 +133,9 @@ func TestBucketProcessor_Process(t *testing.T) { wantErr: false, }, { - name: "error when display start fails", + name: "error when output checking message fails", prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState, md *MockIDisplayManager) { - md.EXPECT().Start([]string{"bucket1"}).Return(fmt.Errorf("StartError")) + m.EXPECT().OutputCheckingMessage("bucket1").Return(fmt.Errorf("OutputCheckingMessageError")) }, config: BucketProcessorConfig{ TargetBuckets: []string{"bucket1"}, @@ -123,12 +144,13 @@ func TestBucketProcessor_Process(t *testing.T) { ConcurrencyNumber: UnspecifiedConcurrencyNumber, }, wantErr: true, - expectedErr: "StartError", + expectedErr: "OutputCheckingMessageError", }, { name: "error when clear bucket fails", prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState, md *MockIDisplayManager) { - md.EXPECT().Start([]string{"bucket1"}).Return(nil) + m.EXPECT().OutputCheckingMessage("bucket1").Return(nil) + md.EXPECT().Start([]string{"bucket1"}) countCh := make(chan int64) completedCh := make(chan bool) mc.EXPECT().GetChannelsForBucket("bucket1").Return(countCh, completedCh) @@ -159,7 +181,8 @@ func TestBucketProcessor_Process(t *testing.T) { { name: "error when display finish fails", prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState, md *MockIDisplayManager) { - md.EXPECT().Start([]string{"bucket1"}).Return(nil) + m.EXPECT().OutputCheckingMessage("bucket1").Return(nil) + md.EXPECT().Start([]string{"bucket1"}) countCh := make(chan int64) completedCh := make(chan bool) mc.EXPECT().GetChannelsForBucket("bucket1").Return(countCh, completedCh) @@ -221,11 +244,12 @@ func TestBucketProcessor_Process(t *testing.T) { func TestBucketProcessor_clearBuckets(t *testing.T) { tests := []struct { - name string - prepareMockFn func(m *wrapper.MockIWrapper, mc *MockIClearingState) - config BucketProcessorConfig - wantErr bool - expectedErr string + name string + prepareMockFn func(m *wrapper.MockIWrapper, mc *MockIClearingState) + config BucketProcessorConfig + concurrencyNumber int + wantErr bool + expectedErr string }{ { name: "successfully clear single bucket", @@ -254,7 +278,40 @@ func TestBucketProcessor_clearBuckets(t *testing.T) { ConcurrentMode: false, ConcurrencyNumber: UnspecifiedConcurrencyNumber, }, - wantErr: false, + concurrencyNumber: 1, + wantErr: false, + }, + { + name: "successfully clear multiple buckets", + prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState) { + for _, bucket := range []string{"bucket1", "bucket2"} { + countCh := make(chan int64) + completedCh := make(chan bool) + mc.EXPECT().GetChannelsForBucket(bucket).Return(countCh, completedCh) + m.EXPECT().ClearBucket( + gomock.Any(), + wrapper.ClearBucketInput{ + TargetBucket: bucket, + ForceMode: false, + OldVersionsOnly: false, + QuietMode: false, + ClearingCountCh: countCh, + }, + ).Return(nil) + go func() { + completed := <-completedCh + assert.True(t, completed, "value from completedCh should be true") + }() + } + }, + config: BucketProcessorConfig{ + TargetBuckets: []string{"bucket1", "bucket2"}, + QuietMode: false, + ConcurrentMode: true, + ConcurrencyNumber: UnspecifiedConcurrencyNumber, + }, + concurrencyNumber: 1, + wantErr: false, }, { name: "successfully clear multiple buckets concurrently", @@ -285,7 +342,40 @@ func TestBucketProcessor_clearBuckets(t *testing.T) { ConcurrentMode: true, ConcurrencyNumber: UnspecifiedConcurrencyNumber, }, - wantErr: false, + concurrencyNumber: 2, + wantErr: false, + }, + { + name: "successfully clear many buckets concurrently", + prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState) { + for _, bucket := range []string{"bucket1", "bucket2", "bucket3", "bucket4", "bucket5"} { + countCh := make(chan int64) + completedCh := make(chan bool) + mc.EXPECT().GetChannelsForBucket(bucket).Return(countCh, completedCh) + m.EXPECT().ClearBucket( + gomock.Any(), + wrapper.ClearBucketInput{ + TargetBucket: bucket, + ForceMode: false, + OldVersionsOnly: false, + QuietMode: false, + ClearingCountCh: countCh, + }, + ).Return(nil) + go func() { + completed := <-completedCh + assert.True(t, completed, "value from completedCh should be true") + }() + } + }, + config: BucketProcessorConfig{ + TargetBuckets: []string{"bucket1", "bucket2", "bucket3", "bucket4", "bucket5"}, + QuietMode: false, + ConcurrentMode: true, + ConcurrencyNumber: UnspecifiedConcurrencyNumber, + }, + concurrencyNumber: 2, + wantErr: false, }, { name: "successfully clear single bucket with quiet mode", @@ -310,7 +400,8 @@ func TestBucketProcessor_clearBuckets(t *testing.T) { ConcurrentMode: false, ConcurrencyNumber: UnspecifiedConcurrencyNumber, }, - wantErr: false, + concurrencyNumber: 1, + wantErr: false, }, { name: "error when clear bucket fails", @@ -339,8 +430,9 @@ func TestBucketProcessor_clearBuckets(t *testing.T) { ConcurrentMode: false, ConcurrencyNumber: UnspecifiedConcurrencyNumber, }, - wantErr: true, - expectedErr: "ClearBucketError", + concurrencyNumber: 1, + wantErr: true, + expectedErr: "ClearBucketError", }, } @@ -357,7 +449,7 @@ func TestBucketProcessor_clearBuckets(t *testing.T) { state: mockClearingState, } - err := processor.clearBuckets(context.Background()) + err := processor.clearBuckets(context.Background(), tt.concurrencyNumber) if (err != nil) != tt.wantErr { t.Errorf("error = %v, wantErr %v", err, tt.wantErr) return diff --git a/internal/app/clearing_state.go b/internal/app/clearing_state.go index 3ceb5fa..262b3fe 100644 --- a/internal/app/clearing_state.go +++ b/internal/app/clearing_state.go @@ -13,7 +13,7 @@ import ( ) type IClearingState interface { - StartDisplayRoutines(targetBuckets []string, writer *io.Writer) (*errgroup.Group, error) + StartDisplayRoutines(targetBuckets []string, writer *io.Writer) *errgroup.Group OutputFinalMessages(targetBuckets []string) error GetChannelsForBucket(bucket string) (chan int64, chan bool) } @@ -33,7 +33,7 @@ type ClearingState struct { } // NewClearingState initializes a new ClearingState instance -func NewClearingState(targetBuckets []string, s3Wrapper wrapper.IWrapper, forceMode bool) (*ClearingState, error) { +func NewClearingState(targetBuckets []string, s3Wrapper wrapper.IWrapper, forceMode bool) *ClearingState { state := &ClearingState{ lines: make([]string, len(targetBuckets)), countChannels: make(map[string]chan int64, len(targetBuckets)), @@ -44,25 +44,18 @@ func NewClearingState(targetBuckets []string, s3Wrapper wrapper.IWrapper, forceM } for _, bucket := range targetBuckets { - if err := s3Wrapper.OutputCheckingMessage(bucket); err != nil { - return nil, err - } state.countChannels[bucket] = make(chan int64) state.completedChannels[bucket] = make(chan bool) state.counts[bucket] = &atomic.Int64{} } - return state, nil + return state } // StartDisplayRoutines initializes and starts the display monitoring routines -func (s *ClearingState) StartDisplayRoutines(targetBuckets []string, writer *io.Writer) (*errgroup.Group, error) { +func (s *ClearingState) StartDisplayRoutines(targetBuckets []string, writer *io.Writer) *errgroup.Group { displayEg := &errgroup.Group{} - if err := s.prepareInitialDisplay(targetBuckets); err != nil { - return nil, err - } - for i, bucket := range targetBuckets { i, bucket := i, bucket displayEg.Go(func() error { @@ -70,22 +63,7 @@ func (s *ClearingState) StartDisplayRoutines(targetBuckets []string, writer *io. }) } - return displayEg, nil -} - -// prepareInitialDisplay prepares the initial display lines for each bucket -func (s *ClearingState) prepareInitialDisplay(targetBuckets []string) error { - for i, bucket := range targetBuckets { - // Necessary to first display all bucket rows together - message, err := s.s3Wrapper.GetLiveClearingMessage(bucket, 0) - if err != nil { - return err - } - s.linesMutex.Lock() - s.lines[i] = message - s.linesMutex.Unlock() - } - return nil + return displayEg } // monitorBucketProgress monitors the progress of a single bucket clearing operation diff --git a/internal/app/clearing_state_test.go b/internal/app/clearing_state_test.go index b323ead..5fb0b82 100644 --- a/internal/app/clearing_state_test.go +++ b/internal/app/clearing_state_test.go @@ -5,8 +5,8 @@ import ( "sync/atomic" "testing" + "github.com/go-to-k/cls3/internal/io" "github.com/go-to-k/cls3/internal/wrapper" - "github.com/gosuri/uilive" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" ) @@ -14,31 +14,13 @@ import ( func TestClearingState_NewClearingState(t *testing.T) { tests := []struct { name string - prepareMockFn func(m *wrapper.MockIWrapper) targetBuckets []string forceMode bool - wantErr bool - expectedErr string }{ { - name: "successfully create clearing state", - prepareMockFn: func(m *wrapper.MockIWrapper) { - m.EXPECT().OutputCheckingMessage("bucket1").Return(nil) - m.EXPECT().OutputCheckingMessage("bucket2").Return(nil) - }, + name: "create clearing state", targetBuckets: []string{"bucket1", "bucket2"}, forceMode: false, - wantErr: false, - }, - { - name: "error when output checking message fails", - prepareMockFn: func(m *wrapper.MockIWrapper) { - m.EXPECT().OutputCheckingMessage("bucket1").Return(fmt.Errorf("OutputCheckingMessageError")) - }, - targetBuckets: []string{"bucket1"}, - forceMode: false, - wantErr: true, - expectedErr: "OutputCheckingMessageError", }, } @@ -46,19 +28,10 @@ func TestClearingState_NewClearingState(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) mockWrapper := wrapper.NewMockIWrapper(ctrl) - tt.prepareMockFn(mockWrapper) - - state, err := NewClearingState(tt.targetBuckets, mockWrapper, tt.forceMode) - if (err != nil) != tt.wantErr { - t.Errorf("error = %v, wantErr %v", err, tt.wantErr) - return - } - if tt.wantErr { - assert.EqualError(t, err, tt.expectedErr) - return - } + state := NewClearingState(tt.targetBuckets, mockWrapper, tt.forceMode) assert.NotNil(t, state) + assert.Equal(t, len(tt.targetBuckets), len(state.lines)) assert.Equal(t, len(tt.targetBuckets), len(state.countChannels)) assert.Equal(t, len(tt.targetBuckets), len(state.completedChannels)) @@ -73,28 +46,26 @@ func TestClearingState_StartDisplayRoutines(t *testing.T) { name string prepareMockFn func(m *wrapper.MockIWrapper) targetBuckets []string - wantErr bool + wantEgErr bool expectedErr string }{ { name: "successfully start display routines", prepareMockFn: func(m *wrapper.MockIWrapper) { - m.EXPECT().GetLiveClearingMessage("bucket1", int64(0)).Return("Clearing bucket1", nil) - m.EXPECT().GetLiveClearingMessage("bucket2", int64(0)).Return("Clearing bucket2", nil) m.EXPECT().GetLiveClearedMessage("bucket1", int64(0), true).Return("Cleared bucket1", nil) m.EXPECT().GetLiveClearedMessage("bucket2", int64(0), true).Return("Cleared bucket2", nil) }, targetBuckets: []string{"bucket1", "bucket2"}, - wantErr: false, + wantEgErr: false, }, { - name: "error when get live clearing message fails", + name: "error when get live cleared message fails", prepareMockFn: func(m *wrapper.MockIWrapper) { - m.EXPECT().GetLiveClearingMessage("bucket1", int64(0)).Return("", fmt.Errorf("GetLiveClearingMessageError")) + m.EXPECT().GetLiveClearedMessage("bucket1", int64(0), true).Return("", fmt.Errorf("GetLiveClearedMessageError")) }, targetBuckets: []string{"bucket1"}, - wantErr: true, - expectedErr: "GetLiveClearingMessageError", + wantEgErr: true, + expectedErr: "GetLiveClearedMessageError", }, } @@ -119,16 +90,8 @@ func TestClearingState_StartDisplayRoutines(t *testing.T) { state.counts[bucket] = &atomic.Int64{} } - writer := uilive.New() - eg, err := state.StartDisplayRoutines(tt.targetBuckets, writer) - if (err != nil) != tt.wantErr { - t.Errorf("error = %v, wantErr %v", err, tt.wantErr) - return - } - if tt.wantErr { - assert.EqualError(t, err, tt.expectedErr) - return - } + writer := io.NewWriter() + eg := state.StartDisplayRoutines(tt.targetBuckets, writer) assert.NotNil(t, eg) @@ -137,8 +100,11 @@ func TestClearingState_StartDisplayRoutines(t *testing.T) { state.completedChannels[bucket] <- true close(state.completedChannels[bucket]) } - if err := eg.Wait(); err != nil { - t.Errorf("error waiting for display routines: %v", err) + + err := eg.Wait() + assert.Equal(t, tt.wantEgErr, err != nil) + if tt.wantEgErr { + assert.EqualError(t, err, tt.expectedErr) } }) } diff --git a/internal/app/display_manager.go b/internal/app/display_manager.go index 763efc5..afe9ccf 100644 --- a/internal/app/display_manager.go +++ b/internal/app/display_manager.go @@ -7,7 +7,7 @@ import ( ) type IDisplayManager interface { - Start(targetBuckets []string) error + Start(targetBuckets []string) Finish(targetBuckets []string) error } @@ -30,21 +30,14 @@ func NewDisplayManager(state IClearingState, quietMode bool) *DisplayManager { } // Start initializes and starts the display operations -func (d *DisplayManager) Start(targetBuckets []string) error { +func (d *DisplayManager) Start(targetBuckets []string) { if d.quietMode { - return nil + return } - d.writer = io.New() + d.writer = io.NewWriter() d.writer.Start() - var err error - d.displayEg, err = d.state.StartDisplayRoutines(targetBuckets, d.writer) - if err != nil { - d.writer.Stop() - return err - } - - return nil + d.displayEg = d.state.StartDisplayRoutines(targetBuckets, d.writer) } // Finish waits for display operations to complete and performs cleanup diff --git a/internal/app/display_manager_test.go b/internal/app/display_manager_test.go index 1e41856..c1510de 100644 --- a/internal/app/display_manager_test.go +++ b/internal/app/display_manager_test.go @@ -4,7 +4,7 @@ import ( "fmt" "testing" - "github.com/gosuri/uilive" + "github.com/go-to-k/cls3/internal/io" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" "golang.org/x/sync/errgroup" @@ -16,8 +16,6 @@ func TestDisplayManager_Start(t *testing.T) { prepareMockFn func(m *MockIClearingState) quietMode bool targetBuckets []string - wantErr bool - expectedErr string }{ { name: "successfully start display manager", @@ -26,11 +24,10 @@ func TestDisplayManager_Start(t *testing.T) { m.EXPECT().StartDisplayRoutines( []string{"bucket1"}, gomock.Any(), - ).Return(eg, nil) + ).Return(eg) }, quietMode: false, targetBuckets: []string{"bucket1"}, - wantErr: false, }, { name: "skip display in quiet mode", @@ -39,20 +36,6 @@ func TestDisplayManager_Start(t *testing.T) { }, quietMode: true, targetBuckets: []string{"bucket1"}, - wantErr: false, - }, - { - name: "error when start display routines fails", - prepareMockFn: func(m *MockIClearingState) { - m.EXPECT().StartDisplayRoutines( - []string{"bucket1"}, - gomock.Any(), - ).Return(nil, fmt.Errorf("StartDisplayRoutinesError")) - }, - quietMode: false, - targetBuckets: []string{"bucket1"}, - wantErr: true, - expectedErr: "StartDisplayRoutinesError", }, } @@ -65,15 +48,7 @@ func TestDisplayManager_Start(t *testing.T) { manager := NewDisplayManager(mockClearingState, tt.quietMode) - err := manager.Start(tt.targetBuckets) - if (err != nil) != tt.wantErr { - t.Errorf("error = %v, wantErr %v", err, tt.wantErr) - return - } - if tt.wantErr { - assert.EqualError(t, err, tt.expectedErr) - return - } + manager.Start(tt.targetBuckets) if !tt.quietMode { assert.NotNil(t, manager.writer) @@ -103,7 +78,7 @@ func TestDisplayManager_Finish(t *testing.T) { m.EXPECT().StartDisplayRoutines( []string{"bucket1"}, gomock.Any(), - ).Return(eg, nil) + ).Return(eg) m.EXPECT().OutputFinalMessages([]string{"bucket1"}).Return(nil) }, quietMode: false, @@ -126,7 +101,7 @@ func TestDisplayManager_Finish(t *testing.T) { m.EXPECT().StartDisplayRoutines( []string{"bucket1"}, gomock.Any(), - ).Return(eg, nil) + ).Return(eg) eg.Go(func() error { return fmt.Errorf("errgroup wait error") }) @@ -143,7 +118,7 @@ func TestDisplayManager_Finish(t *testing.T) { m.EXPECT().StartDisplayRoutines( []string{"bucket1"}, gomock.Any(), - ).Return(eg, nil) + ).Return(eg) m.EXPECT().OutputFinalMessages([]string{"bucket1"}).Return(fmt.Errorf("OutputFinalMessagesError")) }, quietMode: false, @@ -163,13 +138,9 @@ func TestDisplayManager_Finish(t *testing.T) { manager := NewDisplayManager(mockClearingState, tt.quietMode) if !tt.quietMode { - manager.writer = uilive.New() + manager.writer = io.NewWriter() manager.writer.Start() - var err error - manager.displayEg, err = mockClearingState.StartDisplayRoutines(tt.targetBuckets, manager.writer) - if err != nil { - t.Fatal(err) - } + manager.displayEg = mockClearingState.StartDisplayRoutines(tt.targetBuckets, manager.writer) } err := manager.Finish(tt.targetBuckets) diff --git a/internal/app/mock_clearing_state.go b/internal/app/mock_clearing_state.go index 557014e..a2d7f9f 100644 --- a/internal/app/mock_clearing_state.go +++ b/internal/app/mock_clearing_state.go @@ -9,7 +9,7 @@ package app import ( reflect "reflect" - uilive "github.com/gosuri/uilive" + io "github.com/go-to-k/cls3/internal/io" gomock "go.uber.org/mock/gomock" errgroup "golang.org/x/sync/errgroup" ) @@ -67,12 +67,11 @@ func (mr *MockIClearingStateMockRecorder) OutputFinalMessages(targetBuckets any) } // StartDisplayRoutines mocks base method. -func (m *MockIClearingState) StartDisplayRoutines(targetBuckets []string, writer *uilive.Writer) (*errgroup.Group, error) { +func (m *MockIClearingState) StartDisplayRoutines(targetBuckets []string, writer *io.Writer) *errgroup.Group { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StartDisplayRoutines", targetBuckets, writer) ret0, _ := ret[0].(*errgroup.Group) - ret1, _ := ret[1].(error) - return ret0, ret1 + return ret0 } // StartDisplayRoutines indicates an expected call of StartDisplayRoutines. diff --git a/internal/app/mock_display_manager.go b/internal/app/mock_display_manager.go index 0b57442..9f8a085 100644 --- a/internal/app/mock_display_manager.go +++ b/internal/app/mock_display_manager.go @@ -50,11 +50,9 @@ func (mr *MockIDisplayManagerMockRecorder) Finish(targetBuckets any) *gomock.Cal } // Start mocks base method. -func (m *MockIDisplayManager) Start(targetBuckets []string) error { +func (m *MockIDisplayManager) Start(targetBuckets []string) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Start", targetBuckets) - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "Start", targetBuckets) } // Start indicates an expected call of Start. diff --git a/internal/io/writer.go b/internal/io/writer.go index f62e87a..fe5c673 100644 --- a/internal/io/writer.go +++ b/internal/io/writer.go @@ -14,7 +14,7 @@ type Writer struct { lineCount int } -func New() *Writer { +func NewWriter() *Writer { return &Writer{ out: os.Stdout, } @@ -45,7 +45,7 @@ func (w *Writer) clearLines() { func (w *Writer) Flush() error { w.mtx.Lock() defer w.mtx.Unlock() - _, err := fmt.Fprintln(w.out) + _, err := fmt.Fprint(w.out) return err } diff --git a/internal/wrapper/s3_wrapper.go b/internal/wrapper/s3_wrapper.go index d3b6624..8291851 100644 --- a/internal/wrapper/s3_wrapper.go +++ b/internal/wrapper/s3_wrapper.go @@ -60,6 +60,11 @@ func (s *S3Wrapper) ClearBucket( func (s *S3Wrapper) clearObjects(ctx context.Context, input ClearBucketInput, bucketRegion string) error { state := &objectDeletionState{} + if !input.QuietMode { + // NOTE: Send 0 to the channel to indicate that the clearing has started. + input.ClearingCountCh <- 0 + } + // NOTE: Try clearing objects up to 2 times to handle eventual consistency // There was a case where the object deletion was completed but the object was still there. // So, even if all objects are deleted, it is not guaranteed that the object is deleted.