Skip to content

Provide realtime skipColumns option via untamperable public access tokens #2201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/red-rings-marry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Provide realtime skipColumns option via untamperable public access tokens
18 changes: 11 additions & 7 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import { TaskRun } from "@trigger.dev/database";
import { z } from "zod";
import { env } from "~/env.server";
import { EngineServiceValidationError } from "~/runEngine/concerns/errors";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import {
ApiAuthenticationResultSuccess,
AuthenticatedEnvironment,
getOneTimeUseToken,
} from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
Expand Down Expand Up @@ -100,11 +104,7 @@ const { action, loader } = createActionApiRoute(
return json({ error: "Task not found" }, { status: 404 });
}

const $responseHeaders = await responseHeaders(
result.run,
authentication.environment,
triggerClient
);
const $responseHeaders = await responseHeaders(result.run, authentication, triggerClient);

return json(
{
Expand Down Expand Up @@ -133,19 +133,23 @@ const { action, loader } = createActionApiRoute(

async function responseHeaders(
run: TaskRun,
environment: AuthenticatedEnvironment,
authentication: ApiAuthenticationResultSuccess,
triggerClient?: string | null
): Promise<Record<string, string>> {
const { environment, realtime } = authentication;

const claimsHeader = JSON.stringify({
sub: environment.id,
pub: true,
realtime,
});

if (triggerClient === "browser") {
const claims = {
sub: environment.id,
pub: true,
scopes: [`read:runs:${run.friendlyId}`],
realtime,
};

const jwt = await internal_generateJWT({
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/realtime.v1.batches.$batchId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export const loader = createLoaderApiRoute(
request.url,
authentication.environment,
batchRun.id,
authentication.realtime,
request.headers.get("x-trigger-electric-version") ?? undefined
);
}
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/realtime.v1.runs.$runId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export const loader = createLoaderApiRoute(
request.url,
authentication.environment,
run.id,
authentication.realtime,
request.headers.get("x-trigger-electric-version") ?? undefined
);
}
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/realtime.v1.runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const loader = createLoaderApiRoute(
request.url,
authentication.environment,
searchParams,
authentication.realtime,
request.headers.get("x-trigger-electric-version") ?? undefined
);
}
Expand Down
10 changes: 10 additions & 0 deletions apps/webapp/app/services/apiAuth.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ const ClaimsSchema = z.object({
scopes: z.array(z.string()).optional(),
// One-time use token
otu: z.boolean().optional(),
realtime: z
.object({
skipColumns: z.array(z.string()).optional(),
})
.optional(),
});

type Optional<T, K extends keyof T> = Prettify<Omit<T, K> & Partial<Pick<T, K>>>;
Expand All @@ -43,6 +48,9 @@ export type ApiAuthenticationResultSuccess = {
environment: AuthenticatedEnvironment;
scopes?: string[];
oneTimeUse?: boolean;
realtime?: {
skipColumns?: string[];
};
};

export type ApiAuthenticationResultFailure = {
Expand Down Expand Up @@ -151,6 +159,7 @@ export async function authenticateApiKey(
environment: validationResults.environment,
scopes: parsedClaims.success ? parsedClaims.data.scopes : [],
oneTimeUse: parsedClaims.success ? parsedClaims.data.otu : false,
realtime: parsedClaims.success ? parsedClaims.data.realtime : undefined,
};
}
}
Expand Down Expand Up @@ -233,6 +242,7 @@ async function authenticateApiKeyWithFailure(
environment: validationResults.environment,
scopes: parsedClaims.success ? parsedClaims.data.scopes : [],
oneTimeUse: parsedClaims.success ? parsedClaims.data.otu : false,
realtime: parsedClaims.success ? parsedClaims.data.realtime : undefined,
};
}
}
Expand Down
45 changes: 36 additions & 9 deletions apps/webapp/app/services/realtimeClient.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ export type RealtimeRunsParams = {
createdAt?: string;
};

export type RealtimeRequestOptions = {
skipColumns?: string[];
};

export class RealtimeClient {
private redis: RedisClient;
private expiryTimeInSeconds: number;
Expand Down Expand Up @@ -124,15 +128,17 @@ export class RealtimeClient {
url: URL | string,
environment: RealtimeEnvironment,
runId: string,
requestOptions?: RealtimeRequestOptions,
clientVersion?: string
) {
return this.#streamRunsWhere(url, environment, `id='${runId}'`, clientVersion);
return this.#streamRunsWhere(url, environment, `id='${runId}'`, requestOptions, clientVersion);
}

async streamBatch(
url: URL | string,
environment: RealtimeEnvironment,
batchId: string,
requestOptions?: RealtimeRequestOptions,
clientVersion?: string
) {
const whereClauses: string[] = [
Expand All @@ -142,13 +148,14 @@ export class RealtimeClient {

const whereClause = whereClauses.join(" AND ");

return this.#streamRunsWhere(url, environment, whereClause, clientVersion);
return this.#streamRunsWhere(url, environment, whereClause, requestOptions, clientVersion);
}

async streamRuns(
url: URL | string,
environment: RealtimeEnvironment,
params: RealtimeRunsParams,
requestOptions?: RealtimeRequestOptions,
clientVersion?: string
) {
const whereClauses: string[] = [`"runtimeEnvironmentId"='${environment.id}'`];
Expand All @@ -165,7 +172,13 @@ export class RealtimeClient {

const whereClause = whereClauses.join(" AND ");

const response = await this.#streamRunsWhere(url, environment, whereClause, clientVersion);
const response = await this.#streamRunsWhere(
url,
environment,
whereClause,
requestOptions,
clientVersion
);

if (createdAtFilter) {
const [setCreatedAtFilterError] = await tryCatch(
Expand Down Expand Up @@ -256,12 +269,14 @@ export class RealtimeClient {
url: URL | string,
environment: RealtimeEnvironment,
whereClause: string,
requestOptions?: RealtimeRequestOptions,
clientVersion?: string
) {
const electricUrl = this.#constructRunsElectricUrl(
url,
environment,
whereClause,
requestOptions,
clientVersion
);

Expand All @@ -272,6 +287,7 @@ export class RealtimeClient {
url: URL | string,
environment: RealtimeEnvironment,
whereClause: string,
requestOptions?: RealtimeRequestOptions,
clientVersion?: string
): URL {
const $url = new URL(url.toString());
Expand All @@ -297,13 +313,10 @@ export class RealtimeClient {
electricUrl.searchParams.set("handle", electricUrl.searchParams.get("shape_id") ?? "");
}

const skipColumnsRaw = $url.searchParams.get("skipColumns");
let skipColumns = getSkipColumns($url.searchParams, requestOptions);

if (skipColumnsRaw) {
const skipColumns = skipColumnsRaw
.split(",")
.map((c) => c.trim())
.filter((c) => c !== "" && !RESERVED_COLUMNS.includes(c));
if (skipColumns.length > 0) {
skipColumns = skipColumns.filter((c) => c !== "" && !RESERVED_COLUMNS.includes(c));

electricUrl.searchParams.set(
"columns",
Expand Down Expand Up @@ -543,3 +556,17 @@ declare module "ioredis" {
): Result<number, Context>;
}
}

function getSkipColumns(searchParams: URLSearchParams, requestOptions?: RealtimeRequestOptions) {
if (requestOptions?.skipColumns) {
return requestOptions.skipColumns;
}

const skipColumnsRaw = searchParams.get("skipColumns");

if (skipColumnsRaw) {
return skipColumnsRaw.split(",").map((c) => c.trim());
}

return [];
}
6 changes: 6 additions & 0 deletions apps/webapp/test/realtimeClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => {
"http://localhost:3000?offset=-1",
environment,
run.id,
{},
"0.8.1"
);

Expand All @@ -81,6 +82,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => {
"http://localhost:3000?offset=-1",
environment,
run.id,
{},
"0.8.1"
);

Expand Down Expand Up @@ -108,6 +110,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => {
`http://localhost:3000?offset=0_0&live=true&handle=${shapeId}`,
environment,
run.id,
{},
"0.8.1"
);

Expand All @@ -117,6 +120,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => {
`http://localhost:3000?offset=0_0&live=true&handle=${shapeId}`,
environment,
run.id,
{},
"0.8.1"
);

Expand Down Expand Up @@ -217,6 +221,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => {
{
tags: ["test:tag:1234"],
},
{},
"0.8.1"
);

Expand Down Expand Up @@ -307,6 +312,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => {
"http://localhost:3000?offset=-1",
environment,
run.id,
{},
"0.8.1"
);

Expand Down
47 changes: 46 additions & 1 deletion packages/trigger-sdk/src/v3/auth.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { type ApiClientConfiguration, apiClientManager } from "@trigger.dev/core/v3";
import {
type ApiClientConfiguration,
apiClientManager,
RealtimeRunSkipColumns,
} from "@trigger.dev/core/v3";
import { generateJWT as internal_generateJWT } from "@trigger.dev/core/v3";

/**
Expand Down Expand Up @@ -106,6 +110,24 @@ export type CreatePublicTokenOptions = {
* ```
*/
expirationTime?: number | Date | string;

realtime?: {
/**
* Skip columns from the subscription.
*
* @default []
*
* @example
* ```ts
* auth.createPublicToken({
* realtime: {
* skipColumns: ["payload", "output"]
* }
* });
* ```
*/
skipColumns?: RealtimeRunSkipColumns;
};
};

/**
Expand All @@ -114,6 +136,8 @@ export type CreatePublicTokenOptions = {
* @param options - Optional parameters for creating the public token.
* @param options.scopes - An array of permission scopes to be included in the token.
* @param options.expirationTime - The expiration time for the token.
* @param options.realtime - Options for realtime subscriptions.
* @param options.realtime.skipColumns - Skip columns from the subscription.
* @returns A promise that resolves to a string representing the generated public token.
*
* @example
Expand All @@ -139,6 +163,7 @@ async function createPublicToken(options?: CreatePublicTokenOptions): Promise<st
payload: {
...claims,
scopes: options?.scopes ? flattenScopes(options.scopes) : undefined,
realtime: options?.realtime,
},
expirationTime: options?.expirationTime,
});
Expand Down Expand Up @@ -173,6 +198,24 @@ export type CreateTriggerTokenOptions = {
* @default false
*/
multipleUse?: boolean;

realtime?: {
/**
* Skip columns from the subscription.
*
* @default []
*
* @example
* ```ts
* auth.createTriggerPublicToken("my-task", {
* realtime: {
* skipColumns: ["payload", "output"]
* }
* });
* ```
*/
skipColumns?: RealtimeRunSkipColumns;
};
};

/**
Expand Down Expand Up @@ -220,6 +263,7 @@ async function createTriggerPublicToken(
payload: {
...claims,
otu: typeof options?.multipleUse === "boolean" ? !options.multipleUse : true,
realtime: options?.realtime,
scopes: flattenScopes({
trigger: {
tasks: task,
Expand Down Expand Up @@ -291,6 +335,7 @@ async function createBatchTriggerPublicToken(
payload: {
...claims,
otu: typeof options?.multipleUse === "boolean" ? !options.multipleUse : true,
realtime: options?.realtime,
scopes: flattenScopes({
batchTrigger: {
tasks: task,
Expand Down