diff --git a/artifactory/commands/transferfiles/manager.go b/artifactory/commands/transferfiles/manager.go index 0a85fd124..2b825f88b 100644 --- a/artifactory/commands/transferfiles/manager.go +++ b/artifactory/commands/transferfiles/manager.go @@ -259,19 +259,25 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa if phaseBase != nil { timeEstMng = &phaseBase.stateManager.TimeEstimationManager } - for { + for i := 0; ; i++ { if ShouldStop(phaseBase, nil, errorsChannelMng) { return } time.Sleep(waitTimeBetweenChunkStatusSeconds * time.Second) - // 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance. - if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil { - log.Error("Couldn't set the current number of working threads:", err.Error()) + // Run once per 3 minutes + if i%60 == 0 { + // 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance. + if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil { + log.Error("Couldn't set the current number of working threads:", err.Error()) + } } - // Each uploading thread receive a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status. + // Each uploading thread receives a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status. fillChunkDataBatch(&chunksLifeCycleManager, uploadChunkChan) + if err := chunksLifeCycleManager.StoreStaleChunks(phaseBase.stateManager); err != nil { + log.Error("Couldn't store the stale chunks:", err.Error()) + } // When totalChunks size is zero, it means that all the tokens are uploaded, // we received 'DONE' for all of them, and we notified the source that they can be deleted from the memory. // If during the polling some chunks data were lost due to network issues, either on the client or on the source, diff --git a/artifactory/commands/transferfiles/state/runstatus.go b/artifactory/commands/transferfiles/state/runstatus.go index df1db470a..1b9ffa9d6 100644 --- a/artifactory/commands/transferfiles/state/runstatus.go +++ b/artifactory/commands/transferfiles/state/runstatus.go @@ -38,6 +38,19 @@ type TransferRunStatus struct { WorkingThreads int `json:"working_threads,omitempty"` TransferFailures uint `json:"transfer_failures,omitempty"` TimeEstimationManager `json:"time_estimation,omitempty"` + StaleChunks []StaleChunks `json:"stale_chunks,omitempty"` +} + +// This structure contains a collection of chunks that have been undergoing processing for over 30 minutes +type StaleChunks struct { + NodeID string `json:"node_id,omitempty"` + Chunks []StaleChunk `json:"stale_node_chunks,omitempty"` +} + +type StaleChunk struct { + ChunkID string `json:"chunk_id,omitempty"` + Files []string `json:"files,omitempty"` + Sent int64 `json:"sent,omitempty"` } func (ts *TransferRunStatus) action(action ActionOnStatusFunc) error { diff --git a/artifactory/commands/transferfiles/state/statemanager.go b/artifactory/commands/transferfiles/state/statemanager.go index 389631f96..5425832e1 100644 --- a/artifactory/commands/transferfiles/state/statemanager.go +++ b/artifactory/commands/transferfiles/state/statemanager.go @@ -302,6 +302,20 @@ func (ts *TransferStateManager) GetWorkingThreads() (workingThreads int, err err }) } +func (ts *TransferStateManager) SetStaleChunks(staleChunks []StaleChunks) error { + return ts.action(func(transferRunStatus *TransferRunStatus) error { + transferRunStatus.StaleChunks = staleChunks + return nil + }) +} + +func (ts *TransferStateManager) GetStaleChunks() (staleChunks []StaleChunks, err error) { + return staleChunks, ts.action(func(transferRunStatus *TransferRunStatus) error { + staleChunks = transferRunStatus.StaleChunks + return nil + }) +} + func (ts *TransferStateManager) SaveStateAndSnapshots() error { ts.TransferState.lastSaveTimestamp = time.Now() if err := ts.persistTransferState(false); err != nil { @@ -361,7 +375,7 @@ func GetRunningTime() (runningTime string, isRunning bool, err error) { return } runningSecs := int64(time.Since(time.Unix(0, startTimestamp)).Seconds()) - return secondsToLiteralTime(runningSecs, ""), true, nil + return SecondsToLiteralTime(runningSecs, ""), true, nil } func UpdateChunkInState(stateManager *TransferStateManager, chunk *api.ChunkStatus) (err error) { diff --git a/artifactory/commands/transferfiles/state/timeestimation.go b/artifactory/commands/transferfiles/state/timeestimation.go index 5fdf82089..9ca2e63ec 100644 --- a/artifactory/commands/transferfiles/state/timeestimation.go +++ b/artifactory/commands/transferfiles/state/timeestimation.go @@ -2,6 +2,7 @@ package state import ( "fmt" + "github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api" "github.com/jfrog/jfrog-cli-core/v2/artifactory/utils" @@ -185,7 +186,7 @@ func (tem *TimeEstimationManager) GetEstimatedRemainingTimeString() string { return err.Error() } - return secondsToLiteralTime(remainingTimeSec, "About ") + return SecondsToLiteralTime(remainingTimeSec, "About ") } func (tem *TimeEstimationManager) isTimeEstimationAvailable() bool { diff --git a/artifactory/commands/transferfiles/state/utils.go b/artifactory/commands/transferfiles/state/utils.go index 789c6cf46..6f87a5711 100644 --- a/artifactory/commands/transferfiles/state/utils.go +++ b/artifactory/commands/transferfiles/state/utils.go @@ -2,14 +2,15 @@ package state import ( "fmt" - "github.com/jfrog/build-info-go/utils" - "github.com/jfrog/jfrog-cli-core/v2/utils/coreutils" - "github.com/jfrog/jfrog-client-go/utils/errorutils" - "github.com/jfrog/jfrog-client-go/utils/io/fileutils" "path/filepath" "strconv" "strings" "time" + + "github.com/jfrog/build-info-go/utils" + "github.com/jfrog/jfrog-cli-core/v2/utils/coreutils" + "github.com/jfrog/jfrog-client-go/utils/errorutils" + "github.com/jfrog/jfrog-client-go/utils/io/fileutils" ) const ( @@ -36,9 +37,9 @@ func ConvertTimeToEpochMilliseconds(timeToConvert time.Time) string { return strconv.FormatInt(timeToConvert.UnixMilli(), 10) } -// secondsToLiteralTime converts a number of seconds to an easy-to-read string. +// SecondsToLiteralTime converts a number of seconds to an easy-to-read string. // Prefix is not taken into account if the time is less than a minute. -func secondsToLiteralTime(secondsToConvert int64, prefix string) string { +func SecondsToLiteralTime(secondsToConvert int64, prefix string) string { daysTime := secondsToConvert / secondsInDay daysTimeInSecs := daysTime * secondsInDay hoursTime := (secondsToConvert - daysTimeInSecs) / secondsInHour diff --git a/artifactory/commands/transferfiles/state/utils_test.go b/artifactory/commands/transferfiles/state/utils_test.go index d8375f196..89fb980de 100644 --- a/artifactory/commands/transferfiles/state/utils_test.go +++ b/artifactory/commands/transferfiles/state/utils_test.go @@ -1,8 +1,9 @@ package state import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestSecondsToLiteralTime(t *testing.T) { @@ -32,7 +33,7 @@ func TestSecondsToLiteralTime(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - assert.Equal(t, testCase.expected, secondsToLiteralTime(testCase.secsToConvert, testCase.prefix)) + assert.Equal(t, testCase.expected, SecondsToLiteralTime(testCase.secsToConvert, testCase.prefix)) }) } } diff --git a/artifactory/commands/transferfiles/status.go b/artifactory/commands/transferfiles/status.go index 4917993c1..db95454cd 100644 --- a/artifactory/commands/transferfiles/status.go +++ b/artifactory/commands/transferfiles/status.go @@ -5,6 +5,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api" "github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/state" @@ -23,7 +24,7 @@ func ShowStatus() error { return err } if !isRunning { - addString(&output, "๐Ÿ”ด", "Status", "Not running", 0, coreutils.IsWindows()) + addString(&output, "๐Ÿ”ด", "Status", "Not running", 0) log.Output(output.String()) return nil } @@ -32,7 +33,7 @@ func ShowStatus() error { return err } if isStopping { - addString(&output, "๐ŸŸก", "Status", "Stopping", 0, coreutils.IsWindows()) + addString(&output, "๐ŸŸก", "Status", "Stopping", 0) log.Output(output.String()) return nil } @@ -54,6 +55,7 @@ func ShowStatus() error { output.WriteString("\n") setRepositoryStatus(stateManager, &output) } + addStaleChunks(stateManager, &output) log.Output(output.String()) return nil } @@ -68,20 +70,19 @@ func isStopping() (bool, error) { } func addOverallStatus(stateManager *state.TransferStateManager, output *strings.Builder, runningTime string) { - windows := coreutils.IsWindows() addTitle(output, "Overall Transfer Status") - addString(output, coreutils.RemoveEmojisIfNonSupportedTerminal("๐ŸŸข"), "Status", "Running", 3, windows) - addString(output, "๐Ÿƒ", "Running for", runningTime, 3, windows) - addString(output, "๐Ÿ—„ ", "Storage", sizeToString(stateManager.OverallTransfer.TransferredSizeBytes)+" / "+sizeToString(stateManager.OverallTransfer.TotalSizeBytes)+calcPercentageInt64(stateManager.OverallTransfer.TransferredSizeBytes, stateManager.OverallTransfer.TotalSizeBytes), 3, windows) - addString(output, "๐Ÿ“ฆ", "Repositories", fmt.Sprintf("%d / %d", stateManager.TotalRepositories.TransferredUnits, stateManager.TotalRepositories.TotalUnits)+calcPercentageInt64(stateManager.TotalRepositories.TransferredUnits, stateManager.TotalRepositories.TotalUnits), 2, windows) - addString(output, "๐Ÿงต", "Working threads", strconv.Itoa(stateManager.WorkingThreads), 2, windows) - addString(output, "โšก", "Transfer speed", stateManager.GetSpeedString(), 2, windows) - addString(output, "โŒ›", "Estimated time remaining", stateManager.GetEstimatedRemainingTimeString(), 1, windows) + addString(output, coreutils.RemoveEmojisIfNonSupportedTerminal("๐ŸŸข"), "Status", "Running", 3) + addString(output, "๐Ÿƒ", "Running for", runningTime, 3) + addString(output, "๐Ÿ—„ ", "Storage", sizeToString(stateManager.OverallTransfer.TransferredSizeBytes)+" / "+sizeToString(stateManager.OverallTransfer.TotalSizeBytes)+calcPercentageInt64(stateManager.OverallTransfer.TransferredSizeBytes, stateManager.OverallTransfer.TotalSizeBytes), 3) + addString(output, "๐Ÿ“ฆ", "Repositories", fmt.Sprintf("%d / %d", stateManager.TotalRepositories.TransferredUnits, stateManager.TotalRepositories.TotalUnits)+calcPercentageInt64(stateManager.TotalRepositories.TransferredUnits, stateManager.TotalRepositories.TotalUnits), 2) + addString(output, "๐Ÿงต", "Working threads", strconv.Itoa(stateManager.WorkingThreads), 2) + addString(output, "โšก", "Transfer speed", stateManager.GetSpeedString(), 2) + addString(output, "โŒ›", "Estimated time remaining", stateManager.GetEstimatedRemainingTimeString(), 1) failureTxt := strconv.FormatUint(uint64(stateManager.TransferFailures), 10) if stateManager.TransferFailures > 0 { failureTxt += " (" + "In Phase 3 and in subsequent executions, we'll retry transferring the failed files." + ")" } - addString(output, "โŒ", "Transfer failures", failureTxt, 2, windows) + addString(output, "โŒ", "Transfer failures", failureTxt, 2) } func calcPercentageInt64(transferred, total int64) string { @@ -92,21 +93,41 @@ func calcPercentageInt64(transferred, total int64) string { } func setRepositoryStatus(stateManager *state.TransferStateManager, output *strings.Builder) { - windows := coreutils.IsWindows() addTitle(output, "Current Repository Status") - addString(output, "๐Ÿท ", "Name", stateManager.CurrentRepoKey, 2, windows) + addString(output, "๐Ÿท ", "Name", stateManager.CurrentRepoKey, 2) currentRepo := stateManager.CurrentRepo switch stateManager.CurrentRepoPhase { case api.Phase1, api.Phase3: if stateManager.CurrentRepoPhase == api.Phase1 { - addString(output, "๐Ÿ”ข", "Phase", "Transferring all files in the repository (1/3)", 2, windows) + addString(output, "๐Ÿ”ข", "Phase", "Transferring all files in the repository (1/3)", 2) } else { - addString(output, "๐Ÿ”ข", "Phase", "Retrying transfer failures (3/3)", 2, windows) + addString(output, "๐Ÿ”ข", "Phase", "Retrying transfer failures (3/3)", 2) } - addString(output, "๐Ÿ—„ ", "Storage", sizeToString(currentRepo.Phase1Info.TransferredSizeBytes)+" / "+sizeToString(currentRepo.Phase1Info.TotalSizeBytes)+calcPercentageInt64(currentRepo.Phase1Info.TransferredSizeBytes, currentRepo.Phase1Info.TotalSizeBytes), 2, windows) - addString(output, "๐Ÿ“„", "Files", fmt.Sprintf("%d / %d", currentRepo.Phase1Info.TransferredUnits, currentRepo.Phase1Info.TotalUnits)+calcPercentageInt64(currentRepo.Phase1Info.TransferredUnits, currentRepo.Phase1Info.TotalUnits), 2, windows) + addString(output, "๐Ÿ—„ ", "Storage", sizeToString(currentRepo.Phase1Info.TransferredSizeBytes)+" / "+sizeToString(currentRepo.Phase1Info.TotalSizeBytes)+calcPercentageInt64(currentRepo.Phase1Info.TransferredSizeBytes, currentRepo.Phase1Info.TotalSizeBytes), 2) + addString(output, "๐Ÿ“„", "Files", fmt.Sprintf("%d / %d", currentRepo.Phase1Info.TransferredUnits, currentRepo.Phase1Info.TotalUnits)+calcPercentageInt64(currentRepo.Phase1Info.TransferredUnits, currentRepo.Phase1Info.TotalUnits), 2) case api.Phase2: - addString(output, "๐Ÿ”ข", "Phase", "Transferring newly created and modified files (2/3)", 2, windows) + addString(output, "๐Ÿ”ข", "Phase", "Transferring newly created and modified files (2/3)", 2) + } +} + +func addStaleChunks(stateManager *state.TransferStateManager, output *strings.Builder) { + if len(stateManager.StaleChunks) == 0 { + return + } + output.WriteString("\n") + addTitle(output, "File Chunks in Transit for More than 30 Minutes") + + for _, nodeStaleChunks := range stateManager.StaleChunks { + addString(output, "๐Ÿท๏ธ ", "Node ID", nodeStaleChunks.NodeID, 1) + for _, staleChunks := range nodeStaleChunks.Chunks { + addString(output, " ๐Ÿท๏ธ ", "Chunk ID", staleChunks.ChunkID, 1) + sent := time.Unix(staleChunks.Sent, 0) + runningSecs := int64(time.Since(sent).Seconds()) + addString(output, " โฑ๏ธ ", "Sent", sent.Format(time.DateTime)+" ("+state.SecondsToLiteralTime(runningSecs, "")+")", 1) + for _, file := range staleChunks.Files { + output.WriteString("\t\t๐Ÿ“„ " + file + "\n") + } + } } } @@ -114,13 +135,13 @@ func addTitle(output *strings.Builder, title string) { output.WriteString(coreutils.PrintBoldTitle(title + "\n")) } -func addString(output *strings.Builder, emoji, key, value string, tabsCount int, windows bool) { +func addString(output *strings.Builder, emoji, key, value string, tabsCount int) { indentation := strings.Repeat("\t", tabsCount) if indentation == "" { indentation = " " } if len(emoji) > 0 { - if windows { + if coreutils.IsWindows() { emoji = "โ—" } emoji += " " diff --git a/artifactory/commands/transferfiles/status_test.go b/artifactory/commands/transferfiles/status_test.go index 8cb2f0983..f1c7226ab 100644 --- a/artifactory/commands/transferfiles/status_test.go +++ b/artifactory/commands/transferfiles/status_test.go @@ -3,6 +3,7 @@ package transferfiles import ( "bytes" "testing" + "time" "github.com/jfrog/build-info-go/utils" "github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api" @@ -53,7 +54,7 @@ func TestShowStatus(t *testing.T) { defer cleanUp() // Create state manager and persist to file system - createStateManager(t, api.Phase1, false) + createStateManager(t, api.Phase1, false, false) // Run show status and check output assert.NoError(t, ShowStatus()) @@ -83,7 +84,7 @@ func TestShowStatusDiffPhase(t *testing.T) { defer cleanUp() // Create state manager and persist to file system - createStateManager(t, api.Phase2, false) + createStateManager(t, api.Phase2, false, false) // Run show status and check output assert.NoError(t, ShowStatus()) @@ -113,7 +114,7 @@ func TestShowBuildInfoRepo(t *testing.T) { defer cleanUp() // Create state manager and persist to file system - createStateManager(t, api.Phase3, true) + createStateManager(t, api.Phase3, true, false) // Run show status and check output assert.NoError(t, ShowStatus()) @@ -138,10 +139,30 @@ func TestShowBuildInfoRepo(t *testing.T) { assert.Contains(t, results, "Files: 500 / 10000 (5.0%)") } +func TestShowStaleChunks(t *testing.T) { + buffer, cleanUp := initStatusTest(t) + defer cleanUp() + + // Create state manager and persist to file system + createStateManager(t, api.Phase1, false, true) + + // Run show status and check output + assert.NoError(t, ShowStatus()) + results := buffer.String() + + // Check stale chunks + assert.Contains(t, results, "File Chunks in Transit for More than 30 Minutes") + assert.Contains(t, results, "Node ID:\tnode-id-1") + assert.Contains(t, results, "Sent:\t") + assert.Contains(t, results, "(31 minutes)") + assert.Contains(t, results, "a/b/c") + assert.Contains(t, results, "d/e/f") +} + // Create state manager and persist in the file system. // t - The testing object // phase - Phase ID -func createStateManager(t *testing.T, phase int, buildInfoRepo bool) { +func createStateManager(t *testing.T, phase int, buildInfoRepo bool, staleChunks bool) { stateManager, err := state.NewTransferStateManager(false) assert.NoError(t, err) assert.NoError(t, stateManager.TryLockTransferStateManager()) @@ -159,6 +180,19 @@ func createStateManager(t *testing.T, phase int, buildInfoRepo bool) { stateManager.TimeEstimationManager.LastSpeedsSum = 12 stateManager.TimeEstimationManager.SpeedsAverage = 12 + if staleChunks { + stateManager.StaleChunks = append(stateManager.StaleChunks, state.StaleChunks{ + NodeID: staleChunksNodeIdOne, + Chunks: []state.StaleChunk{ + { + ChunkID: staleChunksChunkId, + Sent: time.Now().Add(-time.Minute * 31).Unix(), + Files: []string{"a/b/c", "d/e/f"}, + }, + }, + }) + } + // Increment transferred size and files. This action also persists the run status. assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase1(500, 5000)) diff --git a/artifactory/commands/transferfiles/utils.go b/artifactory/commands/transferfiles/utils.go index 1c9f3caef..668bebc1d 100644 --- a/artifactory/commands/transferfiles/utils.go +++ b/artifactory/commands/transferfiles/utils.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "strconv" "strings" @@ -92,6 +93,37 @@ func (clcm *ChunksLifeCycleManager) GetInProgressTokensSliceByNodeId(nodeId node return inProgressTokens } +// Save in the TransferRunStatus the chunks that have been in transit for more than 30 minutes. +// This allows them to be displayed using the '--status' option. +// stateManager - Transfer state manager +func (clcm *ChunksLifeCycleManager) StoreStaleChunks(stateManager *state.TransferStateManager) error { + var staleChunks []state.StaleChunks + for nodeId, chunkIdToData := range clcm.nodeToChunksMap { + staleNodeChunks := state.StaleChunks{NodeID: string(nodeId)} + for chunkId, uploadedChunkData := range chunkIdToData { + if time.Since(uploadedChunkData.TimeSent).Hours() < 0.5 { + continue + } + staleNodeChunk := state.StaleChunk{ + ChunkID: string(chunkId), + Sent: uploadedChunkData.TimeSent.Unix(), + } + for _, file := range uploadedChunkData.ChunkFiles { + var sizeStr string + if file.Size > 0 { + sizeStr = " (" + utils.ConvertIntToStorageSizeString(file.Size) + ")" + } + staleNodeChunk.Files = append(staleNodeChunk.Files, path.Join(file.Repo, file.Path, file.Name)+sizeStr) + } + staleNodeChunks.Chunks = append(staleNodeChunks.Chunks, staleNodeChunk) + } + if len(staleNodeChunks.Chunks) > 0 { + staleChunks = append(staleChunks, staleNodeChunks) + } + } + return stateManager.SetStaleChunks(staleChunks) +} + type InterruptionErr struct{} func (m *InterruptionErr) Error() string { diff --git a/artifactory/commands/transferfiles/utils_test.go b/artifactory/commands/transferfiles/utils_test.go index da958e678..0f38b2100 100644 --- a/artifactory/commands/transferfiles/utils_test.go +++ b/artifactory/commands/transferfiles/utils_test.go @@ -11,7 +11,10 @@ import ( "strconv" "strings" "testing" + "time" + "github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api" + "github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/state" "github.com/jfrog/jfrog-cli-core/v2/utils/config" "github.com/jfrog/jfrog-cli-core/v2/utils/tests" "github.com/jfrog/jfrog-client-go/artifactory/services" @@ -41,8 +44,16 @@ const runningNodesResponse = ` } ` +const ( + staleChunksNodeIdOne = "node-id-1" + staleChunksNodeIdTwo = "node-id-2" + staleChunksChunkId = "chunk-id" + staleChunksPath = "path-in-repo" + staleChunksName = "file-name" +) + func TestGetRunningNodes(t *testing.T) { - testServer, serverDetails, _ := createMockServer(t, func(w http.ResponseWriter, r *http.Request) { + testServer, serverDetails, _ := createMockServer(t, func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, err := w.Write([]byte(runningNodesResponse)) assert.NoError(t, err) @@ -57,7 +68,7 @@ func TestGetRunningNodes(t *testing.T) { func TestStopTransferOnArtifactoryNodes(t *testing.T) { stoppedNodeOne, stoppedNodeTwo := false, false requestNumber := 0 - testServer, _, srcUpService := createMockServer(t, func(w http.ResponseWriter, r *http.Request) { + testServer, _, srcUpService := createMockServer(t, func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) var nodeId string if requestNumber == 0 { @@ -245,6 +256,109 @@ func TestInterruptIfRequested(t *testing.T) { assert.Equal(t, os.Interrupt, actualSignal) } +func TestStoreStaleChunksEmpty(t *testing.T) { + // Init state manager + stateManager, cleanUp := state.InitStateTest(t) + defer cleanUp() + + // Store empty stale chunks + chunksLifeCycleManager := ChunksLifeCycleManager{ + nodeToChunksMap: make(map[nodeId]map[api.ChunkId]UploadedChunkData), + } + assert.NoError(t, chunksLifeCycleManager.StoreStaleChunks(stateManager)) + + // Make sure no chunks + staleChunks, err := stateManager.GetStaleChunks() + assert.NoError(t, err) + assert.Empty(t, staleChunks) +} + +func TestStoreStaleChunksNoStale(t *testing.T) { + // Init state manager + stateManager, cleanUp := state.InitStateTest(t) + defer cleanUp() + + // Store chunk that is not stale + chunksLifeCycleManager := ChunksLifeCycleManager{ + nodeToChunksMap: map[nodeId]map[api.ChunkId]UploadedChunkData{ + staleChunksNodeIdOne: { + staleChunksChunkId: { + TimeSent: time.Now().Add(-time.Minute), + ChunkFiles: []api.FileRepresentation{{Repo: repo1Key, Path: staleChunksPath, Name: staleChunksName}}, + }, + }, + }, + } + assert.NoError(t, chunksLifeCycleManager.StoreStaleChunks(stateManager)) + + // Make sure no chunks + staleChunks, err := stateManager.GetStaleChunks() + assert.NoError(t, err) + assert.Empty(t, staleChunks) +} + +func TestStoreStaleChunksStale(t *testing.T) { + // Init state manager + stateManager, cleanUp := state.InitStateTest(t) + defer cleanUp() + + // Store stale chunk + sent := time.Now().Add(-time.Hour) + chunksLifeCycleManager := ChunksLifeCycleManager{ + nodeToChunksMap: map[nodeId]map[api.ChunkId]UploadedChunkData{ + staleChunksNodeIdOne: { + staleChunksChunkId: { + TimeSent: sent, + ChunkFiles: []api.FileRepresentation{{Repo: repo1Key, Path: staleChunksPath, Name: staleChunksName, Size: 100}}, + }, + }, + }, + } + assert.NoError(t, chunksLifeCycleManager.StoreStaleChunks(stateManager)) + + // Make sure the stale chunk was stored in the state + staleChunks, err := stateManager.GetStaleChunks() + assert.NoError(t, err) + assert.Len(t, staleChunks, 1) + assert.Equal(t, staleChunksNodeIdOne, staleChunks[0].NodeID) + assert.Len(t, staleChunks[0].Chunks, 1) + assert.Equal(t, staleChunksChunkId, staleChunks[0].Chunks[0].ChunkID) + assert.Equal(t, sent.Unix(), staleChunks[0].Chunks[0].Sent) + assert.Len(t, staleChunks[0].Chunks[0].Files, 1) + assert.Equal(t, fmt.Sprintf("%s/%s/%s (0.1KB)", repo1Key, staleChunksPath, staleChunksName), staleChunks[0].Chunks[0].Files[0]) +} + +func TestStoreStaleChunksTwoNodes(t *testing.T) { + // Init state manager + stateManager, cleanUp := state.InitStateTest(t) + defer cleanUp() + + // Store 1 stale chunk and 1 non-stale chunk + chunksLifeCycleManager := ChunksLifeCycleManager{ + nodeToChunksMap: map[nodeId]map[api.ChunkId]UploadedChunkData{ + staleChunksNodeIdOne: { + staleChunksChunkId: { + TimeSent: time.Now().Add(-time.Hour), // Older than 0.5 hours + ChunkFiles: []api.FileRepresentation{{Repo: repo1Key, Path: staleChunksPath, Name: staleChunksName, Size: 1024}}, + }, + }, + staleChunksNodeIdTwo: { + staleChunksChunkId: { + TimeSent: time.Now(), // Less than 0.5 hours + ChunkFiles: []api.FileRepresentation{{Repo: repo2Key, Path: staleChunksPath, Name: staleChunksName, Size: 0}}, + }, + }, + }, + } + assert.NoError(t, chunksLifeCycleManager.StoreStaleChunks(stateManager)) + + // Make sure only the stale chunk was stored in the state + staleChunks, err := stateManager.GetStaleChunks() + assert.NoError(t, err) + assert.Len(t, staleChunks, 1) + assert.Equal(t, staleChunksNodeIdOne, staleChunks[0].NodeID) +} + // Create mock server to test transfer config commands // t - The testing object // testHandler - The HTTP handler of the test