Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warmup IDP cache #354

Merged
merged 7 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions management/server/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import (
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"math/rand"
"reflect"
"strings"
"sync"
"time"
)

const (
PublicCategory = "public"
PrivateCategory = "private"
UnknownCategory = "unknown"
PublicCategory = "public"
PrivateCategory = "private"
UnknownCategory = "unknown"
CacheExpirationMax = 7 * 24 * 3600 * time.Second // 7 days
CacheExpirationMin = 3 * 24 * 3600 * time.Second // 3 days
)

type AccountManager interface {
Expand Down Expand Up @@ -173,14 +176,46 @@ func BuildManager(
}
}

gocacheClient := gocache.New(7*24*time.Hour, 30*time.Minute)
gocacheClient := gocache.New(CacheExpirationMax, 30*time.Minute)
gocacheStore := cacheStore.NewGoCache(gocacheClient, nil)

am.cacheManager = cache.NewLoadable(am.loadFromCache, cache.New(gocacheStore))

if !isNil(am.idpManager) {
go func() {
err := am.warmupIDPCache()
if err != nil {
log.Warnf("failed warming up cache due to error: %v", err)
//todo retry?
return
}
}()
}

return am, nil

}

func (am *DefaultAccountManager) warmupIDPCache() error {
userData, err := am.idpManager.GetAllAccounts()
if err != nil {
return err
}

for accountID, users := range userData {
rand.Seed(time.Now().UnixNano())

r := rand.Intn(int(CacheExpirationMax.Milliseconds()-CacheExpirationMin.Milliseconds())) + int(CacheExpirationMin.Milliseconds())
expiration := time.Duration(r) * time.Millisecond
err = am.cacheManager.Set(am.ctx, accountID, users, &cacheStore.Options{Expiration: expiration})
if err != nil {
return err
}
}
log.Infof("warmed up IDP cache with %d entries", len(userData))
return nil
}

// AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account
func (am *DefaultAccountManager) AddSetupKey(
accountId string,
Expand Down Expand Up @@ -332,7 +367,7 @@ func mergeLocalAndQueryUser(queried idp.UserData, local User) *UserInfo {
}

func (am *DefaultAccountManager) loadFromCache(ctx context.Context, accountID interface{}) (interface{}, error) {
return am.idpManager.GetBatchedUserData(fmt.Sprintf("%v", accountID))
return am.idpManager.GetAccount(fmt.Sprintf("%v", accountID))
}

func (am *DefaultAccountManager) lookupCache(accountUsers map[string]*User, accountID string) ([]*idp.UserData, error) {
Expand Down
250 changes: 249 additions & 1 deletion management/server/idp/auth0.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package idp

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -51,6 +54,43 @@ type Auth0Credentials struct {
mux sync.Mutex
}

type UserExportJobRequest struct {
Format string `json:"format"`
Fields []map[string]string `json:"fields"`
}

type UserExportJobResponse struct {
Type string `json:"type"`
Status string `json:"status"`
ConnectionId string `json:"connection_id"`
Format string `json:"format"`
Limit int `json:"limit"`
Connection string `json:"connection"`
CreatedAt time.Time `json:"created_at"`
Id string `json:"id"`
}

type ExportJobStatusResponse struct {
Type string `json:"type"`
Status string `json:"status"`
ConnectionId string `json:"connection_id"`
Format string `json:"format"`
Limit int `json:"limit"`
Location string `json:"location"`
Connection string `json:"connection"`
CreatedAt time.Time `json:"created_at"`
Id string `json:"id"`
}

type Auth0Profile struct {
AccountId string `json:"wt_account_id"`
UserID string `json:"user_id"`
Name string `json:"name"`
Email string `json:"email"`
CreatedAt string `json:"created_at"`
LastLogin string `json:"last_login"`
}

// NewAuth0Manager creates a new instance of the Auth0Manager
func NewAuth0Manager(config Auth0ClientConfig) (*Auth0Manager, error) {

Expand Down Expand Up @@ -205,7 +245,7 @@ func requestByUserIdUrl(authIssuer, userId string) string {
}

// GetBatchedUserData requests users in batches from Auth0
func (am *Auth0Manager) GetBatchedUserData(accountId string) ([]*UserData, error) {
func (am *Auth0Manager) GetAccount(accountId string) ([]*UserData, error) {
jwtToken, err := am.credentials.Authenticate()
if err != nil {
return nil, err
Expand Down Expand Up @@ -359,3 +399,211 @@ func (am *Auth0Manager) UpdateUserAppMetadata(userId string, appMetadata AppMeta

return nil
}

func buildUserExportRequest() (string, error) {
req := &UserExportJobRequest{}
fields := make([]map[string]string, 0)

for _, field := range []string{"created_at", "last_login", "user_id", "email", "name"} {
fields = append(fields, map[string]string{"name": field})
}

fields = append(fields, map[string]string{
"name": "app_metadata.wt_account_id",
"export_as": "wt_account_id",
})

req.Format = "json"
req.Fields = fields

str, err := json.Marshal(req)
if err != nil {
return "", err
}

return string(str), nil
}

// GetAllAccounts gets all registered accounts with corresponding user data.
// It returns a list of users indexed by accountID.
func (am *Auth0Manager) GetAllAccounts() (map[string][]*UserData, error) {
jwtToken, err := am.credentials.Authenticate()
if err != nil {
return nil, err
}

reqURL := am.authIssuer + "/api/v2/jobs/users-exports"

payloadString, err := buildUserExportRequest()
if err != nil {
return nil, err
}
payload := strings.NewReader(payloadString)

exportJobReq, err := http.NewRequest("POST", reqURL, payload)
if err != nil {
return nil, err
}
exportJobReq.Header.Add("authorization", "Bearer "+jwtToken.AccessToken)
exportJobReq.Header.Add("content-type", "application/json")

jobResp, err := am.httpClient.Do(exportJobReq)
if err != nil {
log.Debugf("Couldn't get job response %v", err)
return nil, err
}

defer func() {
err = jobResp.Body.Close()
if err != nil {
log.Errorf("error while closing update user app metadata response body: %v", err)
}
}()
if jobResp.StatusCode != 200 {
return nil, fmt.Errorf("unable to update the appMetadata, statusCode %d", jobResp.StatusCode)
}

var exportJobResp UserExportJobResponse

body, err := ioutil.ReadAll(jobResp.Body)
if err != nil {
log.Debugf("Coudln't read export job response; %v", err)
return nil, err
}

err = am.helper.Unmarshal(body, &exportJobResp)
if err != nil {
log.Debugf("Coudln't unmarshal export job response; %v", err)
return nil, err
}

if exportJobResp.Id == "" {
return nil, fmt.Errorf("couldn't get an batch id status %d, %s, response body: %v", jobResp.StatusCode, jobResp.Status, exportJobResp)
}

log.Debugf("batch id status %d, %s, response body: %v", jobResp.StatusCode, jobResp.Status, exportJobResp)

done, downloadLink, err := am.checkExportJobStatus(exportJobResp.Id)
if err != nil {
log.Debugf("Failed at getting status checks from exportJob; %v", err)
return nil, err
}

if done {
return am.downloadProfileExport(downloadLink)
}

return nil, fmt.Errorf("failed extracting user profiles from auth0")
}

// checkExportJobStatus checks the status of the job created at CreateExportUsersJob.
// If the status is "completed", then return the downloadLink
func (am *Auth0Manager) checkExportJobStatus(jobId string) (bool, string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()
retry := time.NewTicker(10 * time.Second)
for {
select {
case <-ctx.Done():
log.Debugf("Export job status stopped...\n")
return false, "", ctx.Err()
case <-retry.C:
jwtToken, err := am.credentials.Authenticate()
if err != nil {
return false, "", err
}

statusUrl := am.authIssuer + "/api/v2/jobs/" + jobId
body, err := doGetReq(am.httpClient, statusUrl, jwtToken.AccessToken)
if err != nil {
return false, "", err
}

var status ExportJobStatusResponse
err = am.helper.Unmarshal(body, &status)
if err != nil {
return false, "", err
}

log.Debugf("current export job status is %v", status.Status)

if status.Status != "completed" {
continue
}

return true, status.Location, nil
}
}
}

// downloadProfileExport downloads user profiles from auth0 batch job
func (am *Auth0Manager) downloadProfileExport(location string) (map[string][]*UserData, error) {
body, err := doGetReq(am.httpClient, location, "")
if err != nil {
return nil, err
}

bodyReader := bytes.NewReader(body)

gzipReader, err := gzip.NewReader(bodyReader)
if err != nil {
return nil, err
}

decoder := json.NewDecoder(gzipReader)

res := make(map[string][]*UserData)
for decoder.More() {
profile := Auth0Profile{}
err = decoder.Decode(&profile)
if err != nil {
return nil, err
}
if profile.AccountId != "" {
if _, ok := res[profile.AccountId]; !ok {
res[profile.AccountId] = []*UserData{}
}
res[profile.AccountId] = append(res[profile.AccountId],
&UserData{
ID: profile.UserID,
Name: profile.Name,
Email: profile.Email,
})
}
}

return res, nil
}

// Boilerplate implementation for Get Requests.
func doGetReq(client ManagerHTTPClient, url, accessToken string) ([]byte, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}

if accessToken != "" {
req.Header.Add("authorization", "Bearer "+accessToken)
}

res, err := client.Do(req)
if err != nil {
return nil, err
}

defer func() {
err = res.Body.Close()
if err != nil {
log.Errorf("error while closing body for url %s: %v", url, err)
}
}()
if res.StatusCode != 200 {
return nil, fmt.Errorf("unable to get %s, statusCode %d", url, res.StatusCode)
}

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
return body, nil
}
3 changes: 2 additions & 1 deletion management/server/idp/idp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
type Manager interface {
UpdateUserAppMetadata(userId string, appMetadata AppMetadata) error
GetUserDataByID(userId string, appMetadata AppMetadata) (*UserData, error)
GetBatchedUserData(accountId string) ([]*UserData, error)
GetAccount(accountId string) ([]*UserData, error)
GetAllAccounts() (map[string][]*UserData, error)
}

// Config an idp configuration struct to be loaded from management server's config file
Expand Down