Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: onboard initial marketo builk upload implementation #5114

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e616a39
feat: onboard initial marketo builk uplaod implementation
utsabc Sep 18, 2024
fd19b4a
feat: resolve conflicts
utsabc Sep 18, 2024
f3b085c
feat: modularise code for marketo bultk upload
utsabc Oct 1, 2024
d52ad8e
Merge branch 'master' into feat.marketo-bulk-uplaod-async
utsabc Oct 1, 2024
661c0cc
feat: add deduplication field
utsabc Oct 6, 2024
aaadd63
Merge branch 'master' into feat.marketo-bulk-uplaod-async
utsabc Oct 6, 2024
0eaa617
chore: fix lint
utsabc Oct 6, 2024
ad2ee31
chore: fix lint errors
utsabc Oct 6, 2024
8ebec83
chore: fix struct
utsabc Oct 6, 2024
01c21b1
fix: fixes based on unit tests
utsabc Oct 7, 2024
f4d6117
Merge branch 'master' into feat.marketo-bulk-uplaod-async
utsabc Oct 21, 2024
7f7f71d
feat: handle csv lines using csv module
utsabc Oct 21, 2024
e409d01
Merge branch 'feat.marketo-bulk-uplaod-async' of github.com:rudderlab…
utsabc Oct 21, 2024
5fabdfd
feat: add interfaces for api serivce and auth service
utsabc Oct 21, 2024
53daed0
fix: update for fixes
utsabc Oct 22, 2024
a5387e0
fix: update for access token retry
utsabc Oct 22, 2024
b3e6321
fix: update for correct file size handling
utsabc Oct 22, 2024
f54b67a
chore: add logs
utsabc Oct 22, 2024
510a67c
chore: remove temp logs
utsabc Oct 23, 2024
4888971
fix: auth issues
utsabc Oct 23, 2024
597e2c9
chore: update formatting
utsabc Oct 23, 2024
5c1a0e9
chore: update formatting pt 2
utsabc Oct 23, 2024
52730de
chore: update formatting for utils
utsabc Oct 23, 2024
8efb416
fix: test case for auth token
utsabc Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion router/batchrouter/asyncdestinationmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newRegularManager(
case "BINGADS_OFFLINE_CONVERSIONS":
return bingads_offline_conversions.NewManager(conf, logger, statsFactory, destination, backendConfig)
case "MARKETO_BULK_UPLOAD":
return marketobulkupload.NewManager(conf, logger, statsFactory, destination)
return marketobulkupload.NewManager(logger, statsFactory, destination)
case "ELOQUA":
return eloqua.NewManager(logger, statsFactory, destination)
case "YANDEX_METRICA_OFFLINE_EVENTS":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package marketobulkupload

import (
"encoding/csv"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type MarketoAPIServiceInterface interface {
ImportLeads(csvFilePath, deduplicationField string) (string, *APIError)
PollImportStatus(importId string) (*MarketoResponse, *APIError)
GetLeadStatus(url string) ([]map[string]string, *APIError)
}

type MarketoAPIService struct {
logger logger.Logger
statsFactory stats.Stats
httpClient *http.Client
munchkinId string
authService MarketoAuthServiceInterface
maxRetries int
}

type APIError struct {
StatusCode int64
Category string
Message string
}

func (m *MarketoAPIService) waitForTokenExpiry(tokenExpiresIn int64) {
// Calculate how long to wait based on when the token was fetched and its expiry time
// Add a small buffer (5 seconds) to ensure the token has fully expired
waitDuration := time.Duration(tokenExpiresIn)*time.Second + 5*time.Second
m.logger.Info(fmt.Sprintf("Waiting %v for token to expire before retrying", waitDuration))
fmt.Println("Waiting for token to expire before retrying")
time.Sleep(waitDuration)
}

func (m *MarketoAPIService) checkForCSVLikeResponse(resp *http.Response) bool {
// check for csv like response by checking the headers
respHeaders := resp.Header
return respHeaders.Get("Content-Type") == "text/csv;charset=UTF-8"
}

func (m *MarketoAPIService) attemptImport(uploadURL, csvFilePath, deduplicationField string, uploadTimeStat stats.Measurement) (string, *APIError) {
token, err := m.authService.GetAccessToken()
if err != nil {
return "", &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in fetching access token"}
}

startTime := time.Now()
resp, err := sendHTTPRequest(uploadURL, csvFilePath, token, deduplicationField)
uploadTimeStat.Since(startTime)

if err != nil {
return "", &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in sending request"}
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return "", &APIError{StatusCode: int64(resp.StatusCode), Category: "Retryable", Message: "Error in sending request"}
}

responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return "", &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in reading response body"}
}

var marketoResponse MarketoResponse
err = json.Unmarshal(responseBody, &marketoResponse)
if err != nil {
return "", &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in parsing response body"}
}

statusCode, category, errorMessage := parseMarketoResponse(marketoResponse)
if category == "Success" {
return marketoResponse.Result[0].ImportID, nil
}

fmt.Println("Error in attemptImport ", statusCode, category, errorMessage)
responseJSON, _ := json.Marshal(marketoResponse)
fmt.Println("Marketo Response: ", string(responseJSON))
fmt.Println("access token: ", token)

return "", &APIError{StatusCode: statusCode, Category: category, Message: errorMessage}
}

func (m *MarketoAPIService) ImportLeads(csvFilePath, deduplicationField string) (string, *APIError) {
uploadTimeStat := m.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, map[string]string{
"module": "batch_router",
"destType": "MARKETO_BULK_UPLOAD",
})

uploadURL := fmt.Sprintf("https://%s.mktorest.com/bulk/v1/leads.json", m.munchkinId)

// Initial attempt
importID, apiError := m.attemptImport(uploadURL, csvFilePath, deduplicationField, uploadTimeStat)
if apiError == nil {
return importID, nil
}

retryCount := 0
for retryCount < m.maxRetries {

if apiError.Category == "RefreshToken" {

tokenInfo := m.authService.GetAccessTokenInfo()

tokent_info_json, _ := json.Marshal(tokenInfo)
fmt.Println("Token Info: ", string(tokent_info_json))

// Wait for the token to expire before retrying
m.waitForTokenExpiry(tokenInfo.ExpiresIn)

m.logger.Info(fmt.Sprintf("Retrying import after token expiry (attempt %d of %d)", retryCount+1, m.maxRetries))
importID, apiError = m.attemptImport(uploadURL, csvFilePath, deduplicationField, uploadTimeStat)

if apiError == nil {
return importID, nil
}

retryCount++

} else {
// If it's not a token refresh error, don't retry
return "", apiError
}
}

return "", &APIError{
StatusCode: 500,
Category: "Retryable",
Message: fmt.Sprintf("Failed to import after %d retries", m.maxRetries),
}
}

func (m *MarketoAPIService) PollImportStatus(importId string) (*MarketoResponse, *APIError) {
// poll for the import status

apiURL := fmt.Sprintf("https://%s.mktorest.com/bulk/v1/leads/batch/%s.json", m.munchkinId, importId)
token, err := m.authService.GetAccessToken()
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in fetching access token"}
}

// Make the API request
req, err := http.NewRequest("GET", apiURL, nil)

if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in creating request"}
}
req.Header.Add("Authorization", "Bearer "+token)

resp, err := m.httpClient.Do(req)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in sending request"}
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in reading response body"}
}

var marketoResponse MarketoResponse

err = json.Unmarshal(body, &marketoResponse)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in parsing response body"}
}

m.logger.Debugf("[Async Destination Manager] Marketo Poll Response: %v", marketoResponse)

statusCode, category, errorMessage := parseMarketoResponse(marketoResponse)
if category == "Success" {
return &marketoResponse, nil
}

return nil, &APIError{StatusCode: statusCode, Category: category, Message: errorMessage}

}

func (m *MarketoAPIService) GetLeadStatus(url string) ([]map[string]string, *APIError) {

req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in creating request"}
}

token, err := m.authService.GetAccessToken()
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in fetching access token"}
}

req.Header.Add("Authorization", "Bearer "+token)

resp, err := m.httpClient.Do(req)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in sending request"}
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in reading response body"}
}

m.logger.Debugf("[Async Destination Manager] Marketo Get Lead Status Response: %v", string(body))

if !m.checkForCSVLikeResponse(resp) {
var marketoResponse MarketoResponse
err = json.Unmarshal(body, &marketoResponse)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in parsing response body"}
}

statusCode, category, errorMessage := parseMarketoResponse(marketoResponse)
// if the response is not a csv like response, then it should be a json response
return nil, &APIError{StatusCode: statusCode, Category: category, Message: errorMessage}
}

// if the response is a csv like response
// parse the csv response

reader := csv.NewReader(strings.NewReader(string(body)))

rows, err := reader.ReadAll()
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in parsing csv response"}
}

if len(rows) == 0 {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "No data in csv response"}
}

// The first row is the header
header := rows[0]

records := make([]map[string]string, 0, len(rows)-1)

for _, row := range rows[1:] {
record := make(map[string]string)
for i, value := range row {
if i < len(header) {
record[header[i]] = value
}
}
records = append(records, record)
}

return records, nil

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package marketobulkupload

import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)

type MarketoAccessToken struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
ExpiresIn int64 `json:"expires_in"`
FetchedAt int64
Scope string `json:"scope"`
}

type MarketoAuthServiceInterface interface {
GetAccessToken() (string, error)
GetAccessTokenInfo() MarketoAccessToken
}

type MarketoAuthService struct {
httpCLient *http.Client
munchkinId string
clientId string
clientSecret string
accessToken MarketoAccessToken
}

func (m *MarketoAuthService) fetchOrUpdateAccessToken() error {

accessTokenURL := fmt.Sprintf("https://%s.mktorest.com/identity/oauth/token?client_id=%s&client_secret=%s&grant_type=client_credentials", m.munchkinId, m.clientId, m.clientSecret)
req, err := http.NewRequest("POST", accessTokenURL, nil)
if err != nil {
return err
}

resp, err := m.httpCLient.Do(req)
if err != nil {
return err
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

var accessToken MarketoAccessToken
err = json.Unmarshal(body, &accessToken)
if err != nil {
return err
}

// 1657211
accessToken.FetchedAt = time.Now().Unix()
m.accessToken = accessToken
return nil

}

func (m *MarketoAuthService) GetAccessToken() (string, error) {

// if m.accessToken.AccessToken == "" {
// err := m.fetchOrUpdateAccessToken()
// if err != nil {
// return "", err
// }

// // If the access token is nil or about to expire in 10 seconds, wait 10 seconds and fetch a new access token
// } else if m.accessToken.FetchedAt+m.accessToken.ExpiresIn-int64(time.Second) < 10 {
// time.Sleep(11 * time.Second)
// err := m.fetchOrUpdateAccessToken()
// if err != nil {
// return "", err
// }
// }

// leeping simple logic for now
err := m.fetchOrUpdateAccessToken()
if err != nil {
return "", err
}

return m.accessToken.AccessToken, nil
}

func (m *MarketoAuthService) GetAccessTokenInfo() MarketoAccessToken {
return m.accessToken
}
Loading
Loading