diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f474624269c5..184ce8aa9927 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -628,7 +628,7 @@ - name: Okta sourceDefinitionId: 1d4fdb25-64fc-4569-92da-fcdca79a8372 dockerRepository: airbyte/source-okta - dockerImageTag: 0.1.12 + dockerImageTag: 0.1.13 documentationUrl: https://docs.airbyte.io/integrations/sources/okta icon: okta.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index efa51da1adb7..808f5bab1a79 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6025,7 +6025,7 @@ - - "client_secret" oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-okta:0.1.12" +- dockerImage: "airbyte/source-okta:0.1.13" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/okta" connectionSpecification: diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py index 3745f1ce4db1..209c979c8a24 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py @@ -210,5 +210,7 @@ def test_state_with_abnormally_large_values(self, connector_config, configured_c records = filter_output(output, type_=Type.RECORD) states = filter_output(output, type_=Type.STATE) - assert not records, "The sync should produce no records when run with the state with abnormally large values" + assert ( + not records + ), f"The sync should produce no records when run with the state with abnormally large values {records[0].record.stream}" assert states, "The sync should produce at least one STATE message" diff --git a/airbyte-integrations/connectors/source-okta/Dockerfile b/airbyte-integrations/connectors/source-okta/Dockerfile index 6d93118fa396..d1fb62853e03 100644 --- a/airbyte-integrations/connectors/source-okta/Dockerfile +++ b/airbyte-integrations/connectors/source-okta/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.12 +LABEL io.airbyte.version=0.1.13 LABEL io.airbyte.name=airbyte/source-okta diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json index 18615c665427..99a265b8c8b3 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/abnormal_state.json @@ -2,5 +2,6 @@ "users": { "lastUpdated": "3021-09-08T07:04:28.000Z" }, "groups": { "lastUpdated": "3021-09-08T07:04:28.000Z" }, "group_members": { "id": "00uzzzzzzzzzzzzzzzzz" }, - "logs": { "published": "3021-09-08T07:04:28.000Z" } + "logs": { "published": "3021-09-08T07:04:28.000Z" }, + "resource_sets": { "id": "iamzzzzzzzzzzzzzzzzz" } } diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json index 14c4218507f8..286cb39956b4 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/configured_catalog.json @@ -24,18 +24,18 @@ }, { "stream": { - "name": "logs", + "name": "group_members", "json_schema": {}, "supported_sync_modes": ["full_refresh", "incremental"] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite", - "cursor_field": ["published"], - "primary_key": [["uuid"]] + "cursor_field": ["id"], + "primary_key": [["id"]] }, { "stream": { - "name": "group_members", + "name": "resource_sets", "json_schema": {}, "supported_sync_modes": ["full_refresh", "incremental"] }, diff --git a/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json index 6322da62450f..2602e8825d85 100644 --- a/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-okta/integration_tests/invalid_config.json @@ -1,4 +1,8 @@ { - "base_url": "invalid url", - "token": "invalid token" + "domain": "myorg", + "start_date": "2022-07-22T00:00:00Z", + "credentials": { + "auth_type": "api_token", + "api_token": "00uItIsFake_DoNotUseTheTokenEoxoRw_2" + } } diff --git a/airbyte-integrations/connectors/source-okta/sample_files/config.json b/airbyte-integrations/connectors/source-okta/sample_files/config.json index 454bc28225e8..2602e8825d85 100644 --- a/airbyte-integrations/connectors/source-okta/sample_files/config.json +++ b/airbyte-integrations/connectors/source-okta/sample_files/config.json @@ -1,4 +1,8 @@ { - "base_url": "https://myorg.okta.com", - "token": "xyz123foo325a.fbar" + "domain": "myorg", + "start_date": "2022-07-22T00:00:00Z", + "credentials": { + "auth_type": "api_token", + "api_token": "00uItIsFake_DoNotUseTheTokenEoxoRw_2" + } } diff --git a/airbyte-integrations/connectors/source-okta/source_okta/schemas/resource_sets.json b/airbyte-integrations/connectors/source-okta/source_okta/schemas/resource_sets.json new file mode 100644 index 000000000000..0741df0fedf9 --- /dev/null +++ b/airbyte-integrations/connectors/source-okta/source_okta/schemas/resource_sets.json @@ -0,0 +1,63 @@ +{ + "properties": { + "id": { + "type": "string" + }, + "label": { + "type": "string" + }, + "description": { + "type": "string" + }, + "_links": { + "properties": { + "assignee": { + "properties": { + "self": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "href": { + "type": ["null", "string"] + } + }, + "description": "gets this Resource Set" + }, + "resources": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "href": { + "type": ["null", "string"] + } + }, + "description": "gets a paginable list of resources included in this set" + }, + "bindings": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "href": { + "type": ["null", "string"] + } + }, + "description": "gets a paginable list of admin Role Bindings assigned to this set" + }, + "next": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "href": { + "type": ["null", "string"] + } + }, + "description": "the link for the next page, 'after' is the query string, the cursor field is id" + } + } + } + }, + "type": ["object", "null"] + } + }, + "type": "object" +} diff --git a/airbyte-integrations/connectors/source-okta/source_okta/source.py b/airbyte-integrations/connectors/source-okta/source_okta/source.py index ad11d844930d..380e55d10a3a 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/source.py +++ b/airbyte-integrations/connectors/source-okta/source_okta/source.py @@ -45,7 +45,6 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, if "self" in links: if links["self"]["url"] == next_url: return None - return query_params return None @@ -79,17 +78,19 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: class IncrementalOktaStream(OktaStream, ABC): + min_id = "" + @property @abstractmethod def cursor_field(self) -> str: pass def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - lowest_date = str(pendulum.datetime.min) + min_cursor_value = self.min_id if self.min_id else str(pendulum.datetime.min) return { self.cursor_field: max( - latest_record.get(self.cursor_field, lowest_date), - current_stream_state.get(self.cursor_field, lowest_date), + latest_record.get(self.cursor_field, min_cursor_value), + current_stream_state.get(self.cursor_field, min_cursor_value), ) } @@ -117,8 +118,8 @@ def path(self, **kwargs) -> str: class GroupMembers(IncrementalOktaStream): cursor_field = "id" primary_key = "id" - min_user_id = "00u00000000000000000" use_cache = True + min_id = "00u00000000000000000" def stream_slices(self, **kwargs): group_stream = Groups(authenticator=self.authenticator, url_base=self.url_base, start_date=self.start_date) @@ -135,22 +136,11 @@ def request_params( stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: - # Filter param should be ignored SCIM filter expressions can't use the published - # attribute since it may conflict with the logic of the since, after, and until query params. - # Docs: https://developer.okta.com/docs/reference/api/system-log/#expression-filter params = super(IncrementalOktaStream, self).request_params(stream_state, stream_slice, next_page_token) - latest_entry = stream_state.get(self.cursor_field) if stream_state else self.min_user_id + latest_entry = stream_state.get(self.cursor_field) if stream_state else self.min_id params["after"] = latest_entry return params - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - return { - self.cursor_field: max( - latest_record.get(self.cursor_field, self.min_user_id), - current_stream_state.get(self.cursor_field, self.min_user_id), - ) - } - class GroupRoleAssignments(OktaStream): primary_key = "id" @@ -250,6 +240,45 @@ def request_params( return params +class ResourceSets(IncrementalOktaStream): + cursor_field = "id" + primary_key = "id" + min_id = "iam00000000000000000" + + def path(self, **kwargs) -> str: + return "iam/resource-sets" + + def parse_response( + self, + response: requests.Response, + **kwargs, + ) -> Iterable[Mapping]: + yield from response.json()["resource-sets"] + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + # We can't follow the default pagination that takes query from header.links + # Instead, the payload contains _links that offers the next link + body = response.json() + if "_links" in body and "next" in body["_links"] and "href" in body["_links"]["next"]: + next_url = body["_links"]["next"]["href"] + parsed_link = parse.urlparse(next_url) + return dict(parse.parse_qsl(parsed_link.query)) + + return None + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state, stream_slice, next_page_token) + latest_entry = stream_state.get(self.cursor_field) + if latest_entry: + params["after"] = latest_entry + return params + + class CustomRoles(OktaStream): # https://developer.okta.com/docs/reference/api/roles/#list-roles primary_key = "id" @@ -337,4 +366,5 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: UserRoleAssignments(**initialization_params), GroupRoleAssignments(**initialization_params), Permissions(**initialization_params), + ResourceSets(**initialization_params), ] diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py b/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py index f3eeb4f5a13d..db6928e7e766 100644 --- a/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-okta/unit_tests/conftest.py @@ -378,6 +378,26 @@ def logs_instance(): } +@pytest.fixture() +def resource_set_instance(api_url): + """ + Resource set object instance + """ + _id = "iam5xyzmibarA6Afoo7" + return { + "id": _id, + "label": "all users", + "description": "all users", + "created": "2022-07-09T20:58:41.000Z", + "lastUpdated": "2022-07-09T20:58:41.000Z", + "_links": { + "bindings": {"href": f"{url_base}/iam/resource-sets/{_id}/bindings"}, + "self": {"href": f"{url_base}/iam/resource-sets/{_id}"}, + "resources": {"href": f"{url_base}/iam/resource-sets/{_id}/resources"}, + }, + } + + @pytest.fixture() def latest_record_instance(url_base, api_url): """ diff --git a/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py index 48e90adeb6a3..ed5b2bb6948a 100644 --- a/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-okta/unit_tests/test_streams.py @@ -20,6 +20,7 @@ Logs, OktaStream, Permissions, + ResourceSets, UserRoleAssignments, Users, ) @@ -126,7 +127,6 @@ def cursor_field(self) -> str: stream = TestIncrementalOktaStream(url_base=url_base, start_date=start_date) stream._cursor_field = "lastUpdated" - current_stream_state = {"lastUpdated": "2021-04-21T21:03:55.000Z"} update_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record_instance) expected_result = {"lastUpdated": "2022-07-18T07:58:11.000Z"} @@ -360,3 +360,40 @@ def test_user_role_assignments_slice_stream( stream = UserRoleAssignments(url_base=url_base, start_date=start_date) requests_mock.get(f"{api_url}/users?limit=200", json=[users_instance]) assert list(stream.stream_slices()) == [{"user_id": "test_user_id"}] + + +class TestStreamResourceSets: + def test_resource_sets(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date): + stream = ResourceSets(url_base=url_base, start_date=start_date) + record = {"resource-sets": [resource_set_instance]} + requests_mock.get(f"{api_url}/iam/resource-sets", json=record) + inputs = {"sync_mode": SyncMode.incremental} + assert list(stream.read_records(**inputs)) == record["resource-sets"] + + def test_resource_sets_parse_response(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date): + stream = ResourceSets(url_base=url_base, start_date=start_date) + record = {"resource-sets": [resource_set_instance]} + requests_mock.get(f"{api_url}", json=record) + assert list(stream.parse_response(response=requests.get(f"{api_url}"))) == [resource_set_instance] + + def test_resource_sets_next_page_token(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date): + stream = ResourceSets(url_base=url_base, start_date=start_date) + cursor = "iam5cursorFybecursor" + response = MagicMock(requests.Response) + next_link = f"{url_base}/iam/resource-sets?after={cursor}" + response.json = MagicMock(return_value={"_links": {"next": {"href": next_link}}, "resource-sets": [resource_set_instance]}) + inputs = {"response": response} + result = stream.next_page_token(**inputs) + assert result == {"after": cursor} + + response.json = MagicMock(return_value={"resource-sets": [resource_set_instance]}) + inputs = {"response": response} + result = stream.next_page_token(**inputs) + assert result is None + + def test_resource_sets_request_params(self, requests_mock, patch_base_class, resource_set_instance, url_base, api_url, start_date): + stream = ResourceSets(url_base=url_base, start_date=start_date) + cursor = "iam5cursorFybecursor" + inputs = {"stream_slice": None, "stream_state": {"id": cursor}, "next_page_token": None} + expected_params = {"limit": 200, "after": "iam5cursorFybecursor", "filter": 'id gt "iam5cursorFybecursor"'} + assert stream.request_params(**inputs) == expected_params diff --git a/docs/integrations/sources/okta.md b/docs/integrations/sources/okta.md index 662ab25c6f73..4edeb60892b3 100644 --- a/docs/integrations/sources/okta.md +++ b/docs/integrations/sources/okta.md @@ -69,6 +69,7 @@ The Okta source connector supports the following [sync modes](https://docs.airby - [System Log](https://developer.okta.com/docs/reference/api/system-log/#get-started) - [Custom Roles](https://developer.okta.com/docs/reference/api/roles/#list-roles) - [Permissions](https://developer.okta.com/docs/reference/api/roles/#list-permissions) +- [Resource Sets](https://developer.okta.com/docs/reference/api/roles/#list-resource-sets) ## Performance considerations @@ -78,6 +79,7 @@ The connector is restricted by normal Okta [requests limitation](https://develop | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------| +| 0.1.13 | 2022-08-12 | [14700](https://github.com/airbytehq/airbyte/pull/14700) | Add resource sets | | 0.1.12 | 2022-08-05 | [15050](https://github.com/airbytehq/airbyte/pull/15050) | Add parameter `start_date` for Logs stream | | 0.1.11 | 2022-08-03 | [14739](https://github.com/airbytehq/airbyte/pull/14739) | Add permissions for custom roles | | 0.1.10 | 2022-08-01 | [15179](https://github.com/airbytehq/airbyte/pull/15179) | Fix broken schemas for all streams |