Skip to content

Make provider changes to support image cache #1700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 51 additions & 15 deletions apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
import { PodCleaner } from "./podCleaner";
import { TaskMonitor } from "./taskMonitor";
import { UptimeHeartbeat } from "./uptimeHeartbeat";
import { assertExhaustive } from "@trigger.dev/core";
import { CustomLabelHelper } from "./labelHelper";

const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
const NODE_NAME = process.env.NODE_NAME || "local";
Expand All @@ -37,7 +39,14 @@ const UPTIME_MAX_PENDING_ERRORS = Number(process.env.UPTIME_MAX_PENDING_ERRORS |
const POD_EPHEMERAL_STORAGE_SIZE_LIMIT = process.env.POD_EPHEMERAL_STORAGE_SIZE_LIMIT || "10Gi";
const POD_EPHEMERAL_STORAGE_SIZE_REQUEST = process.env.POD_EPHEMERAL_STORAGE_SIZE_REQUEST || "2Gi";

// Image config
const PRE_PULL_DISABLED = process.env.PRE_PULL_DISABLED === "true";
const ADDITIONAL_PULL_SECRETS = process.env.ADDITIONAL_PULL_SECRETS;
const PAUSE_IMAGE = process.env.PAUSE_IMAGE || "registry.k8s.io/pause:3.9";
const BUSYBOX_IMAGE = process.env.BUSYBOX_IMAGE || "registry.digitalocean.com/trigger/busybox";
const DEPLOYMENT_IMAGE_PREFIX = process.env.DEPLOYMENT_IMAGE_PREFIX;
const RESTORE_IMAGE_PREFIX = process.env.RESTORE_IMAGE_PREFIX;
const UTILITY_IMAGE_PREFIX = process.env.UTILITY_IMAGE_PREFIX;

const logger = new SimpleLogger(`[${NODE_NAME}]`);
logger.log(`running in ${RUNTIME_ENV} mode`);
Expand Down Expand Up @@ -65,6 +74,8 @@ class KubernetesTaskOperations implements TaskOperations {
apps: k8s.AppsV1Api;
};

#labelHelper = new CustomLabelHelper();

constructor(opts: { namespace?: string } = {}) {
if (opts.namespace) {
this.#namespace.metadata.name = opts.namespace;
Expand Down Expand Up @@ -103,7 +114,7 @@ class KubernetesTaskOperations implements TaskOperations {
containers: [
{
name: this.#getIndexContainerName(opts.shortCode),
image: opts.imageRef,
image: getImageRef("deployment", opts.imageRef),
ports: [
{
containerPort: 8000,
Expand Down Expand Up @@ -157,6 +168,7 @@ class KubernetesTaskOperations implements TaskOperations {
name: containerName,
namespace: this.#namespace.metadata.name,
labels: {
...this.#labelHelper.getAdditionalLabels("create"),
...this.#getSharedLabels(opts),
app: "task-run",
"app.kubernetes.io/part-of": "trigger-worker",
Expand All @@ -170,7 +182,7 @@ class KubernetesTaskOperations implements TaskOperations {
containers: [
{
name: containerName,
image: opts.image,
image: getImageRef("deployment", opts.image),
ports: [
{
containerPort: 8000,
Expand Down Expand Up @@ -218,6 +230,7 @@ class KubernetesTaskOperations implements TaskOperations {
name: `${this.#getRunContainerName(opts.runId)}-${opts.checkpointId.slice(-8)}`,
namespace: this.#namespace.metadata.name,
labels: {
...this.#labelHelper.getAdditionalLabels("restore"),
...this.#getSharedLabels(opts),
app: "task-run",
"app.kubernetes.io/part-of": "trigger-worker",
Expand All @@ -231,12 +244,12 @@ class KubernetesTaskOperations implements TaskOperations {
initContainers: [
{
name: "pull-base-image",
image: opts.imageRef,
image: getImageRef("deployment", opts.imageRef),
command: ["sleep", "0"],
},
{
name: "populate-taskinfo",
image: "registry.digitalocean.com/trigger/busybox",
image: getImageRef("utility", BUSYBOX_IMAGE),
imagePullPolicy: "IfNotPresent",
command: ["/bin/sh", "-c"],
args: ["printenv COORDINATOR_HOST | tee /etc/taskinfo/coordinator-host"],
Expand All @@ -252,7 +265,7 @@ class KubernetesTaskOperations implements TaskOperations {
containers: [
{
name: this.#getRunContainerName(opts.runId),
image: opts.checkpointRef,
image: getImageRef("restore", opts.checkpointRef),
ports: [
{
containerPort: 8000,
Expand Down Expand Up @@ -358,7 +371,7 @@ class KubernetesTaskOperations implements TaskOperations {
initContainers: [
{
name: "prepull",
image: opts.imageRef,
image: getImageRef("deployment", opts.imageRef),
command: ["/usr/bin/true"],
resources: {
limits: {
Expand All @@ -372,7 +385,7 @@ class KubernetesTaskOperations implements TaskOperations {
containers: [
{
name: "pause",
image: "registry.k8s.io/pause:3.9",
image: getImageRef("utility", PAUSE_IMAGE),
resources: {
limits: {
cpu: "1m",
Expand Down Expand Up @@ -403,17 +416,20 @@ class KubernetesTaskOperations implements TaskOperations {
}

get #defaultPodSpec(): Omit<k8s.V1PodSpec, "containers"> {
const pullSecrets = ["registry-trigger", "registry-trigger-failover"];

if (ADDITIONAL_PULL_SECRETS) {
pullSecrets.push(...ADDITIONAL_PULL_SECRETS.split(","));
}

const imagePullSecrets = pullSecrets.map(
(name) => ({ name }) satisfies k8s.V1LocalObjectReference
);

return {
restartPolicy: "Never",
automountServiceAccountToken: false,
imagePullSecrets: [
{
name: "registry-trigger",
},
{
name: "registry-trigger-failover",
},
],
imagePullSecrets,
nodeSelector: {
nodetype: "worker",
},
Expand Down Expand Up @@ -673,6 +689,26 @@ class KubernetesTaskOperations implements TaskOperations {
}
}

type ImageType = "deployment" | "restore" | "utility";

function getImagePrefix(type: ImageType) {
switch (type) {
case "deployment":
return DEPLOYMENT_IMAGE_PREFIX;
case "restore":
return RESTORE_IMAGE_PREFIX;
case "utility":
return UTILITY_IMAGE_PREFIX;
default:
assertExhaustive(type);
}
}

function getImageRef(type: ImageType, ref: string) {
const prefix = getImagePrefix(type);
return prefix ? `${prefix}/${ref}` : ref;
}

const provider = new ProviderShell({
tasks: new KubernetesTaskOperations({
namespace: KUBERNETES_NAMESPACE,
Expand Down
153 changes: 153 additions & 0 deletions apps/kubernetes-provider/src/labelHelper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import { assertExhaustive } from "@trigger.dev/core";

const CREATE_LABEL_ENV_VAR_PREFIX = "DEPLOYMENT_LABEL_";
const RESTORE_LABEL_ENV_VAR_PREFIX = "RESTORE_LABEL_";
const LABEL_SAMPLE_RATE_POSTFIX = "_SAMPLE_RATE";

type OperationType = "create" | "restore";

type CustomLabel = {
key: string;
value: string;
sampleRate: number;
};

export class CustomLabelHelper {
// Labels and sample rates are defined in environment variables so only need to be computed once
private createLabels?: CustomLabel[];
private restoreLabels?: CustomLabel[];

private getLabelPrefix(type: OperationType) {
const prefix = type === "create" ? CREATE_LABEL_ENV_VAR_PREFIX : RESTORE_LABEL_ENV_VAR_PREFIX;
return prefix.toLowerCase();
}

private getLabelSampleRatePostfix() {
return LABEL_SAMPLE_RATE_POSTFIX.toLowerCase();
}

// Can only range from 0 to 1
private fractionFromPercent(percent: number) {
return Math.min(1, Math.max(0, percent / 100));
}

private isLabelSampleRateEnvVar(key: string) {
return key.toLowerCase().endsWith(this.getLabelSampleRatePostfix());
}

private isLabelEnvVar(type: OperationType, key: string) {
const prefix = this.getLabelPrefix(type);
return key.toLowerCase().startsWith(prefix) && !this.isLabelSampleRateEnvVar(key);
}

private getSampleRateEnvVarKey(type: OperationType, envKey: string) {
return `${envKey.toLowerCase()}${this.getLabelSampleRatePostfix()}`;
}

private getLabelNameFromEnvVarKey(type: OperationType, key: string) {
return key
.slice(this.getLabelPrefix(type).length)
.toLowerCase()
.replace(/___/g, ".")
.replace(/__/g, "/")
.replace(/_/g, "-");
}

private getCaseInsensitiveEnvValue(key: string) {
for (const [envKey, value] of Object.entries(process.env)) {
if (envKey.toLowerCase() === key.toLowerCase()) {
return value;
}
}
}

/** Returns the sample rate for a given label as fraction of 100 */
private getSampleRateFromEnvVarKey(type: OperationType, envKey: string) {
// Apply default: always sample
const DEFAULT_SAMPLE_RATE_PERCENT = 100;
const defaultSampleRateFraction = this.fractionFromPercent(DEFAULT_SAMPLE_RATE_PERCENT);

const value = this.getCaseInsensitiveEnvValue(this.getSampleRateEnvVarKey(type, envKey));

if (!value) {
return defaultSampleRateFraction;
}

const sampleRatePercent = parseFloat(value || String(DEFAULT_SAMPLE_RATE_PERCENT));

if (isNaN(sampleRatePercent)) {
return defaultSampleRateFraction;
}

const fractionalSampleRate = this.fractionFromPercent(sampleRatePercent);

return fractionalSampleRate;
}

private getCustomLabels(type: OperationType): CustomLabel[] {
switch (type) {
case "create":
if (this.createLabels) {
return this.createLabels;
}
break;
case "restore":
if (this.restoreLabels) {
return this.restoreLabels;
}
break;
default:
assertExhaustive(type);
}

const customLabels: CustomLabel[] = [];

for (const [envKey, value] of Object.entries(process.env)) {
const key = envKey.toLowerCase();

// Only process env vars that start with the expected prefix
if (!this.isLabelEnvVar(type, key)) {
continue;
}

// Skip sample rates - deal with them separately
if (this.isLabelSampleRateEnvVar(key)) {
continue;
}

const labelName = this.getLabelNameFromEnvVarKey(type, key);
const sampleRate = this.getSampleRateFromEnvVarKey(type, key);

const label = {
key: labelName,
value: value || "",
sampleRate,
} satisfies CustomLabel;

customLabels.push(label);
}

return customLabels;
}

getAdditionalLabels(type: OperationType): Record<string, string> {
const labels = this.getCustomLabels(type);

const additionalLabels: Record<string, string> = {};

for (const { key, value, sampleRate } of labels) {
// Always apply label if sample rate is 1
if (sampleRate === 1) {
additionalLabels[key] = value;
continue;
}

if (Math.random() <= sampleRate) {
additionalLabels[key] = value;
continue;
}
}

return additionalLabels;
}
}
2 changes: 2 additions & 0 deletions apps/kubernetes-provider/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"strict": true,
"skipLibCheck": true,
"paths": {
"@trigger.dev/core": ["../../packages/core/src"],
"@trigger.dev/core/*": ["../../packages/core/src/*"],
"@trigger.dev/core/v3": ["../../packages/core/src/v3"],
"@trigger.dev/core/v3/*": ["../../packages/core/src/v3/*"]
}
Expand Down