From 8623240009b35206ee5de52de91f3080b7326ec3 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Wed, 21 Jan 2026 18:04:01 +0530 Subject: [PATCH 1/3] feat: added watchlist managment tools --- server/secops/secops_mcp/tools/__init__.py | 1 + .../secops_mcp/tools/watchlist_management.py | 506 ++++++++++++++++++ .../secops/tests/test_secops_watchlist_mcp.py | 212 ++++++++ 3 files changed, 719 insertions(+) create mode 100644 server/secops/secops_mcp/tools/watchlist_management.py create mode 100644 server/secops/tests/test_secops_watchlist_mcp.py diff --git a/server/secops/secops_mcp/tools/__init__.py b/server/secops/secops_mcp/tools/__init__.py index 0cfc387..c17f415 100644 --- a/server/secops/secops_mcp/tools/__init__.py +++ b/server/secops/secops_mcp/tools/__init__.py @@ -26,3 +26,4 @@ from .udm_search import * from .search import * from .feed_management import * +from .watchlist_management import * diff --git a/server/secops/secops_mcp/tools/watchlist_management.py b/server/secops/secops_mcp/tools/watchlist_management.py new file mode 100644 index 0000000..886e6c4 --- /dev/null +++ b/server/secops/secops_mcp/tools/watchlist_management.py @@ -0,0 +1,506 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Security Operations MCP tools for watchlist management.""" + +import logging +from typing import Any, Dict, Optional + +from secops_mcp.server import get_chronicle_client, server + +# Configure logging +logger = logging.getLogger("secops-mcp") + + +@server.tool() +async def create_watchlist( + name: str, + display_name: str, + multiplying_factor: float, + description: str, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Create a new watchlist in Chronicle SIEM. + + Creates a watchlist for managing entity risk scores and monitoring + specific entities. Watchlists apply risk multipliers to entities and + can be used to track high-risk actors, privileged accounts, or other + entities requiring special attention. + + **Workflow Integration:** + - Use to create watchlists for tracking high-risk entities or VIPs. + - Essential for entity risk management and monitoring workflows. + - Enables risk score amplification for watchlisted entities. + - Supports both manual and automated entity population mechanisms. + + **Use Cases:** + - Create watchlist for monitoring privileged accounts. + - Track suspicious IP addresses with elevated risk scores. + - Monitor executive accounts for targeted attacks. + - Track threat actor infrastructure with high risk multipliers. + - Create watchlists for regulatory compliance monitoring. + + Args: + name (str): Unique name for the watchlist (used internally). + display_name (str): User-friendly display name for the watchlist. + multiplying_factor (float): Risk score multiplier for entities + on this watchlist (e.g., 1.5 increases risk by 50%). + description (str): Description of the watchlist's purpose. + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Dictionary containing the created watchlist details, + or an error dictionary if watchlist creation fails. + + Example Usage: + # Create watchlist for privileged accounts + create_watchlist( + name="privileged_accounts", + display_name="Privileged User Accounts", + multiplying_factor=2.0, + description="High-risk accounts with elevated privileges", + project_id="my-project", + customer_id="my-customer", + region="us" + ) + + # Create watchlist for suspicious IPs + create_watchlist( + name="suspicious_ips", + display_name="Suspicious IP Addresses", + multiplying_factor=1.5, + description="IP addresses with suspicious activity", + project_id="my-project", + customer_id="my-customer", + region="us" + ) + + Next Steps (using MCP-enabled tools): + - Add entities to the watchlist through entity management tools. + - Monitor watchlist impact on entity risk scores. + - Create detection rules leveraging watchlist membership. + - Review entities on the watchlist regularly for accuracy. + - Adjust risk multipliers based on threat intelligence updates. + """ + try: + logger.info( + f"Creating watchlist: {name} with multiplier " + f"{multiplying_factor}" + ) + + chronicle = get_chronicle_client(project_id, customer_id, region) + + # Create the watchlist and return SDK response directly + return chronicle.create_watchlist( + name=name, + display_name=display_name, + multiplying_factor=multiplying_factor, + description=description, + ) + + except Exception as e: + logger.error( + f"Error creating watchlist {name}: {str(e)}", exc_info=True + ) + return {"error": f"Error creating watchlist {name}: {str(e)}"} + + +@server.tool() +async def update_watchlist( + watchlist_id: str, + display_name: Optional[str] = None, + description: Optional[str] = None, + multiplying_factor: Optional[float] = None, + entity_population_mechanism: Optional[Dict[str, Any]] = None, + watchlist_user_preferences: Optional[Dict[str, Any]] = None, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Update an existing watchlist in Chronicle SIEM. + + Updates watchlist properties including display name, description, + risk multiplier, entity population mechanism, and user preferences. + Only provided parameters will be updated. + + **Workflow Integration:** + - Use to modify watchlist properties as requirements change. + - Essential for adjusting risk multipliers based on threat landscape. + - Enables configuration of automated entity population mechanisms. + - Supports user preference updates like pinning watchlists. + + **Use Cases:** + - Increase risk multiplier for a watchlist during active threats. + - Update description to reflect current monitoring objectives. + - Change entity population mechanism from manual to automated. + - Pin important watchlists for quick access. + - Adjust watchlist configuration based on operational feedback. + + Args: + watchlist_id (str): Unique identifier of the watchlist to update. + display_name (Optional[str]): New display name for the watchlist. + description (Optional[str]): New description for the watchlist. + multiplying_factor (Optional[float]): New risk score multiplier. + entity_population_mechanism (Optional[Dict[str, Any]]): Entity + population configuration (e.g., {"manual": {}}). + watchlist_user_preferences (Optional[Dict[str, Any]]): User + preferences (e.g., {"pinned": True}). + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Dictionary containing the updated watchlist details, + or an error dictionary if update fails. + + Example Usage: + # Update risk multiplier + update_watchlist( + watchlist_id="abc-123-def", + multiplying_factor=3.0, + description="Increased multiplier due to active campaign", + project_id="my-project", + customer_id="my-customer", + region="us" + ) + + # Pin watchlist and update display name + update_watchlist( + watchlist_id="abc-123-def", + display_name="Critical Threat Actors", + watchlist_user_preferences={"pinned": True}, + project_id="my-project", + customer_id="my-customer", + region="us" + ) + + Next Steps (using MCP-enabled tools): + - Verify changes using `get_watchlist` to confirm updates. + - Monitor impact of risk multiplier changes on entity scores. + - Review entities on watchlist after configuration changes. + - Document reason for changes in security operations log. + - Communicate significant changes to security team members. + """ + try: + logger.info(f"Updating watchlist: {watchlist_id}") + + chronicle = get_chronicle_client(project_id, customer_id, region) + + # Update the watchlist and return SDK response directly + return chronicle.update_watchlist( + watchlist_id=watchlist_id, + display_name=display_name, + description=description, + multiplying_factor=multiplying_factor, + entity_population_mechanism=entity_population_mechanism, + watchlist_user_preferences=watchlist_user_preferences, + ) + + except Exception as e: + logger.error( + f"Error updating watchlist {watchlist_id}: {str(e)}", exc_info=True + ) + return {"error": f"Error updating watchlist {watchlist_id}: {str(e)}"} + + +@server.tool() +async def delete_watchlist( + watchlist_id: str, + force: bool = False, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Delete a watchlist from Chronicle SIEM. + + Permanently removes a watchlist from Chronicle. Use the force parameter + to handle dependencies. This operation cannot be undone. + + **Workflow Integration:** + - Use to remove obsolete or unused watchlists. + - Essential for maintaining clean watchlist configurations. + - Supports forced deletion to handle dependencies. + - Part of watchlist lifecycle management workflows. + + **Use Cases:** + - Remove watchlists that are no longer needed. + - Clean up test or temporary watchlists. + - Delete duplicate watchlists created by mistake. + - Remove watchlists after threat campaigns end. + - Consolidate multiple watchlists into single lists. + + **Warning:** + This operation permanently deletes the watchlist. Ensure entities + on the watchlist are migrated to other watchlists if still needed. + + Args: + watchlist_id (str): Unique identifier of the watchlist to delete. + force (bool): Force deletion even if dependencies exist. + Defaults to False. + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Dictionary confirming watchlist deletion, + or an error dictionary if deletion fails. + + Example Usage: + # Delete watchlist + delete_watchlist( + watchlist_id="abc-123-def", + project_id="my-project", + customer_id="my-customer", + region="us" + ) + + # Force delete watchlist with dependencies + delete_watchlist( + watchlist_id="abc-123-def", + force=True, + project_id="my-project", + customer_id="my-customer", + region="us" + ) + + Next Steps (using MCP-enabled tools): + - Verify deletion using `list_watchlists`. + - Migrate critical entities to other watchlists if needed. + - Update detection rules that referenced the deleted watchlist. + - Document the reason for deletion in operations log. + - Review entity risk scores after watchlist removal. + """ + try: + logger.info(f"Deleting watchlist: {watchlist_id} (force={force})") + + chronicle = get_chronicle_client(project_id, customer_id, region) + + # Delete the watchlist + result = chronicle.delete_watchlist(watchlist_id, force=force) + + # If delete returns None or empty, return success confirmation + if not result: + return { + "success": True, + "watchlist_id": watchlist_id, + "message": "Watchlist deleted successfully", + } + + return result + + except Exception as e: + logger.error( + f"Error deleting watchlist {watchlist_id}: {str(e)}", exc_info=True + ) + return {"error": f"Error deleting watchlist {watchlist_id}: {str(e)}"} + + +@server.tool() +async def get_watchlist( + watchlist_id: str, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Get detailed information about a specific watchlist. + + Retrieves complete configuration and details for a watchlist by its ID, + including display name, description, risk multiplier, entity population + mechanism, and current entities on the watchlist. + + **Workflow Integration:** + - Use to review watchlist configuration and entity membership. + - Essential for auditing watchlist effectiveness and accuracy. + - Helps understand entity risk score calculations. + - Supports troubleshooting watchlist-related detection rules. + + **Use Cases:** + - Review entities currently on a watchlist. + - Verify watchlist configuration before making changes. + - Audit risk multipliers for compliance reporting. + - Troubleshoot entity risk score calculations. + - Document watchlist contents for security reviews. + + Args: + watchlist_id (str): Unique identifier of the watchlist to retrieve. + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Complete watchlist details including configuration + and entity information. Returns error dict if retrieval fails. + + Example Usage: + # Get watchlist details + watchlist = get_watchlist( + watchlist_id="abc-123-def", + project_id="my-project", + customer_id="my-customer", + region="us" + ) + + Next Steps (using MCP-enabled tools): + - Review entity list for accuracy and relevance. + - Update watchlist configuration using `update_watchlist`. + - Remove outdated entities from the watchlist. + - Create detection rules leveraging watchlist membership. + - Export watchlist data for reporting or analysis. + """ + try: + logger.info(f"Getting watchlist details: {watchlist_id}") + + chronicle = get_chronicle_client(project_id, customer_id, region) + + # Get watchlist details + watchlist = chronicle.get_watchlist(watchlist_id) + + if not watchlist: + return {"error": f"Watchlist with ID {watchlist_id} not found"} + + return watchlist + + except Exception as e: + error_msg = f"Error getting watchlist {watchlist_id}: {str(e)}" + logger.error(error_msg, exc_info=True) + return {"error": error_msg} + + +@server.tool() +async def list_watchlists( + page_size: Optional[int] = None, + page_token: Optional[str] = None, + as_list: bool = False, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """List all watchlists in Chronicle SIEM. + + Retrieves a list of all watchlists configured in Chronicle with their + basic details. Supports pagination for large result sets and optional + automatic pagination with the as_list parameter. + + **Workflow Integration:** + - Use to get an overview of all configured watchlists. + - Essential for watchlist inventory and management. + - Helps identify watchlists for review or consolidation. + - Supports auditing of entity risk management configuration. + + **Use Cases:** + - Inventory all watchlists for security posture review. + - Find specific watchlists by name or description. + - Audit risk multipliers across all watchlists. + - Identify unused or obsolete watchlists for cleanup. + - Generate reports on watchlist coverage and usage. + + Args: + page_size (Optional[int]): Maximum number of watchlists to return + per page. If not specified, uses Chronicle's default. + page_token (Optional[str]): Token for retrieving next page of + results. Obtained from previous response. + as_list (bool): If True, automatically fetches all pages and + returns a list of watchlists. If False, returns paginated + response with metadata. Defaults to False. + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Dictionary containing watchlist data. If as_list + is False, includes pagination metadata. If as_list is True, + contains a direct list of all watchlists. Returns error dict + if listing fails. + + Example Usage: + # List watchlists with pagination + watchlists = list_watchlists( + page_size=50, + project_id="my-project", + customer_id="my-customer", + region="us" + ) + for watchlist in watchlists.get("watchlists", []): + print(f"Watchlist: {watchlist.get('displayName')}") + + # Get all watchlists as a list (auto-pagination) + all_watchlists = list_watchlists( + as_list=True, + project_id="my-project", + customer_id="my-customer", + region="us" + ) + for watchlist in all_watchlists.get("watchlists", []): + print(f"Watchlist: {watchlist.get('displayName')}") + + Next Steps (using MCP-enabled tools): + - Review watchlist configurations for optimization. + - Use `get_watchlist` for detailed information on specific lists. + - Identify and delete obsolete watchlists. + - Document watchlist purposes and ownership. + - Create new watchlists for emerging threat scenarios. + """ + try: + logger.info("Listing watchlists") + + chronicle = get_chronicle_client(project_id, customer_id, region) + + # Build parameters for list_watchlists call + kwargs = {} + if page_size is not None: + kwargs["page_size"] = page_size + if page_token is not None: + kwargs["page_token"] = page_token + if as_list: + kwargs["as_list"] = as_list + + # List watchlists + watchlists = chronicle.list_watchlists(**kwargs) + + # If as_list=True, watchlists is a direct list + # If as_list=False, it's a dict with pagination metadata + if as_list: + # Convert list to dict format for consistency + return { + "watchlists": watchlists, + "total_watchlists": len(watchlists), + } + else: + # Add summary statistics + watchlist_count = len(watchlists.get("watchlists", [])) + result = {**watchlists, "total_in_page": watchlist_count} + return result + + except Exception as e: + error_msg = f"Error listing watchlists: {str(e)}" + logger.error(error_msg, exc_info=True) + return {"error": error_msg} diff --git a/server/secops/tests/test_secops_watchlist_mcp.py b/server/secops/tests/test_secops_watchlist_mcp.py new file mode 100644 index 0000000..e986151 --- /dev/null +++ b/server/secops/tests/test_secops_watchlist_mcp.py @@ -0,0 +1,212 @@ +"""Integration tests for Chronicle SecOps Watchlist MCP tools. + +These tests exercise the watchlist management functionality by making actual +API calls to the Chronicle service. They require proper authentication and +configuration to run. + +To run these tests: +1. Make sure you have created a config.json file in the tests directory with + your Chronicle credentials (see conftest.py for format) +2. Authenticate with Google Cloud using ADC: + gcloud auth application-default login + OR set SECOPS_SA_PATH environment variable to use service account: + export SECOPS_SA_PATH=/path/to/service-account.json +3. Run: pytest -xvs server/secops/tests/test_secops_watchlist_mcp.py +""" + +import uuid +from typing import Dict + +import pytest + +from secops_mcp.tools.watchlist_management import ( + create_watchlist, + delete_watchlist, + get_watchlist, + list_watchlists, + update_watchlist, +) + + +class TestChronicleWatchlistMCP: + """Test class for Chronicle Watchlist MCP tools.""" + + @pytest.mark.asyncio + async def test_list_watchlists( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test listing all watchlists. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await list_watchlists( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + # Verify response structure + assert isinstance(result, dict) + + # Should either have watchlists or an error + if "error" not in result: + assert "watchlists" in result or "total_in_page" in result + + @pytest.mark.asyncio + async def test_list_watchlists_as_list( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test listing all watchlists with as_list parameter. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await list_watchlists( + as_list=True, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + # Verify response structure + assert isinstance(result, dict) + + # Should either have watchlists or an error + if "error" not in result: + assert "watchlists" in result + assert "total_watchlists" in result + + @pytest.mark.asyncio + async def test_get_watchlist( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test getting details of a watchlist. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + # First list watchlists to get an ID + watchlists_result = await list_watchlists( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + # Skip test if no watchlists or error + if "error" in watchlists_result or not watchlists_result.get( + "watchlists" + ): + pytest.skip("No watchlists available to test get_watchlist") + + # Get the first watchlist ID + watchlist_id = watchlists_result["watchlists"][0]["name"].split("/")[-1] + + # Get watchlist details + result = await get_watchlist( + watchlist_id=watchlist_id, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + # Verify response structure + assert isinstance(result, dict) + + # Should either have expected fields or an error + if "error" not in result: + assert "name" in result + assert "displayName" in result + + @pytest.mark.asyncio + async def test_create_update_delete_watchlist( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test the full lifecycle of a watchlist. + + Tests watchlist creation, updating, and deletion. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + # Create a unique name with UUID + unique_id = str(uuid.uuid4()) + name = f"test_watchlist_{unique_id[:8]}" + display_name = f"Test Watchlist MCP SDK {unique_id[:8]}" + + watchlist_id = None + + # Step 1: Create watchlist + try: + create_result = await create_watchlist( + name=name, + display_name=display_name, + multiplying_factor=1.5, + description="Test watchlist created by MCP SDK", + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + # Skip remaining tests if creation fails + if "error" in create_result: + pytest.fail( + f"Watchlist creation failed: {create_result['error']}" + ) + + # Verify creation result + assert "name" in create_result + assert create_result["displayName"] == display_name + assert ( + create_result["multiplyingFactor"] == 1.5 + or create_result["multiplyingFactor"] == "1.5" + ) + watchlist_id = create_result["name"].split("/")[-1] + + print(f"Created watchlist with ID: {watchlist_id}") + + # Step 2: Update watchlist + update_result = await update_watchlist( + watchlist_id=watchlist_id, + display_name=f"Updated Test Watchlist {unique_id[:8]}", + description="Updated test watchlist description", + multiplying_factor=2.0, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + # Verify update result + assert "name" in update_result + assert "Updated Test Watchlist" in update_result["displayName"] + assert ( + update_result["multiplyingFactor"] == 2.0 + or update_result["multiplyingFactor"] == "2.0" + ) + + print(f"Updated watchlist: {update_result['displayName']}") + + finally: + # Always attempt to clean up by deleting the watchlist + if watchlist_id: + try: + delete_result = await delete_watchlist( + watchlist_id=watchlist_id, + force=True, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + # Verify delete result + assert isinstance(delete_result, dict) + assert ( + "success" in delete_result + or "message" in delete_result + or "error" not in delete_result + ) + + print(f"Deleted watchlist with ID: {watchlist_id}") + + except Exception as e: + print(f"Warning: Failed to delete test watchlist: {e}") From 9656a95c6434e3e5dc33078fa68487e616478ab9 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:17:28 +0530 Subject: [PATCH 2/3] chore: clarify watchlist_id parameter usage in get_watchlist documentation --- .../secops/secops_mcp/tools/watchlist_management.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/server/secops/secops_mcp/tools/watchlist_management.py b/server/secops/secops_mcp/tools/watchlist_management.py index 886e6c4..5e4ee82 100644 --- a/server/secops/secops_mcp/tools/watchlist_management.py +++ b/server/secops/secops_mcp/tools/watchlist_management.py @@ -345,7 +345,9 @@ async def get_watchlist( - Document watchlist contents for security reviews. Args: - watchlist_id (str): Unique identifier of the watchlist to retrieve. + watchlist_id (str): Watchlist ID only (e.g., "abc-123-def"), not + the full resource name. Extract from the "name" field returned + by list_watchlists or create_watchlist. project_id (Optional[str]): Google Cloud project ID. Defaults to environment configuration. customer_id (Optional[str]): Chronicle customer ID. Defaults to @@ -358,9 +360,14 @@ async def get_watchlist( and entity information. Returns error dict if retrieval fails. Example Usage: - # Get watchlist details + # First, list watchlists to get the ID + watchlists = list_watchlists() + # Extract ID from the full name (last part after '/') + watchlist_id = watchlists["watchlists"][0]["name"].split("/")[-1] + + # Get watchlist details using just the ID watchlist = get_watchlist( - watchlist_id="abc-123-def", + watchlist_id=watchlist_id, # Just the ID, not full name project_id="my-project", customer_id="my-customer", region="us" From ce7d40eca059a200967a901f9ddce6dec101bcec Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Thu, 22 Jan 2026 17:31:30 +0530 Subject: [PATCH 3/3] docs: add watchlist management tools documentation and update README --- docs/servers/secops_mcp.md | 179 +++++++++++++++++++++++++++++++++++++ server/secops/README.md | 21 ++++- 2 files changed, 199 insertions(+), 1 deletion(-) diff --git a/docs/servers/secops_mcp.md b/docs/servers/secops_mcp.md index a2a5cfa..c6ba131 100644 --- a/docs/servers/secops_mcp.md +++ b/docs/servers/secops_mcp.md @@ -344,6 +344,123 @@ The service account or user credentials need the following Chronicle roles: - Zero-day exploitation ``` +### Watchlist Management + +- **`create_watchlist(name, display_name, multiplying_factor, description, project_id=None, customer_id=None, region=None)`** + - **Description:** Creates a new watchlist in Chronicle to track high-risk entities and apply risk score multipliers. + - **Parameters:** + - `name` (required): Unique identifier for the watchlist (e.g., "critical_threat_actors"). + - `display_name` (required): Human-readable name shown in the UI. + - `multiplying_factor` (required): Risk score multiplier (e.g., 2.0 doubles the risk score). + - `description` (required): Purpose and context of the watchlist. + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary with created watchlist details including ID and configuration. + - **Return Example:** + ```json + { + "name": "projects/123/locations/us/instances/456/watchlists/abc-123-def", + "displayName": "Critical Threat Actors", + "description": "High-priority threat actors requiring immediate investigation", + "multiplyingFactor": 2.0, + "createTime": "2024-01-15T10:30:00Z", + "updateTime": "2024-01-15T10:30:00Z" + } + ``` + +- **`update_watchlist(watchlist_id, display_name=None, description=None, multiplying_factor=None, entity_population_mechanism=None, watchlist_user_preferences=None, project_id=None, customer_id=None, region=None)`** + - **Description:** Updates an existing watchlist's configuration, risk multiplier, or preferences. + - **Parameters:** + - `watchlist_id` (required): Watchlist ID only (e.g., "abc-123-def"). + - `display_name` (optional): New display name for the watchlist. + - `description` (optional): New description for the watchlist. + - `multiplying_factor` (optional): New risk score multiplier. + - `entity_population_mechanism` (optional): Entity population configuration. + - `watchlist_user_preferences` (optional): User preferences like pinning. + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary with updated watchlist details. + - **Return Example:** + ```json + { + "name": "projects/123/locations/us/instances/456/watchlists/abc-123-def", + "displayName": "Updated Critical Threat Actors", + "description": "Increased multiplier due to active campaign", + "multiplyingFactor": 3.0, + "updateTime": "2024-01-15T14:22:00Z" + } + ``` + +- **`delete_watchlist(watchlist_id, force=False, project_id=None, customer_id=None, region=None)`** + - **Description:** Permanently deletes a watchlist from Chronicle. Use caution as this operation cannot be undone. + - **Parameters:** + - `watchlist_id` (required): Watchlist ID only (e.g., "abc-123-def"). + - `force` (optional): Force deletion even if dependencies exist (default: False). + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary confirming deletion or error details. + - **Return Example:** + ```json + { + "success": true, + "watchlist_id": "abc-123-def", + "message": "Watchlist deleted successfully" + } + ``` + +- **`get_watchlist(watchlist_id, project_id=None, customer_id=None, region=None)`** + - **Description:** Retrieves detailed information about a specific watchlist including configuration and entity membership. + - **Parameters:** + - `watchlist_id` (required): Watchlist ID only (e.g., "abc-123-def"). + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary with complete watchlist details. + - **Return Example:** + ```json + { + "name": "projects/123/locations/us/instances/456/watchlists/abc-123-def", + "displayName": "Critical Threat Actors", + "description": "High-priority threat actors", + "multiplyingFactor": 2.0, + "createTime": "2024-01-15T10:30:00Z", + "updateTime": "2024-01-15T14:22:00Z", + "entityCount": 42 + } + ``` + +- **`list_watchlists(page_size=None, page_token=None, as_list=False, project_id=None, customer_id=None, region=None)`** + - **Description:** Lists all watchlists in Chronicle with pagination support. + - **Parameters:** + - `page_size` (optional): Number of results per page for pagination. + - `page_token` (optional): Token for fetching the next page of results. + - `as_list` (optional): Return all results as a flat list (default: False). + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary with watchlists array and pagination metadata. + - **Return Example:** + ```json + { + "watchlists": [ + { + "name": "projects/123/locations/us/instances/456/watchlists/abc-123", + "displayName": "Critical Threat Actors", + "multiplyingFactor": 2.0 + }, + { + "name": "projects/123/locations/us/instances/456/watchlists/def-456", + "displayName": "Suspicious Domains", + "multiplyingFactor": 1.5 + } + ], + "total_in_page": 2 + } + ``` + ## Usage Examples ### Example 1: Natural Language Security Event Search @@ -443,6 +560,68 @@ Their TTPs include: The group continues to evolve tactics and primarily targets financial institutions, cryptocurrency exchanges, and organizations in sectors of strategic interest to North Korea. ``` +### Example 5: Watchlist Management + +``` +User: Create a watchlist for critical APT threat actors with a 2.5x risk multiplier. + +AI uses: create_watchlist( + name="apt_threat_actors_2024", + display_name="APT Threat Actors 2024", + multiplying_factor=2.5, + description="High-priority APT groups actively targeting our industry" +) + +AI: I've created the "APT Threat Actors 2024" watchlist with a 2.5x risk multiplier. +Watchlist ID: abc-123-def + +Any entities added to this watchlist will have their risk scores multiplied by 2.5, +helping prioritize alerts and investigations related to these threat actors. + +User: Show me all my watchlists. + +AI uses: list_watchlists(as_list=True) + +AI: You have 3 watchlists configured: + +1. APT Threat Actors 2024 (Risk: 2.5x) + - Created: 2024-01-15 + - For tracking high-priority APT groups + +2. Suspicious Domains (Risk: 1.5x) + - Created: 2024-01-10 + - Domains flagged by threat intelligence + +3. Legacy Monitoring (Risk: 1.2x) + - Created: 2023-12-01 + - Deprecated systems requiring extra monitoring + +User: We have an active campaign. Increase the APT watchlist multiplier to 4.0. + +AI uses: update_watchlist( + watchlist_id="abc-123-def", + multiplying_factor=4.0, + description="Increased multiplier due to active campaign against our sector" +) + +AI: Updated the APT Threat Actors 2024 watchlist: +- Risk multiplier: 2.5x → 4.0x +- Description updated to note the active campaign + +This will significantly elevate the priority of any events involving entities +on this watchlist. + +User: The campaign has ended. We can remove the APT watchlist now. + +AI uses: delete_watchlist( + watchlist_id="abc-123-def" +) + +AI: Successfully deleted the "APT Threat Actors 2024" watchlist. +Entities previously on this watchlist will now use their base risk scores +without the 4.0x multiplier. +``` + ## Additional Resources - [Chronicle API Documentation](https://cloud.google.com/chronicle/docs/reference/rest) diff --git a/server/secops/README.md b/server/secops/README.md index 21e4c92..0d0d8d7 100644 --- a/server/secops/README.md +++ b/server/secops/README.md @@ -120,6 +120,23 @@ Chronicle Security Operations suite. - **`generate_feed_secret(feed_id, project_id=None, customer_id=None, region=None)`** - Creates a new authentication secret for feeds that support authentication (e.g., HTTP feeds with basic auth). This replaces any existing secret. +### Watchlist Management Tools + +- **`create_watchlist(name, display_name, multiplying_factor, description, project_id=None, customer_id=None, region=None)`** + - Creates a new watchlist to track high-risk entities and apply risk score multipliers for prioritizing investigations. + +- **`update_watchlist(watchlist_id, display_name=None, description=None, multiplying_factor=None, entity_population_mechanism=None, watchlist_user_preferences=None, project_id=None, customer_id=None, region=None)`** + - Updates an existing watchlist's configuration, risk multiplier, or user preferences like pinning. + +- **`delete_watchlist(watchlist_id, force=False, project_id=None, customer_id=None, region=None)`** + - Permanently removes a watchlist from Chronicle. Use with caution as this operation cannot be undone. + +- **`get_watchlist(watchlist_id, project_id=None, customer_id=None, region=None)`** + - Retrieves detailed information about a specific watchlist including configuration and entity membership. + +- **`list_watchlists(page_size=None, page_token=None, as_list=False, project_id=None, customer_id=None, region=None)`** + - Lists all watchlists in Chronicle with pagination support for reviewing configured watchlists. + ### API Capabilities The MCP server provides the following capabilities: @@ -136,7 +153,8 @@ The MCP server provides the following capabilities: 10. **Data Table Management**: Create and manage structured data tables for detection rules 11. **Reference List Management**: Create and manage reference lists for detection rules 12. **Feed Management**: Create, update, enable, disable, and delete data feeds -13. **UDM Search & Export**: Direct UDM querying, field value autocomplete, and CSV export +13. **Watchlist Management**: Create, update, delete, and list watchlists for entity risk scoring +14. **UDM Search & Export**: Direct UDM querying, field value autocomplete, and CSV export ### Example @@ -151,6 +169,7 @@ These tools focus on core security operations tasks: - **Entity Analysis**: Use `lookup_entity` to investigate IPs, domains, hashes, and other indicators - **Rule Management**: Use `list_security_rules` and `search_security_rules` to manage detection rules - **Threat Intelligence**: Use `get_ioc_matches` and `get_threat_intel` for IOC analysis and AI-powered insights +- **Watchlist Management**: Use `create_watchlist`, `update_watchlist`, `list_watchlists`, `get_watchlist`, and `delete_watchlist` to manage entity watchlists with risk score multipliers for prioritizing high-risk entities - **UDM Analysis & Export**: Use `search_udm`, `export_udm_search_csv`, and `find_udm_field_values` for direct UDM querying, data export, and field discovery ### Data Ingestion & Parsing Tools