Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer - Show stale chunks in --status #849

Merged
merged 3 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,25 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa
if phaseBase != nil {
timeEstMng = &phaseBase.stateManager.TimeEstimationManager
}
for {
for i := 0; ; i++ {
if ShouldStop(phaseBase, nil, errorsChannelMng) {
return
}
time.Sleep(waitTimeBetweenChunkStatusSeconds * time.Second)

// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
// Run once per 3 minutes
if i%60 == 0 {
// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
}
}

// Each uploading thread receive a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status.
// Each uploading thread receives a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status.
fillChunkDataBatch(&chunksLifeCycleManager, uploadChunkChan)
if err := chunksLifeCycleManager.StoreStaleChunks(phaseBase.stateManager); err != nil {
log.Error("Couldn't store the stale chunks:", err.Error())
}
yahavi marked this conversation as resolved.
Show resolved Hide resolved
// When totalChunks size is zero, it means that all the tokens are uploaded,
// we received 'DONE' for all of them, and we notified the source that they can be deleted from the memory.
// If during the polling some chunks data were lost due to network issues, either on the client or on the source,
Expand Down
13 changes: 13 additions & 0 deletions artifactory/commands/transferfiles/state/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ type TransferRunStatus struct {
WorkingThreads int `json:"working_threads,omitempty"`
TransferFailures uint `json:"transfer_failures,omitempty"`
TimeEstimationManager `json:"time_estimation,omitempty"`
StaleChunks []StaleChunks `json:"stale_chunks,omitempty"`
}

// This structure contains a collection of chunks that have been undergoing processing for over 30 minutes
type StaleChunks struct {
NodeID string `json:"node_id,omitempty"`
Chunks []StaleChunk `json:"stale_node_chunks,omitempty"`
}

type StaleChunk struct {
ChunkID string `json:"chunk_id,omitempty"`
Files []string `json:"files,omitempty"`
Sent int64 `json:"sent,omitempty"`
}

func (ts *TransferRunStatus) action(action ActionOnStatusFunc) error {
Expand Down
16 changes: 15 additions & 1 deletion artifactory/commands/transferfiles/state/statemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,20 @@ func (ts *TransferStateManager) GetWorkingThreads() (workingThreads int, err err
})
}

func (ts *TransferStateManager) SetStaleChunks(staleChunks []StaleChunks) error {
return ts.action(func(transferRunStatus *TransferRunStatus) error {
transferRunStatus.StaleChunks = staleChunks
return nil
})
}

func (ts *TransferStateManager) GetStaleChunks() (staleChunks []StaleChunks, err error) {
return staleChunks, ts.action(func(transferRunStatus *TransferRunStatus) error {
staleChunks = transferRunStatus.StaleChunks
return nil
})
}

func (ts *TransferStateManager) SaveStateAndSnapshots() error {
ts.TransferState.lastSaveTimestamp = time.Now()
if err := ts.persistTransferState(false); err != nil {
Expand Down Expand Up @@ -361,7 +375,7 @@ func GetRunningTime() (runningTime string, isRunning bool, err error) {
return
}
runningSecs := int64(time.Since(time.Unix(0, startTimestamp)).Seconds())
return secondsToLiteralTime(runningSecs, ""), true, nil
return SecondsToLiteralTime(runningSecs, ""), true, nil
}

func UpdateChunkInState(stateManager *TransferStateManager, chunk *api.ChunkStatus) (err error) {
Expand Down
3 changes: 2 additions & 1 deletion artifactory/commands/transferfiles/state/timeestimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"fmt"

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/utils"

Expand Down Expand Up @@ -185,7 +186,7 @@ func (tem *TimeEstimationManager) GetEstimatedRemainingTimeString() string {
return err.Error()
}

return secondsToLiteralTime(remainingTimeSec, "About ")
return SecondsToLiteralTime(remainingTimeSec, "About ")
}

func (tem *TimeEstimationManager) isTimeEstimationAvailable() bool {
Expand Down
13 changes: 7 additions & 6 deletions artifactory/commands/transferfiles/state/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package state

import (
"fmt"
"github.com/jfrog/build-info-go/utils"
"github.com/jfrog/jfrog-cli-core/v2/utils/coreutils"
"github.com/jfrog/jfrog-client-go/utils/errorutils"
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/jfrog/build-info-go/utils"
"github.com/jfrog/jfrog-cli-core/v2/utils/coreutils"
"github.com/jfrog/jfrog-client-go/utils/errorutils"
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
)

const (
Expand All @@ -36,9 +37,9 @@ func ConvertTimeToEpochMilliseconds(timeToConvert time.Time) string {
return strconv.FormatInt(timeToConvert.UnixMilli(), 10)
}

// secondsToLiteralTime converts a number of seconds to an easy-to-read string.
// SecondsToLiteralTime converts a number of seconds to an easy-to-read string.
// Prefix is not taken into account if the time is less than a minute.
func secondsToLiteralTime(secondsToConvert int64, prefix string) string {
func SecondsToLiteralTime(secondsToConvert int64, prefix string) string {
daysTime := secondsToConvert / secondsInDay
daysTimeInSecs := daysTime * secondsInDay
hoursTime := (secondsToConvert - daysTimeInSecs) / secondsInHour
Expand Down
5 changes: 3 additions & 2 deletions artifactory/commands/transferfiles/state/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package state

import (
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSecondsToLiteralTime(t *testing.T) {
Expand Down Expand Up @@ -32,7 +33,7 @@ func TestSecondsToLiteralTime(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
assert.Equal(t, testCase.expected, secondsToLiteralTime(testCase.secsToConvert, testCase.prefix))
assert.Equal(t, testCase.expected, SecondsToLiteralTime(testCase.secsToConvert, testCase.prefix))
})
}
}
Expand Down
61 changes: 41 additions & 20 deletions artifactory/commands/transferfiles/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/state"
Expand All @@ -23,7 +24,7 @@ func ShowStatus() error {
return err
}
if !isRunning {
addString(&output, "🔴", "Status", "Not running", 0, coreutils.IsWindows())
addString(&output, "🔴", "Status", "Not running", 0)
log.Output(output.String())
return nil
}
Expand All @@ -32,7 +33,7 @@ func ShowStatus() error {
return err
}
if isStopping {
addString(&output, "🟡", "Status", "Stopping", 0, coreutils.IsWindows())
addString(&output, "🟡", "Status", "Stopping", 0)
log.Output(output.String())
return nil
}
Expand All @@ -54,6 +55,7 @@ func ShowStatus() error {
output.WriteString("\n")
setRepositoryStatus(stateManager, &output)
}
addStaleChunks(stateManager, &output)
log.Output(output.String())
return nil
}
Expand All @@ -68,20 +70,19 @@ func isStopping() (bool, error) {
}

func addOverallStatus(stateManager *state.TransferStateManager, output *strings.Builder, runningTime string) {
windows := coreutils.IsWindows()
addTitle(output, "Overall Transfer Status")
addString(output, coreutils.RemoveEmojisIfNonSupportedTerminal("🟢"), "Status", "Running", 3, windows)
addString(output, "🏃", "Running for", runningTime, 3, windows)
addString(output, "🗄 ", "Storage", sizeToString(stateManager.OverallTransfer.TransferredSizeBytes)+" / "+sizeToString(stateManager.OverallTransfer.TotalSizeBytes)+calcPercentageInt64(stateManager.OverallTransfer.TransferredSizeBytes, stateManager.OverallTransfer.TotalSizeBytes), 3, windows)
addString(output, "📦", "Repositories", fmt.Sprintf("%d / %d", stateManager.TotalRepositories.TransferredUnits, stateManager.TotalRepositories.TotalUnits)+calcPercentageInt64(stateManager.TotalRepositories.TransferredUnits, stateManager.TotalRepositories.TotalUnits), 2, windows)
addString(output, "🧵", "Working threads", strconv.Itoa(stateManager.WorkingThreads), 2, windows)
addString(output, "⚡", "Transfer speed", stateManager.GetSpeedString(), 2, windows)
addString(output, "⌛", "Estimated time remaining", stateManager.GetEstimatedRemainingTimeString(), 1, windows)
addString(output, coreutils.RemoveEmojisIfNonSupportedTerminal("🟢"), "Status", "Running", 3)
addString(output, "🏃", "Running for", runningTime, 3)
addString(output, "🗄 ", "Storage", sizeToString(stateManager.OverallTransfer.TransferredSizeBytes)+" / "+sizeToString(stateManager.OverallTransfer.TotalSizeBytes)+calcPercentageInt64(stateManager.OverallTransfer.TransferredSizeBytes, stateManager.OverallTransfer.TotalSizeBytes), 3)
addString(output, "📦", "Repositories", fmt.Sprintf("%d / %d", stateManager.TotalRepositories.TransferredUnits, stateManager.TotalRepositories.TotalUnits)+calcPercentageInt64(stateManager.TotalRepositories.TransferredUnits, stateManager.TotalRepositories.TotalUnits), 2)
addString(output, "🧵", "Working threads", strconv.Itoa(stateManager.WorkingThreads), 2)
addString(output, "⚡", "Transfer speed", stateManager.GetSpeedString(), 2)
addString(output, "⌛", "Estimated time remaining", stateManager.GetEstimatedRemainingTimeString(), 1)
failureTxt := strconv.FormatUint(uint64(stateManager.TransferFailures), 10)
if stateManager.TransferFailures > 0 {
failureTxt += " (" + "In Phase 3 and in subsequent executions, we'll retry transferring the failed files." + ")"
}
addString(output, "❌", "Transfer failures", failureTxt, 2, windows)
addString(output, "❌", "Transfer failures", failureTxt, 2)
}

func calcPercentageInt64(transferred, total int64) string {
Expand All @@ -92,35 +93,55 @@ func calcPercentageInt64(transferred, total int64) string {
}

func setRepositoryStatus(stateManager *state.TransferStateManager, output *strings.Builder) {
windows := coreutils.IsWindows()
addTitle(output, "Current Repository Status")
addString(output, "🏷 ", "Name", stateManager.CurrentRepoKey, 2, windows)
addString(output, "🏷 ", "Name", stateManager.CurrentRepoKey, 2)
currentRepo := stateManager.CurrentRepo
switch stateManager.CurrentRepoPhase {
case api.Phase1, api.Phase3:
if stateManager.CurrentRepoPhase == api.Phase1 {
addString(output, "🔢", "Phase", "Transferring all files in the repository (1/3)", 2, windows)
addString(output, "🔢", "Phase", "Transferring all files in the repository (1/3)", 2)
} else {
addString(output, "🔢", "Phase", "Retrying transfer failures (3/3)", 2, windows)
addString(output, "🔢", "Phase", "Retrying transfer failures (3/3)", 2)
}
addString(output, "🗄 ", "Storage", sizeToString(currentRepo.Phase1Info.TransferredSizeBytes)+" / "+sizeToString(currentRepo.Phase1Info.TotalSizeBytes)+calcPercentageInt64(currentRepo.Phase1Info.TransferredSizeBytes, currentRepo.Phase1Info.TotalSizeBytes), 2, windows)
addString(output, "📄", "Files", fmt.Sprintf("%d / %d", currentRepo.Phase1Info.TransferredUnits, currentRepo.Phase1Info.TotalUnits)+calcPercentageInt64(currentRepo.Phase1Info.TransferredUnits, currentRepo.Phase1Info.TotalUnits), 2, windows)
addString(output, "🗄 ", "Storage", sizeToString(currentRepo.Phase1Info.TransferredSizeBytes)+" / "+sizeToString(currentRepo.Phase1Info.TotalSizeBytes)+calcPercentageInt64(currentRepo.Phase1Info.TransferredSizeBytes, currentRepo.Phase1Info.TotalSizeBytes), 2)
addString(output, "📄", "Files", fmt.Sprintf("%d / %d", currentRepo.Phase1Info.TransferredUnits, currentRepo.Phase1Info.TotalUnits)+calcPercentageInt64(currentRepo.Phase1Info.TransferredUnits, currentRepo.Phase1Info.TotalUnits), 2)
case api.Phase2:
addString(output, "🔢", "Phase", "Transferring newly created and modified files (2/3)", 2, windows)
addString(output, "🔢", "Phase", "Transferring newly created and modified files (2/3)", 2)
}
}

func addStaleChunks(stateManager *state.TransferStateManager, output *strings.Builder) {
if len(stateManager.StaleChunks) == 0 {
return
}
output.WriteString("\n")
addTitle(output, "File Chunks in Transit for More than 30 Minutes")

for _, nodeStaleChunks := range stateManager.StaleChunks {
addString(output, "🏷️ ", "Node ID", nodeStaleChunks.NodeID, 1)
for _, staleChunks := range nodeStaleChunks.Chunks {
addString(output, " 🏷️ ", "Chunk ID", staleChunks.ChunkID, 1)
sent := time.Unix(staleChunks.Sent, 0)
runningSecs := int64(time.Since(sent).Seconds())
addString(output, " ⏱️ ", "Sent", sent.Format(time.DateTime)+" ("+state.SecondsToLiteralTime(runningSecs, "")+")", 1)
for _, file := range staleChunks.Files {
output.WriteString("\t\t📄 " + file + "\n")
}
}
}
}

func addTitle(output *strings.Builder, title string) {
output.WriteString(coreutils.PrintBoldTitle(title + "\n"))
}

func addString(output *strings.Builder, emoji, key, value string, tabsCount int, windows bool) {
func addString(output *strings.Builder, emoji, key, value string, tabsCount int) {
indentation := strings.Repeat("\t", tabsCount)
if indentation == "" {
indentation = " "
}
if len(emoji) > 0 {
if windows {
if coreutils.IsWindows() {
emoji = "●"
}
emoji += " "
Expand Down
42 changes: 38 additions & 4 deletions artifactory/commands/transferfiles/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transferfiles
import (
"bytes"
"testing"
"time"

"github.com/jfrog/build-info-go/utils"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
Expand Down Expand Up @@ -53,7 +54,7 @@ func TestShowStatus(t *testing.T) {
defer cleanUp()

// Create state manager and persist to file system
createStateManager(t, api.Phase1, false)
createStateManager(t, api.Phase1, false, false)

// Run show status and check output
assert.NoError(t, ShowStatus())
Expand Down Expand Up @@ -83,7 +84,7 @@ func TestShowStatusDiffPhase(t *testing.T) {
defer cleanUp()

// Create state manager and persist to file system
createStateManager(t, api.Phase2, false)
createStateManager(t, api.Phase2, false, false)

// Run show status and check output
assert.NoError(t, ShowStatus())
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestShowBuildInfoRepo(t *testing.T) {
defer cleanUp()

// Create state manager and persist to file system
createStateManager(t, api.Phase3, true)
createStateManager(t, api.Phase3, true, false)

// Run show status and check output
assert.NoError(t, ShowStatus())
Expand All @@ -138,10 +139,30 @@ func TestShowBuildInfoRepo(t *testing.T) {
assert.Contains(t, results, "Files: 500 / 10000 (5.0%)")
}

func TestShowStaleChunks(t *testing.T) {
buffer, cleanUp := initStatusTest(t)
defer cleanUp()

// Create state manager and persist to file system
createStateManager(t, api.Phase1, false, true)

// Run show status and check output
assert.NoError(t, ShowStatus())
results := buffer.String()

// Check stale chunks
assert.Contains(t, results, "File Chunks in Transit for More than 30 Minutes")
assert.Contains(t, results, "Node ID:\tnode-id-1")
assert.Contains(t, results, "Sent:\t")
assert.Contains(t, results, "(31 minutes)")
assert.Contains(t, results, "a/b/c")
assert.Contains(t, results, "d/e/f")
}

// Create state manager and persist in the file system.
// t - The testing object
// phase - Phase ID
func createStateManager(t *testing.T, phase int, buildInfoRepo bool) {
func createStateManager(t *testing.T, phase int, buildInfoRepo bool, staleChunks bool) {
stateManager, err := state.NewTransferStateManager(false)
assert.NoError(t, err)
assert.NoError(t, stateManager.TryLockTransferStateManager())
Expand All @@ -159,6 +180,19 @@ func createStateManager(t *testing.T, phase int, buildInfoRepo bool) {
stateManager.TimeEstimationManager.LastSpeedsSum = 12
stateManager.TimeEstimationManager.SpeedsAverage = 12

if staleChunks {
stateManager.StaleChunks = append(stateManager.StaleChunks, state.StaleChunks{
NodeID: staleChunksNodeIdOne,
Chunks: []state.StaleChunk{
{
ChunkID: staleChunksChunkId,
Sent: time.Now().Add(-time.Minute * 31).Unix(),
Files: []string{"a/b/c", "d/e/f"},
},
},
})
}

// Increment transferred size and files. This action also persists the run status.
assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase1(500, 5000))

Expand Down
Loading
Loading