diff --git a/apps/sim/app/api/chat/[identifier]/otp/route.ts b/apps/sim/app/api/chat/[identifier]/otp/route.ts index 3cb30844d3..8ba425972d 100644 --- a/apps/sim/app/api/chat/[identifier]/otp/route.ts +++ b/apps/sim/app/api/chat/[identifier]/otp/route.ts @@ -6,7 +6,7 @@ import { z } from 'zod' import { renderOTPEmail } from '@/components/emails/render-email' import { sendEmail } from '@/lib/email/mailer' import { createLogger } from '@/lib/logs/console/logger' -import { getRedisClient } from '@/lib/redis' +import { getRedisClient, markMessageAsProcessed, releaseLock } from '@/lib/redis' import { generateRequestId } from '@/lib/utils' import { addCorsHeaders, setChatAuthCookie } from '@/app/api/chat/utils' import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils' @@ -21,52 +21,83 @@ function generateOTP() { // We use 15 minutes (900 seconds) expiry for OTPs const OTP_EXPIRY = 15 * 60 -async function storeOTP(email: string, chatId: string, otp: string): Promise { +// Store OTP in Redis +async function storeOTP(email: string, chatId: string, otp: string): Promise { const key = `otp:${email}:${chatId}` const redis = getRedisClient() - if (!redis) { - logger.warn('Redis not available, OTP functionality requires Redis') - return false - } - - try { + if (redis) { + // Use Redis if available await redis.set(key, otp, 'EX', OTP_EXPIRY) - return true - } catch (error) { - logger.error('Error storing OTP in Redis:', error) - return false + } else { + // Use the existing function as fallback to mark that an OTP exists + await markMessageAsProcessed(key, OTP_EXPIRY) + + // For the fallback case, we need to handle storing the OTP value separately + // since markMessageAsProcessed only stores "1" + const valueKey = `${key}:value` + try { + // Access the in-memory cache directly - hacky but works for fallback + const inMemoryCache = (global as any).inMemoryCache + if (inMemoryCache) { + const fullKey = `processed:${valueKey}` + const expiry = OTP_EXPIRY ? Date.now() + OTP_EXPIRY * 1000 : null + inMemoryCache.set(fullKey, { value: otp, expiry }) + } + } catch (error) { + logger.error('Error storing OTP in fallback cache:', error) + } } } +// Get OTP from Redis async function getOTP(email: string, chatId: string): Promise { const key = `otp:${email}:${chatId}` const redis = getRedisClient() - if (!redis) { - return null + if (redis) { + // Use Redis if available + return await redis.get(key) } + // Use the existing function as fallback - check if it exists + const exists = await new Promise((resolve) => { + try { + // Check the in-memory cache directly - hacky but works for fallback + const inMemoryCache = (global as any).inMemoryCache + const fullKey = `processed:${key}` + const cacheEntry = inMemoryCache?.get(fullKey) + resolve(!!cacheEntry) + } catch { + resolve(false) + } + }) + + if (!exists) return null + // Try to get the value key + const valueKey = `${key}:value` try { - return await redis.get(key) - } catch (error) { - logger.error('Error getting OTP from Redis:', error) + const inMemoryCache = (global as any).inMemoryCache + const fullKey = `processed:${valueKey}` + const cacheEntry = inMemoryCache?.get(fullKey) + return cacheEntry?.value || null + } catch { return null } } +// Delete OTP from Redis async function deleteOTP(email: string, chatId: string): Promise { const key = `otp:${email}:${chatId}` const redis = getRedisClient() - if (!redis) { - return - } - - try { + if (redis) { + // Use Redis if available await redis.del(key) - } catch (error) { - logger.error('Error deleting OTP from Redis:', error) + } else { + // Use the existing function as fallback + await releaseLock(`processed:${key}`) + await releaseLock(`processed:${key}:value`) } } @@ -146,17 +177,7 @@ export async function POST( const otp = generateOTP() - const stored = await storeOTP(email, deployment.id, otp) - if (!stored) { - logger.error(`[${requestId}] Failed to store OTP - Redis unavailable`) - return addCorsHeaders( - createErrorResponse( - 'Email verification temporarily unavailable, please try again later', - 503 - ), - request - ) - } + await storeOTP(email, deployment.id, otp) const emailHtml = await renderOTPEmail( otp, diff --git a/apps/sim/lib/auth.ts b/apps/sim/lib/auth.ts index f0486c6f1d..5dd170ad91 100644 --- a/apps/sim/lib/auth.ts +++ b/apps/sim/lib/auth.ts @@ -44,7 +44,6 @@ import { quickValidateEmail } from '@/lib/email/validation' import { env, isTruthy } from '@/lib/env' import { isBillingEnabled, isEmailVerificationEnabled } from '@/lib/environment' import { createLogger } from '@/lib/logs/console/logger' -import { getRedisClient } from '@/lib/redis' import { SSO_TRUSTED_PROVIDERS } from './sso/consts' const logger = createLogger('Auth') @@ -60,40 +59,6 @@ if (validStripeKey) { }) } -// Configure Redis secondary storage for session data (optional) -const redis = getRedisClient() -const redisSecondaryStorage = redis - ? { - get: async (key: string) => { - try { - const value = await redis.get(key) - return value - } catch (error) { - logger.error('Redis get error in secondaryStorage', { key, error }) - return null - } - }, - set: async (key: string, value: string, ttl?: number) => { - try { - if (ttl) { - await redis.set(key, value, 'EX', ttl) - } else { - await redis.set(key, value) - } - } catch (error) { - logger.error('Redis set error in secondaryStorage', { key, ttl, error }) - } - }, - delete: async (key: string) => { - try { - await redis.del(key) - } catch (error) { - logger.error('Redis delete error in secondaryStorage', { key, error }) - } - }, - } - : undefined - export const auth = betterAuth({ baseURL: getBaseURL(), trustedOrigins: [ @@ -104,8 +69,6 @@ export const auth = betterAuth({ provider: 'pg', schema, }), - // Conditionally add secondaryStorage only if Redis is available - ...(redisSecondaryStorage ? { secondaryStorage: redisSecondaryStorage } : {}), session: { cookieCache: { enabled: true, diff --git a/apps/sim/lib/redis.ts b/apps/sim/lib/redis.ts index 8b7a8e029e..4944329a48 100644 --- a/apps/sim/lib/redis.ts +++ b/apps/sim/lib/redis.ts @@ -4,19 +4,25 @@ import { createLogger } from '@/lib/logs/console/logger' const logger = createLogger('Redis') +// Only use Redis if explicitly configured const redisUrl = env.REDIS_URL + +// Global Redis client for connection pooling let globalRedisClient: Redis | null = null -const MESSAGE_ID_PREFIX = 'processed:' -const MESSAGE_ID_EXPIRY = 60 * 60 * 24 * 7 +// Fallback in-memory cache for when Redis is not available +const inMemoryCache = new Map() +const MAX_CACHE_SIZE = 1000 /** * Get a Redis client instance * Uses connection pooling to avoid creating a new connection for each request */ export function getRedisClient(): Redis | null { + // For server-side only if (typeof window !== 'undefined') return null + // Return null immediately if no Redis URL is configured if (!redisUrl) { return null } @@ -24,19 +30,25 @@ export function getRedisClient(): Redis | null { if (globalRedisClient) return globalRedisClient try { + // Create a new Redis client with optimized settings for serverless globalRedisClient = new Redis(redisUrl, { + // Keep alive is critical for serverless to reuse connections keepAlive: 1000, + // Faster connection timeout for serverless connectTimeout: 5000, + // Disable reconnection attempts in serverless maxRetriesPerRequest: 3, + // Retry strategy with exponential backoff retryStrategy: (times) => { if (times > 5) { - logger.warn('Redis connection failed after 5 attempts') - return null + logger.warn('Redis connection failed after 5 attempts, using fallback') + return null // Stop retrying } - return Math.min(times * 200, 2000) + return Math.min(times * 200, 2000) // Exponential backoff }, }) + // Handle connection events globalRedisClient.on('error', (err: any) => { logger.error('Redis connection error:', { err }) if (err.code === 'ECONNREFUSED' || err.code === 'ETIMEDOUT') { @@ -53,90 +65,144 @@ export function getRedisClient(): Redis | null { } } +// Message ID cache functions +const MESSAGE_ID_PREFIX = 'processed:' // Generic prefix +const MESSAGE_ID_EXPIRY = 60 * 60 * 24 * 7 // 7 days in seconds + /** - * Check if a key exists in Redis - * @param key The key to check - * @returns True if the key exists, false otherwise + * Check if a key exists in Redis or fallback cache. + * @param key The key to check (e.g., messageId, lockKey). + * @returns True if the key exists and hasn't expired, false otherwise. */ export async function hasProcessedMessage(key: string): Promise { - const redis = getRedisClient() - if (!redis) { - return false - } - try { - const fullKey = `${MESSAGE_ID_PREFIX}${key}` - const result = await redis.exists(fullKey) - return result === 1 + const redis = getRedisClient() + const fullKey = `${MESSAGE_ID_PREFIX}${key}` // Use generic prefix + + if (redis) { + // Use Redis if available + const result = await redis.exists(fullKey) + return result === 1 + } + // Fallback to in-memory cache + const cacheEntry = inMemoryCache.get(fullKey) + if (!cacheEntry) return false + + // Check if the entry has expired + if (cacheEntry.expiry && cacheEntry.expiry < Date.now()) { + inMemoryCache.delete(fullKey) + return false + } + + return true } catch (error) { logger.error(`Error checking key ${key}:`, { error }) - return false + // Fallback to in-memory cache on error + const fullKey = `${MESSAGE_ID_PREFIX}${key}` + const cacheEntry = inMemoryCache.get(fullKey) + return !!cacheEntry && (!cacheEntry.expiry || cacheEntry.expiry > Date.now()) } } /** - * Mark a key as processed in Redis - * @param key The key to mark - * @param expirySeconds Optional expiry time in seconds (defaults to 7 days) + * Mark a key as processed/present in Redis or fallback cache. + * @param key The key to mark (e.g., messageId, lockKey). + * @param expirySeconds Optional expiry time in seconds (defaults to 7 days). */ export async function markMessageAsProcessed( key: string, expirySeconds: number = MESSAGE_ID_EXPIRY ): Promise { - const redis = getRedisClient() - if (!redis) { - logger.warn(`Cannot mark message as processed - Redis unavailable: ${key}`) - return - } - try { - const fullKey = `${MESSAGE_ID_PREFIX}${key}` - await redis.set(fullKey, '1', 'EX', expirySeconds) + const redis = getRedisClient() + const fullKey = `${MESSAGE_ID_PREFIX}${key}` // Use generic prefix + + if (redis) { + // Use Redis if available - use pipelining for efficiency + await redis.set(fullKey, '1', 'EX', expirySeconds) + } else { + // Fallback to in-memory cache + const expiry = expirySeconds ? Date.now() + expirySeconds * 1000 : null + inMemoryCache.set(fullKey, { value: '1', expiry }) + + // Clean up old message IDs if cache gets too large + if (inMemoryCache.size > MAX_CACHE_SIZE) { + const now = Date.now() + + // First try to remove expired entries + for (const [cacheKey, entry] of inMemoryCache.entries()) { + if (entry.expiry && entry.expiry < now) { + inMemoryCache.delete(cacheKey) + } + } + + // If still too large, remove oldest entries (FIFO based on insertion order) + if (inMemoryCache.size > MAX_CACHE_SIZE) { + const keysToDelete = Array.from(inMemoryCache.keys()).slice( + 0, + inMemoryCache.size - MAX_CACHE_SIZE + ) + + for (const keyToDelete of keysToDelete) { + inMemoryCache.delete(keyToDelete) + } + } + } + } } catch (error) { logger.error(`Error marking key ${key} as processed:`, { error }) + // Fallback to in-memory cache on error + const fullKey = `${MESSAGE_ID_PREFIX}${key}` + const expiry = expirySeconds ? Date.now() + expirySeconds * 1000 : null + inMemoryCache.set(fullKey, { value: '1', expiry }) } } /** - * Attempt to acquire a distributed lock using Redis SET NX command - * @param lockKey The key to use for the lock - * @param value The value to set (e.g., a unique identifier for the process holding the lock) - * @param expirySeconds The lock's time-to-live in seconds - * @returns True if the lock was acquired successfully, false otherwise + * Attempts to acquire a lock using Redis SET NX command. + * @param lockKey The key to use for the lock. + * @param value The value to set (e.g., a unique identifier for the process holding the lock). + * @param expirySeconds The lock's time-to-live in seconds. + * @returns True if the lock was acquired successfully, false otherwise. */ export async function acquireLock( lockKey: string, value: string, expirySeconds: number ): Promise { - const redis = getRedisClient() - if (!redis) { - logger.warn('Redis client not available, cannot acquire lock.') - return false - } - try { + const redis = getRedisClient() + if (!redis) { + logger.warn('Redis client not available, cannot acquire lock.') + // Fallback behavior: maybe allow processing but log a warning? + // Or treat as lock acquired if no Redis? Depends on desired behavior. + return true // Or false, depending on safety requirements + } + + // Use SET key value EX expirySeconds NX + // Returns "OK" if successful, null if key already exists (lock held) const result = await redis.set(lockKey, value, 'EX', expirySeconds, 'NX') + return result === 'OK' } catch (error) { logger.error(`Error acquiring lock for key ${lockKey}:`, { error }) + // Treat errors as failure to acquire lock for safety return false } } /** - * Retrieve the value of a key from Redis - * @param key The key to retrieve - * @returns The value of the key, or null if the key doesn't exist or an error occurs + * Retrieves the value of a key from Redis. + * @param key The key to retrieve. + * @returns The value of the key, or null if the key doesn't exist or an error occurs. */ export async function getLockValue(key: string): Promise { - const redis = getRedisClient() - if (!redis) { - logger.warn('Redis client not available, cannot get lock value.') - return null - } - try { + const redis = getRedisClient() + if (!redis) { + logger.warn('Redis client not available, cannot get lock value.') + return null // Cannot determine lock value + } return await redis.get(key) } catch (error) { logger.error(`Error getting value for key ${key}:`, { error }) @@ -145,18 +211,20 @@ export async function getLockValue(key: string): Promise { } /** - * Release a lock by deleting the key - * @param lockKey The key of the lock to release + * Releases a lock by deleting the key. + * Ideally, use Lua script for safe release (check value before deleting), + * but simple DEL is often sufficient if lock expiry is handled well. + * @param lockKey The key of the lock to release. */ export async function releaseLock(lockKey: string): Promise { - const redis = getRedisClient() - if (!redis) { - logger.warn('Redis client not available, cannot release lock.') - return - } - try { - await redis.del(lockKey) + const redis = getRedisClient() + if (redis) { + await redis.del(lockKey) + } else { + logger.warn('Redis client not available, cannot release lock.') + // No fallback needed for releasing if using in-memory cache for locking wasn't implemented + } } catch (error) { logger.error(`Error releasing lock for key ${lockKey}:`, { error }) }