Skip to content

Commit

Permalink
x-pack/filebeat/input/entityanalytics/provider/{,internal/}okta: impr…
Browse files Browse the repository at this point in the history
…ove debug logging

Wire logging into the internal package and log rate limit handling.
  • Loading branch information
efd6 committed Aug 4, 2024
1 parent d7e3037 commit b5696ba
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add scaling up support for Netflow input. {issue}37761[37761] {pull}40122[40122]
- Update CEL mito extensions to v1.15.0. {pull}40294[40294]
- Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309]
- Improve logging in Okta Entity Analytics provider. {issue}40106[40106] {pull}40347[40347]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

"golang.org/x/time/rate"

"github.com/elastic/elastic-agent-libs/logp"
)

// ISO8601 is the time format accepted by Okta queries.
Expand Down Expand Up @@ -220,7 +222,7 @@ func (o Response) String() string {
// https://${yourOktaDomain}/reports/rate-limit.
//
// See https://developer.okta.com/docs/reference/api/users/#list-users for details.
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration) ([]User, http.Header, error) {
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
const endpoint = "/api/v1/users"

u := &url.URL{
Expand All @@ -229,7 +231,7 @@ func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user strin
Path: path.Join(endpoint, user),
RawQuery: query.Encode(),
}
return getDetails[User](ctx, cli, u, key, user == "", omit, lim, window)
return getDetails[User](ctx, cli, u, key, user == "", omit, lim, window, log)
}

// GetUserGroupDetails returns Okta group details using the users API endpoint. host is the
Expand All @@ -238,7 +240,7 @@ func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user strin
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/reference/api/users/#request-parameters-8 (no anchor exists on the page for this endpoint) for details.
func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration) ([]Group, http.Header, error) {
func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) {
const endpoint = "/api/v1/users"

if user == "" {
Expand All @@ -250,7 +252,7 @@ func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user
Host: host,
Path: path.Join(endpoint, user, "groups"),
}
return getDetails[Group](ctx, cli, u, key, true, OmitNone, lim, window)
return getDetails[Group](ctx, cli, u, key, true, OmitNone, lim, window, log)
}

// GetDeviceDetails returns Okta device details using the list devices API endpoint. host is the
Expand All @@ -260,7 +262,7 @@ func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices for details.
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration) ([]Device, http.Header, error) {
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) {
const endpoint = "/api/v1/devices"

u := &url.URL{
Expand All @@ -269,7 +271,7 @@ func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device s
Path: path.Join(endpoint, device),
RawQuery: query.Encode(),
}
return getDetails[Device](ctx, cli, u, key, device == "", OmitNone, lim, window)
return getDetails[Device](ctx, cli, u, key, device == "", OmitNone, lim, window, log)
}

// GetDeviceUsers returns Okta user details for users associated with the provided device identifier
Expand All @@ -279,7 +281,7 @@ func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device s
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDeviceUsers for details.
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration) ([]User, http.Header, error) {
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
if device == "" {
// No user associated with a null device. Not an error.
return nil, nil, nil
Expand All @@ -293,7 +295,7 @@ func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device str
Path: path.Join(endpoint, device, "users"),
RawQuery: query.Encode(),
}
du, h, err := getDetails[devUser](ctx, cli, u, key, true, omit, lim, window)
du, h, err := getDetails[devUser](ctx, cli, u, key, true, omit, lim, window, log)
if err != nil {
return nil, h, err
}
Expand All @@ -318,8 +320,9 @@ type devUser struct {
// for the specific user are returned, otherwise a list of all users is returned.
//
// See GetUserDetails for details of the query and rate limit parameters.
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key string, all bool, omit Response, lim *rate.Limiter, window time.Duration) ([]E, http.Header, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key string, all bool, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) {
url := u.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, nil, err
}
Expand All @@ -331,6 +334,7 @@ func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key
req.Header.Set("Content-Type", contentType)
req.Header.Set("Authorization", fmt.Sprintf("SSWS %s", key))

log.Debugw("rate limit", "limit", lim.Limit(), "burst", lim.Burst(), "url", url)
err = lim.Wait(ctx)
if err != nil {
return nil, nil, err
Expand All @@ -340,7 +344,7 @@ func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key
return nil, nil, err
}
defer resp.Body.Close()
err = oktaRateLimit(resp.Header, window, lim)
err = oktaRateLimit(resp.Header, window, lim, log)
if err != nil {
io.Copy(io.Discard, resp.Body)

Check failure on line 349 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `io.Copy` is not checked (errcheck)
return nil, nil, err
Expand Down Expand Up @@ -406,10 +410,11 @@ func (e *Error) Error() string {
// oktaRateLimit implements the Okta rate limit policy translation.
//
// See https://developer.okta.com/docs/reference/rl-best-practices/ for details.
func oktaRateLimit(h http.Header, window time.Duration, limiter *rate.Limiter) error {
func oktaRateLimit(h http.Header, window time.Duration, limiter *rate.Limiter, log *logp.Logger) error {
limit := h.Get("X-Rate-Limit-Limit")
remaining := h.Get("X-Rate-Limit-Remaining")
reset := h.Get("X-Rate-Limit-Reset")
log.Debugw("rate limit header", "X-Rate-Limit-Limit", limit, "X-Rate-Limit-Remaining", remaining, "X-Rate-Limit-Reset", reset)
if limit == "" || remaining == "" || reset == "" {
return nil
}
Expand Down Expand Up @@ -446,10 +451,12 @@ func oktaRateLimit(h http.Header, window time.Duration, limiter *rate.Limiter) e
next := rate.Limit(lim / window.Seconds())
limiter.SetLimitAt(waitUntil, next)
limiter.SetBurstAt(waitUntil, burst)
log.Debugw("rate limit adjust", "reset_time", waitUntil, "next_rate", next, "next_burst", burst)
return nil
}
limiter.SetLimit(rateLimit)
limiter.SetBurst(burst)
log.Debugw("rate limit adjust", "set_rate", rateLimit, "set_burst", burst)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/time/rate"

"github.com/elastic/elastic-agent-libs/logp"
)

var logResponses = flag.Bool("log_response", false, "use to log users/devices returned from the API")

func Test(t *testing.T) {
logp.TestingSetup()
logger := logp.L()

// https://developer.okta.com/docs/reference/core-okta-api/
host, ok := os.LookupEnv("OKTA_HOST")
if !ok {
Expand Down Expand Up @@ -60,7 +65,7 @@ func Test(t *testing.T) {
t.Run("me", func(t *testing.T) {
query := make(url.Values)
query.Set("limit", "200")
users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "me", query, omit, limiter, window)
users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "me", query, omit, limiter, window, logger)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -89,7 +94,7 @@ func Test(t *testing.T) {
t.Run("my_groups", func(t *testing.T) {
query := make(url.Values)
query.Set("limit", "200")
groups, _, err := GetUserGroupDetails(context.Background(), http.DefaultClient, host, key, me.ID, limiter, window)
groups, _, err := GetUserGroupDetails(context.Background(), http.DefaultClient, host, key, me.ID, limiter, window, logger)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -119,7 +124,7 @@ func Test(t *testing.T) {

query := make(url.Values)
query.Set("limit", "200")
users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, me.Profile.Login, query, omit, limiter, window)
users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, me.Profile.Login, query, omit, limiter, window, logger)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -134,7 +139,7 @@ func Test(t *testing.T) {
t.Run("all", func(t *testing.T) {
query := make(url.Values)
query.Set("limit", "200")
users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, window)
users, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, window, logger)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -162,7 +167,7 @@ func Test(t *testing.T) {
query := make(url.Values)
query.Set("limit", "200")
query.Add("search", `not (status pr)`) // This cannot ever be true.
_, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, window)
_, _, err := GetUserDetails(context.Background(), http.DefaultClient, host, key, "", query, omit, limiter, window, logger)
oktaErr := &Error{}
if !errors.As(err, &oktaErr) {
// Don't test the value of the error since it was
Expand All @@ -178,7 +183,7 @@ func Test(t *testing.T) {
t.Run("device", func(t *testing.T) {
query := make(url.Values)
query.Set("limit", "200")
devices, _, err := GetDeviceDetails(context.Background(), http.DefaultClient, host, key, "", query, limiter, window)
devices, _, err := GetDeviceDetails(context.Background(), http.DefaultClient, host, key, "", query, limiter, window, logger)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -190,7 +195,7 @@ func Test(t *testing.T) {
t.Logf("devices: %s", b)
}
for _, d := range devices {
users, _, err := GetDeviceUsers(context.Background(), http.DefaultClient, host, key, d.ID, query, OmitCredentials, limiter, window)
users, _, err := GetDeviceUsers(context.Background(), http.DefaultClient, host, key, d.ID, query, OmitCredentials, limiter, window, logger)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -207,24 +212,24 @@ var localTests = []struct {
name string
msg string
id string
fn func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *rate.Limiter, window time.Duration) (any, http.Header, error)
fn func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) (any, http.Header, error)
mkWant func(string) (any, error)
}{
{
// Test case constructed from API-returned value with details anonymised.
name: "users",
msg: `[{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}]`,
fn: func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *rate.Limiter, window time.Duration) (any, http.Header, error) {
return GetUserDetails(context.Background(), cli, host, key, user, query, OmitNone, lim, window)
fn: func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) (any, http.Header, error) {
return GetUserDetails(context.Background(), cli, host, key, user, query, OmitNone, lim, window, log)
},
mkWant: mkWant[User],
},
{
// Test case from https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices
name: "devices",
msg: `[{"id":"devid","status":"CREATED","created":"2019-10-02T18:03:07.000Z","lastUpdated":"2019-10-02T18:03:07.000Z","profile":{"displayName":"Example Device name 1","platform":"WINDOWS","serialNumber":"XXDDRFCFRGF3M8MD6D","sid":"S-1-11-111","registered":true,"secureHardwarePresent":false,"diskEncryptionType":"ALL_INTERNAL_VOLUMES"},"resourceType":"UDDevice","resourceDisplayName":{"value":"Example Device name 1","sensitive":false},"resourceAlternateId":null,"resourceId":"guo4a5u7YAHhjXrMK0g4","_links":{"activate":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4/lifecycle/activate","hints":{"allow":["POST"]}},"self":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4","hints":{"allow":["GET","PATCH","PUT"]}},"users":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4/users","hints":{"allow":["GET"]}}}},{"id":"guo4a5u7YAHhjXrMK0g5","status":"ACTIVE","created":"2023-06-21T23:24:02.000Z","lastUpdated":"2023-06-21T23:24:02.000Z","profile":{"displayName":"Example Device name 2","platform":"ANDROID","manufacturer":"Google","model":"Pixel 6","osVersion":"13:2023-05-05","registered":true,"secureHardwarePresent":true,"diskEncryptionType":"USER"},"resourceType":"UDDevice","resourceDisplayName":{"value":"Example Device name 2","sensitive":false},"resourceAlternateId":null,"resourceId":"guo4a5u7YAHhjXrMK0g5","_links":{"activate":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5/lifecycle/activate","hints":{"allow":["POST"]}},"self":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5","hints":{"allow":["GET","PATCH","PUT"]}},"users":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5/users","hints":{"allow":["GET"]}}}}]`,
fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration) (any, http.Header, error) {
return GetDeviceDetails(context.Background(), cli, host, key, device, query, lim, window)
fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) (any, http.Header, error) {
return GetDeviceDetails(context.Background(), cli, host, key, device, query, lim, window, log)
},
mkWant: mkWant[Device],
},
Expand All @@ -233,8 +238,8 @@ var localTests = []struct {
name: "devices_users",
msg: `[{"created":"2023-08-07T21:48:27.000Z","managementStatus":"NOT_MANAGED","user":{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}}]`,
id: "devid",
fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration) (any, http.Header, error) {
return GetDeviceUsers(context.Background(), cli, host, key, device, query, OmitNone, lim, window)
fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) (any, http.Header, error) {
return GetDeviceUsers(context.Background(), cli, host, key, device, query, OmitNone, lim, window, log)
},
mkWant: mkWant[devUser],
},
Expand All @@ -254,6 +259,9 @@ func mkWant[E entity](data string) (any, error) {
}

func TestLocal(t *testing.T) {
logp.TestingSetup()
logger := logp.L()

for _, test := range localTests {
t.Run(test.name, func(t *testing.T) {
// Make a global limiter with more capacity than will be set by the mock API.
Expand Down Expand Up @@ -309,7 +317,7 @@ func TestLocal(t *testing.T) {

query := make(url.Values)
query.Set("limit", "200")
got, h, err := test.fn(context.Background(), ts.Client(), host, key, test.id, query, limiter, window)
got, h, err := test.fn(context.Background(), ts.Client(), host, key, test.id, query, limiter, window, logger)
if err != nil {
t.Fatalf("unexpected error from Get_Details: %v", err)
}
Expand Down
Loading

0 comments on commit b5696ba

Please sign in to comment.