Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- Upload gate: handle GitHub API rate limits and optional authenticated lookup token (thanks @superlowburn, #246).
- HTTP: remove `allowH2` from Undici agent to prevent `fetch failed` on Node.js 22+ (#245).
- Tests: add root `undici` dev dependency for Node E2E imports (thanks @tanujbhaud, #255).
- Downloads: add download rate limiting + per-IP/day dedupe + scheduled dedupe pruning; preserve moderation gating and deterministic zips (thanks @regenrek, #43).
- VirusTotal: fix scan sync race conditions and retry behavior in scan/backfill paths.
- Metadata: tolerate trailing commas in JSON metadata.
- Auth: allow soft-deleted users to re-authenticate on fresh login, while keeping banned users blocked (thanks @tanujbhaud, #177).
Expand Down
2 changes: 2 additions & 0 deletions convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import type * as lib_githubAccount from "../lib/githubAccount.js";
import type * as lib_githubBackup from "../lib/githubBackup.js";
import type * as lib_githubImport from "../lib/githubImport.js";
import type * as lib_githubSoulBackup from "../lib/githubSoulBackup.js";
import type * as lib_httpRateLimit from "../lib/httpRateLimit.js";
import type * as lib_leaderboards from "../lib/leaderboards.js";
import type * as lib_moderation from "../lib/moderation.js";
import type * as lib_public from "../lib/public.js";
Expand Down Expand Up @@ -99,6 +100,7 @@ declare const fullApi: ApiFromModules<{
"lib/githubBackup": typeof lib_githubBackup;
"lib/githubImport": typeof lib_githubImport;
"lib/githubSoulBackup": typeof lib_githubSoulBackup;
"lib/httpRateLimit": typeof lib_httpRateLimit;
"lib/leaderboards": typeof lib_leaderboards;
"lib/moderation": typeof lib_moderation;
"lib/public": typeof lib_public;
Expand Down
7 changes: 7 additions & 0 deletions convex/crons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,11 @@ crons.interval('vt-cache-backfill', { minutes: 30 }, internal.vt.backfillActiveS
// Daily re-scan of all active skills at 3am UTC
crons.daily('vt-daily-rescan', { hourUTC: 3, minuteUTC: 0 }, internal.vt.rescanActiveSkills, {})

crons.interval(
'download-dedupe-prune',
{ hours: 24 },
internal.downloads.pruneDownloadDedupesInternal,
{},
)

export default crons
12 changes: 12 additions & 0 deletions convex/downloads.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { describe, expect, it } from 'vitest'
import { __test } from './downloads'

describe('downloads helpers', () => {
it('calculates day start boundaries', () => {
const day = 86_400_000
expect(__test.getDayStart(0)).toBe(0)
expect(__test.getDayStart(day - 1)).toBe(0)
expect(__test.getDayStart(day)).toBe(day)
expect(__test.getDayStart(day + 1)).toBe(day)
})
})
96 changes: 90 additions & 6 deletions convex/downloads.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { v } from 'convex/values'
import { api } from './_generated/api'
import { httpAction, mutation } from './_generated/server'
import { api, internal } from './_generated/api'
import { httpAction, internalMutation, mutation } from './_generated/server'
import { applyRateLimit, getClientIp } from './lib/httpRateLimit'
import { buildDeterministicZip } from './lib/skillZip'
import { hashToken } from './lib/tokens'
import { insertStatEvent } from './skillStatEvents'

const DAY_MS = 86_400_000
const DEDUPE_RETENTION_DAYS = 14
const PRUNE_BATCH_SIZE = 200
const PRUNE_MAX_BATCHES = 50

export const downloadZip = httpAction(async (ctx, request) => {
const url = new URL(request.url)
const slug = url.searchParams.get('slug')?.trim().toLowerCase()
Expand All @@ -14,12 +21,15 @@ export const downloadZip = httpAction(async (ctx, request) => {
return new Response('Missing slug', { status: 400 })
}

const rate = await applyRateLimit(ctx, request, 'download')
if (!rate.ok) return rate.response

const skillResult = await ctx.runQuery(api.skills.getBySlug, { slug })
if (!skillResult?.skill) {
return new Response('Skill not found', { status: 404 })
}

// Block downloads based on moderation status
// Block downloads based on moderation status.
const mod = skillResult.moderationInfo
if (mod?.isMalwareBlocked) {
return new Response(
Expand Down Expand Up @@ -77,15 +87,26 @@ export const downloadZip = httpAction(async (ctx, request) => {
})
const zipBlob = new Blob([zipArray], { type: 'application/zip' })

await ctx.runMutation(api.downloads.increment, { skillId: skill._id })
const ip = getClientIp(request) ?? 'unknown'
const ipHash = await hashToken(ip)
const dayStart = getDayStart(Date.now())
try {
await ctx.runMutation(internal.downloads.recordDownloadInternal, {
skillId: skill._id,
ipHash,
dayStart,
})
} catch {
// Best-effort metric path; do not fail downloads.
}

return new Response(zipBlob, {
status: 200,
headers: {
headers: mergeHeaders(rate.headers, {
'Content-Type': 'application/zip',
'Content-Disposition': `attachment; filename="${slug}-${version.version}.zip"`,
'Cache-Control': 'private, max-age=60',
},
}),
})
})

Expand All @@ -101,3 +122,66 @@ export const increment = mutation({
})
},
})

export const recordDownloadInternal = internalMutation({
args: {
skillId: v.id('skills'),
ipHash: v.string(),
dayStart: v.number(),
},
handler: async (ctx, args) => {
const existing = await ctx.db
.query('downloadDedupes')
.withIndex('by_skill_ip_day', (q) =>
q.eq('skillId', args.skillId).eq('ipHash', args.ipHash).eq('dayStart', args.dayStart),
)
.unique()
if (existing) return

await ctx.db.insert('downloadDedupes', {
skillId: args.skillId,
ipHash: args.ipHash,
dayStart: args.dayStart,
createdAt: Date.now(),
})

await insertStatEvent(ctx, {
skillId: args.skillId,
kind: 'download',
})
},
})

export const pruneDownloadDedupesInternal = internalMutation({
args: {},
handler: async (ctx) => {
const cutoff = Date.now() - DEDUPE_RETENTION_DAYS * DAY_MS

for (let batches = 0; batches < PRUNE_MAX_BATCHES; batches += 1) {
const stale = await ctx.db
.query('downloadDedupes')
.withIndex('by_day', (q) => q.lt('dayStart', cutoff))
.take(PRUNE_BATCH_SIZE)

if (stale.length === 0) break

for (const entry of stale) {
await ctx.db.delete(entry._id)
}

if (stale.length < PRUNE_BATCH_SIZE) break
}
},
})

export function getDayStart(timestamp: number) {
return Math.floor(timestamp / DAY_MS) * DAY_MS
}

export const __test = {
getDayStart,
}

function mergeHeaders(base: HeadersInit, extra: HeadersInit) {
return { ...(base as Record<string, string>), ...(extra as Record<string, string>) }
}
23 changes: 18 additions & 5 deletions convex/httpApiV1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -877,14 +877,16 @@ function rateHeaders(result: RateLimitResult): HeadersInit {
}

function getClientIp(request: Request) {
const header =
request.headers.get('cf-connecting-ip') ??
const cfHeader = request.headers.get('cf-connecting-ip')
if (cfHeader) return splitFirstIp(cfHeader)

if (!shouldTrustForwardedIps()) return null

const forwarded =
request.headers.get('x-real-ip') ??
request.headers.get('x-forwarded-for') ??
request.headers.get('fly-client-ip')
if (!header) return null
if (header.includes(',')) return header.split(',')[0]?.trim() || null
return header.trim()
return splitFirstIp(forwarded)
}

function parseBearerToken(request: Request) {
Expand Down Expand Up @@ -926,6 +928,17 @@ function mergeHeaders(base: HeadersInit, extra?: HeadersInit) {
return { ...(base as Record<string, string>), ...(extra as Record<string, string>) }
}

function splitFirstIp(header: string | null) {
if (!header) return null
if (header.includes(',')) return header.split(',')[0]?.trim() || null
const trimmed = header.trim()
return trimmed || null
}

function shouldTrustForwardedIps() {
return String(process.env.TRUST_FORWARDED_IPS ?? '').toLowerCase() === 'true'
}

function getPathSegments(request: Request, prefix: string) {
const pathname = new URL(request.url).pathname
if (!pathname.startsWith(prefix)) return []
Expand Down
35 changes: 35 additions & 0 deletions convex/lib/httpRateLimit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* @vitest-environment node */
import { describe, expect, it } from 'vitest'
import { getClientIp } from './httpRateLimit'

describe('getClientIp', () => {
it('returns null when cf-connecting-ip missing', () => {
const request = new Request('https://example.com', {
headers: {
'x-forwarded-for': '203.0.113.9',
},
})
process.env.TRUST_FORWARDED_IPS = ''
expect(getClientIp(request)).toBeNull()
})

it('returns first ip from cf-connecting-ip', () => {
const request = new Request('https://example.com', {
headers: {
'cf-connecting-ip': '203.0.113.1, 198.51.100.2',
},
})
expect(getClientIp(request)).toBe('203.0.113.1')
})

it('uses forwarded headers when opt-in enabled', () => {
const request = new Request('https://example.com', {
headers: {
'x-forwarded-for': '203.0.113.9, 198.51.100.2',
},
})
process.env.TRUST_FORWARDED_IPS = 'true'
expect(getClientIp(request)).toBe('203.0.113.9')
process.env.TRUST_FORWARDED_IPS = ''
})
})
137 changes: 137 additions & 0 deletions convex/lib/httpRateLimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { internal } from '../_generated/api'
import type { ActionCtx } from '../_generated/server'
import { hashToken } from './tokens'

const RATE_LIMIT_WINDOW_MS = 60_000
export const RATE_LIMITS = {
read: { ip: 120, key: 600 },
write: { ip: 30, key: 120 },
download: { ip: 20, key: 120 },
} as const

type RateLimitResult = {
allowed: boolean
remaining: number
limit: number
resetAt: number
}

export async function applyRateLimit(
ctx: ActionCtx,
request: Request,
kind: keyof typeof RATE_LIMITS,
): Promise<{ ok: true; headers: HeadersInit } | { ok: false; response: Response }> {
const ip = getClientIp(request) ?? 'unknown'
const ipResult = await checkRateLimit(ctx, `ip:${ip}`, RATE_LIMITS[kind].ip)
const token = parseBearerToken(request)
const keyResult = token
? await checkRateLimit(ctx, `key:${await hashToken(token)}`, RATE_LIMITS[kind].key)
: null

const chosen = pickMostRestrictive(ipResult, keyResult)
const headers = rateHeaders(chosen)

if (!ipResult.allowed || (keyResult && !keyResult.allowed)) {
return {
ok: false,
response: new Response('Rate limit exceeded', {
status: 429,
headers: mergeHeaders(
{
'Content-Type': 'text/plain; charset=utf-8',
'Cache-Control': 'no-store',
},
headers,
),
}),
}
}

return { ok: true, headers }
}

export function getClientIp(request: Request) {
const cfHeader = request.headers.get('cf-connecting-ip')
if (cfHeader) return splitFirstIp(cfHeader)

if (!shouldTrustForwardedIps()) return null

const forwarded =
request.headers.get('x-real-ip') ??
request.headers.get('x-forwarded-for') ??
request.headers.get('fly-client-ip')

return splitFirstIp(forwarded)
}

async function checkRateLimit(
ctx: ActionCtx,
key: string,
limit: number,
): Promise<RateLimitResult> {
// Step 1: Read-only check to avoid write conflicts on denied requests.
const status = (await ctx.runQuery(internal.rateLimits.getRateLimitStatusInternal, {
key,
limit,
windowMs: RATE_LIMIT_WINDOW_MS,
})) as RateLimitResult

if (!status.allowed) {
return status
}

// Step 2: Consume with a mutation only when still allowed.
const result = (await ctx.runMutation(internal.rateLimits.consumeRateLimitInternal, {
key,
limit,
windowMs: RATE_LIMIT_WINDOW_MS,
})) as { allowed: boolean; remaining: number }

return {
allowed: result.allowed,
remaining: result.remaining,
limit: status.limit,
resetAt: status.resetAt,
}
}

function pickMostRestrictive(primary: RateLimitResult, secondary: RateLimitResult | null) {
if (!secondary) return primary
if (!primary.allowed) return primary
if (!secondary.allowed) return secondary
return secondary.remaining < primary.remaining ? secondary : primary
}

function rateHeaders(result: RateLimitResult): HeadersInit {
const resetSeconds = Math.ceil(result.resetAt / 1000)
return {
'X-RateLimit-Limit': String(result.limit),
'X-RateLimit-Remaining': String(result.remaining),
'X-RateLimit-Reset': String(resetSeconds),
...(result.allowed ? {} : { 'Retry-After': String(resetSeconds) }),
}
}

export function parseBearerToken(request: Request) {
const header = request.headers.get('authorization') ?? request.headers.get('Authorization')
if (!header) return null
const trimmed = header.trim()
if (!trimmed.toLowerCase().startsWith('bearer ')) return null
const token = trimmed.slice(7).trim()
return token || null
}

function splitFirstIp(header: string | null) {
if (!header) return null
if (header.includes(',')) return header.split(',')[0]?.trim() || null
const trimmed = header.trim()
return trimmed || null
}

function mergeHeaders(base: HeadersInit, extra?: HeadersInit) {
return { ...(base as Record<string, string>), ...(extra as Record<string, string>) }
}

function shouldTrustForwardedIps() {
return String(process.env.TRUST_FORWARDED_IPS ?? '').toLowerCase() === 'true'
}
Loading