Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix.oauth prefix 2 #4044

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,9 @@ func (api *APIManager) getOAuthDetail(destDetail *model.Destination, workspaceId
return oauthDetail{}, fmt.Errorf("[%v] Delete account ID key (%v) is not present for destination: %v", destDetail.Name, oauth.DeleteAccountIdKey, destDetail.DestinationID)
}
tokenStatusCode, secretToken := api.OAuth.FetchToken(&oauth.RefreshTokenParams{
AccountId: id,
WorkspaceId: workspaceId,
DestDefName: destDetail.Name,
EventNamePrefix: "fetch_token",
AccountId: id,
WorkspaceId: workspaceId,
DestDefName: destDetail.Name,
})
if tokenStatusCode != http.StatusOK {
return oauthDetail{}, fmt.Errorf("[%s][FetchToken] Error in Token Fetch statusCode: %d\t error: %s", destDetail.Name, tokenStatusCode, secretToken.ErrorMessage)
Expand Down Expand Up @@ -228,11 +227,10 @@ func (api *APIManager) inactivateAuthStatus(destination *model.Destination, job

func (api *APIManager) refreshOAuthToken(destination *model.Destination, job model.Job, oAuthDetail oauthDetail) error {
refTokenParams := &oauth.RefreshTokenParams{
Secret: oAuthDetail.secretToken.Account.Secret,
WorkspaceId: job.WorkspaceID,
AccountId: oAuthDetail.id,
DestDefName: destination.Name,
EventNamePrefix: "refresh_token",
Secret: oAuthDetail.secretToken.Account.Secret,
WorkspaceId: job.WorkspaceID,
AccountId: oAuthDetail.id,
DestDefName: destination.Name,
}
statusCode, refreshResponse := api.OAuth.RefreshToken(refTokenParams)
if statusCode != http.StatusOK {
Expand Down
11 changes: 5 additions & 6 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,12 +621,11 @@ func (rt *Handle) handleOAuthDestResponse(params *HandleDestOAuthRespParams) (in
case oauth.REFRESH_TOKEN:
var refSecret *oauth.AuthResponse
refTokenParams := &oauth.RefreshTokenParams{
Secret: params.secret,
WorkspaceId: workspaceID,
AccountId: rudderAccountID,
DestDefName: destinationJob.Destination.DestinationDefinition.Name,
EventNamePrefix: "refresh_token",
WorkerId: params.workerID,
Secret: params.secret,
WorkspaceId: workspaceID,
AccountId: rudderAccountID,
DestDefName: destinationJob.Destination.DestinationDefinition.Name,
WorkerId: params.workerID,
}
errCatStatusCode, refSecret = rt.oauth.RefreshToken(refTokenParams)
refSec := *refSecret
Expand Down
7 changes: 3 additions & 4 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,9 @@ func (w *worker) workLoop() {
w.logger.Debugf(`[%s][FetchToken] Token Fetch Method to be called`, destination.DestinationDefinition.Name)
// Get Access Token Information to send it as part of the event
tokenStatusCode, accountSecretInfo := w.rt.oauth.FetchToken(&oauth.RefreshTokenParams{
AccountId: rudderAccountID,
WorkspaceId: jobMetadata.WorkspaceID,
DestDefName: destination.DestinationDefinition.Name,
EventNamePrefix: "fetch_token",
AccountId: rudderAccountID,
WorkspaceId: jobMetadata.WorkspaceID,
DestDefName: destination.DestinationDefinition.Name,
})
w.logger.Debugf(`[%s][FetchToken] Token Fetch Method finished (statusCode, value): (%v, %+v)`, destination.DestinationDefinition.Name, tokenStatusCode, accountSecretInfo)
if tokenStatusCode == http.StatusOK {
Expand Down
46 changes: 29 additions & 17 deletions services/oauth/oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ func (authErrHandler *OAuthErrResHandler) RefreshToken(refTokenParams *RefreshTo
destDefName: refTokenParams.DestDefName,
flowType: authErrHandler.rudderFlowType,
}
return authErrHandler.GetTokenInfo(refTokenParams, "Refresh token", authStats)
refTokenParams.EventNamePrefix = "refresh_token"
return authErrHandler.getTokenInfo(refTokenParams, "Refresh token", authStats)
}

func (authErrHandler *OAuthErrResHandler) FetchToken(fetchTokenParams *RefreshTokenParams) (int, *AuthResponse) {
Expand All @@ -238,15 +239,16 @@ func (authErrHandler *OAuthErrResHandler) FetchToken(fetchTokenParams *RefreshTo
isTokenFetch: true,
flowType: authErrHandler.rudderFlowType,
}
return authErrHandler.GetTokenInfo(fetchTokenParams, "Fetch token", authStats)
fetchTokenParams.EventNamePrefix = "fetch_token"
return authErrHandler.getTokenInfo(fetchTokenParams, "Fetch token", authStats)
}

func (authErrHandler *OAuthErrResHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, authStats *OAuthStats) (int, *AuthResponse) {
func (authErrHandler *OAuthErrResHandler) getTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, authStats *OAuthStats) (int, *AuthResponse) {
startTime := time.Now()
defer func() {
authStats.statName = fmt.Sprintf("%v_total_req_latency", refTokenParams.EventNamePrefix)
authStats.statName = "total_req_latency"
authStats.isCallToCpApi = false
authStats.SendTimerStats(startTime)
authStats.SendTimerStats(startTime, tag{k: "caller", v: refTokenParams.EventNamePrefix})
}()

accountMutex := authErrHandler.getKeyMutex(authErrHandler.accountLockMap, refTokenParams.AccountId)
Expand Down Expand Up @@ -302,9 +304,9 @@ func (authErrHandler *OAuthErrResHandler) GetTokenInfo(refTokenParams *RefreshTo

errHandlerReqTimeStart := time.Now()
defer func() {
authStats.statName = fmt.Sprintf("%v_request_exec_time", refTokenParams.EventNamePrefix)
authStats.statName = "request_exec_time"
authStats.isCallToCpApi = true
authStats.SendTimerStats(errHandlerReqTimeStart)
authStats.SendTimerStats(errHandlerReqTimeStart, tag{k: "caller", v: refTokenParams.EventNamePrefix})
}()

statusCode := authErrHandler.fetchAccountInfoFromCp(refTokenParams, refTokenBody, authStats, logTypeName)
Expand All @@ -331,7 +333,7 @@ func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams
}
var accountSecret AccountSecret
// Stat for counting number of Refresh Token endpoint calls
authStats.statName = fmt.Sprintf(`%v_request_sent`, refTokenParams.EventNamePrefix)
authStats.statName = fmt.Sprintf(`%v_request_sent`, refTokenParams.EventNamePrefix) // TODO change this too
authStats.isCallToCpApi = true
authStats.errorMessage = ""
authStats.SendCountStat()
Expand All @@ -345,7 +347,7 @@ func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams

// Empty Refresh token response
if !router_utils.IsNotEmptyString(response) {
authStats.statName = fmt.Sprintf("%s_failure", refTokenParams.EventNamePrefix)
authStats.statName = fmt.Sprintf("%s_failure", refTokenParams.EventNamePrefix) // TODO change this too
authStats.errorMessage = "Empty secret"
authStats.SendCountStat()
// Setting empty accessToken value into in-memory auth info map(cache)
Expand All @@ -369,7 +371,7 @@ func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams
authErrHandler.destAuthInfoMap[refTokenParams.AccountId].Err = errType
authErrHandler.destAuthInfoMap[refTokenParams.AccountId].ErrorMessage = refErrMsg
}
authStats.statName = fmt.Sprintf("%s_failure", refTokenParams.EventNamePrefix)
authStats.statName = fmt.Sprintf("%s_failure", refTokenParams.EventNamePrefix) // TODO change this too
authStats.errorMessage = refErrMsg
authStats.SendCountStat()
if refErrMsg == REF_TOKEN_INVALID_GRANT {
Expand All @@ -383,7 +385,7 @@ func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams
authErrHandler.destAuthInfoMap[refTokenParams.AccountId] = &AuthResponse{
Account: accountSecret,
}
authStats.statName = fmt.Sprintf("%s_success", refTokenParams.EventNamePrefix)
authStats.statName = fmt.Sprintf("%s_success", refTokenParams.EventNamePrefix) // TODO change this too
authStats.errorMessage = ""
authStats.SendCountStat()
authErrHandler.logger.Debugf("[%s request] :: (Write) %s response received(rt-worker-%d): %s\n", loggerNm, logTypeName, refTokenParams.WorkerId, response)
Expand All @@ -410,21 +412,25 @@ func (authErrHandler *OAuthErrResHandler) getRefreshTokenErrResp(response string
return errorType, message
}

func (authStats *OAuthStats) SendTimerStats(startTime time.Time) {
stats.Default.NewTaggedStat(authStats.statName, stats.TimerType, stats.Tags{
func (authStats *OAuthStats) SendTimerStats(startTime time.Time, tags ...tag) {
statsTags := stats.Tags{
"id": authStats.id,
"workspaceId": authStats.workspaceId,
"rudderCategory": authStats.rudderCategory,
"isCallToCpApi": strconv.FormatBool(authStats.isCallToCpApi),
"authErrCategory": authStats.authErrCategory,
"destType": authStats.destDefName,
"flowType": string(authStats.flowType),
}).SendTiming(time.Since(startTime))
}
for _, tag := range tags {
statsTags[tag.k] = tag.v
}
stats.Default.NewTaggedStat(authStats.statName, stats.TimerType, statsTags).SendTiming(time.Since(startTime))
}

// Send count type stats related to OAuth(Destination)
func (refStats *OAuthStats) SendCountStat() {
stats.Default.NewTaggedStat(refStats.statName, stats.CountType, stats.Tags{
func (refStats *OAuthStats) SendCountStat(tags ...tag) {
statsTags := stats.Tags{
"id": refStats.id,
"workspaceId": refStats.workspaceId,
"rudderCategory": refStats.rudderCategory,
Expand All @@ -434,7 +440,11 @@ func (refStats *OAuthStats) SendCountStat() {
"destType": refStats.destDefName,
"isTokenFetch": strconv.FormatBool(refStats.isTokenFetch),
"flowType": string(refStats.flowType),
}).Increment()
}
for _, tag := range tags {
statsTags[tag.k] = tag.v
}
stats.Default.NewTaggedStat(refStats.statName, stats.CountType, statsTags).Increment()
}

func (authErrHandler *OAuthErrResHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusCode int, respBody string) {
Expand Down Expand Up @@ -612,3 +622,5 @@ func (resHandler *OAuthErrResHandler) getKeyMutex(mutexMap map[string]*sync.RWMu
}
return mutexMap[id]
}

type tag struct{ k, v string }
Loading