Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GoogleCloud] Real time improvements + Susbscription as specific resource #1010

Merged
merged 11 commits into from
Sep 15, 2024
9 changes: 9 additions & 0 deletions integrations/gcp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.56 (2024-09-15)


### Improvements

- Extracted the subscription from the asset inventory and added specific fetching via the GCP's SubscriberAPI.
- Changed realtime's default non-specific behavior to rely on the asset's data in the feed.


## 0.1.55 (2024-09-12)


Expand Down
51 changes: 51 additions & 0 deletions integrations/gcp/examples/blueprints.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"identifier": "googleCloudSubscription",
"description": "This blueprint represents a Google Cloud Subscription",
"title": "Google Cloud Subscription",
"icon": "GoogleCloud",
"schema": {
"properties": {
"pushConfig": {
"type": "object",
"title": "Push Configuration",
"description": "The ingestion configuration for this subscription"
},
"ackDeadlineSeconds": {
"type": "number",
"title": "Ack Deadline Seconds",
"description": "The maximum time after receiving a message that the subscriber should acknowledge the message"
},
"labels": {
"type": "object",
"title": "Labels",
"description": "A set of key/value label pairs to assign to this subscription"
},
"retainAckedMessages": {
"type": "boolean",
"title": "Retain Acked Messages",
"description": "Indicates whether to retain acknowledged messages"
},
"messageRetentionDuration": {
"type": "boolean",
"title": "Message Retention Duration",
"description": "How long to retain unacknowledged messages in the subscription's backlog"
},
"filter": {
"type": "string",
"title": "Filter",
"description": "A filter expression that determines which messages should be delivered to the subscriber"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"relations": {
"project": {
"target": "gcpProject",
"title": "Project",
"required": true,
"many": false
}
}
}
21 changes: 21 additions & 0 deletions integrations/gcp/examples/mappings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
createMissingRelatedEntities: true
deleteDependentEntities: true
resources:
- kind: pubsub.googleapis.com/Subscription
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: '.name | split("/") | last'
blueprint: '"googleCloudSubscription"'
properties:
pushConfig: .pushConfig
ackDeadlineSeconds: .ackDeadlineSeconds
labels: .labels
retainAckedMessages: .retainAckedMessages
messageRetentionDuration: .messageRetentionDuration
filter: .filter
relations:
project: .__project.name
100 changes: 81 additions & 19 deletions integrations/gcp/gcp_core/search/resource_searches.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ProjectsAsyncClient,
)
from google.pubsub_v1.services.publisher import PublisherAsyncClient
from google.pubsub_v1.services.subscriber import SubscriberAsyncClient
from loguru import logger
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM
from port_ocean.utils.cache import cache_iterator_result
Expand Down Expand Up @@ -137,6 +138,44 @@ async def list_all_topics_per_project(
logger.info(f"Successfully listed all topics within project {project_name}")


async def list_all_subscriptions_per_project(
project: dict[str, Any], **kwargs: Any
) -> ASYNC_GENERATOR_RESYNC_TYPE:
"""
This lists all Topics under a certain project.
The Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory.
The listing is being done via the PublisherAsyncClient, ignoring state in assets inventory
"""
async with SubscriberAsyncClient() as async_subscriber_client:
project_name = project["name"]
logger.info(
f"Searching all {AssetTypesWithSpecialHandling.SUBSCRIPTION}'s in project {project_name}"
)
try:
async for subscriptions in paginated_query(
async_subscriber_client,
"list_subscriptions",
{"project": project_name},
lambda response: parse_protobuf_messages(response.subscriptions),
kwargs.get("rate_limiter"),
):
for subscription in subscriptions:
subscription[EXTRA_PROJECT_FIELD] = project
yield subscriptions
except PermissionDenied:
logger.error(
f"Service account doesn't have permissions to list subscriptions from project {project_name}"
)
except NotFound:
logger.info(
f"Couldn't perform list_subscriptions on project {project_name} since it's deleted."
)
else:
logger.info(
f"Successfully listed all subscriptions within project {project_name}"
)


@cache_iterator_result()
async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE:
logger.info("Searching projects")
Expand Down Expand Up @@ -214,6 +253,19 @@ async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM:
)


async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_ITEM:
"""
Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
Here the SubscriberAsyncClient is used, ignoring state in assets inventory
"""
async with SubscriberAsyncClient() as async_subscriber_client:
return parse_protobuf_message(
await async_subscriber_client.get_subscription(
subscription=subscription_id, timeout=DEFAULT_REQUEST_TIMEOUT
)
)


async def search_single_resource(
project: dict[str, Any], asset_kind: str, asset_name: str
) -> RAW_ITEM:
Expand All @@ -235,25 +287,35 @@ async def search_single_resource(


async def feed_event_to_resource(
asset_type: str, asset_name: str, project_id: str
asset_type: str, asset_name: str, project_id: str, asset_data: dict[str, Any]
) -> RAW_ITEM:
resource = None
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_topic(project_id, topic_name)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace("//cloudresourcemanager.googleapis.com/", "")
resource = await get_single_folder(folder_id)
case AssetTypesWithSpecialHandling.ORGANIZATION:
organization_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
resource = await get_single_organization(organization_id)
case AssetTypesWithSpecialHandling.PROJECT:
resource = await get_single_project(project_id)
case _:
project = await get_single_project(project_id)
resource = await search_single_resource(project, asset_type, asset_name)
if asset_data.get("deleted") is True:
resource = asset_data["priorAsset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
else:
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_topic(project_id, topic_name)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
case AssetTypesWithSpecialHandling.SUBSCRIPTION:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_subscription(project_id, topic_name)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
resource = await get_single_folder(folder_id)
case AssetTypesWithSpecialHandling.ORGANIZATION:
organization_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
resource = await get_single_organization(organization_id)
case AssetTypesWithSpecialHandling.PROJECT:
resource = await get_single_project(project_id)
case _:
resource = asset_data["asset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
return resource
56 changes: 33 additions & 23 deletions integrations/gcp/gcp_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def parse_protobuf_messages(

class AssetTypesWithSpecialHandling(enum.StrEnum):
TOPIC = "pubsub.googleapis.com/Topic"
SUBSCRIPTION = "pubsub.googleapis.com/Subscription"
PROJECT = "cloudresourcemanager.googleapis.com/Project"
ORGANIZATION = "cloudresourcemanager.googleapis.com/Organization"
FOLDER = "cloudresourcemanager.googleapis.com/Folder"
Expand Down Expand Up @@ -115,7 +116,7 @@ def get_service_account_project_id() -> str:
return gcp_project_env
else:
raise ValueError(
f"Couldn't figure out the service account's project id. You can specify it usign the GCP_PROJECT environment variable. Error: {str(e)}"
f"Couldn't figure out the service account's project id. You can specify it using the GCP_PROJECT environment variable. Error: {str(e)}"
)
except KeyError as e:
raise ValueError(
Expand All @@ -128,32 +129,34 @@ def get_service_account_project_id() -> str:
raise ValueError("Couldn't figure out the service account's project id.")


async def resolve_request_controllers(
kind: str,
async def get_quotas_for_project(
project_id: str, kind: str
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
try:
service_account_project_id = get_service_account_project_id()

if kind == AssetTypesWithSpecialHandling.TOPIC:
topic_rate_limiter = (
await pubsub_administrator_per_minute_per_project.limiter(
service_account_project_id
match kind:
case (
AssetTypesWithSpecialHandling.TOPIC
| AssetTypesWithSpecialHandling.SUBSCRIPTION
):
topic_rate_limiter = (
await pubsub_administrator_per_minute_per_project.limiter(
project_id
)
)
)
topic_semaphore = (
await pubsub_administrator_per_minute_per_project.semaphore(
service_account_project_id
topic_semaphore = (
await pubsub_administrator_per_minute_per_project.semaphore(
project_id
)
)
)
return (topic_rate_limiter, topic_semaphore)

asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
service_account_project_id
)
asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
service_account_project_id
)
return (asset_rate_limiter, asset_semaphore)
return (topic_rate_limiter, topic_semaphore)
case _:
asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
project_id
)
asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
project_id
)
return (asset_rate_limiter, asset_semaphore)
except Exception as e:
logger.warning(
f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}"
Expand All @@ -165,3 +168,10 @@ async def resolve_request_controllers(
await search_all_resources_qpm_per_project.default_semaphore()
)
return (default_rate_limiter, default_semaphore)


async def resolve_request_controllers(
kind: str,
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
service_account_project_id = get_service_account_project_id()
return await get_quotas_for_project(service_account_project_id, kind)
Loading
Loading