Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Okta: add resource-sets (incremental supported) #14700

Merged
merged 13 commits into from
Aug 12, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@
- name: Okta
sourceDefinitionId: 1d4fdb25-64fc-4569-92da-fcdca79a8372
dockerRepository: airbyte/source-okta
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
documentationUrl: https://docs.airbyte.io/integrations/sources/okta
icon: okta.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6025,7 +6025,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-okta:0.1.12"
- dockerImage: "airbyte/source-okta:0.1.13"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/okta"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,7 @@ def test_state_with_abnormally_large_values(self, connector_config, configured_c
records = filter_output(output, type_=Type.RECORD)
states = filter_output(output, type_=Type.STATE)

assert not records, "The sync should produce no records when run with the state with abnormally large values"
assert (
not records
), f"The sync should produce no records when run with the state with abnormally large values {records[0].record.stream}"
assert states, "The sync should produce at least one STATE message"
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-okta/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.name=airbyte/source-okta
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"users": { "lastUpdated": "3021-09-08T07:04:28.000Z" },
"groups": { "lastUpdated": "3021-09-08T07:04:28.000Z" },
"group_members": { "id": "00uzzzzzzzzzzzzzzzzz" },
"logs": { "published": "3021-09-08T07:04:28.000Z" }
"logs": { "published": "3021-09-08T07:04:28.000Z" },
"resource_sets": { "id": "iamzzzzzzzzzzzzzzzzz" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@
},
{
"stream": {
"name": "logs",
"name": "group_members",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["published"],
"primary_key": [["uuid"]]
"cursor_field": ["id"],
"primary_key": [["id"]]
},
{
"stream": {
"name": "group_members",
"name": "resource_sets",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"base_url": "invalid url",
"token": "invalid token"
"domain": "myorg",
"start_date": "2022-07-22T00:00:00Z",
"credentials": {
"auth_type": "api_token",
"api_token": "00uItIsFake_DoNotUseTheTokenEoxoRw_2"
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"base_url": "https://myorg.okta.com",
"token": "xyz123foo325a.fbar"
"domain": "myorg",
"start_date": "2022-07-22T00:00:00Z",
"credentials": {
"auth_type": "api_token",
"api_token": "00uItIsFake_DoNotUseTheTokenEoxoRw_2"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"properties": {
"id": {
"type": "string"
},
"label": {
"type": "string"
},
"description": {
"type": "string"
},
"_links": {
"properties": {
"assignee": {
"properties": {
"self": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"href": {
"type": ["null", "string"]
}
},
"description": "gets this Resource Set"
},
"resources": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"href": {
"type": ["null", "string"]
}
},
"description": "gets a paginable list of resources included in this set"
},
"bindings": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"href": {
"type": ["null", "string"]
}
},
"description": "gets a paginable list of admin Role Bindings assigned to this set"
},
"next": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"href": {
"type": ["null", "string"]
}
},
"description": "the link for the next page, 'after' is the query string, the cursor field is id"
}
}
}
},
"type": ["object", "null"]
}
},
"type": "object"
}
64 changes: 47 additions & 17 deletions airbyte-integrations/connectors/source-okta/source_okta/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
if "self" in links:
if links["self"]["url"] == next_url:
return None

return query_params

return None
Expand Down Expand Up @@ -79,17 +78,19 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:


class IncrementalOktaStream(OktaStream, ABC):
min_id = ""

@property
@abstractmethod
def cursor_field(self) -> str:
pass

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
lowest_date = str(pendulum.datetime.min)
min_cursor_value = self.min_id if self.min_id else str(pendulum.datetime.min)
return {
self.cursor_field: max(
latest_record.get(self.cursor_field, lowest_date),
current_stream_state.get(self.cursor_field, lowest_date),
latest_record.get(self.cursor_field, min_cursor_value),
current_stream_state.get(self.cursor_field, min_cursor_value),
)
}

Expand Down Expand Up @@ -117,8 +118,8 @@ def path(self, **kwargs) -> str:
class GroupMembers(IncrementalOktaStream):
cursor_field = "id"
primary_key = "id"
min_user_id = "00u00000000000000000"
use_cache = True
min_id = "00u00000000000000000"

def stream_slices(self, **kwargs):
group_stream = Groups(authenticator=self.authenticator, url_base=self.url_base, start_date=self.start_date)
Expand All @@ -135,22 +136,11 @@ def request_params(
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
# Filter param should be ignored SCIM filter expressions can't use the published
# attribute since it may conflict with the logic of the since, after, and until query params.
# Docs: https://developer.okta.com/docs/reference/api/system-log/#expression-filter
params = super(IncrementalOktaStream, self).request_params(stream_state, stream_slice, next_page_token)
latest_entry = stream_state.get(self.cursor_field) if stream_state else self.min_user_id
latest_entry = stream_state.get(self.cursor_field) if stream_state else self.min_id
params["after"] = latest_entry
return params

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
return {
self.cursor_field: max(
latest_record.get(self.cursor_field, self.min_user_id),
current_stream_state.get(self.cursor_field, self.min_user_id),
)
}


class GroupRoleAssignments(OktaStream):
primary_key = "id"
Expand Down Expand Up @@ -250,6 +240,45 @@ def request_params(
return params


class ResourceSets(IncrementalOktaStream):
cursor_field = "id"
primary_key = "id"
min_id = "iam00000000000000000"

def path(self, **kwargs) -> str:
return "iam/resource-sets"

def parse_response(
self,
response: requests.Response,
**kwargs,
) -> Iterable[Mapping]:
yield from response.json()["resource-sets"]

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
# We can't follow the default pagination that takes query from header.links
# Instead, the payload contains _links that offers the next link
body = response.json()
if "_links" in body and "next" in body["_links"] and "href" in body["_links"]["next"]:
next_url = body["_links"]["next"]["href"]
parsed_link = parse.urlparse(next_url)
return dict(parse.parse_qsl(parsed_link.query))

return None

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
latest_entry = stream_state.get(self.cursor_field)
if latest_entry:
params["after"] = latest_entry
return params


class CustomRoles(OktaStream):
# https://developer.okta.com/docs/reference/api/roles/#list-roles
primary_key = "id"
Expand Down Expand Up @@ -337,4 +366,5 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
UserRoleAssignments(**initialization_params),
GroupRoleAssignments(**initialization_params),
Permissions(**initialization_params),
ResourceSets(**initialization_params),
]
20 changes: 20 additions & 0 deletions airbyte-integrations/connectors/source-okta/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,26 @@ def logs_instance():
}


@pytest.fixture()
def resource_set_instance(api_url):
"""
Resource set object instance
"""
_id = "iam5xyzmibarA6Afoo7"
return {
"id": _id,
"label": "all users",
"description": "all users",
"created": "2022-07-09T20:58:41.000Z",
"lastUpdated": "2022-07-09T20:58:41.000Z",
"_links": {
"bindings": {"href": f"{url_base}/iam/resource-sets/{_id}/bindings"},
"self": {"href": f"{url_base}/iam/resource-sets/{_id}"},
"resources": {"href": f"{url_base}/iam/resource-sets/{_id}/resources"},
},
}


@pytest.fixture()
def latest_record_instance(url_base, api_url):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Logs,
OktaStream,
Permissions,
ResourceSets,
UserRoleAssignments,
Users,
)
Expand Down Expand Up @@ -126,7 +127,6 @@ def cursor_field(self) -> str:

stream = TestIncrementalOktaStream(url_base=url_base, start_date=start_date)
stream._cursor_field = "lastUpdated"

current_stream_state = {"lastUpdated": "2021-04-21T21:03:55.000Z"}
update_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record_instance)
expected_result = {"lastUpdated": "2022-07-18T07:58:11.000Z"}
Expand Down Expand Up @@ -360,3 +360,40 @@ def test_user_role_assignments_slice_stream(
stream = UserRoleAssignments(url_base=url_base, start_date=start_date)
requests_mock.get(f"{api_url}/users?limit=200", json=[users_instance])
assert list(stream.stream_slices()) == [{"user_id": "test_user_id"}]


class TestStreamResourceSets:
def test_resource_sets(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date):
stream = ResourceSets(url_base=url_base, start_date=start_date)
record = {"resource-sets": [resource_set_instance]}
requests_mock.get(f"{api_url}/iam/resource-sets", json=record)
inputs = {"sync_mode": SyncMode.incremental}
assert list(stream.read_records(**inputs)) == record["resource-sets"]

def test_resource_sets_parse_response(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date):
stream = ResourceSets(url_base=url_base, start_date=start_date)
record = {"resource-sets": [resource_set_instance]}
requests_mock.get(f"{api_url}", json=record)
assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [resource_set_instance]

def test_resource_sets_next_page_token(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date):
stream = ResourceSets(url_base=url_base, start_date=start_date)
cursor = "iam5cursorFybecursor"
response = MagicMock(requests.Response)
next_link = f"{url_base}/iam/resource-sets?after={cursor}"
response.json = MagicMock(return_value={"_links": {"next": {"href": next_link}}, "resource-sets": [resource_set_instance]})
inputs = {"response": response}
result = stream.next_page_token(**inputs)
assert result == {"after": cursor}

response.json = MagicMock(return_value={"resource-sets": [resource_set_instance]})
inputs = {"response": response}
result = stream.next_page_token(**inputs)
assert result is None

def test_resource_sets_request_params(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date):
stream = ResourceSets(url_base=url_base, start_date=start_date)
cursor = "iam5cursorFybecursor"
inputs = {"stream_slice": None, "stream_state": {"id": cursor}, "next_page_token": None}
expected_params = {"limit": 200, "after": "iam5cursorFybecursor", "filter": 'id gt "iam5cursorFybecursor"'}
assert stream.request_params(**inputs) == expected_params
2 changes: 2 additions & 0 deletions docs/integrations/sources/okta.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The Okta source connector supports the following [sync modes](https://docs.airby
- [System Log](https://developer.okta.com/docs/reference/api/system-log/#get-started)
- [Custom Roles](https://developer.okta.com/docs/reference/api/roles/#list-roles)
- [Permissions](https://developer.okta.com/docs/reference/api/roles/#list-permissions)
- [Resource Sets](https://developer.okta.com/docs/reference/api/roles/#list-resource-sets)

## Performance considerations

Expand All @@ -78,6 +79,7 @@ The connector is restricted by normal Okta [requests limitation](https://develop

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------|
| 0.1.13 | 2022-08-12 | [14700](https://github.com/airbytehq/airbyte/pull/14700) | Add resource sets |
| 0.1.12 | 2022-08-05 | [15050](https://github.com/airbytehq/airbyte/pull/15050) | Add parameter `start_date` for Logs stream |
| 0.1.11 | 2022-08-03 | [14739](https://github.com/airbytehq/airbyte/pull/14739) | Add permissions for custom roles |
| 0.1.10 | 2022-08-01 | [15179](https://github.com/airbytehq/airbyte/pull/15179) | Fix broken schemas for all streams |
Expand Down