Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(destination): oauth stats prefix #4033

Merged
merged 12 commits into from
Nov 3, 2023
16 changes: 7 additions & 9 deletions regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,9 @@ func (api *APIManager) getOAuthDetail(destDetail *model.Destination, workspaceId
return oauthDetail{}, fmt.Errorf("[%v] Delete account ID key (%v) is not present for destination: %v", destDetail.Name, oauth.DeleteAccountIdKey, destDetail.DestinationID)
}
tokenStatusCode, secretToken := api.OAuth.FetchToken(&oauth.RefreshTokenParams{
AccountId: id,
WorkspaceId: workspaceId,
DestDefName: destDetail.Name,
EventNamePrefix: "fetch_token",
AccountId: id,
WorkspaceId: workspaceId,
DestDefName: destDetail.Name,
})
if tokenStatusCode != http.StatusOK {
return oauthDetail{}, fmt.Errorf("[%s][FetchToken] Error in Token Fetch statusCode: %d\t error: %s", destDetail.Name, tokenStatusCode, secretToken.ErrorMessage)
Expand Down Expand Up @@ -228,11 +227,10 @@ func (api *APIManager) inactivateAuthStatus(destination *model.Destination, job

func (api *APIManager) refreshOAuthToken(destination *model.Destination, job model.Job, oAuthDetail oauthDetail) error {
refTokenParams := &oauth.RefreshTokenParams{
Secret: oAuthDetail.secretToken.Account.Secret,
WorkspaceId: job.WorkspaceID,
AccountId: oAuthDetail.id,
DestDefName: destination.Name,
EventNamePrefix: "refresh_token",
Secret: oAuthDetail.secretToken.Account.Secret,
WorkspaceId: job.WorkspaceID,
AccountId: oAuthDetail.id,
DestDefName: destination.Name,
}
statusCode, refreshResponse := api.OAuth.RefreshToken(refTokenParams)
if statusCode != http.StatusOK {
Expand Down
11 changes: 5 additions & 6 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,12 +621,11 @@
case oauth.REFRESH_TOKEN:
var refSecret *oauth.AuthResponse
refTokenParams := &oauth.RefreshTokenParams{
Secret: params.secret,
WorkspaceId: workspaceID,
AccountId: rudderAccountID,
DestDefName: destinationJob.Destination.DestinationDefinition.Name,
EventNamePrefix: "refresh_token",
WorkerId: params.workerID,
Secret: params.secret,
WorkspaceId: workspaceID,
AccountId: rudderAccountID,
DestDefName: destinationJob.Destination.DestinationDefinition.Name,
WorkerId: params.workerID,

Check warning on line 628 in router/handle.go

View check run for this annotation

Codecov / codecov/patch

router/handle.go#L624-L628

Added lines #L624 - L628 were not covered by tests
}
errCatStatusCode, refSecret = rt.oauth.RefreshToken(refTokenParams)
refSec := *refSecret
Expand Down
7 changes: 3 additions & 4 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,9 @@
w.logger.Debugf(`[%s][FetchToken] Token Fetch Method to be called`, destination.DestinationDefinition.Name)
// Get Access Token Information to send it as part of the event
tokenStatusCode, accountSecretInfo := w.rt.oauth.FetchToken(&oauth.RefreshTokenParams{
AccountId: rudderAccountID,
WorkspaceId: jobMetadata.WorkspaceID,
DestDefName: destination.DestinationDefinition.Name,
EventNamePrefix: "fetch_token",
AccountId: rudderAccountID,
WorkspaceId: jobMetadata.WorkspaceID,
DestDefName: destination.DestinationDefinition.Name,

Check warning on line 211 in router/worker.go

View check run for this annotation

Codecov / codecov/patch

router/worker.go#L209-L211

Added lines #L209 - L211 were not covered by tests
})
w.logger.Debugf(`[%s][FetchToken] Token Fetch Method finished (statusCode, value): (%v, %+v)`, destination.DestinationDefinition.Name, tokenStatusCode, accountSecretInfo)
if tokenStatusCode == http.StatusOK {
Expand Down
71 changes: 36 additions & 35 deletions services/oauth/oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
destDefName string
isTokenFetch bool // This stats field is used to identify if a request to get token is arising from processor
flowType RudderFlow
action string // refresh_token, fetch_token, auth_status_toggle
}

type DisableDestinationResponse struct {
Expand All @@ -83,12 +84,11 @@
}

type RefreshTokenParams struct {
AccountId string
WorkspaceId string
DestDefName string
EventNamePrefix string
WorkerId int
Secret json.RawMessage
AccountId string
WorkspaceId string
DestDefName string
WorkerId int
Secret json.RawMessage
}

// OAuthErrResHandler is the handle for this class
Expand Down Expand Up @@ -221,10 +221,15 @@
errorMessage: "",
destDefName: refTokenParams.DestDefName,
flowType: authErrHandler.rudderFlowType,
action: "refresh_token",
}
return authErrHandler.GetTokenInfo(refTokenParams, "Refresh token", authStats)
}

func getOAuthActionStatName(stat string) string {
return fmt.Sprintf("oauth_action_%v", stat)
}

func (authErrHandler *OAuthErrResHandler) FetchToken(fetchTokenParams *RefreshTokenParams) (int, *AuthResponse) {
authStats := &OAuthStats{
id: fetchTokenParams.AccountId,
Expand All @@ -237,14 +242,15 @@
destDefName: fetchTokenParams.DestDefName,
isTokenFetch: true,
flowType: authErrHandler.rudderFlowType,
action: "fetch_token",
}
return authErrHandler.GetTokenInfo(fetchTokenParams, "Fetch token", authStats)
}

func (authErrHandler *OAuthErrResHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, authStats *OAuthStats) (int, *AuthResponse) {
startTime := time.Now()
defer func() {
authStats.statName = fmt.Sprintf("%v_total_req_latency", refTokenParams.EventNamePrefix)
authStats.statName = getOAuthActionStatName("total_latency")
authStats.isCallToCpApi = false
authStats.SendTimerStats(startTime)
}()
Expand Down Expand Up @@ -300,13 +306,6 @@

authErrHandler.logger.Debugf("[%s] [%v request] Lock Acquired by rt-worker-%d\n", loggerNm, logTypeName, refTokenParams.WorkerId)

errHandlerReqTimeStart := time.Now()
defer func() {
authStats.statName = fmt.Sprintf("%v_request_exec_time", refTokenParams.EventNamePrefix)
authStats.isCallToCpApi = true
authStats.SendTimerStats(errHandlerReqTimeStart)
}()

statusCode := authErrHandler.fetchAccountInfoFromCp(refTokenParams, refTokenBody, authStats, logTypeName)
return statusCode, authErrHandler.destAuthInfoMap[refTokenParams.AccountId]
}
Expand All @@ -327,25 +326,25 @@
ContentType: "application/json; charset=utf-8",
Body: string(res),
destName: refTokenParams.DestDefName,
RequestType: logTypeName,
RequestType: authStats.action,
}
var accountSecret AccountSecret
// Stat for counting number of Refresh Token endpoint calls
authStats.statName = fmt.Sprintf(`%v_request_sent`, refTokenParams.EventNamePrefix)
authStats.statName = getOAuthActionStatName(`request_sent`)
authStats.isCallToCpApi = true
authStats.errorMessage = ""
authStats.SendCountStat()

cpiCallStartTime := time.Now()
statusCode, response := authErrHandler.cpApiCall(refreshCpReq)
authStats.statName = fmt.Sprintf(`%v_request_latency`, refTokenParams.EventNamePrefix)
authStats.statName = getOAuthActionStatName(`request_latency`)
authStats.SendTimerStats(cpiCallStartTime)

authErrHandler.logger.Debugf("[%s] Got the response from Control-Plane: rt-worker-%d with statusCode: %d\n", loggerNm, refTokenParams.WorkerId, statusCode)

// Empty Refresh token response
if !router_utils.IsNotEmptyString(response) {
authStats.statName = fmt.Sprintf("%s_failure", refTokenParams.EventNamePrefix)
authStats.statName = getOAuthActionStatName("failure")

Check warning on line 347 in services/oauth/oauth.go

View check run for this annotation

Codecov / codecov/patch

services/oauth/oauth.go#L347

Added line #L347 was not covered by tests
authStats.errorMessage = "Empty secret"
authStats.SendCountStat()
// Setting empty accessToken value into in-memory auth info map(cache)
Expand All @@ -369,7 +368,7 @@
authErrHandler.destAuthInfoMap[refTokenParams.AccountId].Err = errType
authErrHandler.destAuthInfoMap[refTokenParams.AccountId].ErrorMessage = refErrMsg
}
authStats.statName = fmt.Sprintf("%s_failure", refTokenParams.EventNamePrefix)
authStats.statName = getOAuthActionStatName("failure")
authStats.errorMessage = refErrMsg
authStats.SendCountStat()
if refErrMsg == REF_TOKEN_INVALID_GRANT {
Expand All @@ -383,7 +382,7 @@
authErrHandler.destAuthInfoMap[refTokenParams.AccountId] = &AuthResponse{
Account: accountSecret,
}
authStats.statName = fmt.Sprintf("%s_success", refTokenParams.EventNamePrefix)
authStats.statName = getOAuthActionStatName("success")
authStats.errorMessage = ""
authStats.SendCountStat()
authErrHandler.logger.Debugf("[%s request] :: (Write) %s response received(rt-worker-%d): %s\n", loggerNm, logTypeName, refTokenParams.WorkerId, response)
Expand Down Expand Up @@ -411,20 +410,22 @@
}

func (authStats *OAuthStats) SendTimerStats(startTime time.Time) {
stats.Default.NewTaggedStat(authStats.statName, stats.TimerType, stats.Tags{
statsTags := stats.Tags{
"id": authStats.id,
"workspaceId": authStats.workspaceId,
"rudderCategory": authStats.rudderCategory,
"isCallToCpApi": strconv.FormatBool(authStats.isCallToCpApi),
"authErrCategory": authStats.authErrCategory,
"destType": authStats.destDefName,
"flowType": string(authStats.flowType),
}).SendTiming(time.Since(startTime))
"action": authStats.action,
}
stats.Default.NewTaggedStat(authStats.statName, stats.TimerType, statsTags).SendTiming(time.Since(startTime))
}

// Send count type stats related to OAuth(Destination)
func (refStats *OAuthStats) SendCountStat() {
stats.Default.NewTaggedStat(refStats.statName, stats.CountType, stats.Tags{
statsTags := stats.Tags{
"id": refStats.id,
"workspaceId": refStats.workspaceId,
"rudderCategory": refStats.rudderCategory,
Expand All @@ -434,17 +435,16 @@
"destType": refStats.destDefName,
"isTokenFetch": strconv.FormatBool(refStats.isTokenFetch),
"flowType": string(refStats.flowType),
}).Increment()
"action": refStats.action,
}
stats.Default.NewTaggedStat(refStats.statName, stats.CountType, statsTags).Increment()
}

func (authErrHandler *OAuthErrResHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusCode int, respBody string) {
authErrHandlerTimeStart := time.Now()
destinationId := params.Destination.ID
authStatusToggleMutex := authErrHandler.getKeyMutex(authErrHandler.destLockMap, destinationId)

getStatName := func(statName string) string {
return fmt.Sprintf("auth_status_%v_%v", statName, params.AuthStatus)
}
action := fmt.Sprintf("auth_status_%v", params.AuthStatus)

authStatusToggleStats := &OAuthStats{
id: destinationId,
Expand All @@ -456,9 +456,10 @@
errorMessage: "",
destDefName: params.Destination.DestinationDefinition.Name,
flowType: authErrHandler.rudderFlowType,
action: action,
}
defer func() {
authStatusToggleStats.statName = getStatName("total_req_latency")
authStatusToggleStats.statName = getOAuthActionStatName("total_latency")
authStatusToggleStats.isCallToCpApi = false
authStatusToggleStats.SendTimerStats(authErrHandlerTimeStart)
}()
Expand Down Expand Up @@ -493,19 +494,19 @@
authStatusInactiveCpReq := &ControlPlaneRequestT{
Url: authStatusToggleUrl,
Method: http.MethodPut,
Body: `{"authStatus": "inactive"}`,
Body: fmt.Sprintf(`{"authStatus": "%v"}`, params.AuthStatus),
ContentType: "application/json",
destName: params.Destination.DestinationDefinition.Name,
RequestType: "Auth Status inactive",
RequestType: action,
}

authStatusToggleStats.statName = getStatName("request_sent")
authStatusToggleStats.statName = getOAuthActionStatName("request_sent")
authStatusToggleStats.isCallToCpApi = true
authStatusToggleStats.SendCountStat()

cpiCallStartTime := time.Now()
statusCode, respBody = authErrHandler.cpApiCall(authStatusInactiveCpReq)
authStatusToggleStats.statName = getStatName("request_latency")
authStatusToggleStats.statName = getOAuthActionStatName("request_latency")
defer authStatusToggleStats.SendTimerStats(cpiCallStartTime)
authErrHandler.logger.Errorf(`Response from CP(stCd: %v) for auth status inactive req: %v`, statusCode, respBody)

Expand All @@ -518,14 +519,14 @@
} else {
msg = fmt.Sprintf("Could not update authStatus to inactive for destination: %v", authStatusToggleRes.Message)
}
authStatusToggleStats.statName = getStatName("failure")
authStatusToggleStats.statName = getOAuthActionStatName("failure")
authStatusToggleStats.errorMessage = msg
authStatusToggleStats.SendCountStat()
return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error()
}

authErrHandler.logger.Errorf("[%s request] :: (Write) auth status inactive Response received : %s\n", loggerNm, respBody)
authStatusToggleStats.statName = getStatName("success")
authStatusToggleStats.statName = getOAuthActionStatName("success")
authStatusToggleStats.errorMessage = ""
authStatusToggleStats.SendCountStat()

Expand Down
Loading