Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a command to perform signals testing to the CLI #5285

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 124 additions & 4 deletions src/dispatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import uvicorn

from dispatch import __version__, config
from dispatch.config import DISPATCH_UI_URL
from dispatch.enums import UserRoles
from dispatch.plugin.models import PluginInstance

Expand Down Expand Up @@ -636,6 +637,7 @@ def revision_database(
def dispatch_scheduler():
"""Container for all dispatch scheduler commands."""
# we need scheduled tasks to be imported
from .case.scheduled import case_close_reminder, case_triage_reminder # noqa
from .case_cost.scheduled import (
calculate_cases_response_cost, # noqa
)
Expand All @@ -656,7 +658,6 @@ def dispatch_scheduler():
)
from .term.scheduled import sync_terms # noqa
from .workflow.scheduled import sync_workflows # noqa
from .case.scheduled import case_triage_reminder, case_close_reminder # noqa


@dispatch_scheduler.command("list")
Expand Down Expand Up @@ -806,10 +807,10 @@ def consume_signals():
None
"""
from dispatch.common.utils.cli import install_plugins
from dispatch.project import service as project_service
from dispatch.plugin import service as plugin_service
from dispatch.database.core import get_organization_session, get_session
from dispatch.organization.service import get_all as get_all_organizations
from dispatch.database.core import get_session, get_organization_session
from dispatch.plugin import service as plugin_service
from dispatch.project import service as project_service

install_plugins()

Expand Down Expand Up @@ -883,6 +884,125 @@ def process_signals():
db_session.close()


@signals_group.command("perf-test")
@click.option("--num-instances", default=1, help="Number of signal instances to send.")
@click.option("--num-workers", default=1, help="Number of threads to use.")
@click.option(
"--api-endpoint",
default=f"{DISPATCH_UI_URL}/api/v1/default/signals/instances",
required=True,
help="API endpoint to send the signal instances to.",
)
@click.option(
"--api-token",
required=True,
help="API token to use.",
)
@click.option(
"--project",
default="Test",
required=True,
help="The Dispatch project to send the instances to.",
)
def perf_test(
num_instances: int, num_workers: int, api_endpoint: str, api_token: str, project: str
) -> None:
"""Performance testing utility for creating signal instances."""

import concurrent.futures
import time
import uuid

import requests
from fastapi import status

NUM_SIGNAL_INSTANCES = num_instances
NUM_WORKERS = num_workers

session = requests.Session()
session.headers.update(
{
"Content-Type": "application/json",
"Authorization": f"Bearer {api_token}",
}
)
start_time = time.time()

def _send_signal_instance(
api_endpoint: str,
api_token: str,
session: requests.Session,
signal_instance: dict[str, str],
) -> None:
try:
r = session.post(
api_endpoint,
json=signal_instance,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_token}",
},
)
log.info(f"Response: {r.json()}")
if r.status_code == status.HTTP_401_UNAUTHORIZED:
raise PermissionError(
"Unauthorized. Please check your bearer token. You can find it in the Dev Tools under Request Headers -> Authorization."
)

r.raise_for_status()

except requests.exceptions.RequestException as e:
log.error(f"Unable to send finding. Reason: {e} Response: {r.json() if r else 'N/A'}")
else:
log.info(f"{signal_instance.get('raw', {}).get('id')} created successfully")

def send_signal_instances(
api_endpoint: str, api_token: str, signal_instances: list[dict[str, str]]
):
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = [
executor.submit(
_send_signal_instance,
api_endpoint=api_endpoint,
api_token=api_token,
session=session,
signal_instance=signal_instance,
)
for signal_instance in signal_instances
]
results = [future.result() for future in concurrent.futures.as_completed(futures)]

log.info(f"\nSent {len(results)} of {NUM_SIGNAL_INSTANCES} signal instances")

signal_instances = [
{
"project": {"name": project},
"raw": {
"id": str(uuid.uuid4()),
"name": "Test Signal",
"slug": "test-signal",
"canary": False,
"events": [
{
"original": {
"dateint": 20240930,
"distinct_lookupkey_count": 95,
},
},
],
"created_at": "2024-09-18T19:47:15Z",
"quiet_mode": False,
"external_id": "4ebbab36-c703-495f-ae47-7051bdc8b3ef",
},
},
] * NUM_SIGNAL_INSTANCES

send_signal_instances(api_endpoint, api_token, signal_instances)

elapsed_time = time.time() - start_time
click.echo(f"Elapsed time: {elapsed_time:.2f} seconds")


@dispatch_server.command("slack")
@click.argument("organization")
@click.argument("project")
Expand Down
Loading