Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: access_denied error handling for OAuth destinations #3853

Merged
merged 26 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f199f6b
fix: access_denied error handling for OAuth destinations
Sep 11, 2023
bec2e34
fix: mock oauth service
Sep 12, 2023
12c46ff
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Sep 12, 2023
84261c8
chore: rename ref_token_invalid_grant constant
Sep 12, 2023
07fa015
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Sep 12, 2023
e49076a
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Sep 14, 2023
9b1cb61
chore: update the method for authStatus toggle to PUT
Sep 14, 2023
fe203c1
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Sep 15, 2023
3bf7054
chore: include contract changes
Sep 15, 2023
520b123
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Sep 26, 2023
301e33b
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Sep 29, 2023
f720182
fix: add AUTH_STATUS_INACTIVE handling in regulation-worker
Sep 29, 2023
096f232
chore: refactoring some changes, adding logic for handling invalid_gr…
Oct 8, 2023
9a571e1
fix: address comments
Oct 8, 2023
a1f1261
fix: send badrequest when required parameters are not sent in tests
Oct 8, 2023
d3755ba
fix: change response error message for authStatusInactive req(both fa…
Oct 8, 2023
c3d1772
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Oct 8, 2023
404a323
fix: add multiple go-routines tests for authStatus/toggle
Oct 9, 2023
8783ea5
fix: formatting
Oct 9, 2023
7cefefe
fix: rename variables, send right error message post inactivation of …
Oct 9, 2023
053a1db
fix: comment correction
Oct 9, 2023
17e633e
fix: remove unused argument
Oct 9, 2023
068d20f
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Oct 9, 2023
716db0b
fix: updated wrong url status-code to 404
Oct 9, 2023
8dd4a5d
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Oct 9, 2023
c834a57
Merge remote-tracking branch 'origin' into feat.oauth-access-denied-h…
Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions mocks/services/oauth/mock_oauth.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,8 @@ func (rt *Handle) handleOAuthDestResponse(params *HandleDestOAuthRespParams) (in
return trRespStatusCode, trRespBody
}
switch destErrOutput.AuthErrorCategory {
case oauth.DISABLE_DEST:
return rt.execDisableDestination(&destinationJob.Destination, workspaceID, trRespBody, rudderAccountID)
case oauth.AUTH_STATUS_INACTIVE:
return rt.updateAuthStatusToInactive(&destinationJob.Destination, workspaceID, trRespBody, rudderAccountID)
case oauth.REFRESH_TOKEN:
var refSecret *oauth.AuthResponse
refTokenParams := &oauth.RefreshTokenParams{
Expand All @@ -630,20 +630,20 @@ func (rt *Handle) handleOAuthDestResponse(params *HandleDestOAuthRespParams) (in
}
errCatStatusCode, refSecret = rt.oauth.RefreshToken(refTokenParams)
refSec := *refSecret
if routerutils.IsNotEmptyString(refSec.Err) && refSec.Err == oauth.INVALID_REFRESH_TOKEN_GRANT {
if routerutils.IsNotEmptyString(refSec.Err) && refSec.Err == oauth.REF_TOKEN_INVALID_GRANT {
// In-case the refresh token has been revoked, this error comes in
// Even trying to refresh the token also doesn't work here. Hence, this would be more ideal to Abort Events
// As well as to disable destination as well.
// Alert the user in this error as well, to check if the refresh token also has been revoked & fix it
disableStCd, _ := rt.execDisableDestination(&destinationJob.Destination, workspaceID, trRespBody, rudderAccountID)
stats.Default.NewTaggedStat(oauth.INVALID_REFRESH_TOKEN_GRANT, stats.CountType, stats.Tags{
disableStCd, _ := rt.updateAuthStatusToInactive(&destinationJob.Destination, workspaceID, trRespBody, rudderAccountID)
stats.Default.NewTaggedStat(oauth.REF_TOKEN_INVALID_GRANT, stats.CountType, stats.Tags{
"destinationId": destinationJob.Destination.ID,
"workspaceId": refTokenParams.WorkspaceId,
"accountId": refTokenParams.AccountId,
"destType": refTokenParams.DestDefName,
"flowType": string(oauth.RudderFlow_Delivery),
}).Increment()
rt.logger.Errorf(`[OAuth request] Aborting the event as %v`, oauth.INVALID_REFRESH_TOKEN_GRANT)
rt.logger.Errorf(`[OAuth request] Aborting the event as %v`, oauth.REF_TOKEN_INVALID_GRANT)
return disableStCd, refSec.Err
}
// Error while refreshing the token or Has an error while refreshing or sending empty access token
Expand All @@ -658,24 +658,24 @@ func (rt *Handle) handleOAuthDestResponse(params *HandleDestOAuthRespParams) (in
return trRespStatusCode, trRespBody
}

func (rt *Handle) execDisableDestination(destination *backendconfig.DestinationT, workspaceID, destResBody, rudderAccountId string) (int, string) {
disableDestStatTags := stats.Tags{
func (rt *Handle) updateAuthStatusToInactive(destination *backendconfig.DestinationT, workspaceID, destResBody, rudderAccountId string) (int, string) {
inactiveAuthStatusStatTags := stats.Tags{
"id": destination.ID,
"destType": destination.DestinationDefinition.Name,
"workspaceId": workspaceID,
"success": "true",
"flowType": string(oauth.RudderFlow_Delivery),
}
errCatStatusCode, errCatResponse := rt.oauth.DisableDestination(destination, workspaceID, rudderAccountId)
errCatStatusCode, errCatResponse := rt.oauth.UpdateAuthStatusToInactive(destination, workspaceID, rudderAccountId)
if errCatStatusCode != http.StatusOK {
// Error while disabling a destination
// High-Priority notification to rudderstack needs to be sent
disableDestStatTags["success"] = "false"
stats.Default.NewTaggedStat("disable_destination_category_count", stats.CountType, disableDestStatTags).Increment()
inactiveAuthStatusStatTags["success"] = "false"
stats.Default.NewTaggedStat("auth_status_inactive_category_count", stats.CountType, inactiveAuthStatusStatTags).Increment()
return http.StatusBadRequest, errCatResponse
}
// High-Priority notification to workspace(& rudderstack) needs to be sent
stats.Default.NewTaggedStat("disable_destination_category_count", stats.CountType, disableDestStatTags).Increment()
stats.Default.NewTaggedStat("auth_status_inactive_category_count", stats.CountType, inactiveAuthStatusStatTags).Increment()
// Abort the jobs as the destination is disabled
return http.StatusBadRequest, destResBody
}
155 changes: 85 additions & 70 deletions services/oauth/oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ type DisableDestinationResponse struct {
DestinationId string `json:"id"`
}

type AuthStatusInactiveResponse struct {
Success string `json:"success"`
Error string `json:"error"`
StatusCode string `json:"statusCode"`
}

type RefreshTokenParams struct {
AccountId string
WorkspaceId string
Expand All @@ -78,21 +84,21 @@ type RefreshTokenParams struct {

// OAuthErrResHandler is the handle for this class
type OAuthErrResHandler struct {
tr *http.Transport
client *http.Client
logger logger.Logger
destLockMap map[string]*sync.RWMutex // This mutex map is used for disable destination locking
accountLockMap map[string]*sync.RWMutex // This mutex map is used for refresh token locking
lockMapWMutex *sync.RWMutex // This mutex is used to prevent concurrent writes in lockMap(s) mentioned in the struct
destAuthInfoMap map[string]*AuthResponse
refreshActiveMap map[string]bool // Used to check if a refresh request for an account is already InProgress
disableDestActiveMap map[string]bool // Used to check if a disable destination request for a destination is already InProgress
tokenProvider tokenProvider
rudderFlowType RudderFlow
tr *http.Transport
client *http.Client
logger logger.Logger
destLockMap map[string]*sync.RWMutex // This mutex map is used for disable destination locking
accountLockMap map[string]*sync.RWMutex // This mutex map is used for refresh token locking
lockMapWMutex *sync.RWMutex // This mutex is used to prevent concurrent writes in lockMap(s) mentioned in the struct
destAuthInfoMap map[string]*AuthResponse
refreshActiveMap map[string]bool // Used to check if a refresh request for an account is already InProgress
authStatusUpdateActiveMap map[string]bool // Used to check if a authStatusInactive request for a destination is already InProgress
tokenProvider tokenProvider
rudderFlowType RudderFlow
}

type Authorizer interface {
DisableDestination(destination *backendconfig.DestinationT, workspaceId, rudderAccountId string) (statusCode int, resBody string)
UpdateAuthStatusToInactive(destination *backendconfig.DestinationT, workspaceId, rudderAccountId string) (statusCode int, resBody string)
RefreshToken(refTokenParams *RefreshTokenParams) (int, *AuthResponse)
FetchToken(fetchTokenParams *RefreshTokenParams) (int, *AuthResponse)
}
Expand All @@ -113,9 +119,12 @@ var (
)

const (
DISABLE_DEST = "DISABLE_DESTINATION"
REFRESH_TOKEN = "REFRESH_TOKEN"
INVALID_REFRESH_TOKEN_GRANT = "refresh_token_invalid_grant"
REFRESH_TOKEN = "REFRESH_TOKEN"
// Identifier to be sent from destination(during transformation/delivery)
AUTH_STATUS_INACTIVE = "AUTH_STATUS_INACTIVE"

// Identifier for invalid_grant or access_denied errors(during refreshing the token)
REF_TOKEN_INVALID_GRANT = "ref_token_invalid_grant"
)

// This struct only exists for marshalling and sending payload to control-plane
Expand Down Expand Up @@ -155,13 +164,13 @@ func NewOAuthErrorHandler(provider tokenProvider, options ...func(*OAuthErrResHa
tr: &http.Transport{},
client: &http.Client{Timeout: config.GetDuration("HttpClient.oauth.timeout", 30, time.Second)},
// This timeout is kind of modifiable & it seemed like 10 mins for this is too much!
destLockMap: make(map[string]*sync.RWMutex),
accountLockMap: make(map[string]*sync.RWMutex),
lockMapWMutex: &sync.RWMutex{},
destAuthInfoMap: make(map[string]*AuthResponse),
refreshActiveMap: make(map[string]bool),
disableDestActiveMap: make(map[string]bool),
rudderFlowType: RudderFlow_Delivery,
destLockMap: make(map[string]*sync.RWMutex),
accountLockMap: make(map[string]*sync.RWMutex),
lockMapWMutex: &sync.RWMutex{},
destAuthInfoMap: make(map[string]*AuthResponse),
refreshActiveMap: make(map[string]bool),
authStatusUpdateActiveMap: make(map[string]bool),
rudderFlowType: RudderFlow_Delivery,
}
for _, opt := range options {
opt(oAuthErrResHandler)
Expand Down Expand Up @@ -350,7 +359,7 @@ func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams
authStats.statName = fmt.Sprintf("%s_failure", refTokenParams.EventNamePrefix)
authStats.errorMessage = refErrMsg
authStats.SendCountStat()
if refErrMsg == INVALID_REFRESH_TOKEN_GRANT {
if refErrMsg == REF_TOKEN_INVALID_GRANT {
// Should abort the event as refresh is not going to work
// until we have new refresh token for the account
return http.StatusBadRequest
Expand All @@ -372,9 +381,9 @@ func getRefreshTokenErrResp(response string, accountSecret *AccountSecret) (mess
if err := json.Unmarshal([]byte(response), &accountSecret); err != nil {
// Some problem with AccountSecret unmarshalling
message = fmt.Sprintf("Unmarshal of response unsuccessful: %v", response)
} else if gjson.Get(response, "body.code").String() == INVALID_REFRESH_TOKEN_GRANT {
} else if gjson.Get(response, "body.code").String() == REF_TOKEN_INVALID_GRANT {
// User (or) AccessToken (or) RefreshToken has been revoked
message = INVALID_REFRESH_TOKEN_GRANT
message = REF_TOKEN_INVALID_GRANT
}
return message
}
Expand Down Expand Up @@ -406,83 +415,89 @@ func (refStats *OAuthStats) SendCountStat() {
}).Increment()
}

func (authErrHandler *OAuthErrResHandler) DisableDestination(destination *backendconfig.DestinationT, workspaceId, rudderAccountId string) (statusCode int, respBody string) {
func (authErrHandler *OAuthErrResHandler) UpdateAuthStatusToInactive(destination *backendconfig.DestinationT, workspaceId, rudderAccountId string) (statusCode int, respBody string) {
authErrHandlerTimeStart := time.Now()
destinationId := destination.ID
disableDestMutex := authErrHandler.getKeyMutex(authErrHandler.destLockMap, destinationId)
authStatusInactiveMutex := authErrHandler.getKeyMutex(authErrHandler.destLockMap, destinationId)

disableDestStats := &OAuthStats{
getStatName := func(statName string) string {
return fmt.Sprintf("auth_status_inactive_%v", statName)
}

authStatusInactiveStats := &OAuthStats{
id: destinationId,
workspaceId: workspaceId,
rudderCategory: "destination",
statName: "",
isCallToCpApi: false,
authErrCategory: DISABLE_DEST,
authErrCategory: AUTH_STATUS_INACTIVE,
errorMessage: "",
destDefName: destination.DestinationDefinition.Name,
flowType: authErrHandler.rudderFlowType,
}
defer func() {
disableDestStats.statName = "disable_destination_total_req_latency"
disableDestStats.isCallToCpApi = false
disableDestStats.SendTimerStats(authErrHandlerTimeStart)
authStatusInactiveStats.statName = getStatName("total_req_latency")
authStatusInactiveStats.isCallToCpApi = false
authStatusInactiveStats.SendTimerStats(authErrHandlerTimeStart)
}()

disableDestMutex.Lock()
isDisableDestActive, isDisableDestReqPresent := authErrHandler.disableDestActiveMap[destinationId]
disableActiveReq := strconv.FormatBool(isDisableDestReqPresent && isDisableDestActive)
if isDisableDestReqPresent && isDisableDestActive {
disableDestMutex.Unlock()
authErrHandler.logger.Debugf("[%s request] :: Disable Destination Active : %s\n", loggerNm, disableActiveReq)
return http.StatusOK, fmt.Sprintf(`{response: {isDisabled: %v, activeRequest: %v}`, false, disableActiveReq)
authStatusInactiveMutex.Lock()
isAuthStatusUpdateActive, isAuthStatusUpdateReqPresent := authErrHandler.authStatusUpdateActiveMap[destinationId]
authStatusUpdateActiveReq := strconv.FormatBool(isAuthStatusUpdateReqPresent && isAuthStatusUpdateActive)
if isAuthStatusUpdateReqPresent && isAuthStatusUpdateActive {
fracasula marked this conversation as resolved.
Show resolved Hide resolved
authStatusInactiveMutex.Unlock()
authErrHandler.logger.Debugf("[%s request] :: AuthStatusInactive request Active : %s\n", loggerNm, authStatusUpdateActiveReq)
return http.StatusOK, fmt.Sprintf(`{response: {authStatusInactive: %v, activeRequest: %v}`, false, authStatusUpdateActiveReq)
}

authErrHandler.disableDestActiveMap[destinationId] = true
disableDestMutex.Unlock()
authErrHandler.authStatusUpdateActiveMap[destinationId] = true
authStatusInactiveMutex.Unlock()

defer func() {
disableDestMutex.Lock()
authErrHandler.disableDestActiveMap[destinationId] = false
authErrHandler.logger.Debugf("[%s request] :: Disable request is inactive!", loggerNm)
disableDestMutex.Unlock()
authStatusInactiveMutex.Lock()
authErrHandler.authStatusUpdateActiveMap[destinationId] = false
authErrHandler.logger.Debugf("[%s request] :: AuthStatusInactive request is inactive!", loggerNm)
authStatusInactiveMutex.Unlock()
}()

disableURL := fmt.Sprintf("%s/workspaces/%s/destinations/%s/disable", configBEURL, workspaceId, destinationId)
disableCpReq := &ControlPlaneRequestT{
Url: disableURL,
Method: http.MethodDelete,
authStatusInactiveUrl := fmt.Sprintf("%s/workspaces/%s/destinations/%s/authStatus/toggle", configBEURL, workspaceId, destinationId)
authStatusInactiveCpReq := &ControlPlaneRequestT{
Url: authStatusInactiveUrl,
Method: http.MethodPost,
Body: `{"authStatus": "inactive"}`,
ContentType: "application/json",
destName: destination.DestinationDefinition.Name,
RequestType: "Disable destination",
RequestType: "Auth Status inactive",
}

disableDestStats.statName = "disable_destination_request_sent"
disableDestStats.isCallToCpApi = true
disableDestStats.SendCountStat()
authStatusInactiveStats.statName = getStatName("request_sent")
authStatusInactiveStats.isCallToCpApi = true
authStatusInactiveStats.SendCountStat()

cpiCallStartTime := time.Now()
statusCode, respBody = authErrHandler.cpApiCall(disableCpReq)
disableDestStats.statName = `disable_destination_request_latency`
defer disableDestStats.SendTimerStats(cpiCallStartTime)
authErrHandler.logger.Debugf(`Response from CP(stCd: %v) for disable dest req: %v`, statusCode, respBody)
statusCode, respBody = authErrHandler.cpApiCall(authStatusInactiveCpReq)
authStatusInactiveStats.statName = getStatName("request_latency")
defer authStatusInactiveStats.SendTimerStats(cpiCallStartTime)
authErrHandler.logger.Errorf(`Response from CP(stCd: %v) for auth status inactive req: %v`, statusCode, respBody)

var disableDestRes *DisableDestinationResponse
if disableErr := json.Unmarshal([]byte(respBody), &disableDestRes); disableErr != nil || !router_utils.IsNotEmptyString(disableDestRes.DestinationId) {
var authStatusInactiveRes *AuthStatusInactiveResponse
if unmarshalErr := json.Unmarshal([]byte(respBody), &authStatusInactiveRes); unmarshalErr != nil || !router_utils.IsNotEmptyString(authStatusInactiveRes.Error) {
var msg string
if disableErr != nil {
msg = disableErr.Error()
if unmarshalErr != nil {
msg = unmarshalErr.Error()
} else {
msg = "Could not disable the destination"
msg = "Could not update authStatus to inactive for destination the destination"
}
disableDestStats.statName = "disable_destination_failure"
disableDestStats.errorMessage = msg
disableDestStats.SendCountStat()
authStatusInactiveStats.statName = getStatName("failure")
authStatusInactiveStats.errorMessage = msg
authStatusInactiveStats.SendCountStat()
return http.StatusBadRequest, msg
}

authErrHandler.logger.Debugf("[%s request] :: (Write) Disable Response received : %s\n", loggerNm, respBody)
disableDestStats.statName = "disable_destination_success"
disableDestStats.errorMessage = ""
disableDestStats.SendCountStat()
authErrHandler.logger.Errorf("[%s request] :: (Write) auth status inactive Response received : %s\n", loggerNm, respBody)
authStatusInactiveStats.statName = getStatName("success")
authStatusInactiveStats.errorMessage = ""
authStatusInactiveStats.SendCountStat()

// After a successfully disabling the destination, need to remove existing accessToken(from in-memory cache)
// This is being done to obtain new token after re-enabling disabled destination
Expand All @@ -491,7 +506,7 @@ func (authErrHandler *OAuthErrResHandler) DisableDestination(destination *backen
defer accountMutex.Unlock()
delete(authErrHandler.destAuthInfoMap, rudderAccountId)

return statusCode, fmt.Sprintf(`{response: {isDisabled: %v, activeRequest: %v}`, !disableDestRes.Enabled, false)
return statusCode, fmt.Sprintf(`{response: {"authStatus": "inactive", "activeRequest": %v}`, false)
}

func processResponse(resp *http.Response) (statusCode int, respBody string) {
Expand Down
Loading