diff --git a/scripts/cli.ts b/scripts/cli.ts index b4415cc..9dcf617 100644 --- a/scripts/cli.ts +++ b/scripts/cli.ts @@ -34,7 +34,9 @@ import * as path from 'path'; import { sql } from '@vercel/postgres'; import { syncAnthropicUsage, getAnthropicSyncState, backfillAnthropicUsage, resetAnthropicBackfillComplete } from '../src/lib/sync/anthropic'; import { syncCursorUsage, backfillCursorUsage, getCursorSyncState, getPreviousCompleteHourEnd, resetCursorBackfillComplete } from '../src/lib/sync/cursor'; +import { syncOpenAIUsage, getOpenAISyncState, backfillOpenAIUsage, resetOpenAIBackfillComplete } from '../src/lib/sync/openai'; import { syncApiKeyMappingsSmart, syncAnthropicApiKeyMappings } from '../src/lib/sync/anthropic-mappings'; +import { syncOpenAIUserMappingsSmart } from '../src/lib/sync/openai-mappings'; import { getToolIdentityMappings, setToolIdentityMapping, getUnmappedToolRecords, getKnownEmails, insertUsageRecord } from '../src/lib/queries'; import { normalizeModelName } from '../src/lib/utils'; @@ -137,18 +139,19 @@ Usage: Commands: db:migrate Run pending database migrations sync [tool] [--days N] [--skip-mappings] - Sync recent usage data (tool: anthropic|cursor, default: both) + Sync recent usage data (tool: anthropic|cursor|openai, default: all) backfill --from YYYY-MM-DD --to YYYY-MM-DD Backfill historical data for a specific tool backfill:complete - Mark backfill as complete for a tool (anthropic|cursor) + Mark backfill as complete for a tool (anthropic|cursor|openai) backfill:reset Reset backfill status for a tool (allows re-backfilling) - gaps [tool] Check for gaps in usage data (tool: anthropic|cursor, default: both) + gaps [tool] Check for gaps in usage data (tool: anthropic|cursor|openai, default: all) mappings List API key mappings mappings:sync [--full] Sync API key mappings from Anthropic (--full for all keys) mappings:fix Interactive fix for unmapped API keys anthropic:status Show Anthropic sync state cursor:status Show Cursor sync state + openai:status Show OpenAI sync state import:cursor-csv Import Cursor usage from CSV export stats Show database statistics @@ -157,9 +160,11 @@ Commands: Examples: npm run cli sync --days 30 npm run cli sync cursor --days 7 + npm run cli sync openai --days 7 npm run cli backfill cursor --from 2024-01-01 --to 2025-01-01 + npm run cli backfill openai --from 2024-01-01 --to 2025-01-01 npm run cli mappings:fix - npm run cli cursor:status + npm run cli openai:status `); } @@ -245,6 +250,34 @@ async function cmdCursorStatus() { } } +async function cmdOpenAIStatus() { + console.log('šŸ”„ OpenAI Sync Status\n'); + + const { lastSyncedDate } = await getOpenAISyncState(); + + // Yesterday is the most recent complete day we should have + const yesterday = new Date(); + yesterday.setDate(yesterday.getDate() - 1); + const yesterdayStr = yesterday.toISOString().split('T')[0]; + + if (lastSyncedDate) { + console.log(`Last synced date: ${lastSyncedDate}`); + console.log(`Current complete day: ${yesterdayStr}`); + + if (lastSyncedDate >= yesterdayStr) { + console.log('\nāœ“ Up to date'); + } else { + const lastDate = new Date(lastSyncedDate); + const daysBehind = Math.floor((yesterday.getTime() - lastDate.getTime()) / (24 * 60 * 60 * 1000)); + console.log(`\nāš ļø ${daysBehind} day(s) behind`); + } + } else { + console.log('Never synced'); + console.log(`Current complete day: ${yesterdayStr}`); + console.log('\nRun backfill to initialize: npm run cli backfill openai --from YYYY-MM-DD --to YYYY-MM-DD'); + } +} + async function cmdMappings() { console.log('šŸ”‘ Tool Identity Mappings\n'); const mappings = await getToolIdentityMappings(); @@ -306,7 +339,7 @@ async function cmdMappingsFix() { console.log('\nDone!'); } -async function cmdSync(days: number = 7, tools: ('anthropic' | 'cursor')[] = ['anthropic', 'cursor'], skipMappings: boolean = false) { +async function cmdSync(days: number = 7, tools: ('anthropic' | 'cursor' | 'openai')[] = ['anthropic', 'cursor', 'openai'], skipMappings: boolean = false) { const endDate = new Date().toISOString().split('T')[0]; const startDate = new Date(Date.now() - days * 24 * 60 * 60 * 1000).toISOString().split('T')[0]; @@ -320,19 +353,23 @@ async function cmdSync(days: number = 7, tools: ('anthropic' | 'cursor')[] = ['a console.log('āš ļø Skipping Cursor: CURSOR_ADMIN_KEY not configured'); return false; } + if (tool === 'openai' && !process.env.OPENAI_ADMIN_KEY) { + console.log('āš ļø Skipping OpenAI: OPENAI_ADMIN_KEY not configured'); + return false; + } return true; }); if (configuredTools.length === 0) { - console.log('\nāŒ No providers configured. Set ANTHROPIC_ADMIN_KEY and/or CURSOR_ADMIN_KEY.'); + console.log('\nāŒ No providers configured. Set ANTHROPIC_ADMIN_KEY, CURSOR_ADMIN_KEY, and/or OPENAI_ADMIN_KEY.'); return; } console.log(`\nšŸ”„ Syncing usage data from ${startDate} to ${endDate}\n`); - // Sync API key mappings FIRST so usage sync has them available + // Sync API key/user mappings FIRST so usage sync has them available if (configuredTools.includes('anthropic') && !skipMappings) { - console.log('Syncing API key mappings...'); + console.log('Syncing Anthropic API key mappings...'); const mappingsResult = await syncApiKeyMappingsSmart(); console.log(` Created: ${mappingsResult.mappingsCreated}, Skipped: ${mappingsResult.mappingsSkipped}`); if (mappingsResult.errors.length > 0) { @@ -341,6 +378,16 @@ async function cmdSync(days: number = 7, tools: ('anthropic' | 'cursor')[] = ['a console.log(''); } + if (configuredTools.includes('openai') && !skipMappings) { + console.log('Syncing OpenAI user mappings...'); + const mappingsResult = await syncOpenAIUserMappingsSmart(); + console.log(` Created: ${mappingsResult.mappingsCreated}, Skipped: ${mappingsResult.mappingsSkipped}`); + if (mappingsResult.errors.length > 0) { + console.log(` Errors: ${mappingsResult.errors.slice(0, 3).join(', ')}`); + } + console.log(''); + } + if (configuredTools.includes('anthropic')) { console.log('Syncing Anthropic usage...'); const anthropicResult = await syncAnthropicUsage(startDate, endDate); @@ -360,10 +407,20 @@ async function cmdSync(days: number = 7, tools: ('anthropic' | 'cursor')[] = ['a } } + if (configuredTools.includes('openai')) { + if (configuredTools.includes('anthropic') || configuredTools.includes('cursor')) console.log(''); + console.log('Syncing OpenAI usage...'); + const openaiResult = await syncOpenAIUsage(startDate, endDate); + console.log(` Imported: ${openaiResult.recordsImported}, Skipped: ${openaiResult.recordsSkipped}`); + if (openaiResult.errors.length > 0) { + console.log(` Errors: ${openaiResult.errors.slice(0, 3).join(', ')}`); + } + } + console.log('\nāœ“ Sync complete!'); } -async function cmdBackfill(tool: 'anthropic' | 'cursor', fromDate: string, toDate: string) { +async function cmdBackfill(tool: 'anthropic' | 'cursor' | 'openai', fromDate: string, toDate: string) { // Check if provider is configured if (tool === 'anthropic' && !process.env.ANTHROPIC_ADMIN_KEY) { console.error('āŒ ANTHROPIC_ADMIN_KEY not configured'); @@ -373,6 +430,10 @@ async function cmdBackfill(tool: 'anthropic' | 'cursor', fromDate: string, toDat console.error('āŒ CURSOR_ADMIN_KEY not configured'); return; } + if (tool === 'openai' && !process.env.OPENAI_ADMIN_KEY) { + console.error('āŒ OPENAI_ADMIN_KEY not configured'); + return; + } console.log(`šŸ“„ Backfilling ${tool} from ${fromDate} to ${toDate}\n`); @@ -403,6 +464,21 @@ async function cmdBackfill(tool: 'anthropic' | 'cursor', fromDate: string, toDat if (result.errors.length > 0) { console.log(` Errors: ${result.errors.slice(0, 5).join(', ')}`); } + } else if (tool === 'openai') { + // Sync user mappings first + console.log('Syncing OpenAI user mappings first...'); + const mappingsResult = await syncOpenAIUserMappingsSmart(); + console.log(` Created: ${mappingsResult.mappingsCreated}, Skipped: ${mappingsResult.mappingsSkipped}\n`); + + // Use backfillOpenAIUsage which updates sync state + const result = await backfillOpenAIUsage(fromDate, { + onProgress: (msg: string) => console.log(msg) + }); + console.log(`\nāœ“ Backfill complete`); + console.log(` Imported: ${result.recordsImported}, Skipped: ${result.recordsSkipped}`); + if (result.errors.length > 0) { + console.log(` Errors: ${result.errors.slice(0, 5).join(', ')}`); + } } } @@ -590,6 +666,9 @@ async function main() { case 'cursor:status': await cmdCursorStatus(); break; + case 'openai:status': + await cmdOpenAIStatus(); + break; case 'mappings': await cmdMappings(); break; @@ -603,21 +682,23 @@ async function main() { const daysIdx = args.indexOf('--days'); const days = daysIdx >= 0 ? parseInt(args[daysIdx + 1]) : 7; const skipMappings = args.includes('--skip-mappings'); - // Parse tool filter: sync [anthropic|cursor] --days N + // Parse tool filter: sync [anthropic|cursor|openai] --days N const toolArg = args[1]; - let tools: ('anthropic' | 'cursor')[] = ['anthropic', 'cursor']; + let tools: ('anthropic' | 'cursor' | 'openai')[] = ['anthropic', 'cursor', 'openai']; if (toolArg === 'anthropic') { tools = ['anthropic']; } else if (toolArg === 'cursor') { tools = ['cursor']; + } else if (toolArg === 'openai') { + tools = ['openai']; } await cmdSync(days, tools, skipMappings); break; } case 'backfill': { - const tool = args[1] as 'anthropic' | 'cursor'; - if (!tool || !['anthropic', 'cursor'].includes(tool)) { - console.error('Error: Please specify tool (anthropic or cursor)'); + const tool = args[1] as 'anthropic' | 'cursor' | 'openai'; + if (!tool || !['anthropic', 'cursor', 'openai'].includes(tool)) { + console.error('Error: Please specify tool (anthropic, cursor, or openai)'); console.error('Usage: npm run cli backfill --from YYYY-MM-DD --to YYYY-MM-DD'); break; } @@ -643,9 +724,9 @@ async function main() { break; } case 'backfill:complete': { - const tool = args[1] as 'anthropic' | 'cursor'; - if (!tool || !['anthropic', 'cursor'].includes(tool)) { - console.error('Error: Please specify tool (anthropic or cursor)'); + const tool = args[1] as 'anthropic' | 'cursor' | 'openai'; + if (!tool || !['anthropic', 'cursor', 'openai'].includes(tool)) { + console.error('Error: Please specify tool (anthropic, cursor, or openai)'); console.error('Usage: npm run cli backfill:complete '); break; } @@ -661,29 +742,31 @@ async function main() { break; } case 'backfill:reset': { - const tool = args[1] as 'anthropic' | 'cursor'; - if (!tool || !['anthropic', 'cursor'].includes(tool)) { - console.error('Error: Please specify tool (anthropic or cursor)'); + const tool = args[1] as 'anthropic' | 'cursor' | 'openai'; + if (!tool || !['anthropic', 'cursor', 'openai'].includes(tool)) { + console.error('Error: Please specify tool (anthropic, cursor, or openai)'); console.error('Usage: npm run cli backfill:reset '); break; } console.log(`Resetting ${tool} backfill status...`); if (tool === 'anthropic') { await resetAnthropicBackfillComplete(); - } else { + } else if (tool === 'cursor') { await resetCursorBackfillComplete(); + } else { + await resetOpenAIBackfillComplete(); } console.log(`āœ“ ${tool} backfill status reset (can now re-backfill)`); break; } case 'gaps': { const toolArg = args[1]; - const toolsToCheck: string[] = toolArg && ['anthropic', 'cursor', 'claude_code'].includes(toolArg) + const toolsToCheck: string[] = toolArg && ['anthropic', 'cursor', 'claude_code', 'openai'].includes(toolArg) ? [toolArg === 'anthropic' ? 'claude_code' : toolArg] - : ['claude_code', 'cursor']; + : ['claude_code', 'cursor', 'openai']; for (const tool of toolsToCheck) { - const displayName = tool === 'claude_code' ? 'Claude Code (anthropic)' : 'Cursor'; + const displayName = tool === 'claude_code' ? 'Claude Code (anthropic)' : tool === 'openai' ? 'OpenAI' : 'Cursor'; console.log(`\nšŸ“Š ${displayName} Data Gap Analysis\n`); const result = await sql` diff --git a/src/app/api/cron/sync-openai/route.ts b/src/app/api/cron/sync-openai/route.ts new file mode 100644 index 0000000..63f5171 --- /dev/null +++ b/src/app/api/cron/sync-openai/route.ts @@ -0,0 +1,71 @@ +import { NextResponse } from 'next/server'; +import { wrapRouteHandlerWithSentry } from '@sentry/nextjs'; +import { runOpenAISync, getOpenAISyncState } from '@/lib/sync'; + +/** + * OpenAI Cron Sync - runs daily at 7 AM UTC (staggered from Anthropic at 6 AM) + * + * Uses state tracking to efficiently sync only new data: + * - Tracks last synced date in sync_state table + * - Syncs from (last_synced_date - 1 day) to yesterday + * - Skips if already synced yesterday's data + * + * This endpoint is safe to call more frequently than daily - + * it will simply return early if there's no new data to sync. + */ +async function handler(request: Request) { + // Verify cron secret + const authHeader = request.headers.get('authorization'); + const cronSecret = process.env.CRON_SECRET; + + if (!cronSecret || authHeader !== `Bearer ${cronSecret}`) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); + } + + // Check if provider is configured + if (!process.env.OPENAI_ADMIN_KEY) { + return NextResponse.json({ + success: true, + service: 'openai', + skipped: true, + reason: 'OPENAI_ADMIN_KEY not configured' + }); + } + + // Get current sync state for logging + const stateBefore = await getOpenAISyncState(); + + const result = await runOpenAISync({ includeMappings: true }); + + // Check if we actually synced anything + const didSync = result.openai.syncedRange !== undefined; + + return NextResponse.json({ + success: result.openai.success, + service: 'openai', + didSync, + syncedRange: result.openai.syncedRange || null, + previousSyncState: stateBefore.lastSyncedDate, + result: { + openai: { + recordsImported: result.openai.recordsImported, + recordsSkipped: result.openai.recordsSkipped, + errors: result.openai.errors.slice(0, 5) // Limit errors in response + }, + mappings: result.mappings ? { + mappingsCreated: result.mappings.mappingsCreated, + mappingsSkipped: result.mappings.mappingsSkipped + } : null + } + }); +} + +export const GET = wrapRouteHandlerWithSentry(handler, { + method: 'GET', + parameterizedRoute: '/api/cron/sync-openai', +}); + +export const POST = wrapRouteHandlerWithSentry(handler, { + method: 'POST', + parameterizedRoute: '/api/cron/sync-openai', +}); diff --git a/src/app/api/status/route.ts b/src/app/api/status/route.ts index ce58bd6..777822a 100644 --- a/src/app/api/status/route.ts +++ b/src/app/api/status/route.ts @@ -2,6 +2,7 @@ import { NextResponse } from 'next/server'; import { wrapRouteHandlerWithSentry } from '@sentry/nextjs'; import { getAnthropicSyncState, getAnthropicBackfillState } from '@/lib/sync/anthropic'; import { getCursorSyncState, getCursorBackfillState } from '@/lib/sync/cursor'; +import { getOpenAISyncState, getOpenAIBackfillState } from '@/lib/sync/openai'; import { getUnattributedStats } from '@/lib/queries'; import { getSession } from '@/lib/auth'; @@ -42,6 +43,7 @@ async function handler() { // Check which providers are configured const anthropicConfigured = !!process.env.ANTHROPIC_ADMIN_KEY; const cursorConfigured = !!process.env.CURSOR_ADMIN_KEY; + const openaiConfigured = !!process.env.OPENAI_ADMIN_KEY; const providers: Record = {}; const crons: { path: string; schedule: string; type: string }[] = []; @@ -106,6 +108,33 @@ async function handler() { ); } + // OpenAI + if (openaiConfigured) { + const [openaiSync, openaiBackfill] = await Promise.all([ + getOpenAISyncState(), + getOpenAIBackfillState() + ]); + + providers.openai = { + id: 'openai', + name: 'OpenAI', + color: 'green', + configured: true, + forwardSync: { + lastSyncedDate: openaiSync.lastSyncedDate, + status: getForwardSyncStatus(openaiSync.lastSyncedDate, false) + }, + backfill: { + oldestDate: openaiBackfill.oldestDate, + status: getBackfillStatus(openaiBackfill.oldestDate, openaiBackfill.isComplete) + } + }; + + crons.push( + { path: '/api/cron/sync-openai', schedule: 'Daily at 7 AM UTC', type: 'forward' } + ); + } + // Get unattributed usage stats const unattributed = await getUnattributedStats(); @@ -115,7 +144,8 @@ async function handler() { unattributed, // For backwards compatibility, also include at top level anthropic: providers.anthropic || null, - cursor: providers.cursor || null + cursor: providers.cursor || null, + openai: providers.openai || null }); } diff --git a/src/lib/db.ts b/src/lib/db.ts index 2c5e70f..76cf03f 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -16,15 +16,29 @@ export { sql }; // This allows gradual migration from raw SQL to Drizzle export { sql as vercelSql } from '@vercel/postgres'; -// Cost calculation for Claude models (per million tokens) +// Cost calculation for models (per million tokens) // Cache write tokens cost 1.25x input price, cache read tokens cost 0.1x input price const MODEL_PRICING: Record = { + // Claude models 'claude-opus-4-5-20251101': { input: 15, output: 75 }, 'claude-opus-4-1-20250805': { input: 15, output: 75 }, 'claude-sonnet-4-5-20250929': { input: 3, output: 15 }, 'claude-sonnet-4-20250514': { input: 3, output: 15 }, 'claude-haiku-4-5-20251001': { input: 0.8, output: 4 }, 'claude-3-5-haiku-20241022': { input: 0.8, output: 4 }, + // OpenAI GPT models + 'gpt-4o': { input: 2.5, output: 10 }, + 'gpt-4o-mini': { input: 0.15, output: 0.6 }, + 'gpt-4-turbo': { input: 10, output: 30 }, + 'gpt-4': { input: 30, output: 60 }, + 'gpt-3.5-turbo': { input: 0.5, output: 1.5 }, + // OpenAI reasoning models + 'o1': { input: 15, output: 60 }, + 'o1-mini': { input: 1.1, output: 4.4 }, + 'o1-pro': { input: 150, output: 600 }, + 'o3-mini': { input: 1.1, output: 4.4 }, + // OpenAI Codex models + 'codex-mini': { input: 1.5, output: 6 }, }; const CACHE_WRITE_MULTIPLIER = 1.25; diff --git a/src/lib/sync/index.ts b/src/lib/sync/index.ts index 34e2365..2f38463 100644 --- a/src/lib/sync/index.ts +++ b/src/lib/sync/index.ts @@ -1,12 +1,16 @@ import { syncAnthropicUsage, syncAnthropicCron, backfillAnthropicUsage, getAnthropicSyncState, resetAnthropicBackfillComplete, SyncResult as AnthropicResult } from './anthropic'; import { syncCursorCron, syncCursorUsage, backfillCursorUsage, getCursorSyncState, resetCursorBackfillComplete, SyncResult as CursorResult } from './cursor'; +import { syncOpenAIUsage, syncOpenAICron, backfillOpenAIUsage, getOpenAISyncState, resetOpenAIBackfillComplete, SyncResult as OpenAIResult } from './openai'; import { syncAnthropicApiKeyMappings, syncApiKeyMappingsSmart, MappingResult } from './anthropic-mappings'; +import { syncOpenAIUserMappings, syncOpenAIUserMappingsSmart, MappingResult as OpenAIMappingResult } from './openai-mappings'; import { sql } from '@vercel/postgres'; export interface FullSyncResult { anthropic: AnthropicResult; cursor: CursorResult; + openai: OpenAIResult; mappings?: MappingResult; + openaiMappings?: OpenAIMappingResult; } export async function getSyncState(id: string): Promise<{ lastSyncAt: string | null; lastCursor: string | null }> { @@ -58,7 +62,27 @@ export async function runCursorSync(): Promise { } /** - * Run full sync for both services. + * Run OpenAI cron sync. + * Only syncs new data since last sync. + * Safe to call frequently - will skip if already synced. + */ +export async function runOpenAISync(options: { includeMappings?: boolean } = {}): Promise<{ openai: OpenAIResult; mappings?: OpenAIMappingResult }> { + // Sync user mappings FIRST so usage sync has them available + let mappingsResult: OpenAIMappingResult | undefined; + if (options.includeMappings) { + mappingsResult = await syncOpenAIUserMappingsSmart(); + } + + const openaiResult = await syncOpenAICron(); + + return { + openai: openaiResult, + mappings: mappingsResult + }; +} + +/** + * Run full sync for all services. * For backwards compatibility and manual syncs via CLI. */ export async function runFullSync( @@ -70,17 +94,22 @@ export async function runFullSync( const end = endDate || new Date().toISOString().split('T')[0]; const start = startDate || new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString().split('T')[0]; - // Sync API key mappings FIRST so usage sync has them available + // Sync API key/user mappings FIRST so usage sync has them available let mappingsResult: MappingResult | undefined; + let openaiMappingsResult: OpenAIMappingResult | undefined; if (options.includeMappings) { - mappingsResult = await syncApiKeyMappingsSmart(); + [mappingsResult, openaiMappingsResult] = await Promise.all([ + syncApiKeyMappingsSmart(), + syncOpenAIUserMappingsSmart() + ]); } // Run usage syncs in parallel // Note: For manual syncs, use date-based sync - const [anthropicResult, cursorResult] = await Promise.all([ + const [anthropicResult, cursorResult, openaiResult] = await Promise.all([ syncAnthropicUsage(start, end), - syncCursorUsage(start, end) + syncCursorUsage(start, end), + syncOpenAIUsage(start, end) ]); // Update sync state @@ -89,7 +118,9 @@ export async function runFullSync( return { anthropic: anthropicResult, cursor: cursorResult, - mappings: mappingsResult + openai: openaiResult, + mappings: mappingsResult, + openaiMappings: openaiMappingsResult }; } @@ -114,5 +145,12 @@ export { backfillCursorUsage, getCursorSyncState, resetCursorBackfillComplete, - syncAnthropicApiKeyMappings + syncOpenAIUsage, + syncOpenAICron, + backfillOpenAIUsage, + getOpenAISyncState, + resetOpenAIBackfillComplete, + syncAnthropicApiKeyMappings, + syncOpenAIUserMappings, + syncOpenAIUserMappingsSmart }; diff --git a/src/lib/sync/openai-mappings.ts b/src/lib/sync/openai-mappings.ts new file mode 100644 index 0000000..65df5a0 --- /dev/null +++ b/src/lib/sync/openai-mappings.ts @@ -0,0 +1,239 @@ +/** + * Auto-map OpenAI user IDs to user emails using the Admin API + * + * Uses the endpoint: + * - GET /v1/organization/users - Get all org users with emails + * + * OpenAI usage data includes user_id directly, so we just need to map user_id → email + */ + +import { setToolIdentityMapping, getToolIdentityMappings, getUnmappedToolRecords } from '../queries'; + +const TOOL = 'openai'; + +interface OpenAIUser { + object: string; + id: string; + email: string; + name: string; + role: string; + added_at: number; // Unix timestamp +} + +interface UsersResponse { + object: string; + data: OpenAIUser[]; + has_more: boolean; + first_id?: string; + last_id?: string; +} + +export interface MappingResult { + success: boolean; + mappingsCreated: number; + mappingsSkipped: number; + errors: string[]; +} + +async function fetchAllUsers(adminKey: string): Promise> { + const userMap = new Map(); // user_id → email + let afterId: string | undefined; + + do { + const params = new URLSearchParams({ limit: '100' }); + if (afterId) params.set('after', afterId); + + const response = await fetch( + `https://api.openai.com/v1/organization/users?${params}`, + { + headers: { + 'Authorization': `Bearer ${adminKey}`, + 'Content-Type': 'application/json' + } + } + ); + + if (!response.ok) { + throw new Error(`Failed to fetch users: ${response.status}`); + } + + const data: UsersResponse = await response.json(); + + for (const user of data.data) { + userMap.set(user.id, user.email); + } + + afterId = data.has_more ? data.last_id : undefined; + } while (afterId); + + return userMap; +} + +/** + * Sync all OpenAI user mappings (user_id → email) + */ +export async function syncOpenAIUserMappings(): Promise { + const adminKey = process.env.OPENAI_ADMIN_KEY; + if (!adminKey) { + return { + success: false, + mappingsCreated: 0, + mappingsSkipped: 0, + errors: ['OPENAI_ADMIN_KEY not configured'] + }; + } + + const result: MappingResult = { + success: true, + mappingsCreated: 0, + mappingsSkipped: 0, + errors: [] + }; + + try { + // Fetch all users + const userMap = await fetchAllUsers(adminKey); + + // Get existing mappings to avoid duplicates + const existingMappings = await getToolIdentityMappings(TOOL); + const existingSet = new Set(existingMappings.map(m => m.external_id)); + + // Create mappings for each user + for (const [userId, email] of userMap) { + // Skip if already mapped + if (existingSet.has(userId)) { + result.mappingsSkipped++; + continue; + } + + try { + await setToolIdentityMapping(TOOL, userId, email); + result.mappingsCreated++; + } catch (err) { + result.errors.push(`Failed to save mapping for ${userId}: ${err}`); + result.mappingsSkipped++; + } + } + + } catch (err) { + result.success = false; + result.errors.push(err instanceof Error ? err.message : 'Unknown error'); + } + + return result; +} + +/** + * Get user email by user ID (for on-the-fly lookups) + */ +export async function getEmailForUserId(userId: string): Promise { + const adminKey = process.env.OPENAI_ADMIN_KEY; + if (!adminKey) return null; + + try { + const response = await fetch( + `https://api.openai.com/v1/organization/users/${userId}`, + { + headers: { + 'Authorization': `Bearer ${adminKey}`, + 'Content-Type': 'application/json' + } + } + ); + + if (!response.ok) return null; + + const user: OpenAIUser = await response.json(); + return user.email; + + } catch { + return null; + } +} + +/** + * Smart sync that uses incremental lookups for small numbers of unmapped users, + * falling back to full sync when there are many. + */ +export async function syncOpenAIUserMappingsSmart( + options: { incrementalThreshold?: number } = {} +): Promise { + const threshold = options.incrementalThreshold ?? 20; + + // Check how many unmapped records we have for openai + const unmappedRecords = await getUnmappedToolRecords(TOOL); + + if (unmappedRecords.length === 0) { + return { + success: true, + mappingsCreated: 0, + mappingsSkipped: 0, + errors: [] + }; + } + + // If we have few unmapped users, do individual lookups (1 API call per user) + // If we have many, do full sync (1 paginated API call total) + if (unmappedRecords.length <= threshold) { + return syncOpenAIUserMappingsIncremental(unmappedRecords.map(r => r.tool_record_id)); + } + + return syncOpenAIUserMappings(); +} + +/** + * Incremental sync - looks up only specific user IDs individually. + * More efficient when only a few users need mapping. + */ +async function syncOpenAIUserMappingsIncremental(userIds: string[]): Promise { + const adminKey = process.env.OPENAI_ADMIN_KEY; + if (!adminKey) { + return { + success: false, + mappingsCreated: 0, + mappingsSkipped: 0, + errors: ['OPENAI_ADMIN_KEY not configured'] + }; + } + + const result: MappingResult = { + success: true, + mappingsCreated: 0, + mappingsSkipped: 0, + errors: [] + }; + + for (const userId of userIds) { + try { + const response = await fetch( + `https://api.openai.com/v1/organization/users/${userId}`, + { + headers: { + 'Authorization': `Bearer ${adminKey}`, + 'Content-Type': 'application/json' + } + } + ); + + if (!response.ok) { + result.mappingsSkipped++; + if (response.status !== 404) { + result.errors.push(`Failed to fetch user ${userId}: ${response.status}`); + } + continue; + } + + const user: OpenAIUser = await response.json(); + + // Save the mapping (also updates usage_records) + await setToolIdentityMapping(TOOL, userId, user.email); + result.mappingsCreated++; + + } catch (err) { + result.mappingsSkipped++; + result.errors.push(`Error processing ${userId}: ${err instanceof Error ? err.message : 'Unknown'}`); + } + } + + return result; +} diff --git a/src/lib/sync/openai.ts b/src/lib/sync/openai.ts new file mode 100644 index 0000000..3c897f1 --- /dev/null +++ b/src/lib/sync/openai.ts @@ -0,0 +1,370 @@ +import { insertUsageRecord, getToolIdentityMappings } from '../queries'; +import { calculateCost } from '../db'; +import { normalizeModelName } from '../utils'; +import { sql } from '@vercel/postgres'; + +interface OpenAIUsageResult { + input_tokens: number; + output_tokens: number; + input_cached_tokens: number; + num_model_requests: number; + project_id: string | null; + user_id: string | null; + api_key_id: string | null; + model: string | null; + batch: boolean; + service_tier: string | null; +} + +interface OpenAITimeBucket { + start_time: number; // Unix timestamp in seconds + end_time: number; + results: OpenAIUsageResult[]; +} + +interface OpenAIUsageResponse { + object: string; + data: OpenAITimeBucket[]; + has_more: boolean; + next_page?: string; +} + +export interface SyncResult { + success: boolean; + recordsImported: number; + recordsSkipped: number; + errors: string[]; + syncedRange?: { startDate: string; endDate: string }; +} + +const SYNC_STATE_ID = 'openai'; + +// Get OpenAI sync state from database +export async function getOpenAISyncState(): Promise<{ lastSyncedDate: string | null }> { + const result = await sql` + SELECT last_synced_hour_end FROM sync_state WHERE id = ${SYNC_STATE_ID} + `; + if (result.rows.length === 0 || !result.rows[0].last_synced_hour_end) { + return { lastSyncedDate: null }; + } + return { lastSyncedDate: result.rows[0].last_synced_hour_end }; +} + +// Update OpenAI sync state +async function updateOpenAISyncState(lastSyncedDate: string): Promise { + await sql` + INSERT INTO sync_state (id, last_sync_at, last_synced_hour_end) + VALUES (${SYNC_STATE_ID}, NOW(), ${lastSyncedDate}) + ON CONFLICT (id) DO UPDATE SET + last_sync_at = NOW(), + last_synced_hour_end = ${lastSyncedDate} + `; +} + +// Get backfill state - derives oldest date from actual usage data +export async function getOpenAIBackfillState(): Promise<{ oldestDate: string | null; isComplete: boolean }> { + const usageResult = await sql` + SELECT MIN(date)::text as oldest_date FROM usage_records WHERE tool = 'openai' + `; + const oldestDate = usageResult.rows[0]?.oldest_date || null; + + const stateResult = await sql` + SELECT backfill_complete FROM sync_state WHERE id = ${SYNC_STATE_ID} + `; + const isComplete = stateResult.rows[0]?.backfill_complete === true; + + return { oldestDate, isComplete }; +} + +// Mark backfill as complete +async function markOpenAIBackfillComplete(): Promise { + await sql` + INSERT INTO sync_state (id, last_sync_at, backfill_complete) + VALUES (${SYNC_STATE_ID}, NOW(), true) + ON CONFLICT (id) DO UPDATE SET + last_sync_at = NOW(), + backfill_complete = true + `; +} + +// Reset backfill complete flag +export async function resetOpenAIBackfillComplete(): Promise { + await sql` + UPDATE sync_state SET backfill_complete = false WHERE id = ${SYNC_STATE_ID} + `; +} + +/** + * Sync OpenAI usage for a specific date range. + * This is the low-level function that does the actual API fetching. + * Does NOT update sync state - use syncOpenAICron for production syncing. + */ +export async function syncOpenAIUsage( + startDate: string, + endDate: string, + options: { bucketWidth?: '1d' | '1h' | '1m' } = {} +): Promise { + const adminKey = process.env.OPENAI_ADMIN_KEY; + if (!adminKey) { + return { + success: false, + recordsImported: 0, + recordsSkipped: 0, + errors: ['OPENAI_ADMIN_KEY not configured'] + }; + } + + const result: SyncResult = { + success: true, + recordsImported: 0, + recordsSkipped: 0, + errors: [], + syncedRange: { startDate, endDate } + }; + + // Get existing mappings for openai (user_id -> email) + const mappingsArray = await getToolIdentityMappings('openai'); + const mappings = new Map( + mappingsArray.map(m => [m.external_id, m.email]) + ); + + const bucketWidth = options.bucketWidth || '1d'; + + // Convert dates to Unix timestamps (seconds) + const startTime = Math.floor(new Date(startDate).getTime() / 1000); + const endTime = Math.floor(new Date(endDate + 'T23:59:59Z').getTime() / 1000); + + let page: string | undefined; + + try { + do { + const params = new URLSearchParams({ + start_time: startTime.toString(), + end_time: endTime.toString(), + bucket_width: bucketWidth, + 'group_by[]': 'user_id', + }); + params.append('group_by[]', 'model'); + + if (page) { + params.set('page', page); + } + + const response = await fetch( + `https://api.openai.com/v1/organization/usage/completions?${params}`, + { + headers: { + 'Authorization': `Bearer ${adminKey}`, + 'Content-Type': 'application/json' + } + } + ); + + // On rate limit, immediately abort + if (response.status === 429) { + const errorText = await response.text(); + result.success = false; + result.errors.push(`OpenAI API rate limited: ${errorText}`); + return result; + } + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`OpenAI API error: ${response.status} - ${errorText}`); + } + + const data: OpenAIUsageResponse = await response.json(); + + for (const bucket of data.data) { + // Convert Unix timestamp to date string + const date = new Date(bucket.start_time * 1000).toISOString().split('T')[0]; + + for (const item of bucket.results) { + if (!item.model) continue; + + // Resolve email from user_id mapping + let email = 'unknown'; + const userId = item.user_id; + + if (userId) { + email = mappings.get(userId) || 'unknown'; + } + + const inputTokens = item.input_tokens || 0; + const outputTokens = item.output_tokens || 0; + const cacheReadTokens = item.input_cached_tokens || 0; + // OpenAI doesn't have separate cache write tokens in usage API + const cacheWriteTokens = 0; + + const cost = calculateCost(item.model, inputTokens, outputTokens, cacheWriteTokens, cacheReadTokens); + + try { + await insertUsageRecord({ + date, + email, + tool: 'openai', + model: normalizeModelName(item.model || 'unknown'), + inputTokens, + cacheWriteTokens, + cacheReadTokens, + outputTokens, + cost, + toolRecordId: userId || undefined + }); + result.recordsImported++; + } catch (err) { + result.errors.push(`Insert error: ${err instanceof Error ? err.message : 'Unknown'}`); + result.recordsSkipped++; + } + } + } + + page = data.has_more ? data.next_page : undefined; + } while (page); + + } catch (err) { + result.success = false; + result.errors.push(err instanceof Error ? err.message : 'Unknown error'); + } + + return result; +} + +/** + * Sync OpenAI usage for the cron job. + * Tracks state to avoid re-fetching data we already have. + * Syncs from (last_synced_date - 1 day) to yesterday. + */ +export async function syncOpenAICron(): Promise { + const adminKey = process.env.OPENAI_ADMIN_KEY; + if (!adminKey) { + return { + success: false, + recordsImported: 0, + recordsSkipped: 0, + errors: ['OPENAI_ADMIN_KEY not configured'] + }; + } + + // Get yesterday's date (complete day) + const yesterday = new Date(); + yesterday.setDate(yesterday.getDate() - 1); + const yesterdayStr = yesterday.toISOString().split('T')[0]; + + // Check what we've already synced + const { lastSyncedDate } = await getOpenAISyncState(); + + // If we've already synced yesterday, nothing to do + if (lastSyncedDate && lastSyncedDate >= yesterdayStr) { + return { + success: true, + recordsImported: 0, + recordsSkipped: 0, + errors: [], + syncedRange: undefined + }; + } + + // Determine start date + let startDate: string; + if (!lastSyncedDate) { + const weekAgo = new Date(); + weekAgo.setDate(weekAgo.getDate() - 7); + startDate = weekAgo.toISOString().split('T')[0]; + } else { + const lastDate = new Date(lastSyncedDate); + lastDate.setDate(lastDate.getDate() - 1); + startDate = lastDate.toISOString().split('T')[0]; + } + + const endDate = yesterdayStr; + + const syncResult = await syncOpenAIUsage(startDate, endDate); + + if (syncResult.success) { + await updateOpenAISyncState(yesterdayStr); + } + + return syncResult; +} + +/** + * Backfill OpenAI data for a date range. + * Works backwards from the oldest date we have data for. + */ +export async function backfillOpenAIUsage( + targetDate: string, + options: { onProgress?: (msg: string) => void; stopOnEmptyDays?: number } = {} +): Promise { + const log = options.onProgress || (() => {}); + const stopOnEmptyDays = options.stopOnEmptyDays ?? 7; + + const { oldestDate: existingOldest, isComplete } = await getOpenAIBackfillState(); + + if (isComplete) { + log(`Backfill already marked complete, skipping.`); + return { + success: true, + recordsImported: 0, + recordsSkipped: 0, + errors: [], + syncedRange: { startDate: targetDate, endDate: existingOldest || targetDate }, + rateLimited: false + }; + } + + if (existingOldest && existingOldest <= targetDate) { + log(`Already have data back to ${existingOldest}, target is ${targetDate}. Done.`); + return { + success: true, + recordsImported: 0, + recordsSkipped: 0, + errors: [], + syncedRange: { startDate: targetDate, endDate: existingOldest }, + rateLimited: false + }; + } + + let endDate: string; + if (existingOldest) { + const oldestDate = new Date(existingOldest); + oldestDate.setDate(oldestDate.getDate() - 1); + endDate = oldestDate.toISOString().split('T')[0]; + } else { + const yesterday = new Date(); + yesterday.setDate(yesterday.getDate() - 1); + endDate = yesterday.toISOString().split('T')[0]; + } + + if (endDate < targetDate) { + endDate = targetDate; + } + + log(`Fetching OpenAI usage from ${targetDate} to ${endDate}...`); + const syncResult = await syncOpenAIUsage(targetDate, endDate); + + const rateLimited = syncResult.errors.some(e => e.includes('rate limited')); + + if (rateLimited) { + log(`Rate limited! Will retry on next run.`); + } else if (syncResult.success) { + if (syncResult.recordsImported === 0) { + const startMs = new Date(targetDate).getTime(); + const endMs = new Date(endDate).getTime(); + const daysCovered = Math.ceil((endMs - startMs) / (24 * 60 * 60 * 1000)); + + log(`No records found for ${targetDate} to ${endDate} (${daysCovered} days).`); + + if (daysCovered <= stopOnEmptyDays) { + log(`Small range (${daysCovered} days) with no data. Marking backfill complete.`); + await markOpenAIBackfillComplete(); + } else { + log(`Large range - will continue backfilling on next run.`); + } + } else { + log(`Imported ${syncResult.recordsImported} records.`); + } + } + + return { ...syncResult, rateLimited }; +} diff --git a/src/lib/tools.ts b/src/lib/tools.ts index 7992943..0ab00e9 100644 --- a/src/lib/tools.ts +++ b/src/lib/tools.ts @@ -32,6 +32,12 @@ export const TOOL_CONFIGS: Record = { text: 'text-violet-400', gradient: 'from-violet-500/80 to-violet-400/60', }, + openai: { + name: 'OpenAI', + bg: 'bg-green-500', + text: 'text-green-400', + gradient: 'from-green-500/80 to-green-400/60', + }, }; const DEFAULT_CONFIG: ToolConfig = { diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 8475ca1..cb05f96 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -174,6 +174,26 @@ export function normalizeModelName(model: string): string { normalized = `sonnet-${normalized}`; } + // OpenAI model normalization + // "gpt-4o-2024-08-06" → "gpt-4o" + // "gpt-4-turbo-2024-04-09" → "gpt-4-turbo" + // "gpt-3.5-turbo-0125" → "gpt-3.5-turbo" + if (normalized.startsWith('gpt-')) { + // Strip date suffixes (YYYY-MM-DD or MMDD format) + normalized = normalized.replace(/-\d{4}-\d{2}-\d{2}$/, ''); + normalized = normalized.replace(/-\d{4}$/, ''); + } + + // "o1-2024-12-17" → "o1", "o1-mini-2024-09-12" → "o1-mini", "o3-mini-2025-01-31" → "o3-mini" + if (normalized.match(/^o[13]/)) { + normalized = normalized.replace(/-\d{4}-\d{2}-\d{2}$/, ''); + } + + // "codex-mini-latest" → "codex-mini" + if (normalized.startsWith('codex-')) { + normalized = normalized.replace(/-latest$/, ''); + } + // Reconstruct with suffix if present if (suffix) { normalized = `${normalized} (${suffix})`; @@ -209,5 +229,19 @@ export function formatModelName(model: string): string { } } + // Format OpenAI models nicely + // "gpt-4o" → "GPT-4o", "gpt-3.5-turbo" → "GPT-3.5-turbo" + if (display.startsWith('gpt-')) { + display = 'GPT-' + display.slice(4); + } + // "o1-mini" → "O1-mini", "o3-mini" → "O3-mini" + if (display.match(/^o[13]/)) { + display = display.charAt(0).toUpperCase() + display.slice(1); + } + // "codex-mini" → "Codex-mini" + if (display.startsWith('codex-')) { + display = 'Codex-' + display.slice(6); + } + return display; }