-
Notifications
You must be signed in to change notification settings - Fork 320
feat(dev-hub) Pyth Pro Playground #3346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
43da690
f8e99b1
beccc18
8e6d7d6
25ac620
9d2ccf6
f2bf1d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,286 @@ | ||
| import type { NextRequest } from "next/server"; | ||
| import type { MessageEvent } from "ws"; | ||
| import WebSocket from "ws"; | ||
| import { z } from "zod"; | ||
|
|
||
| import { | ||
| PLAYGROUND_MAX_STREAM_DURATION_MS, | ||
| PLAYGROUND_RATE_LIMIT_MAX_REQUESTS, | ||
| PLAYGROUND_RATE_LIMIT_WINDOW_MS, | ||
| PYTH_PRO_DEMO_TOKEN, | ||
| PYTH_PRO_WS_ENDPOINT, | ||
| } from "../../../../config/pyth-pro"; | ||
| import { checkRateLimit } from "../../../../lib/rate-limiter"; | ||
|
|
||
| // Request body schema | ||
| const StreamRequestSchema = z.object({ | ||
| accessToken: z.string().optional().default(""), | ||
| priceFeedIds: z | ||
| .array(z.number()) | ||
| .min(1, "At least one price feed ID required"), | ||
| properties: z.array(z.string()).min(1, "At least one property required"), | ||
| chains: z.array(z.string()).min(1, "At least one chain required"), | ||
| channel: z.enum([ | ||
| "real_time", | ||
| "fixed_rate@1ms", | ||
| "fixed_rate@50ms", | ||
| "fixed_rate@200ms", | ||
| "fixed_rate@1000ms", | ||
| ]), | ||
| deliveryFormat: z.enum(["json", "binary"]), | ||
| jsonBinaryEncoding: z.enum(["hex", "base64"]).optional().default("hex"), | ||
| parsed: z.boolean().optional().default(true), | ||
| }); | ||
|
|
||
| type StreamRequest = z.infer<typeof StreamRequestSchema>; | ||
|
|
||
| /** | ||
| * Get client IP address from request headers | ||
| */ | ||
| function getClientIp(request: NextRequest): string { | ||
| const forwardedFor = request.headers.get("x-forwarded-for"); | ||
| if (forwardedFor) { | ||
| return forwardedFor.split(",")[0]?.trim() ?? "unknown"; | ||
| } | ||
| const realIp = request.headers.get("x-real-ip"); | ||
| if (realIp) { | ||
| return realIp; | ||
| } | ||
| return "unknown"; | ||
| } | ||
|
|
||
| /** | ||
| * Create SSE message string | ||
| */ | ||
| function createSseMessage(event: string, data: unknown): string { | ||
| return `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; | ||
| } | ||
|
|
||
| /** | ||
| * Build the Pyth Pro subscription message | ||
| */ | ||
| function buildSubscriptionMessage(config: StreamRequest): string { | ||
| return JSON.stringify({ | ||
| type: "subscribe", | ||
| subscriptionId: 1, | ||
| priceFeedIds: config.priceFeedIds, | ||
| properties: config.properties, | ||
| chains: config.chains, | ||
| deliveryFormat: config.deliveryFormat, | ||
| channel: config.channel, | ||
| jsonBinaryEncoding: config.jsonBinaryEncoding, | ||
| parsed: config.parsed, | ||
| }); | ||
| } | ||
|
|
||
| export async function POST(request: NextRequest) { | ||
| // Parse and validate request body | ||
| let config: StreamRequest; | ||
| try { | ||
| const body: unknown = await request.json(); | ||
| config = StreamRequestSchema.parse(body); | ||
| } catch (error) { | ||
| if (error instanceof z.ZodError) { | ||
| return new Response( | ||
| JSON.stringify({ error: "Invalid request", details: error.errors }), | ||
| { status: 400, headers: { "Content-Type": "application/json" } }, | ||
| ); | ||
| } | ||
| return new Response(JSON.stringify({ error: "Invalid JSON body" }), { | ||
| status: 400, | ||
| headers: { "Content-Type": "application/json" }, | ||
| }); | ||
| } | ||
|
|
||
| // Determine which token to use | ||
| const demoToken = PYTH_PRO_DEMO_TOKEN; | ||
| const usesDemoToken = !config.accessToken; | ||
|
|
||
| if (usesDemoToken && !demoToken) { | ||
| return new Response( | ||
| JSON.stringify({ error: "Demo token not configured on server" }), | ||
| { status: 500, headers: { "Content-Type": "application/json" } }, | ||
| ); | ||
| } | ||
|
|
||
| const accessToken = usesDemoToken ? demoToken : config.accessToken; | ||
|
|
||
| // Apply rate limiting for demo token users | ||
| if (usesDemoToken) { | ||
| const clientIp = getClientIp(request); | ||
| const rateLimitResult = checkRateLimit(clientIp, { | ||
| windowMs: PLAYGROUND_RATE_LIMIT_WINDOW_MS, | ||
| maxRequests: PLAYGROUND_RATE_LIMIT_MAX_REQUESTS, | ||
| }); | ||
|
|
||
| if (!rateLimitResult.allowed) { | ||
| const retryAfterSeconds = Math.ceil(rateLimitResult.resetIn / 1000); | ||
| return new Response( | ||
| JSON.stringify({ | ||
| error: "Rate limit exceeded", | ||
| message: `Too many requests. Try again in ${String(retryAfterSeconds)} seconds.`, | ||
| resetIn: rateLimitResult.resetIn, | ||
| }), | ||
| { | ||
| status: 429, | ||
| headers: { | ||
| "Content-Type": "application/json", | ||
| "Retry-After": String(retryAfterSeconds), | ||
| }, | ||
| }, | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // Create SSE stream | ||
| const encoder = new TextEncoder(); | ||
| let websocket: WebSocket | undefined; | ||
| let timeoutId: ReturnType<typeof setTimeout> | undefined; | ||
| let isClosed = false; | ||
|
|
||
| const stream = new ReadableStream({ | ||
| start(controller) { | ||
| const cleanup = () => { | ||
| if (isClosed) return; | ||
| isClosed = true; | ||
|
|
||
| if (timeoutId) { | ||
| clearTimeout(timeoutId); | ||
| } | ||
| if (websocket && websocket.readyState === WebSocket.OPEN) { | ||
| websocket.close(); | ||
| } | ||
| }; | ||
|
|
||
| const sendEvent = (event: string, data: unknown) => { | ||
| if (isClosed) return; | ||
| try { | ||
| controller.enqueue(encoder.encode(createSseMessage(event, data))); | ||
| } catch { | ||
| // Controller may be closed | ||
| cleanup(); | ||
| } | ||
| }; | ||
|
|
||
| // Set up auto-close timeout | ||
| const maxDurationSeconds = Math.round( | ||
| PLAYGROUND_MAX_STREAM_DURATION_MS / 1000, | ||
| ); | ||
| timeoutId = setTimeout(() => { | ||
| sendEvent("close", { | ||
| timestamp: new Date().toISOString(), | ||
| reason: "timeout", | ||
| message: `Stream closed after ${String(maxDurationSeconds)} seconds`, | ||
| }); | ||
| cleanup(); | ||
| controller.close(); | ||
| }, PLAYGROUND_MAX_STREAM_DURATION_MS); | ||
|
|
||
| // Connect to Pyth Pro WebSocket using Bearer token authentication | ||
| try { | ||
| const wsUrl = PYTH_PRO_WS_ENDPOINT; | ||
| const wsOptions = { | ||
| headers: { | ||
| Authorization: `Bearer ${accessToken ?? ""}`, | ||
| }, | ||
| }; | ||
| websocket = new WebSocket(wsUrl, wsOptions); | ||
|
|
||
| websocket.addEventListener("open", () => { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is actually not going to work once deployed. it works locally, because you have a single, persistent instance that is always up. However, once deployed to Vercel, the app cluster will be scaled up and down dynamically, so the server that opened this websocket instance will likely be killed, which will cause problems that manifest as Pyth API slowness and instability (definitely not a thing to promote 😓 ). Let's revisit this. We may need to have a bespoke server, written in Rust or Javascript, depending on which Pyth Client we want to use, that only handles streaming for the DevHub docsite and the upcoming Admin portal |
||
| sendEvent("connected", { | ||
| timestamp: new Date().toISOString(), | ||
| endpoint: PYTH_PRO_WS_ENDPOINT, | ||
| message: "Connected to Pyth Pro WebSocket", | ||
| }); | ||
|
|
||
| // Send subscription message | ||
| const subscriptionMessage = buildSubscriptionMessage(config); | ||
| websocket?.send(subscriptionMessage); | ||
|
|
||
| sendEvent("subscribed", { | ||
| timestamp: new Date().toISOString(), | ||
| subscription: JSON.parse(subscriptionMessage) as unknown, | ||
| }); | ||
| }); | ||
|
|
||
| websocket.addEventListener("message", (event: MessageEvent) => { | ||
| try { | ||
| // event.data can be string, Buffer, ArrayBuffer, or Buffer[] | ||
| const rawData = event.data; | ||
| const messageData = | ||
| typeof rawData === "string" | ||
| ? rawData | ||
| : (Buffer.isBuffer(rawData) | ||
| ? rawData.toString("utf8") | ||
| : JSON.stringify(rawData)); | ||
| const parsedData: unknown = JSON.parse(messageData); | ||
| sendEvent("message", { | ||
| timestamp: new Date().toISOString(), | ||
| data: parsedData, | ||
| }); | ||
| } catch { | ||
| // If not JSON, send as raw string | ||
| const rawData = event.data; | ||
| const dataStr = | ||
| typeof rawData === "string" | ||
| ? rawData | ||
| : (Buffer.isBuffer(rawData) | ||
| ? rawData.toString("utf8") | ||
| : JSON.stringify(rawData)); | ||
| sendEvent("message", { | ||
| timestamp: new Date().toISOString(), | ||
| data: dataStr, | ||
| }); | ||
| } | ||
| }); | ||
|
|
||
| websocket.addEventListener("error", () => { | ||
| sendEvent("error", { | ||
| timestamp: new Date().toISOString(), | ||
| error: "WebSocket connection error", | ||
| }); | ||
| }); | ||
|
|
||
| websocket.addEventListener("close", (event) => { | ||
| const closeEvent = event as { code?: number; reason?: string }; | ||
| if (isClosed) return; | ||
| sendEvent("close", { | ||
| timestamp: new Date().toISOString(), | ||
| reason: "websocket_closed", | ||
| code: closeEvent.code, | ||
| message: closeEvent.reason ?? "WebSocket connection closed", | ||
| }); | ||
| cleanup(); | ||
| controller.close(); | ||
| }); | ||
| } catch (error) { | ||
| sendEvent("error", { | ||
| timestamp: new Date().toISOString(), | ||
| error: error instanceof Error ? error.message : "Failed to connect", | ||
| }); | ||
| cleanup(); | ||
| controller.close(); | ||
| } | ||
| }, | ||
|
|
||
| cancel() { | ||
| // Client disconnected | ||
| if (websocket && websocket.readyState === WebSocket.OPEN) { | ||
| websocket.close(); | ||
| } | ||
| if (timeoutId) { | ||
| clearTimeout(timeoutId); | ||
| } | ||
| isClosed = true; | ||
| }, | ||
| }); | ||
|
|
||
| return new Response(stream, { | ||
| headers: { | ||
| "Content-Type": "text/event-stream", | ||
| "Cache-Control": "no-cache, no-transform", | ||
| Connection: "keep-alive", | ||
| "X-Accel-Buffering": "no", | ||
| }, | ||
| }); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| import { HomeLayout } from "fumadocs-ui/layouts/home"; | ||
| import type { ReactNode } from "react"; | ||
|
|
||
| import { baseOptions } from "../../config/layout.config"; | ||
|
|
||
| export default function PlaygroundLayout({ children }: PlaygroundLayoutProps) { | ||
| return <HomeLayout {...baseOptions}>{children}</HomeLayout>; | ||
| } | ||
|
|
||
| type PlaygroundLayoutProps = { | ||
| children: ReactNode; | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| import type { Metadata } from "next"; | ||
|
|
||
| import { PlaygroundPage } from "../../components/Pages/PlaygroundPage"; | ||
|
|
||
| export const metadata: Metadata = { | ||
| title: "Pyth Pro Playground | Pyth Network", | ||
| description: | ||
| "Interactive playground to explore Pyth Pro APIs. Configure subscription parameters, generate code in multiple languages, and test real-time price streams.", | ||
| }; | ||
|
|
||
| export default function Playground() { | ||
| return <PlaygroundPage />; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 we may need to do a deeper dive on the usage of the EventStream API here. it's a good usage here in Next (since we cannot use server-side websockets in Next), but there is an opportunity to move this API out of developer hub and into the
shared-lib, so we can standardize the API and also provide some React hooks that work easily with it, without needing to build something custom per-app (thinking out loud here, as this is 100% something that we will need in other places)