From 44574a092e663bbdb1be1ab1f724097d71937500 Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Tue, 10 Sep 2024 15:15:34 +0300 Subject: [PATCH 1/9] Removed fetching of resource after event, relying on asset data from feed --- integrations/gcp/main.py | 41 ++++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index 1ef79c56e2..98c8dfe0dc 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -6,19 +6,17 @@ from fastapi import Request, Response from loguru import logger from port_ocean.context.ocean import ocean -from port_ocean.core.models import Entity from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE from gcp_core.errors import ( AssetHasNoProjectAncestorError, GotFeedCreatedSuccessfullyMessageError, - ResourceNotFoundError, ) from gcp_core.feed_event import get_project_name_from_ancestors, parse_asset_data from gcp_core.overrides import GCPCloudResourceSelector from gcp_core.search.iterators import iterate_per_available_project from gcp_core.search.resource_searches import ( - feed_event_to_resource, + get_single_project, list_all_topics_per_project, search_all_folders, search_all_organizations, @@ -26,6 +24,7 @@ search_all_resources, ) from gcp_core.utils import ( + EXTRA_PROJECT_FIELD, AssetTypesWithSpecialHandling, get_current_resource_config, get_credentials_json, @@ -164,32 +163,28 @@ async def feed_events_callback(request: Request) -> Response: asset_type=asset_type, asset_name=asset_name, asset_project=asset_project ): logger.info("Got Real-Time event") - resource = await feed_event_to_resource( - asset_type=asset_type, project_id=asset_project, asset_name=asset_name - ) if asset_data.get("deleted") is True: - logger.info("Registering a deleted resource") - await ocean.unregister_raw(asset_type, [resource]) + logger.info( + f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port" + ) + asset_resource_data = asset_data["priorAsset"]["resource"]["data"] + asset_resource_data[EXTRA_PROJECT_FIELD] = await get_single_project( + asset_project + ) + await ocean.unregister_raw(asset_type, [asset_resource_data]) else: - logger.info("Registering a change in the data") - await ocean.register_raw(asset_type, [resource]) + asset_resource_data = asset_data["asset"]["resource"]["data"] + asset_resource_data[EXTRA_PROJECT_FIELD] = await get_single_project( + asset_project + ) + logger.info( + f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port" + ) + await ocean.register_raw(asset_type, [asset_resource_data]) except AssetHasNoProjectAncestorError: logger.exception( f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet." ) - except ResourceNotFoundError: - logger.warning( - f"Didn't find any {asset_type} resource named: {asset_name}. Deleting ocean entity." - ) - await ocean.unregister( - [ - Entity( - blueprint=asset_type, - identifier=asset_name, - ) - ] - ) - return Response(status_code=http.HTTPStatus.NOT_FOUND) except GotFeedCreatedSuccessfullyMessageError: logger.info("Assets Feed created successfully") except Exception: From 9cb5f397041d75e933a78ee3a9080fba0c6e4f27 Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Tue, 10 Sep 2024 18:45:43 +0300 Subject: [PATCH 2/9] Added subscription specfic handling + when real time is delete will now use prior asset data --- .../gcp/gcp_core/search/resource_searches.py | 100 ++++++++++++++---- integrations/gcp/gcp_core/utils.py | 56 ++++++---- integrations/gcp/main.py | 56 ++++++---- 3 files changed, 148 insertions(+), 64 deletions(-) diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 7302fb4c0f..00877e4202 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -11,6 +11,7 @@ ProjectsAsyncClient, ) from google.pubsub_v1.services.publisher import PublisherAsyncClient +from google.pubsub_v1.services.subscriber import SubscriberAsyncClient from loguru import logger from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM from port_ocean.utils.cache import cache_iterator_result @@ -137,6 +138,44 @@ async def list_all_topics_per_project( logger.info(f"Successfully listed all topics within project {project_name}") +async def list_all_subscriptions_per_project( + project: dict[str, Any], **kwargs: Any +) -> ASYNC_GENERATOR_RESYNC_TYPE: + """ + This lists all Topics under a certain project. + The Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory. + The listing is being done via the PublisherAsyncClient, ignoring state in assets inventory + """ + async with SubscriberAsyncClient() as async_subscriber_client: + project_name = project["name"] + logger.info( + f"Searching all {AssetTypesWithSpecialHandling.SUBSCRIPTION}'s in project {project_name}" + ) + try: + async for subscriptions in paginated_query( + async_subscriber_client, + "list_subscriptions", + {"project": project_name}, + lambda response: parse_protobuf_messages(response.subscriptions), + kwargs.get("rate_limiter"), + ): + for subscription in subscriptions: + subscription[EXTRA_PROJECT_FIELD] = project + yield subscriptions + except PermissionDenied: + logger.error( + f"Service account doesn't have permissions to list subscriptions from project {project_name}" + ) + except NotFound: + logger.info( + f"Couldn't perform list_subscriptions on project {project_name} since it's deleted." + ) + else: + logger.info( + f"Successfully listed all subscriptions within project {project_name}" + ) + + @cache_iterator_result() async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE: logger.info("Searching projects") @@ -214,6 +253,19 @@ async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM: ) +async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_ITEM: + """ + Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing. + Here the SubscriberAsyncClient is used, ignoring state in assets inventory + """ + async with SubscriberAsyncClient() as async_subscriber_client: + return parse_protobuf_message( + await async_subscriber_client.get_subscription( + subscription=subscription_id, timeout=DEFAULT_REQUEST_TIMEOUT + ) + ) + + async def search_single_resource( project: dict[str, Any], asset_kind: str, asset_name: str ) -> RAW_ITEM: @@ -235,25 +287,35 @@ async def search_single_resource( async def feed_event_to_resource( - asset_type: str, asset_name: str, project_id: str + asset_type: str, asset_name: str, project_id: str, asset_data: dict[str, Any] ) -> RAW_ITEM: resource = None - match asset_type: - case AssetTypesWithSpecialHandling.TOPIC: - topic_name = asset_name.replace("//pubsub.googleapis.com/", "") - resource = await get_single_topic(project_id, topic_name) - resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) - case AssetTypesWithSpecialHandling.FOLDER: - folder_id = asset_name.replace("//cloudresourcemanager.googleapis.com/", "") - resource = await get_single_folder(folder_id) - case AssetTypesWithSpecialHandling.ORGANIZATION: - organization_id = asset_name.replace( - "//cloudresourcemanager.googleapis.com/", "" - ) - resource = await get_single_organization(organization_id) - case AssetTypesWithSpecialHandling.PROJECT: - resource = await get_single_project(project_id) - case _: - project = await get_single_project(project_id) - resource = await search_single_resource(project, asset_type, asset_name) + if asset_data.get("deleted") is True: + resource = asset_data["priorAsset"]["resource"]["data"] + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + else: + match asset_type: + case AssetTypesWithSpecialHandling.TOPIC: + topic_name = asset_name.replace("//pubsub.googleapis.com/", "") + resource = await get_single_topic(project_id, topic_name) + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + case AssetTypesWithSpecialHandling.SUBSCRIPTION: + topic_name = asset_name.replace("//pubsub.googleapis.com/", "") + resource = await get_single_subscription(project_id, topic_name) + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + case AssetTypesWithSpecialHandling.FOLDER: + folder_id = asset_name.replace( + "//cloudresourcemanager.googleapis.com/", "" + ) + resource = await get_single_folder(folder_id) + case AssetTypesWithSpecialHandling.ORGANIZATION: + organization_id = asset_name.replace( + "//cloudresourcemanager.googleapis.com/", "" + ) + resource = await get_single_organization(organization_id) + case AssetTypesWithSpecialHandling.PROJECT: + resource = await get_single_project(project_id) + case _: + resource = asset_data["asset"]["resource"]["data"] + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) return resource diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index a6537b7208..b91ff75890 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -60,6 +60,7 @@ def parse_protobuf_messages( class AssetTypesWithSpecialHandling(enum.StrEnum): TOPIC = "pubsub.googleapis.com/Topic" + SUBSCRIPTION = "pubsub.googleapis.com/Subscription" PROJECT = "cloudresourcemanager.googleapis.com/Project" ORGANIZATION = "cloudresourcemanager.googleapis.com/Organization" FOLDER = "cloudresourcemanager.googleapis.com/Folder" @@ -115,7 +116,7 @@ def get_service_account_project_id() -> str: return gcp_project_env else: raise ValueError( - f"Couldn't figure out the service account's project id. You can specify it usign the GCP_PROJECT environment variable. Error: {str(e)}" + f"Couldn't figure out the service account's project id. You can specify it using the GCP_PROJECT environment variable. Error: {str(e)}" ) except KeyError as e: raise ValueError( @@ -128,32 +129,34 @@ def get_service_account_project_id() -> str: raise ValueError("Couldn't figure out the service account's project id.") -async def resolve_request_controllers( - kind: str, +async def get_quotas_for_project( + project_id: str, kind: str ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: try: - service_account_project_id = get_service_account_project_id() - - if kind == AssetTypesWithSpecialHandling.TOPIC: - topic_rate_limiter = ( - await pubsub_administrator_per_minute_per_project.limiter( - service_account_project_id + match kind: + case ( + AssetTypesWithSpecialHandling.TOPIC + | AssetTypesWithSpecialHandling.SUBSCRIPTION + ): + topic_rate_limiter = ( + await pubsub_administrator_per_minute_per_project.limiter( + project_id + ) ) - ) - topic_semaphore = ( - await pubsub_administrator_per_minute_per_project.semaphore( - service_account_project_id + topic_semaphore = ( + await pubsub_administrator_per_minute_per_project.semaphore( + project_id + ) ) - ) - return (topic_rate_limiter, topic_semaphore) - - asset_rate_limiter = await search_all_resources_qpm_per_project.limiter( - service_account_project_id - ) - asset_semaphore = await search_all_resources_qpm_per_project.semaphore( - service_account_project_id - ) - return (asset_rate_limiter, asset_semaphore) + return (topic_rate_limiter, topic_semaphore) + case _: + asset_rate_limiter = await search_all_resources_qpm_per_project.limiter( + project_id + ) + asset_semaphore = await search_all_resources_qpm_per_project.semaphore( + project_id + ) + return (asset_rate_limiter, asset_semaphore) except Exception as e: logger.warning( f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}" @@ -165,3 +168,10 @@ async def resolve_request_controllers( await search_all_resources_qpm_per_project.default_semaphore() ) return (default_rate_limiter, default_semaphore) + + +async def resolve_request_controllers( + kind: str, +) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: + service_account_project_id = get_service_account_project_id() + return await get_quotas_for_project(service_account_project_id, kind) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index 98c8dfe0dc..f7c14e99b8 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -16,7 +16,9 @@ from gcp_core.overrides import GCPCloudResourceSelector from gcp_core.search.iterators import iterate_per_available_project from gcp_core.search.resource_searches import ( + feed_event_to_resource, get_single_project, + list_all_subscriptions_per_project, list_all_topics_per_project, search_all_folders, search_all_organizations, @@ -43,6 +45,13 @@ async def _resolve_resync_method_for_resource( asset_type=kind, rate_limiter=topic_rate_limiter, ) + case AssetTypesWithSpecialHandling.SUBSCRIPTION: + subscription_rate_limiter, _ = await resolve_request_controllers(kind) + return iterate_per_available_project( + list_all_subscriptions_per_project, + asset_type=kind, + rate_limiter=subscription_rate_limiter, + ) case AssetTypesWithSpecialHandling.FOLDER: return search_all_folders() case AssetTypesWithSpecialHandling.ORGANIZATION: @@ -107,6 +116,17 @@ async def resync_topics(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: yield batch +@ocean.on_resync(kind=AssetTypesWithSpecialHandling.SUBSCRIPTION) +async def resync_subscriptions(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: + topic_rate_limiter, _ = await resolve_request_controllers(kind) + async for batch in iterate_per_available_project( + list_all_subscriptions_per_project, + asset_type=kind, + topic_rate_limiter=topic_rate_limiter, + ): + yield batch + + @ocean.on_resync() async def resync_resources(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: if kind in iter(AssetTypesWithSpecialHandling): @@ -153,34 +173,26 @@ async def feed_events_callback(request: Request) -> Response: """ request_json = await request.json() try: + logger.info("Got Real-Time event") asset_data = await parse_asset_data(request_json["message"]["data"]) asset_type = asset_data["asset"]["assetType"] asset_name = asset_data["asset"]["name"] asset_project = get_project_name_from_ancestors( asset_data["asset"]["ancestors"] ) - with logger.contextualize( - asset_type=asset_type, asset_name=asset_name, asset_project=asset_project - ): - logger.info("Got Real-Time event") - if asset_data.get("deleted") is True: - logger.info( - f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port" - ) - asset_resource_data = asset_data["priorAsset"]["resource"]["data"] - asset_resource_data[EXTRA_PROJECT_FIELD] = await get_single_project( - asset_project - ) - await ocean.unregister_raw(asset_type, [asset_resource_data]) - else: - asset_resource_data = asset_data["asset"]["resource"]["data"] - asset_resource_data[EXTRA_PROJECT_FIELD] = await get_single_project( - asset_project - ) - logger.info( - f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port" - ) - await ocean.register_raw(asset_type, [asset_resource_data]) + asset_resource_data = await feed_event_to_resource( + asset_type, asset_name, asset_project, asset_data + ) + if asset_data.get("deleted") is True: + logger.info( + f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port" + ) + await ocean.unregister_raw(asset_type, [asset_resource_data]) + else: + logger.info( + f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port" + ) + await ocean.register_raw(asset_type, [asset_resource_data]) except AssetHasNoProjectAncestorError: logger.exception( f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet." From 9c43af2e0958709df8b8c743a706a961951701d7 Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Wed, 11 Sep 2024 09:47:34 +0300 Subject: [PATCH 3/9] bumped version --- integrations/gcp/CHANGELOG.md | 9 +++++++++ integrations/gcp/pyproject.toml | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md index 26bbdb7ce4..5f5b25d3d4 100644 --- a/integrations/gcp/CHANGELOG.md +++ b/integrations/gcp/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.55 (2024-09-11) + + +### Improvements + +- Extracted the subscription from the asset inventory and added specific fetching via the GCP's SubscriberAPI. +- Changed realtime's default non-specific behavior to rely on the asset's data in the feed. + + ## 0.1.54 (2024-09-06) diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml index faea8512bd..882b1ef4bb 100644 --- a/integrations/gcp/pyproject.toml +++ b/integrations/gcp/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcp" -version = "0.1.54" +version = "0.1.55" description = "A GCP ocean integration" authors = ["Matan Geva "] From 714e9571cf840f77b8347192db4efc6a337875c0 Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Wed, 11 Sep 2024 12:48:55 +0300 Subject: [PATCH 4/9] Added tests for list subscriptions and get subscription --- integrations/gcp/tests/gcp_core/__init__.py | 0 .../gcp/tests/gcp_core/search/__init__.py | 0 .../gcp_core/search/test_resource_searches.py | 76 +++++++++++++++++++ integrations/gcp/tests/test_sample.py | 2 - 4 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 integrations/gcp/tests/gcp_core/__init__.py create mode 100644 integrations/gcp/tests/gcp_core/search/__init__.py create mode 100644 integrations/gcp/tests/gcp_core/search/test_resource_searches.py delete mode 100644 integrations/gcp/tests/test_sample.py diff --git a/integrations/gcp/tests/gcp_core/__init__.py b/integrations/gcp/tests/gcp_core/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/gcp/tests/gcp_core/search/__init__.py b/integrations/gcp/tests/gcp_core/search/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py new file mode 100644 index 0000000000..4a5385aca3 --- /dev/null +++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py @@ -0,0 +1,76 @@ +from typing import Any +from unittest.mock import patch + +from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE +from google.pubsub_v1.types import pubsub + + +async def mock_subscription_pages( + *args: Any, **kwargs: Any +) -> ASYNC_GENERATOR_RESYNC_TYPE: + yield [{"name": "subscription_1"}, {"name": "subscription_2"}] # First page + yield [{"name": "subscription_3"}, {"name": "subscription_4"}] # Second page + + +@patch( + "port_ocean.context.ocean.PortOceanContext.integration_config", + return_value={"search_all_resources_per_minute_quota": 100}, +) +@patch("gcp_core.search.paginated_query.paginated_query", new=mock_subscription_pages) +async def test_list_all_subscriptions_per_project(integration_config: Any) -> None: + # Arrange + from gcp_core.search.resource_searches import list_all_subscriptions_per_project + + expected_subscriptions = [ + {"__project": {"name": "project_name"}, "name": "subscription_1"}, + {"__project": {"name": "project_name"}, "name": "subscription_2"}, + {"__project": {"name": "project_name"}, "name": "subscription_3"}, + {"__project": {"name": "project_name"}, "name": "subscription_4"}, + ] + mock_project = {"name": "project_name"} + + # Act + actual_subscriptions = [] + async for file in list_all_subscriptions_per_project(mock_project): + actual_subscriptions.extend(file) + + # Assert + assert len(actual_subscriptions) == 4 + assert actual_subscriptions == expected_subscriptions + + +@patch( + "port_ocean.context.ocean.PortOceanContext.integration_config", + return_value={"search_all_resources_per_minute_quota": 100}, +) +@patch( + "google.pubsub_v1.services.subscriber.SubscriberAsyncClient.get_subscription", + return_value=pubsub.Subscription({"name": "subscription_name"}), +) +async def test_get_single_subscription( + integration_config: Any, subscription_mock: Any +) -> None: + # Arrange + from gcp_core.search.resource_searches import get_single_subscription + + expected_subscription = { + "ack_deadline_seconds": 0, + "detached": False, + "enable_exactly_once_delivery": False, + "enable_message_ordering": False, + "filter": "", + "labels": {}, + "name": "subscription_name", + "retain_acked_messages": False, + "state": 0, + "topic": "", + } + mock_project = "project_name" + + # Act + actual_subscription = await get_single_subscription( + mock_project, "subscription_name" + ) + + # Assert + assert actual_subscription == expected_subscription diff --git a/integrations/gcp/tests/test_sample.py b/integrations/gcp/tests/test_sample.py deleted file mode 100644 index dc80e299c8..0000000000 --- a/integrations/gcp/tests/test_sample.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_example() -> None: - assert 1 == 1 From 5d043dacba2994f90fad97ce927eb09123b5a615 Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Wed, 11 Sep 2024 13:26:40 +0300 Subject: [PATCH 5/9] Added test for event to resource --- .../gcp_core/search/test_resource_searches.py | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py index 4a5385aca3..c07cbcf68b 100644 --- a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py +++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py @@ -3,6 +3,7 @@ from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE from google.pubsub_v1.types import pubsub +from google.cloud.resourcemanager_v3.types import Project async def mock_subscription_pages( @@ -74,3 +75,62 @@ async def test_get_single_subscription( # Assert assert actual_subscription == expected_subscription + + +@patch( + "port_ocean.context.ocean.PortOceanContext.integration_config", + return_value={"search_all_resources_per_minute_quota": 100}, +) +@patch( + "google.pubsub_v1.services.publisher.PublisherAsyncClient.get_topic", + return_value=pubsub.Topic({"name": "topic_name"}), +) +@patch( + "google.cloud.resourcemanager_v3.ProjectsAsyncClient.get_project", + return_value=Project({"name": "project_name"}), +) +async def test_feed_to_resource( + integration_config: Any, get_topic_mock: Any, get_project_mock: Any +) -> None: + # Arrange + from gcp_core.search.resource_searches import feed_event_to_resource + + mock_asset_name = "projects/project_name/topics/topic_name" + mock_asset_type = "pubsub.googleapis.com/Topic" + mock_asset_project_name = "project_name" + mock_asset_data = { + "asset": { + "name": mock_asset_name, + "asset_type": mock_asset_type, + }, + "event": "google.cloud.audit.log.v1.written", + "project": "project_name", + } + + expected_resource = { + "__project": { + "display_name": "", + "etag": "", + "labels": {}, + "name": "project_name", + "parent": "", + "project_id": "", + "state": 0, + }, + "kms_key_name": "", + "labels": {}, + "name": "topic_name", + "satisfies_pzs": False, + "state": 0, + } + + # Act + actual_resource = await feed_event_to_resource( + asset_type=mock_asset_type, + asset_name=mock_asset_name, + project_id=mock_asset_project_name, + asset_data=mock_asset_data, + ) + + # Assert + assert actual_resource == expected_resource From 30416e0f7eb192961f06f21690ab704836466e59 Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Wed, 11 Sep 2024 13:55:56 +0300 Subject: [PATCH 6/9] Added example mappings --- integrations/gcp/examples/blueprints.json | 51 +++++++++++++++++++++++ integrations/gcp/examples/mappings.yaml | 21 ++++++++++ 2 files changed, 72 insertions(+) create mode 100644 integrations/gcp/examples/blueprints.json create mode 100644 integrations/gcp/examples/mappings.yaml diff --git a/integrations/gcp/examples/blueprints.json b/integrations/gcp/examples/blueprints.json new file mode 100644 index 0000000000..8461b4c349 --- /dev/null +++ b/integrations/gcp/examples/blueprints.json @@ -0,0 +1,51 @@ +{ + "identifier": "googleCloudSubscription", + "description": "This blueprint represents a Google Cloud Subscription", + "title": "Google Cloud Subscription", + "icon": "GoogleCloud", + "schema": { + "properties": { + "pushConfig": { + "type": "object", + "title": "Push Configuration", + "description": "The ingestion configuration for this subscription" + }, + "ackDeadlineSeconds": { + "type": "number", + "title": "Ack Deadline Seconds", + "description": "The maximum time after receiving a message that the subscriber should acknowledge the message" + }, + "labels": { + "type": "object", + "title": "Labels", + "description": "A set of key/value label pairs to assign to this subscription" + }, + "retainAckedMessages": { + "type": "boolean", + "title": "Retain Acked Messages", + "description": "Indicates whether to retain acknowledged messages" + }, + "messageRetentionDuration": { + "type": "boolean", + "title": "Message Retention Duration", + "description": "How long to retain unacknowledged messages in the subscription's backlog" + }, + "filter": { + "type": "string", + "title": "Filter", + "description": "A filter expression that determines which messages should be delivered to the subscriber" + } + }, + "required": [] + }, + "mirrorProperties": {}, + "calculationProperties": {}, + "relations": { + "project": { + "target": "gcpProject", + "title": "Project", + "required": true, + "many": false + } + } +} diff --git a/integrations/gcp/examples/mappings.yaml b/integrations/gcp/examples/mappings.yaml new file mode 100644 index 0000000000..c1d4ea396f --- /dev/null +++ b/integrations/gcp/examples/mappings.yaml @@ -0,0 +1,21 @@ +createMissingRelatedEntities: true +deleteDependentEntities: true +resources: + - kind: pubsub.googleapis.com/Subscription + selector: + query: "true" + port: + entity: + mappings: + identifier: .name + title: '.name | split("/") | last' + blueprint: '"googleCloudSubscription"' + properties: + pushConfig: .pushConfig + ackDeadlineSeconds: .ackDeadlineSeconds + labels: .labels + retainAckedMessages: .retainAckedMessages + messageRetentionDuration: .messageRetentionDuration + filter: .filter + relations: + project: .__project.name From 224ae81b8a24978670724d9e81de2f7a5f069404 Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Thu, 12 Sep 2024 16:15:39 +0300 Subject: [PATCH 7/9] Fixed log --- integrations/gcp/main.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index f7c14e99b8..f427f4952a 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -173,13 +173,15 @@ async def feed_events_callback(request: Request) -> Response: """ request_json = await request.json() try: - logger.info("Got Real-Time event") asset_data = await parse_asset_data(request_json["message"]["data"]) asset_type = asset_data["asset"]["assetType"] asset_name = asset_data["asset"]["name"] asset_project = get_project_name_from_ancestors( asset_data["asset"]["ancestors"] ) + logger.info( + f"Got Real-Time event for kind: {asset_type} with name: {asset_name} from project: {asset_project}" + ) asset_resource_data = await feed_event_to_resource( asset_type, asset_name, asset_project, asset_data ) From 64455cca6ae2d6dc34353c6df1ed1f3840b65c2a Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Thu, 12 Sep 2024 16:20:32 +0300 Subject: [PATCH 8/9] removed unused imports --- integrations/gcp/main.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index f427f4952a..93da63f57e 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -17,7 +17,6 @@ from gcp_core.search.iterators import iterate_per_available_project from gcp_core.search.resource_searches import ( feed_event_to_resource, - get_single_project, list_all_subscriptions_per_project, list_all_topics_per_project, search_all_folders, @@ -26,7 +25,6 @@ search_all_resources, ) from gcp_core.utils import ( - EXTRA_PROJECT_FIELD, AssetTypesWithSpecialHandling, get_current_resource_config, get_credentials_json, From fb318ae18a7081115c16f90b267cf589501381d7 Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Sun, 15 Sep 2024 15:02:39 +0300 Subject: [PATCH 9/9] Patched google clients --- .../gcp_core/search/test_resource_searches.py | 56 +++++++++++++------ 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py index c07cbcf68b..04a90bb29e 100644 --- a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py +++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py @@ -1,5 +1,5 @@ from typing import Any -from unittest.mock import patch +from unittest.mock import AsyncMock, patch from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE from google.pubsub_v1.types import pubsub @@ -18,7 +18,8 @@ async def mock_subscription_pages( return_value={"search_all_resources_per_minute_quota": 100}, ) @patch("gcp_core.search.paginated_query.paginated_query", new=mock_subscription_pages) -async def test_list_all_subscriptions_per_project(integration_config: Any) -> None: +@patch("google.pubsub_v1.services.subscriber.SubscriberAsyncClient", new=AsyncMock) +async def test_list_all_subscriptions_per_project(integration_config_mock: Any) -> None: # Arrange from gcp_core.search.resource_searches import list_all_subscriptions_per_project @@ -44,14 +45,20 @@ async def test_list_all_subscriptions_per_project(integration_config: Any) -> No "port_ocean.context.ocean.PortOceanContext.integration_config", return_value={"search_all_resources_per_minute_quota": 100}, ) -@patch( - "google.pubsub_v1.services.subscriber.SubscriberAsyncClient.get_subscription", - return_value=pubsub.Subscription({"name": "subscription_name"}), -) async def test_get_single_subscription( - integration_config: Any, subscription_mock: Any + integration_config: Any, monkeypatch: Any ) -> None: # Arrange + subscriber_async_client_mock = AsyncMock + monkeypatch.setattr( + "google.pubsub_v1.services.subscriber.SubscriberAsyncClient", + subscriber_async_client_mock, + ) + subscriber_async_client_mock.get_subscription = AsyncMock() + subscriber_async_client_mock.get_subscription.return_value = pubsub.Subscription( + {"name": "subscription_name"} + ) + from gcp_core.search.resource_searches import get_single_subscription expected_subscription = { @@ -81,18 +88,31 @@ async def test_get_single_subscription( "port_ocean.context.ocean.PortOceanContext.integration_config", return_value={"search_all_resources_per_minute_quota": 100}, ) -@patch( - "google.pubsub_v1.services.publisher.PublisherAsyncClient.get_topic", - return_value=pubsub.Topic({"name": "topic_name"}), -) -@patch( - "google.cloud.resourcemanager_v3.ProjectsAsyncClient.get_project", - return_value=Project({"name": "project_name"}), -) -async def test_feed_to_resource( - integration_config: Any, get_topic_mock: Any, get_project_mock: Any -) -> None: +async def test_feed_to_resource(integration_config: Any, monkeypatch: Any) -> None: # Arrange + + ## Mock project client + projects_async_client_mock = AsyncMock + monkeypatch.setattr( + "google.cloud.resourcemanager_v3.ProjectsAsyncClient", + projects_async_client_mock, + ) + projects_async_client_mock.get_project = AsyncMock() + projects_async_client_mock.get_project.return_value = Project( + {"name": "project_name"} + ) + + ## Mock publisher client + publisher_async_client_mock = AsyncMock + monkeypatch.setattr( + "google.pubsub_v1.services.publisher.PublisherAsyncClient", + publisher_async_client_mock, + ) + publisher_async_client_mock.get_topic = AsyncMock() + publisher_async_client_mock.get_topic.return_value = pubsub.Topic( + {"name": "topic_name"} + ) + from gcp_core.search.resource_searches import feed_event_to_resource mock_asset_name = "projects/project_name/topics/topic_name"