diff --git a/management/server/account.go b/management/server/account.go index 6805f17dbd7..4b94713d6c7 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -13,6 +13,7 @@ import ( log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "math/rand" "reflect" "strings" "sync" @@ -20,9 +21,11 @@ import ( ) const ( - PublicCategory = "public" - PrivateCategory = "private" - UnknownCategory = "unknown" + PublicCategory = "public" + PrivateCategory = "private" + UnknownCategory = "unknown" + CacheExpirationMax = 7 * 24 * 3600 * time.Second // 7 days + CacheExpirationMin = 3 * 24 * 3600 * time.Second // 3 days ) type AccountManager interface { @@ -173,14 +176,46 @@ func BuildManager( } } - gocacheClient := gocache.New(7*24*time.Hour, 30*time.Minute) + gocacheClient := gocache.New(CacheExpirationMax, 30*time.Minute) gocacheStore := cacheStore.NewGoCache(gocacheClient, nil) am.cacheManager = cache.NewLoadable(am.loadFromCache, cache.New(gocacheStore)) + + if !isNil(am.idpManager) { + go func() { + err := am.warmupIDPCache() + if err != nil { + log.Warnf("failed warming up cache due to error: %v", err) + //todo retry? + return + } + }() + } + return am, nil } +func (am *DefaultAccountManager) warmupIDPCache() error { + userData, err := am.idpManager.GetAllAccounts() + if err != nil { + return err + } + + for accountID, users := range userData { + rand.Seed(time.Now().UnixNano()) + + r := rand.Intn(int(CacheExpirationMax.Milliseconds()-CacheExpirationMin.Milliseconds())) + int(CacheExpirationMin.Milliseconds()) + expiration := time.Duration(r) * time.Millisecond + err = am.cacheManager.Set(am.ctx, accountID, users, &cacheStore.Options{Expiration: expiration}) + if err != nil { + return err + } + } + log.Infof("warmed up IDP cache with %d entries", len(userData)) + return nil +} + // AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account func (am *DefaultAccountManager) AddSetupKey( accountId string, @@ -332,7 +367,7 @@ func mergeLocalAndQueryUser(queried idp.UserData, local User) *UserInfo { } func (am *DefaultAccountManager) loadFromCache(ctx context.Context, accountID interface{}) (interface{}, error) { - return am.idpManager.GetBatchedUserData(fmt.Sprintf("%v", accountID)) + return am.idpManager.GetAccount(fmt.Sprintf("%v", accountID)) } func (am *DefaultAccountManager) lookupCache(accountUsers map[string]*User, accountID string) ([]*idp.UserData, error) { diff --git a/management/server/idp/auth0.go b/management/server/idp/auth0.go index a10d9220f25..bd73343fc7c 100644 --- a/management/server/idp/auth0.go +++ b/management/server/idp/auth0.go @@ -1,6 +1,9 @@ package idp import ( + "bytes" + "compress/gzip" + "context" "encoding/json" "fmt" "io" @@ -51,6 +54,47 @@ type Auth0Credentials struct { mux sync.Mutex } +// userExportJobRequest is a user export request struct +type userExportJobRequest struct { + Format string `json:"format"` + Fields []map[string]string `json:"fields"` +} + +// userExportJobResponse is a user export response struct +type userExportJobResponse struct { + Type string `json:"type"` + Status string `json:"status"` + ConnectionID string `json:"connection_id"` + Format string `json:"format"` + Limit int `json:"limit"` + Connection string `json:"connection"` + CreatedAt time.Time `json:"created_at"` + ID string `json:"id"` +} + +// userExportJobStatusResponse is a user export status response struct +type userExportJobStatusResponse struct { + Type string `json:"type"` + Status string `json:"status"` + ConnectionID string `json:"connection_id"` + Format string `json:"format"` + Limit int `json:"limit"` + Location string `json:"location"` + Connection string `json:"connection"` + CreatedAt time.Time `json:"created_at"` + ID string `json:"id"` +} + +// auth0Profile represents an Auth0 user profile response +type auth0Profile struct { + AccountID string `json:"wt_account_id"` + UserID string `json:"user_id"` + Name string `json:"name"` + Email string `json:"email"` + CreatedAt string `json:"created_at"` + LastLogin string `json:"last_login"` +} + // NewAuth0Manager creates a new instance of the Auth0Manager func NewAuth0Manager(config Auth0ClientConfig) (*Auth0Manager, error) { @@ -186,7 +230,7 @@ func (c *Auth0Credentials) Authenticate() (JWTToken, error) { return c.jwtToken, nil } -func batchRequestUsersUrl(authIssuer, accountId string, page int) (string, url.Values, error) { +func batchRequestUsersURL(authIssuer, accountID string, page int) (string, url.Values, error) { u, err := url.Parse(authIssuer + "/api/v2/users") if err != nil { return "", nil, err @@ -194,18 +238,18 @@ func batchRequestUsersUrl(authIssuer, accountId string, page int) (string, url.V q := u.Query() q.Set("page", strconv.Itoa(page)) q.Set("search_engine", "v3") - q.Set("q", "app_metadata.wt_account_id:"+accountId) + q.Set("q", "app_metadata.wt_account_id:"+accountID) u.RawQuery = q.Encode() return u.String(), q, nil } -func requestByUserIdUrl(authIssuer, userId string) string { - return authIssuer + "/api/v2/users/" + userId +func requestByUserIDURL(authIssuer, userID string) string { + return authIssuer + "/api/v2/users/" + userID } -// GetBatchedUserData requests users in batches from Auth0 -func (am *Auth0Manager) GetBatchedUserData(accountId string) ([]*UserData, error) { +// GetAccount returns all the users for a given profile. Calls Auth0 API. +func (am *Auth0Manager) GetAccount(accountID string) ([]*UserData, error) { jwtToken, err := am.credentials.Authenticate() if err != nil { return nil, err @@ -216,7 +260,7 @@ func (am *Auth0Manager) GetBatchedUserData(accountId string) ([]*UserData, error // https://auth0.com/docs/manage-users/user-search/retrieve-users-with-get-users-endpoint#limitations // auth0 limitation of 1000 users via this endpoint for page := 0; page < 20; page++ { - reqURL, query, err := batchRequestUsersUrl(am.authIssuer, accountId, page) + reqURL, query, err := batchRequestUsersURL(am.authIssuer, accountID, page) if err != nil { return nil, err } @@ -269,13 +313,13 @@ func (am *Auth0Manager) GetBatchedUserData(accountId string) ([]*UserData, error } // GetUserDataByID requests user data from auth0 via ID -func (am *Auth0Manager) GetUserDataByID(userId string, appMetadata AppMetadata) (*UserData, error) { +func (am *Auth0Manager) GetUserDataByID(userID string, appMetadata AppMetadata) (*UserData, error) { jwtToken, err := am.credentials.Authenticate() if err != nil { return nil, err } - reqURL := requestByUserIdUrl(am.authIssuer, userId) + reqURL := requestByUserIDURL(am.authIssuer, userID) req, err := http.NewRequest(http.MethodGet, reqURL, nil) if err != nil { return nil, err @@ -314,14 +358,14 @@ func (am *Auth0Manager) GetUserDataByID(userId string, appMetadata AppMetadata) } // UpdateUserAppMetadata updates user app metadata based on userId and metadata map -func (am *Auth0Manager) UpdateUserAppMetadata(userId string, appMetadata AppMetadata) error { +func (am *Auth0Manager) UpdateUserAppMetadata(userID string, appMetadata AppMetadata) error { jwtToken, err := am.credentials.Authenticate() if err != nil { return err } - reqURL := am.authIssuer + "/api/v2/users/" + userId + reqURL := am.authIssuer + "/api/v2/users/" + userID data, err := am.helper.Marshal(appMetadata) if err != nil { @@ -339,7 +383,7 @@ func (am *Auth0Manager) UpdateUserAppMetadata(userId string, appMetadata AppMeta req.Header.Add("authorization", "Bearer "+jwtToken.AccessToken) req.Header.Add("content-type", "application/json") - log.Debugf("updating metadata for user %s", userId) + log.Debugf("updating metadata for user %s", userID) res, err := am.httpClient.Do(req) if err != nil { @@ -359,3 +403,211 @@ func (am *Auth0Manager) UpdateUserAppMetadata(userId string, appMetadata AppMeta return nil } + +func buildUserExportRequest() (string, error) { + req := &userExportJobRequest{} + fields := make([]map[string]string, 0) + + for _, field := range []string{"created_at", "last_login", "user_id", "email", "name"} { + fields = append(fields, map[string]string{"name": field}) + } + + fields = append(fields, map[string]string{ + "name": "app_metadata.wt_account_id", + "export_as": "wt_account_id", + }) + + req.Format = "json" + req.Fields = fields + + str, err := json.Marshal(req) + if err != nil { + return "", err + } + + return string(str), nil +} + +// GetAllAccounts gets all registered accounts with corresponding user data. +// It returns a list of users indexed by accountID. +func (am *Auth0Manager) GetAllAccounts() (map[string][]*UserData, error) { + jwtToken, err := am.credentials.Authenticate() + if err != nil { + return nil, err + } + + reqURL := am.authIssuer + "/api/v2/jobs/users-exports" + + payloadString, err := buildUserExportRequest() + if err != nil { + return nil, err + } + payload := strings.NewReader(payloadString) + + exportJobReq, err := http.NewRequest("POST", reqURL, payload) + if err != nil { + return nil, err + } + exportJobReq.Header.Add("authorization", "Bearer "+jwtToken.AccessToken) + exportJobReq.Header.Add("content-type", "application/json") + + jobResp, err := am.httpClient.Do(exportJobReq) + if err != nil { + log.Debugf("Couldn't get job response %v", err) + return nil, err + } + + defer func() { + err = jobResp.Body.Close() + if err != nil { + log.Errorf("error while closing update user app metadata response body: %v", err) + } + }() + if jobResp.StatusCode != 200 { + return nil, fmt.Errorf("unable to update the appMetadata, statusCode %d", jobResp.StatusCode) + } + + var exportJobResp userExportJobResponse + + body, err := ioutil.ReadAll(jobResp.Body) + if err != nil { + log.Debugf("Coudln't read export job response; %v", err) + return nil, err + } + + err = am.helper.Unmarshal(body, &exportJobResp) + if err != nil { + log.Debugf("Coudln't unmarshal export job response; %v", err) + return nil, err + } + + if exportJobResp.ID == "" { + return nil, fmt.Errorf("couldn't get an batch id status %d, %s, response body: %v", jobResp.StatusCode, jobResp.Status, exportJobResp) + } + + log.Debugf("batch id status %d, %s, response body: %v", jobResp.StatusCode, jobResp.Status, exportJobResp) + + done, downloadLink, err := am.checkExportJobStatus(exportJobResp.ID) + if err != nil { + log.Debugf("Failed at getting status checks from exportJob; %v", err) + return nil, err + } + + if done { + return am.downloadProfileExport(downloadLink) + } + + return nil, fmt.Errorf("failed extracting user profiles from auth0") +} + +// checkExportJobStatus checks the status of the job created at CreateExportUsersJob. +// If the status is "completed", then return the downloadLink +func (am *Auth0Manager) checkExportJobStatus(jobID string) (bool, string, error) { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + retry := time.NewTicker(10 * time.Second) + for { + select { + case <-ctx.Done(): + log.Debugf("Export job status stopped...\n") + return false, "", ctx.Err() + case <-retry.C: + jwtToken, err := am.credentials.Authenticate() + if err != nil { + return false, "", err + } + + statusURL := am.authIssuer + "/api/v2/jobs/" + jobID + body, err := doGetReq(am.httpClient, statusURL, jwtToken.AccessToken) + if err != nil { + return false, "", err + } + + var status userExportJobStatusResponse + err = am.helper.Unmarshal(body, &status) + if err != nil { + return false, "", err + } + + log.Debugf("current export job status is %v", status.Status) + + if status.Status != "completed" { + continue + } + + return true, status.Location, nil + } + } +} + +// downloadProfileExport downloads user profiles from auth0 batch job +func (am *Auth0Manager) downloadProfileExport(location string) (map[string][]*UserData, error) { + body, err := doGetReq(am.httpClient, location, "") + if err != nil { + return nil, err + } + + bodyReader := bytes.NewReader(body) + + gzipReader, err := gzip.NewReader(bodyReader) + if err != nil { + return nil, err + } + + decoder := json.NewDecoder(gzipReader) + + res := make(map[string][]*UserData) + for decoder.More() { + profile := auth0Profile{} + err = decoder.Decode(&profile) + if err != nil { + return nil, err + } + if profile.AccountID != "" { + if _, ok := res[profile.AccountID]; !ok { + res[profile.AccountID] = []*UserData{} + } + res[profile.AccountID] = append(res[profile.AccountID], + &UserData{ + ID: profile.UserID, + Name: profile.Name, + Email: profile.Email, + }) + } + } + + return res, nil +} + +// Boilerplate implementation for Get Requests. +func doGetReq(client ManagerHTTPClient, url, accessToken string) ([]byte, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + if accessToken != "" { + req.Header.Add("authorization", "Bearer "+accessToken) + } + + res, err := client.Do(req) + if err != nil { + return nil, err + } + + defer func() { + err = res.Body.Close() + if err != nil { + log.Errorf("error while closing body for url %s: %v", url, err) + } + }() + if res.StatusCode != 200 { + return nil, fmt.Errorf("unable to get %s, statusCode %d", url, res.StatusCode) + } + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + return body, nil +} diff --git a/management/server/idp/idp.go b/management/server/idp/idp.go index 6991b893a08..f6bd942241d 100644 --- a/management/server/idp/idp.go +++ b/management/server/idp/idp.go @@ -11,7 +11,8 @@ import ( type Manager interface { UpdateUserAppMetadata(userId string, appMetadata AppMetadata) error GetUserDataByID(userId string, appMetadata AppMetadata) (*UserData, error) - GetBatchedUserData(accountId string) ([]*UserData, error) + GetAccount(accountId string) ([]*UserData, error) + GetAllAccounts() (map[string][]*UserData, error) } // Config an idp configuration struct to be loaded from management server's config file