Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
eyalbe4 committed Dec 15, 2023
2 parents 9b44d66 + bd68f60 commit 4c57380
Show file tree
Hide file tree
Showing 16 changed files with 368 additions and 64 deletions.
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/fileserror.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (e *errorsRetryPhase) handleErrorsFile(errFilePath string, pcWrapper *produ
// Since we're about to handle the transfer retry of the failed files,
// we should now decrement the failures counter view.
e.progressBar.changeNumberOfFailuresBy(-1 * len(failedFiles.Errors))
err = e.stateManager.ChangeTransferFailureCountBy(uint(len(failedFiles.Errors)), false)
err = e.stateManager.ChangeTransferFailureCountBy(uint64(len(failedFiles.Errors)), false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/state/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type TransferRunStatus struct {
WorkingThreads int `json:"working_threads,omitempty"`
VisitedFolders uint64 `json:"visited_folders,omitempty"`
DelayedFiles uint64 `json:"delayed_files,omitempty"`
TransferFailures uint `json:"transfer_failures,omitempty"`
TransferFailures uint64 `json:"transfer_failures,omitempty"`
TimeEstimationManager `json:"time_estimation,omitempty"`
StaleChunks []StaleChunks `json:"stale_chunks,omitempty"`
}
Expand Down
45 changes: 20 additions & 25 deletions artifactory/commands/transferfiles/state/statemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,53 +136,56 @@ func (ts *TransferStateManager) SetRepoFullTransferCompleted() error {
// Increasing Transferred Diff files (modified files) and SizeByBytes value in suitable repository progress state
func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase1(chunkTotalFiles, chunkTotalSizeInBytes int64) error {
err := ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase1Info.TransferredSizeBytes += chunkTotalSizeInBytes
state.CurrentRepo.Phase1Info.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&state.CurrentRepo.Phase1Info.TransferredSizeBytes, chunkTotalSizeInBytes)
atomicallyAddInt64(&state.CurrentRepo.Phase1Info.TransferredUnits, chunkTotalFiles)
return nil
})
if err != nil {
return err
}
return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error {
transferRunStatus.OverallTransfer.TransferredSizeBytes += chunkTotalSizeInBytes
transferRunStatus.OverallTransfer.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&transferRunStatus.OverallTransfer.TransferredSizeBytes, chunkTotalSizeInBytes)
atomicallyAddInt64(&transferRunStatus.OverallTransfer.TransferredUnits, chunkTotalFiles)

if transferRunStatus.BuildInfoRepo {
transferRunStatus.OverallBiFiles.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&transferRunStatus.OverallBiFiles.TransferredUnits, chunkTotalFiles)
}
return nil
})
}

func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase2(chunkTotalFiles, chunkTotalSizeInBytes int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase2Info.TransferredSizeBytes += chunkTotalSizeInBytes
state.CurrentRepo.Phase2Info.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&state.CurrentRepo.Phase2Info.TransferredSizeBytes, chunkTotalSizeInBytes)
atomicallyAddInt64(&state.CurrentRepo.Phase2Info.TransferredUnits, chunkTotalFiles)
return nil
})
}

func (ts *TransferStateManager) IncTotalSizeAndFilesPhase2(filesNumber, totalSize int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase2Info.TotalSizeBytes += totalSize
state.CurrentRepo.Phase2Info.TotalUnits += filesNumber
atomicallyAddInt64(&state.CurrentRepo.Phase2Info.TotalSizeBytes, totalSize)
atomicallyAddInt64(&state.CurrentRepo.Phase2Info.TotalUnits, filesNumber)
return nil
})
}

// Set relevant information of files and storage we need to transfer in phase3
func (ts *TransferStateManager) SetTotalSizeAndFilesPhase3(filesNumber, totalSize int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase3Info.TotalSizeBytes = totalSize
state.CurrentRepo.Phase3Info.TotalUnits = filesNumber
state.CurrentRepo.Phase3Info.TransferredUnits = 0
state.CurrentRepo.Phase3Info.TransferredSizeBytes = 0
atomicallyAddInt64(&state.CurrentRepo.Phase3Info.TotalSizeBytes, totalSize)
atomicallyAddInt64(&state.CurrentRepo.Phase3Info.TotalUnits, filesNumber)
return nil
})
}

// Increase transferred storage and files in phase 3
func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase3(chunkTotalFiles, chunkTotalSizeInBytes int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase3Info.TransferredSizeBytes += chunkTotalSizeInBytes
state.CurrentRepo.Phase3Info.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&state.CurrentRepo.Phase3Info.TransferredSizeBytes, chunkTotalSizeInBytes)
atomicallyAddInt64(&state.CurrentRepo.Phase3Info.TransferredUnits, chunkTotalFiles)
return nil
})
}
Expand Down Expand Up @@ -288,29 +291,21 @@ func (ts *TransferStateManager) GetDiffHandlingRange() (start, end time.Time, er

func (ts *TransferStateManager) IncVisitedFolders() error {
return ts.action(func(transferRunStatus *TransferRunStatus) error {
transferRunStatus.VisitedFolders++
atomicallyAddUint64(&transferRunStatus.VisitedFolders, 1, true)
return nil
})
}

func (ts *TransferStateManager) ChangeDelayedFilesCountBy(count uint64, increase bool) error {
return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error {
if increase {
transferRunStatus.DelayedFiles += count
} else {
transferRunStatus.DelayedFiles -= count
}
atomicallyAddUint64(&transferRunStatus.DelayedFiles, count, increase)
return nil
})
}

func (ts *TransferStateManager) ChangeTransferFailureCountBy(count uint, increase bool) error {
func (ts *TransferStateManager) ChangeTransferFailureCountBy(count uint64, increase bool) error {
return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error {
if increase {
transferRunStatus.TransferFailures += count
} else {
transferRunStatus.TransferFailures -= count
}
atomicallyAddUint64(&transferRunStatus.TransferFailures, count, increase)
return nil
})
}
Expand Down
53 changes: 51 additions & 2 deletions artifactory/commands/transferfiles/state/statemanager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package state

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -181,11 +182,11 @@ func TestChangeTransferFailureCountBy(t *testing.T) {
// Increase failures count
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(2, true))
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(4, true))
assert.Equal(t, uint(6), stateManager.TransferFailures)
assert.Equal(t, uint64(6), stateManager.TransferFailures)

// Decrease failures count
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(3, false))
assert.Equal(t, uint(3), stateManager.TransferFailures)
assert.Equal(t, uint64(3), stateManager.TransferFailures)
}

func assertReposTransferredSize(t *testing.T, stateManager *TransferStateManager, expectedSize int64, repoKeys ...string) {
Expand Down Expand Up @@ -310,3 +311,51 @@ func TestGetRunningTimeString(t *testing.T) {
})
}
}

func TestStateConcurrency(t *testing.T) {
stateManager, cleanUp := InitStateTest(t)
defer cleanUp()

// Concurrently increment variables in the state
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase1(1, 1))
assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase2(1, 1))
assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase3(1, 1))
assert.NoError(t, stateManager.IncVisitedFolders())
assert.NoError(t, stateManager.ChangeDelayedFilesCountBy(1, true))
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(1, true))
}()
}
wg.Wait()

// Assert 1000 in all values
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase1Info.TransferredSizeBytes))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase1Info.TransferredUnits))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase2Info.TransferredSizeBytes))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase2Info.TransferredUnits))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase3Info.TransferredSizeBytes))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase3Info.TransferredUnits))
assert.Equal(t, 1000, int(stateManager.OverallTransfer.TransferredSizeBytes))
assert.Equal(t, 1000, int(stateManager.VisitedFolders))
assert.Equal(t, 1000, int(stateManager.DelayedFiles))
assert.Equal(t, 1000, int(stateManager.TransferFailures))

// Concurrently decrement delayed artifacts and transfer failures
for i := 0; i < 500; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, stateManager.ChangeDelayedFilesCountBy(1, false))
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(1, false))
}()
}
wg.Wait()

// Assert 500 in delayed artifacts and transfer failures
assert.Equal(t, 500, int(stateManager.DelayedFiles))
assert.Equal(t, 500, int(stateManager.TransferFailures))
}
20 changes: 20 additions & 0 deletions artifactory/commands/transferfiles/state/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/jfrog/build-info-go/utils"
Expand Down Expand Up @@ -121,3 +122,22 @@ func GetOldTransferDirectoryStructureError() error {
}
return errorutils.CheckErrorf(oldTransferDirectoryStructureErrorFormat, transferDir)
}

// Atomically add to an int64 variable.
// addr - Pointer to int64 variable
// delta - The change to do
func atomicallyAddInt64(addr *int64, delta int64) {
atomic.AddInt64(addr, delta)
}

// Atomically add to an uint64 variable.
// addr - Pointer to uint64 variable
// delta - The change to do
// increase - True to increment, false to decrement
func atomicallyAddUint64(addr *uint64, delta uint64, increase bool) {
if increase {
atomic.AddUint64(addr, delta)
} else {
atomic.AddUint64(addr, ^(delta - 1))
}
}
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func addOverallStatus(stateManager *state.TransferStateManager, output *strings.
addString(output, "🧵", "Working threads", strconv.Itoa(stateManager.WorkingThreads), 2)
addString(output, "⚡", "Transfer speed", stateManager.GetSpeedString(), 2)
addString(output, "⌛", "Estimated time remaining", stateManager.GetEstimatedRemainingTimeString(), 1)
failureTxt := strconv.FormatUint(uint64(stateManager.TransferFailures), 10)
failureTxt := strconv.FormatUint(stateManager.TransferFailures, 10)
if stateManager.TransferFailures > 0 {
failureTxt += " (" + progressbar.RetryFailureContentNote + ")"
}
Expand Down
17 changes: 16 additions & 1 deletion artifactory/commands/transferfiles/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (tdc *TransferFilesCommand) initStateManager(allSourceLocalRepos, sourceBui
if e != nil {
return e
}
tdc.stateManager.TransferFailures = uint(numberInitialErrors)
tdc.stateManager.TransferFailures = uint64(numberInitialErrors)

numberInitialDelays, e := getDelayedFilesCount(allSourceLocalRepos)
if e != nil {
Expand Down Expand Up @@ -516,6 +516,10 @@ func (tdc *TransferFilesCommand) handleStop(srcUpService *srcUserPluginService)
// The stopSignal channel is closed
return
}
// Before interrupting the process, do a thread dump
if err := doThreadDump(); err != nil {
log.Error(err)
}
tdc.cancelFunc()
if newPhase != nil {
newPhase.StopGracefully()
Expand Down Expand Up @@ -809,3 +813,14 @@ func parseErrorsFromLogFiles(logPaths []string) (allErrors FilesErrors, err erro
func assertSupportedTransferDirStructure() error {
return state.VerifyTransferRunStatusVersion()
}

func doThreadDump() error {
log.Info("Starting thread dumping...")
threadDump, err := coreutils.NewProfiler().ThreadDump()
if err != nil {
return err
}
log.Info(threadDump)
log.Info("Thread dump ended successfully")
return nil
}
2 changes: 1 addition & 1 deletion artifactory/utils/weblogin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func DoWebLogin(serverDetails *config.ServerDetails) (token auth.CommonTokenPara
"Make sure the details you entered are correct and that Artifactory meets the version requirement."))
return
}
log.Info("After logging in via your web browser, please enter the code if prompted: "+uuidStr[len(uuidStr)-4:])
log.Info("After logging in via your web browser, please enter the code if prompted: " + uuidStr[len(uuidStr)-4:])
if err = browser.OpenURL(clientUtils.AddTrailingSlashIfNeeded(serverDetails.Url) + "ui/login?jfClientSession=" + uuidStr + "&jfClientName=JFrog-CLI&jfClientCode=1"); err != nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ require (
github.com/gocarina/gocsv v0.0.0-20230616125104-99d496ca653d
github.com/google/uuid v1.3.1
github.com/gookit/color v1.5.4
github.com/jfrog/build-info-go v1.9.16
github.com/jfrog/build-info-go v1.9.17
github.com/jfrog/gofrog v1.3.2
github.com/jfrog/jfrog-apps-config v1.0.1
github.com/jfrog/jfrog-client-go v1.35.0
github.com/jfrog/jfrog-client-go v1.35.1
github.com/magiconair/properties v1.8.7
github.com/manifoldco/promptui v0.9.0
github.com/owenrumney/go-sarif/v2 v2.3.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,14 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jedib0t/go-pretty/v6 v6.4.0 h1:YlI/2zYDrweA4MThiYMKtGRfT+2qZOO65ulej8GTcVI=
github.com/jedib0t/go-pretty/v6 v6.4.0/go.mod h1:MgmISkTWDSFu0xOqiZ0mKNntMQ2mDgOcwOkwBEkMDJI=
github.com/jfrog/build-info-go v1.9.16 h1:zMNxUXve4CZndhlbaEGwgayWPY8CRyPzSobvTKYRPcg=
github.com/jfrog/build-info-go v1.9.16/go.mod h1:/5VZXH2Ud0IK3cOFwPykjwPOaEcHhzzbjnRiou+YKpM=
github.com/jfrog/build-info-go v1.9.17 h1:sUA6V3P8i+awYlK7dkwm4l6IuLE2W964F5Pb18x95HA=
github.com/jfrog/build-info-go v1.9.17/go.mod h1:/5VZXH2Ud0IK3cOFwPykjwPOaEcHhzzbjnRiou+YKpM=
github.com/jfrog/gofrog v1.3.2 h1:TktKP+PdZdxjkYZxWWIq4DkTGSYtr9Slsy+egZpEhUY=
github.com/jfrog/gofrog v1.3.2/go.mod h1:AQo5Fq0G9nDEF6icH7MYQK0iohR4HuEAXl8jaxRuT6Q=
github.com/jfrog/jfrog-apps-config v1.0.1 h1:mtv6k7g8A8BVhlHGlSveapqf4mJfonwvXYLipdsOFMY=
github.com/jfrog/jfrog-apps-config v1.0.1/go.mod h1:8AIIr1oY9JuH5dylz2S6f8Ym2MaadPLR6noCBO4C22w=
github.com/jfrog/jfrog-client-go v1.35.0 h1:VTyrR/jFlWInRdGYswa2fwFc1Thsh6eGMnHuqhDVh7s=
github.com/jfrog/jfrog-client-go v1.35.0/go.mod h1:cG0vdKXbyfupKgSYmwA5FZPco6zSfyJi3eEYOxuqm/k=
github.com/jfrog/jfrog-client-go v1.35.1 h1:T5HXyRykSCx4cZ//9VCnvHs5rvsPCuNsTU4+CRvkVWk=
github.com/jfrog/jfrog-client-go v1.35.1/go.mod h1:XF/PXHuKILfB1sN3n903yWaWJKJX5VYofDGvO9cJ3+g=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
Expand Down
Loading

0 comments on commit 4c57380

Please sign in to comment.