Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
eyalbe4 committed Jun 27, 2023
2 parents 7141b4c + 114cca0 commit ec17933
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,12 @@ func (w *SplitContentWriter) closeCurrentFile() error {
return err
}
if w.writer.GetFilePath() != "" {
fullPath := filepath.Join(w.dirPath, fmt.Sprintf("%s-%d.json", w.filePrefix, w.fileIndex))
fullPath, err := getUniqueErrorOrDelayFilePath(w.dirPath, func() string {
return w.filePrefix
})
if err != nil {
return err
}
log.Debug(fmt.Sprintf("Saving split content JSON file to: %s.", fullPath))
if err := fileutils.MoveFile(w.writer.GetFilePath(), fullPath); err != nil {
return fmt.Errorf("saving file failed! failed moving %s to %s: %w", w.writer.GetFilePath(), fullPath, err)
Expand Down
32 changes: 16 additions & 16 deletions artifactory/commands/transferfiles/errorshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"github.com/jfrog/jfrog-client-go/utils/log"
"os"
"path/filepath"
"time"
)

Expand Down Expand Up @@ -42,9 +41,7 @@ type TransferErrorsMng struct {
type errorWriter struct {
writer *content.ContentWriter
errorCount int
// In case we have multiple errors files - we index them
fileIndex int
filePath string
filePath string
}

type errorWriterMng struct {
Expand Down Expand Up @@ -116,7 +113,7 @@ func (mng *TransferErrorsMng) start() (err error) {
if err != nil {
return err
}
writerRetry, retryFilePath, err := mng.newContentWriter(retryablePath, 0)
writerRetry, retryFilePath, err := mng.newUniqueContentWriter(retryablePath)
if err != nil {
return err
}
Expand All @@ -126,14 +123,14 @@ func (mng *TransferErrorsMng) start() (err error) {
err = e
}
}()
writerMng.retryable = errorWriter{writer: writerRetry, fileIndex: 0, filePath: retryFilePath}
writerMng.retryable = errorWriter{writer: writerRetry, filePath: retryFilePath}
// Init the content writer which is responsible for writing 'skipped errors' into files.
// In the next run we won't retry and upload those files.
skippedPath, err := getJfrogTransferRepoSkippedDir(mng.repoKey)
if err != nil {
return err
}
writerSkip, skipFilePath, err := mng.newContentWriter(skippedPath, 0)
writerSkip, skipFilePath, err := mng.newUniqueContentWriter(skippedPath)
if err != nil {
return err
}
Expand All @@ -143,7 +140,7 @@ func (mng *TransferErrorsMng) start() (err error) {
err = e
}
}()
writerMng.skipped = errorWriter{writer: writerSkip, fileIndex: 0, filePath: skipFilePath}
writerMng.skipped = errorWriter{writer: writerSkip, filePath: skipFilePath}
mng.errorWriterMng = writerMng

// Read errors from channel and write them to files.
Expand All @@ -156,17 +153,22 @@ func (mng *TransferErrorsMng) start() (err error) {
return
}

func (mng *TransferErrorsMng) newContentWriter(dirPath string, index int) (*content.ContentWriter, string, error) {
func (mng *TransferErrorsMng) newUniqueContentWriter(dirPath string) (*content.ContentWriter, string, error) {
writer, err := content.NewContentWriter("errors", true, false)
if err != nil {
return nil, "", err
}
errorsFilePath := filepath.Join(dirPath, getErrorsFileName(mng.repoKey, mng.phaseId, mng.phaseStartTime, index))
errorsFilePath, err := getUniqueErrorOrDelayFilePath(dirPath, func() string {
return getErrorsFileNamePrefix(mng.repoKey, mng.phaseId, mng.phaseStartTime)
})
if err != nil {
return nil, "", err
}
return writer, errorsFilePath, nil
}

func getErrorsFileName(repoKey string, phaseId int, phaseStartTime string, index int) string {
return fmt.Sprintf("%s-%d-%s-%d.json", repoKey, phaseId, phaseStartTime, index)
func getErrorsFileNamePrefix(repoKey string, phaseId int, phaseStartTime string) string {
return fmt.Sprintf("%s-%d-%s", repoKey, phaseId, phaseStartTime)
}

func (mng *TransferErrorsMng) writeErrorContent(e ExtendedFileUploadStatusResponse) error {
Expand Down Expand Up @@ -197,12 +199,11 @@ func (mng *TransferErrorsMng) writeSkippedErrorContent(e ExtendedFileUploadStatu
return err
}
// Initialize variables for new errors file
mng.errorWriterMng.skipped.fileIndex++
dirPath, err := getJfrogTransferRepoSkippedDir(mng.repoKey)
if err != nil {
return err
}
mng.errorWriterMng.skipped.writer, mng.errorWriterMng.skipped.filePath, err = mng.newContentWriter(dirPath, mng.errorWriterMng.skipped.fileIndex)
mng.errorWriterMng.skipped.writer, mng.errorWriterMng.skipped.filePath, err = mng.newUniqueContentWriter(dirPath)
if err != nil {
return err
}
Expand All @@ -222,12 +223,11 @@ func (mng *TransferErrorsMng) writeRetryableErrorContent(e ExtendedFileUploadSta
return err
}
// Initialize variables for new errors file
mng.errorWriterMng.retryable.fileIndex++
dirPath, err := getJfrogTransferRepoRetryableDir(mng.repoKey)
if err != nil {
return err
}
mng.errorWriterMng.retryable.writer, mng.errorWriterMng.retryable.filePath, err = mng.newContentWriter(dirPath, mng.errorWriterMng.retryable.fileIndex)
mng.errorWriterMng.retryable.writer, mng.errorWriterMng.retryable.filePath, err = mng.newUniqueContentWriter(dirPath)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/errorshandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,6 @@ func writeEmptyErrorsFile(t *testing.T, repoKey string, retryable bool, phase, c
assert.NoError(t, err)
assert.NoError(t, fileutils.CreateDirIfNotExist(errorsDirPath))

fileName := getErrorsFileName(repoKey, phase, state.ConvertTimeToEpochMilliseconds(time.Now()), counter)
fileName := fmt.Sprintf("%s-%d.json", getErrorsFileNamePrefix(repoKey, phase, state.ConvertTimeToEpochMilliseconds(time.Now())), counter)
assert.NoError(t, os.WriteFile(filepath.Join(errorsDirPath, fileName), nil, 0644))
}
16 changes: 11 additions & 5 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,25 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa
if phaseBase != nil {
timeEstMng = &phaseBase.stateManager.TimeEstimationManager
}
for {
for i := 0; ; i++ {
if ShouldStop(phaseBase, nil, errorsChannelMng) {
return
}
time.Sleep(waitTimeBetweenChunkStatusSeconds * time.Second)

// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
// Run once per 3 minutes
if i%60 == 0 {
// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
}
}

// Each uploading thread receive a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status.
// Each uploading thread receives a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status.
fillChunkDataBatch(&chunksLifeCycleManager, uploadChunkChan)
if err := chunksLifeCycleManager.StoreStaleChunks(phaseBase.stateManager); err != nil {
log.Error("Couldn't store the stale chunks:", err.Error())
}
// When totalChunks size is zero, it means that all the tokens are uploaded,
// we received 'DONE' for all of them, and we notified the source that they can be deleted from the memory.
// If during the polling some chunks data were lost due to network issues, either on the client or on the source,
Expand Down
13 changes: 13 additions & 0 deletions artifactory/commands/transferfiles/state/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ type TransferRunStatus struct {
WorkingThreads int `json:"working_threads,omitempty"`
TransferFailures uint `json:"transfer_failures,omitempty"`
TimeEstimationManager `json:"time_estimation,omitempty"`
StaleChunks []StaleChunks `json:"stale_chunks,omitempty"`
}

// This structure contains a collection of chunks that have been undergoing processing for over 30 minutes
type StaleChunks struct {
NodeID string `json:"node_id,omitempty"`
Chunks []StaleChunk `json:"stale_node_chunks,omitempty"`
}

type StaleChunk struct {
ChunkID string `json:"chunk_id,omitempty"`
Files []string `json:"files,omitempty"`
Sent int64 `json:"sent,omitempty"`
}

func (ts *TransferRunStatus) action(action ActionOnStatusFunc) error {
Expand Down
16 changes: 15 additions & 1 deletion artifactory/commands/transferfiles/state/statemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,20 @@ func (ts *TransferStateManager) GetWorkingThreads() (workingThreads int, err err
})
}

func (ts *TransferStateManager) SetStaleChunks(staleChunks []StaleChunks) error {
return ts.action(func(transferRunStatus *TransferRunStatus) error {
transferRunStatus.StaleChunks = staleChunks
return nil
})
}

func (ts *TransferStateManager) GetStaleChunks() (staleChunks []StaleChunks, err error) {
return staleChunks, ts.action(func(transferRunStatus *TransferRunStatus) error {
staleChunks = transferRunStatus.StaleChunks
return nil
})
}

func (ts *TransferStateManager) SaveStateAndSnapshots() error {
ts.TransferState.lastSaveTimestamp = time.Now()
if err := ts.persistTransferState(false); err != nil {
Expand Down Expand Up @@ -361,7 +375,7 @@ func GetRunningTime() (runningTime string, isRunning bool, err error) {
return
}
runningSecs := int64(time.Since(time.Unix(0, startTimestamp)).Seconds())
return secondsToLiteralTime(runningSecs, ""), true, nil
return SecondsToLiteralTime(runningSecs, ""), true, nil
}

func UpdateChunkInState(stateManager *TransferStateManager, chunk *api.ChunkStatus) (err error) {
Expand Down
3 changes: 2 additions & 1 deletion artifactory/commands/transferfiles/state/timeestimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"fmt"

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/utils"

Expand Down Expand Up @@ -185,7 +186,7 @@ func (tem *TimeEstimationManager) GetEstimatedRemainingTimeString() string {
return err.Error()
}

return secondsToLiteralTime(remainingTimeSec, "About ")
return SecondsToLiteralTime(remainingTimeSec, "About ")
}

func (tem *TimeEstimationManager) isTimeEstimationAvailable() bool {
Expand Down
13 changes: 7 additions & 6 deletions artifactory/commands/transferfiles/state/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package state

import (
"fmt"
"github.com/jfrog/build-info-go/utils"
"github.com/jfrog/jfrog-cli-core/v2/utils/coreutils"
"github.com/jfrog/jfrog-client-go/utils/errorutils"
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/jfrog/build-info-go/utils"
"github.com/jfrog/jfrog-cli-core/v2/utils/coreutils"
"github.com/jfrog/jfrog-client-go/utils/errorutils"
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
)

const (
Expand All @@ -36,9 +37,9 @@ func ConvertTimeToEpochMilliseconds(timeToConvert time.Time) string {
return strconv.FormatInt(timeToConvert.UnixMilli(), 10)
}

// secondsToLiteralTime converts a number of seconds to an easy-to-read string.
// SecondsToLiteralTime converts a number of seconds to an easy-to-read string.
// Prefix is not taken into account if the time is less than a minute.
func secondsToLiteralTime(secondsToConvert int64, prefix string) string {
func SecondsToLiteralTime(secondsToConvert int64, prefix string) string {
daysTime := secondsToConvert / secondsInDay
daysTimeInSecs := daysTime * secondsInDay
hoursTime := (secondsToConvert - daysTimeInSecs) / secondsInHour
Expand Down
5 changes: 3 additions & 2 deletions artifactory/commands/transferfiles/state/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package state

import (
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSecondsToLiteralTime(t *testing.T) {
Expand Down Expand Up @@ -32,7 +33,7 @@ func TestSecondsToLiteralTime(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
assert.Equal(t, testCase.expected, secondsToLiteralTime(testCase.secsToConvert, testCase.prefix))
assert.Equal(t, testCase.expected, SecondsToLiteralTime(testCase.secsToConvert, testCase.prefix))
})
}
}
Expand Down
Loading

0 comments on commit ec17933

Please sign in to comment.