Skip to content

Commit

Permalink
Use pusher for external triggers (#1198)
Browse files Browse the repository at this point in the history
Issue: #1194 

# Todos
- [x] New Pusher, deploy to Modal, Link GCP to Modal
- [x] Move Pusher to GCP
- [ ] Simplify Pusher, manage rate limits, pooling and rejecting bad
state web-hooks.

# Deploy plan
- [ ] Deploy Pusher
- [ ] Add .env to Backend service > .env `HOSTED_PUSHER_API_URL`
- [ ] Deploy Backend
  • Loading branch information
beastoin authored Oct 29, 2024
2 parents eee33ce + ccc57a3 commit db877c2
Show file tree
Hide file tree
Showing 9 changed files with 443 additions and 17 deletions.
81 changes: 81 additions & 0 deletions .github/workflows/gcp_backend_pusher.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
name: Deploy Backend Pusher to Cloud RUN

on:
# push:
# branches: [ "main" ]
# paths:
# - 'backend/**'
workflow_dispatch:
inputs:
environment:
description: 'Select the environment to deploy to'
required: true
default: 'development'
branch:
description: 'Branch to deploy from'
required: true
default: 'main'

env:
SERVICE: pusher
REGION: us-central1

jobs:
deploy:
environment: ${{ github.event.inputs.environment }}
permissions:
contents: 'read'
id-token: 'write'

runs-on: ubuntu-latest
steps:
- name: Validate Environment Input
run: |
if [[ "${{ github.event.inputs.environment }}" != "development" && "${{ github.event.inputs.environment }}" != "prod" ]]; then
echo "Invalid environment: ${{ github.event.inputs.environment }}. Must be 'development' or 'prod'."
exit 1
fi
# To workaround "no space left on device" issue of GitHub-hosted runner
- name: Delete huge unnecessary tools folder
run: rm -rf /opt/hostedtoolcache

- name: Checkout
uses: actions/checkout@v4

- name: Google Auth
id: auth
uses: 'google-github-actions/auth@v2'
with:
credentials_json: ${{ secrets.GCP_CREDENTIALS }}

- name: Login to GCR
run: gcloud auth configure-docker

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Google Service Account
run: echo "${{ secrets.GCP_SERVICE_ACCOUNT }}" | base64 -d > ./backend/google-credentials.json

- name: Build and Push Docker image
uses: docker/build-push-action@v6
with:
context: .
file: ./backend/pusher/Dockerfile
push: true
tags: gcr.io/${{ vars.GCP_PROJECT_ID }}/${{ env.SERVICE }}:latest
cache-from: type=registry,ref=gcr.io/${{ vars.GCP_PROJECT_ID }}/${{ env.SERVICE }}:buildcache
cache-to: type=registry,ref=gcr.io/${{ vars.GCP_PROJECT_ID }}/${{ env.SERVICE }}:buildcache,mode=max

- name: Deploy to Cloud Run
id: deploy
uses: google-github-actions/deploy-cloudrun@v2
with:
service: ${{ env.SERVICE }}
region: ${{ env.REGION }}
image: gcr.io/${{ vars.GCP_PROJECT_ID }}/${{ env.SERVICE }}

# If required, use the Cloud Run url output in later steps
- name: Show Output
run: echo ${{ steps.deploy.outputs.url }}
2 changes: 2 additions & 0 deletions backend/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ GITHUB_TOKEN=
WORKFLOW_API_KEY=
HUME_API_KEY=
HUME_CALLBACK_URL=

HOSTED_PUSHER_API_URL=
20 changes: 20 additions & 0 deletions backend/pusher/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM python:3.11 AS builder

ENV PATH="/opt/venv/bin:$PATH"
RUN python -m venv /opt/venv

COPY backend/requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /tmp/requirements.txt

FROM python:3.11-slim

WORKDIR /app
ENV PATH="/opt/venv/bin:$PATH"

RUN apt-get update && apt-get -y install ffmpeg curl unzip && rm -rf /var/lib/apt/lists/*

COPY --from=builder /opt/venv /opt/venv
COPY backend/ .

EXPOSE 8080
CMD ["uvicorn", "pusher.main:app", "--host", "0.0.0.0", "--port", "8080"]
47 changes: 47 additions & 0 deletions backend/pusher/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import json
import os

import firebase_admin
from fastapi import FastAPI

from modal import Image, App, asgi_app, Secret
from routers import pusher

if os.environ.get('SERVICE_ACCOUNT_JSON'):
service_account_info = json.loads(os.environ["SERVICE_ACCOUNT_JSON"])
credentials = firebase_admin.credentials.Certificate(service_account_info)
firebase_admin.initialize_app(credentials)
else:
firebase_admin.initialize_app()

app = FastAPI()
app.include_router(pusher.router)

modal_app = App(
name='pusher',
secrets=[Secret.from_name("gcp-credentials"), Secret.from_name('envs')],
)
image = (
Image.debian_slim()
.apt_install('ffmpeg', 'git', 'unzip')
.pip_install_from_requirements('requirements.txt')
)


@modal_app.function(
image=image,
keep_warm=2,
memory=(512, 1024),
cpu=2,
allow_concurrent_inputs=10,
timeout=60 * 10,
)
@asgi_app()
def api():
return app


paths = ['_temp', '_samples', '_segments', '_speech_profiles']
for path in paths:
if not os.path.exists(path):
os.makedirs(path)
192 changes: 192 additions & 0 deletions backend/routers/pusher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import uuid
from datetime import datetime, timezone, timedelta
from enum import Enum

import opuslib
import webrtcvad
from fastapi import APIRouter
from fastapi.websockets import WebSocketDisconnect, WebSocket
from pydub import AudioSegment
from starlette.websockets import WebSocketState

import database.memories as memories_db
from database import redis_db
from database.redis_db import get_cached_user_geolocation
from models.memory import Memory, TranscriptSegment, MemoryStatus, Structured, Geolocation
from models.message_event import MemoryEvent, MessageEvent
from utils.memories.location import get_google_maps_location
from utils.memories.process_memory import process_memory
from utils.plugins import trigger_external_integrations, trigger_realtime_integrations
from utils.stt.streaming import *
from utils.webhooks import send_audio_bytes_developer_webhook, realtime_transcript_webhook, \
get_audio_bytes_webhook_seconds

router = APIRouter()

async def _websocket_util_transcript(
websocket: WebSocket, uid: str,
):
print('_websocket_util_transcript', uid)

try:
await websocket.accept()
except RuntimeError as e:
print(e)
await websocket.close(code=1011, reason="Dirty state")
return

websocket_active = True
websocket_close_code = 1000

loop = asyncio.get_event_loop()

# task
async def receive_segments():
nonlocal websocket_active
nonlocal websocket_close_code

try:
while websocket_active:
segments = await websocket.receive_json()
#print(f"pusher received segments {len(segments)}")
asyncio.run_coroutine_threadsafe(trigger_realtime_integrations(uid, segments), loop)
asyncio.run_coroutine_threadsafe(realtime_transcript_webhook(uid, segments), loop)

except WebSocketDisconnect:
print("WebSocket disconnected")
except Exception as e:
print(f'Could not process segments: error {e}')
websocket_close_code = 1011
finally:
websocket_active = False

# heart beat
async def send_heartbeat():
nonlocal websocket_active
nonlocal websocket_close_code
try:
while websocket_active:
await asyncio.sleep(20)
if websocket.client_state == WebSocketState.CONNECTED:
await websocket.send_json({"type": "ping"})
else:
break
except WebSocketDisconnect:
print("WebSocket disconnected")
except Exception as e:
print(f'Heartbeat error: {e}')
websocket_close_code = 1011
finally:
websocket_active = False

try:
receive_task = asyncio.create_task(
receive_segments()
)
heartbeat_task = asyncio.create_task(send_heartbeat())
await asyncio.gather(receive_task, heartbeat_task)

except Exception as e:
print(f"Error during WebSocket operation: {e}")
finally:
websocket_active = False
if websocket.client_state == WebSocketState.CONNECTED:
try:
await websocket.close(code=websocket_close_code)
except Exception as e:
print(f"Error closing WebSocket: {e}")


@router.websocket("/v1/trigger/transcript/listen")
async def websocket_endpoint_transcript(
websocket: WebSocket, uid: str,
):
await _websocket_util_transcript(websocket, uid)


async def _websocket_util_audio_bytes(
websocket: WebSocket, uid: str, sample_rate: int = 8000,
):
print('_websocket_util_audio_bytes', uid)

try:
await websocket.accept()
except RuntimeError as e:
print(e)
await websocket.close(code=1011, reason="Dirty state")
return

websocket_active = True
websocket_close_code = 1000

loop = asyncio.get_event_loop()

audio_bytes_webhook_delay_seconds = get_audio_bytes_webhook_seconds(uid)

# task
async def receive_audio_bytes():
nonlocal websocket_active
nonlocal websocket_close_code

audiobuffer = bytearray()

try:
while websocket_active:
data = await websocket.receive_bytes()
#print(f"pusher received audio bytes {len(data)}")
audiobuffer.extend(data)
if audio_bytes_webhook_delay_seconds and len(
audiobuffer) > sample_rate * audio_bytes_webhook_delay_seconds * 2:
asyncio.create_task(send_audio_bytes_developer_webhook(uid, sample_rate, audiobuffer.copy()))
audiobuffer = bytearray()

except WebSocketDisconnect:
print("WebSocket disconnected")
except Exception as e:
print(f'Could not process audio: error {e}')
websocket_close_code = 1011
finally:
websocket_active = False

# heart beat
async def send_heartbeat():
nonlocal websocket_active
nonlocal websocket_close_code
try:
while websocket_active:
await asyncio.sleep(20)
if websocket.client_state == WebSocketState.CONNECTED:
await websocket.send_json({"type": "ping"})
else:
break
except WebSocketDisconnect:
print("WebSocket disconnected")
except Exception as e:
print(f'Heartbeat error: {e}')
websocket_close_code = 1011
finally:
websocket_active = False

try:
receive_task = asyncio.create_task(
receive_audio_bytes()
)
heartbeat_task = asyncio.create_task(send_heartbeat())
await asyncio.gather(receive_task, heartbeat_task)

except Exception as e:
print(f"Error during WebSocket operation: {e}")
finally:
websocket_active = False
if websocket.client_state == WebSocketState.CONNECTED:
try:
await websocket.close(code=websocket_close_code)
except Exception as e:
print(f"Error closing WebSocket: {e}")


@router.websocket("/v1/trigger/audio-bytes/listen")
async def websocket_endpoint_audio_bytes(
websocket: WebSocket, uid: str, sample_rate: int = 8000,
):
await _websocket_util_audio_bytes(websocket, uid, sample_rate)
Loading

0 comments on commit db877c2

Please sign in to comment.