Skip to content

Streaming Guide

TenzinGayche edited this page Dec 15, 2025 · 1 revision

Streaming Guide

This guide covers Server-Sent Events (SSE) streaming in the LangGraph Translation API, including event types, client implementation, and best practices.


πŸ“‘ Overview

The API uses Server-Sent Events (SSE) for real-time streaming of:

  • Translation progress
  • Glossary extraction
  • Standardization updates
  • UCCA/Gloss generation
  • Editor comments

Benefits

  • Real-time feedback: See results as they're generated
  • Progress tracking: Monitor batch processing
  • Reduced latency: Start displaying results immediately
  • Better UX: Interactive, responsive interface

πŸ”„ How SSE Works

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Client  β”‚  ── HTTP POST ──▢  β”‚   Server     β”‚
β”‚          β”‚                    β”‚              β”‚
β”‚          β”‚  ◀── SSE Stream ── β”‚  Endpoint    β”‚
β”‚          β”‚      (text/event-  β”‚              β”‚
β”‚          β”‚       stream)      β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Event Format:
  data: {"type": "batch_completed", "results": [...]}

  data: {"type": "completion", "results": [...]}

πŸ“‹ Streaming Endpoints

Endpoint Description
POST /translate/stream Batch translation streaming
POST /translate/single/stream Single text translation streaming
POST /glossary/extract/stream Glossary extraction streaming
POST /standardize/apply/stream Standardization streaming
POST /ucca/generate/stream UCCA generation streaming
POST /gloss/generate/stream Gloss generation streaming
POST /editor/comment/stream Editor comment streaming
POST /dharmamitra/knn-translate-mitra Dharmamitra proxy streaming

πŸ“¨ Event Types

Translation Events

batch_completed

Sent after each batch of translations completes.

{
  "timestamp": "2025-01-15T10:00:02.123Z",
  "type": "batch_completed",
  "status": "batch_completed",
  "batch_results": [
    {
      "original_text": "ΰ½–ΰΎ±ΰ½„ΰΌ‹ΰ½†ΰ½΄ΰ½–ΰΌ‹ΰ½¦ΰ½Ίΰ½˜ΰ½¦",
      "translated_text": "bodhicitta",
      "metadata": {
        "batch_id": "abc-123",
        "model_used": "claude-sonnet-4-20250514",
        "text_type": "Buddhist text"
      }
    }
  ]
}

completion

Final event when all processing is complete.

{
  "timestamp": "2025-01-15T10:00:05.456Z",
  "type": "completion",
  "status": "completed",
  "results": [
    {
      "original_text": "...",
      "translated_text": "...",
      "metadata": {...}
    }
  ]
}

Glossary Events

glossary_batch_completed

Sent after each batch of glossary extraction.

{
  "timestamp": "2025-01-15T10:00:03.789Z",
  "type": "glossary_batch_completed",
  "status": "batch_complete",
  "terms": [
    {"source_term": "ΰ½–ΰΎ±ΰ½„ΰΌ‹ΰ½†ΰ½΄ΰ½–ΰΌ‹ΰ½¦ΰ½Ίΰ½˜ΰ½¦", "translated_term": "bodhicitta"},
    {"source_term": "ΰ½¦ΰΎŸΰ½Όΰ½„ΰΌ‹ΰ½”ΰΌ‹ΰ½‰ΰ½²ΰ½‘", "translated_term": "emptiness"}
  ]
}

completion (Glossary)

{
  "timestamp": "2025-01-15T10:00:06.012Z",
  "type": "completion",
  "status": "completed",
  "glossary": {
    "terms": [...]
  }
}

Standardization Events

retranslation_completed

Sent when an item is re-translated with standardized terms.

{
  "timestamp": "2025-01-15T10:00:04.567Z",
  "type": "retranslation_completed",
  "status": "item_updated",
  "index": 0,
  "updated_item": {
    "original_text": "ΰ½–ΰΎ±ΰ½„ΰΌ‹ΰ½†ΰ½΄ΰ½–ΰΌ‹ΰ½¦ΰ½Ίΰ½˜ΰ½¦ΰΌ‹ΰ½–ΰ½¦ΰΎΰΎ±ΰ½Ίΰ½‘",
    "translated_text": "generate bodhicitta",
    "glossary": [
      {"source_term": "ΰ½–ΰΎ±ΰ½„ΰΌ‹ΰ½†ΰ½΄ΰ½–ΰΌ‹ΰ½¦ΰ½Ίΰ½˜ΰ½¦", "translated_term": "bodhicitta"}
    ]
  }
}

UCCA Events

ucca_item_completed

Sent when a UCCA graph is generated for an item.

{
  "timestamp": "2025-01-15T10:00:02.345Z",
  "type": "ucca_item_completed",
  "status": "item_complete",
  "index": 0,
  "ucca_graph": {
    "scenes": [...],
    "participants": [...],
    "processes": [...]
  }
}

Gloss Events

gloss_item_completed

Sent when gloss analysis is complete for an item.

{
  "timestamp": "2025-01-15T10:00:03.456Z",
  "type": "gloss_item_completed",
  "status": "item_complete",
  "index": 0,
  "standardized_text": "བྱང་ཆུབ་ ΰ½¦ΰ½Ίΰ½˜ΰ½¦ΰΌ‹ΰ½‘ΰ½”ΰ½ ΰΌ‹",
  "note": "Segmented with standard spacing",
  "analysis": "[{\"segment\": \"...\", \"meaning\": \"...\"}]",
  "glossary": {"བྱང་ཆུབ": "enlightenment"}
}

Editor Comment Events

initialization

Sent at the start of comment generation.

{
  "type": "initialization",
  "mentions": ["@User1", "@User2"],
  "model_used": "gemini-2.5-pro"
}

comment_delta

Sent for each chunk of streaming text.

{
  "type": "comment_delta",
  "text": "The term 'bodhicitta' "
}

completion (Editor)

Final event with complete comment.

{
  "type": "completion",
  "comment_text": "@User1 The term 'bodhicitta' should be standardized [ref-commentary-1].",
  "citations_used": ["ref-commentary-1"],
  "mentions": ["@User1"]
}

Error Events

error

Sent when an error occurs.

{
  "type": "error",
  "message": "Model invocation failed: API rate limit exceeded"
}

πŸ’» Client Implementation

JavaScript (Browser)

async function streamTranslation(texts, targetLanguage) {
  const response = await fetch('/translate/stream', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      texts: texts,
      target_language: targetLanguage,
      model_name: 'claude-sonnet-4-20250514',
      batch_size: 5
    })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  
  let buffer = '';
  
  while (true) {
    const { done, value } = await reader.read();
    
    if (done) break;
    
    buffer += decoder.decode(value, { stream: true });
    
    // Process complete events
    const lines = buffer.split('\n');
    buffer = lines.pop(); // Keep incomplete line in buffer
    
    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6));
        handleEvent(data);
      }
    }
  }
}

function handleEvent(event) {
  switch (event.type) {
    case 'batch_completed':
      console.log('Batch done:', event.batch_results.length, 'items');
      displayResults(event.batch_results);
      break;
      
    case 'completion':
      console.log('All done:', event.results.length, 'total');
      break;
      
    case 'error':
      console.error('Error:', event.message);
      break;
  }
}

JavaScript (EventSource Alternative)

// Note: EventSource only supports GET, so use fetch for POST endpoints
// This example shows the pattern for GET-compatible streaming

const eventSource = new EventSource('/some-get-endpoint');

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  handleEvent(data);
};

eventSource.onerror = (error) => {
  console.error('SSE Error:', error);
  eventSource.close();
};

Python (Synchronous)

import requests
import json

def stream_translation(texts: list[str], target_language: str):
    response = requests.post(
        'http://localhost:8001/translate/stream',
        json={
            'texts': texts,
            'target_language': target_language,
            'model_name': 'claude-sonnet-4-20250514',
            'batch_size': 5
        },
        stream=True
    )
    
    for line in response.iter_lines():
        if line:
            line = line.decode('utf-8')
            if line.startswith('data: '):
                data = json.loads(line[6:])
                handle_event(data)

def handle_event(event: dict):
    if event['type'] == 'batch_completed':
        print(f"Batch complete: {len(event['batch_results'])} items")
        for result in event['batch_results']:
            print(f"  {result['original_text'][:30]}... β†’ {result['translated_text'][:30]}...")
    
    elif event['type'] == 'completion':
        print(f"All complete: {len(event['results'])} total translations")
    
    elif event['type'] == 'error':
        print(f"Error: {event['message']}")

# Usage
stream_translation(
    texts=["ΰ½–ΰΎ±ΰ½„ΰΌ‹ΰ½†ΰ½΄ΰ½–ΰΌ‹ΰ½¦ΰ½Ίΰ½˜ΰ½¦", "ΰ½¦ΰΎŸΰ½Όΰ½„ΰΌ‹ΰ½”ΰΌ‹ΰ½‰ΰ½²ΰ½‘"],
    target_language="english"
)

Python (Async with httpx)

import httpx
import json

async def stream_translation_async(texts: list[str], target_language: str):
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream(
            'POST',
            'http://localhost:8001/translate/stream',
            json={
                'texts': texts,
                'target_language': target_language,
                'model_name': 'claude-sonnet-4-20250514',
                'batch_size': 5
            }
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith('data: '):
                    data = json.loads(line[6:])
                    await handle_event_async(data)

async def handle_event_async(event: dict):
    # Same logic as synchronous version
    pass

cURL

curl -X POST http://localhost:8001/translate/stream \
  -H "Content-Type: application/json" \
  -d '{
    "texts": ["ΰ½–ΰΎ±ΰ½„ΰΌ‹ΰ½†ΰ½΄ΰ½–ΰΌ‹ΰ½¦ΰ½Ίΰ½˜ΰ½¦", "ΰ½¦ΰΎŸΰ½Όΰ½„ΰΌ‹ΰ½”ΰΌ‹ΰ½‰ΰ½²ΰ½‘"],
    "target_language": "english",
    "model_name": "claude-sonnet-4-20250514",
    "batch_size": 5
  }' \
  --no-buffer

Output:

data: {"timestamp": "...", "type": "batch_completed", "batch_results": [...]}

data: {"timestamp": "...", "type": "completion", "results": [...]}

🎨 UI Pattern: Progressive Display

// Progressive translation display
class TranslationDisplay {
  constructor(container) {
    this.container = container;
    this.results = [];
  }
  
  async startTranslation(texts) {
    this.container.innerHTML = '<div class="loading">Starting...</div>';
    
    const response = await fetch('/translate/stream', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        texts: texts,
        target_language: 'english',
        batch_size: 5
      })
    });
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';
    
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');
      buffer = lines.pop();
      
      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const event = JSON.parse(line.slice(6));
          this.handleEvent(event);
        }
      }
    }
  }
  
  handleEvent(event) {
    if (event.type === 'batch_completed') {
      // Add new results progressively
      for (const result of event.batch_results) {
        this.results.push(result);
        this.appendResult(result);
      }
      this.updateProgress();
    }
    
    if (event.type === 'completion') {
      this.showComplete();
    }
  }
  
  appendResult(result) {
    const div = document.createElement('div');
    div.className = 'result-item fade-in';
    div.innerHTML = `
      <div class="original">${result.original_text}</div>
      <div class="arrow">β†’</div>
      <div class="translated">${result.translated_text}</div>
    `;
    this.container.appendChild(div);
  }
  
  updateProgress() {
    // Update progress bar, count, etc.
  }
  
  showComplete() {
    // Show completion message
  }
}

⚠️ Error Handling

Connection Errors

async function streamWithRetry(request, maxRetries = 3) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const response = await fetch('/translate/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(request)
      });
      
      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
      }
      
      await processStream(response);
      return; // Success
      
    } catch (error) {
      console.error(`Attempt ${attempt} failed:`, error);
      
      if (attempt === maxRetries) {
        throw error;
      }
      
      // Wait before retry (exponential backoff)
      await new Promise(r => setTimeout(r, 1000 * attempt));
    }
  }
}

Event-Level Errors

function handleEvent(event) {
  if (event.type === 'error') {
    showError(event.message);
    // Optionally retry or show fallback
    return;
  }
  
  // Handle other events...
}

πŸ”§ Best Practices

1. Always Handle completion

Don't rely on stream ending; wait for explicit completion event.

2. Buffer Partial Lines

SSE data may be split across chunks:

let buffer = '';
// ...
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // Keep partial line

3. Set Appropriate Timeouts

Streaming requests should have long or no timeouts:

# Python httpx
async with httpx.AsyncClient(timeout=None) as client:
    ...

4. Show Progress

Update UI as batches complete, don't wait for final result.

5. Handle Reconnection

If connection drops, consider resuming from last successful batch.


πŸ”— See Also

Clone this wiki locally