Skip to content

Commit

Permalink
fix: update for fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
utsabc committed Oct 22, 2024
1 parent 5fabdfd commit 53daed0
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func (m *MarketoAPIService) ImportLeads(csvFilePath, deduplicationField string)

// If we get a token refresh error, retry once
if apiError.Category == "RefreshToken" {

fmt.Println("Token refresh required. Retrying import after fetching new token.")

m.logger.Info("Token refresh required. Retrying import after fetching new token.")
time.Sleep(5 * time.Second) // Wait for 5 seconds before retrying
return m.attemptImport(uploadURL, csvFilePath, deduplicationField, uploadTimeStat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (m *MarketoAuthService) fetchOrUpdateAccessToken() error {
return err
}

// 1657211
accessToken.FetchedAt = time.Now().Unix()
m.accessToken = accessToken
return nil
Expand All @@ -62,19 +63,26 @@ func (m *MarketoAuthService) fetchOrUpdateAccessToken() error {

func (m *MarketoAuthService) GetAccessToken() (string, error) {

if m.accessToken.AccessToken == "" {
err := m.fetchOrUpdateAccessToken()
if err != nil {
return "", err
}

// If the access token is nil or about to expire in 10 seconds, wait 10 seconds and fetch a new access token
} else if m.accessToken.FetchedAt+m.accessToken.ExpiresIn < 10 {
time.Sleep(11 * time.Second)
err := m.fetchOrUpdateAccessToken()
if err != nil {
return "", err
}
// if m.accessToken.AccessToken == "" {
// err := m.fetchOrUpdateAccessToken()
// if err != nil {
// return "", err
// }

// // If the access token is nil or about to expire in 10 seconds, wait 10 seconds and fetch a new access token
// } else if m.accessToken.FetchedAt+m.accessToken.ExpiresIn-int64(time.Second) < 10 {
// time.Sleep(11 * time.Second)
// err := m.fetchOrUpdateAccessToken()
// if err != nil {
// return "", err
// }
// }

// leeping simple logic for now
err := m.fetchOrUpdateAccessToken()
if err != nil {
return "", err
}

return m.accessToken.AccessToken, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,19 @@ func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr
}
payloadSizeStat.Gauge(float64(fileInfo.Size()))

fmt.Println("File Upload Started for Dest Type ", destType)
fmt.Println("File size: ", fileInfo.Size())

importID, apiError := b.apiService.ImportLeads(csvFilePath, b.destinationConfig.DeduplicationField)

b.logger.Debugf("[Async Destination Manager] File Upload Finished for Dest Type %v", destType)

if apiError != nil {

if apiError.Category == "RefreshToken" {

fmt.Println("Token Expired at Upload - ", apiError.Message)

return common.AsyncUploadOutput{
FailedJobIDs: append(failedJobIDs, importingJobIDs...),
FailedReason: "BRT: Error in Uploading File: Token Expired " + apiError.Message,
Expand All @@ -115,7 +121,7 @@ func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr
}

switch apiError.StatusCode {
case 500:
case 429, 500:
return common.AsyncUploadOutput{
FailedJobIDs: append(failedJobIDs, importingJobIDs...),
FailedReason: "BRT: Error in Uploading File: " + apiError.Message,
Expand All @@ -129,13 +135,6 @@ func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr
AbortCount: len(failedJobIDs) + len(importingJobIDs),
DestinationID: destinationID,
}
case 429:
return common.AsyncUploadOutput{
FailedJobIDs: append(failedJobIDs, importingJobIDs...),
FailedReason: "BRT: Error in Uploading File: " + apiError.Message,
FailedCount: len(failedJobIDs) + len(importingJobIDs),
DestinationID: destinationID,
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func calculateHashCode(data []string) string {
func sendHTTPRequest(uploadURL, csvFilePath string, accessToken string, deduplicationField string) (*http.Response, error) {
file, err := os.Open(csvFilePath)
if err != nil {
return nil, err
return nil, fmt.Errorf("error while opening the file: %v", err)
}
defer file.Close()

Expand All @@ -218,11 +218,11 @@ func sendHTTPRequest(uploadURL, csvFilePath string, accessToken string, deduplic
part, err := writer.CreateFormFile("file", filepath.Base(csvFilePath))

if err != nil {
return nil, err
return nil, fmt.Errorf("error while creating form file: %v", err)
}
_, err = io.Copy(part, file)
if err != nil {
return nil, err
return nil, fmt.Errorf("error while copying file: %v", err)
}

_ = writer.WriteField("format", "csv")
Expand All @@ -238,7 +238,7 @@ func sendHTTPRequest(uploadURL, csvFilePath string, accessToken string, deduplic

req, err := http.NewRequest("POST", uploadURL, body)
if err != nil {
return nil, err
return nil, fmt.Errorf("error while creating request: %v", err)
}
req.Header.Set("Content-Type", writer.FormDataContentType())

Expand Down

0 comments on commit 53daed0

Please sign in to comment.