Skip to content

Commit

Permalink
Add pullRegion to streams to make the ingest node sticky (#2127)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Apr 15, 2024
1 parent df068d7 commit 0ed60af
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 1 deletion.
3 changes: 3 additions & 0 deletions packages/api/src/controllers/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,9 @@ export const triggerCatalystPullStart =
url.searchParams.set("lat", lat.toString());
url.searchParams.set("lon", lon.toString());
playbackUrl = url.toString();
console.log(
`triggering catalyst pull start for streamId=${stream.id} playbackId=${stream.playbackId} lat=${lat} lon=${lon} pullRegion=${stream.pullRegion}`
);
}

const deadline = Date.now() + 2 * PULL_START_TIMEOUT;
Expand Down
23 changes: 23 additions & 0 deletions packages/api/src/controllers/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
import serverPromise, { TestServer } from "../test-server";
import { semaphore, sleep } from "../util";
import { generateUniquePlaybackId } from "./generate-keys";
import { extractRegionFrom } from "./stream";

const uuidRegex = /[0-9a-f]+(-[0-9a-f]+){4}/;

Expand Down Expand Up @@ -712,6 +713,28 @@ describe("controllers/stream", () => {
const document = await db.stream.get(stream.id);
expect(db.stream.addDefaultFields(document)).toEqual(updatedStream);
});
it("should extract region from redirected playback url", async () => {
expect(
extractRegionFrom(
"https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8"
)
).toBe("sto");
expect(
extractRegionFrom(
"https://mos2-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8"
)
).toBe("mos2");
expect(
extractRegionFrom(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+not-used-playback/index.m3u8"
)
).toBe("fra-staging");
expect(
extractRegionFrom(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+other-playback/index.m3u8"
)
).toBe(null);
});
});

it("should create a stream, delete it, and error when attempting additional delete or replace", async () => {
Expand Down
43 changes: 42 additions & 1 deletion packages/api/src/controllers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import logger from "../logger";
import { authorizer } from "../middleware";
import { validatePost } from "../middleware";
import { geolocateMiddleware } from "../middleware";
import { fetchWithTimeout } from "../util";
import { CliArgs } from "../parse-cli";
import {
DetectionWebhookPayload,
Expand Down Expand Up @@ -235,6 +236,40 @@ async function triggerManyIdleStreamsWebhook(ids: string[], queue: Queue) {
);
}

async function resolvePullRegion(
stream: NewStreamPayload,
ingest: string
): Promise<string> {
if (process.env.NODE_ENV === "test") {
return null;
}
const url = new URL(
pathJoin(ingest, `hls`, "not-used-playback", `index.m3u8`)
);
const { lat, lon } = stream.pull?.location ?? {};
if (lat && lon) {
url.searchParams.set("lat", lat.toString());
url.searchParams.set("lon", lon.toString());
}
const playbackUrl = url.toString();
// Send any playback request to catalyst-api, which effectively resolves the region using MistUtilLoad
const response = await fetchWithTimeout(playbackUrl, { redirect: "manual" });
if (response.status < 300 || response.status >= 400) {
// not a redirect response, so we can't determine the region
return null;
}
const redirectUrl = response.headers.get("location");
return extractRegionFrom(redirectUrl);
}

// Extracts region from redirected node URL, e.g. "sto" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8"
export function extractRegionFrom(playbackUrl: string): string {
const regionRegex =
/https?:\/\/(.+)-\w+-catalyst.+not-used-playback\/index.m3u8/;
const matches = playbackUrl.match(regionRegex);
return matches ? matches[1] : null;
}

export function getHLSPlaybackUrl(ingest: string, stream: DBStream) {
return pathJoin(ingest, `hls`, stream.playbackId, `index.m3u8`);
}
Expand Down Expand Up @@ -1001,6 +1036,8 @@ app.put(
const { key = "pull.source", waitActive } = toStringValues(req.query);
const rawPayload = req.body as NewStreamPayload;

const ingest = await getIngestBase(req);

if (!rawPayload.pull) {
return res.status(400).json({
errors: [`stream pull configuration is required`],
Expand Down Expand Up @@ -1046,9 +1083,13 @@ app.put(
}
const streamExisted = streams.length === 1;

const pullRegion = await resolvePullRegion(rawPayload, ingest);

let stream: DBStream;
if (!streamExisted) {
stream = await handleCreateStream(req);
stream.pullRegion = pullRegion;
await db.stream.replace(stream);
} else {
const oldStream = streams[0];
const sleepFor = terminateDelay(oldStream);
Expand All @@ -1063,6 +1104,7 @@ app.put(
...oldStream,
...EMPTY_NEW_STREAM_PAYLOAD, // clear all fields that should be set from the payload
suspended: false,
pullRegion,
...payload,
};
await db.stream.replace(stream);
Expand All @@ -1073,7 +1115,6 @@ app.put(
}

if (!stream.isActive || streamExisted) {
const ingest = await getIngestBase(req);
await triggerCatalystPullStart(stream, getHLSPlaybackUrl(ingest, stream));
}

Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/schema/db-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,8 @@ components:
example: 1587667174725
pullLockedBy:
type: string
pullRegion:
type: string
playbackId:
unique: true
mistHost:
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/store/stream-table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ const adminOnlyFields = [
"createdByTokenId",
"pullLockedAt",
"pullLockedBy",
"pullRegion",
];

const privateFields = [
Expand Down

0 comments on commit 0ed60af

Please sign in to comment.