Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to deploy kubernetes programs as configmaps rather than env vars #654

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 119 additions & 28 deletions crates/arroyo-controller/src/schedulers/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ use crate::schedulers::{Scheduler, SchedulerError, StartPipelineReq};
use anyhow::bail;
use arroyo_rpc::grpc::{api, HeartbeatNodeReq, RegisterNodeReq, WorkerFinishedReq};
use arroyo_types::{
string_config, u32_config, WorkerId, ADMIN_PORT_ENV, ARROYO_PROGRAM_ENV, CONTROLLER_ADDR_ENV,
GRPC_PORT_ENV, JOB_ID_ENV, K8S_NAMESPACE_ENV, K8S_WORKER_ANNOTATIONS_ENV,
K8S_WORKER_CONFIG_MAP_ENV, K8S_WORKER_IMAGE_ENV, K8S_WORKER_IMAGE_PULL_POLICY_ENV,
K8S_WORKER_LABELS_ENV, K8S_WORKER_NAME_ENV, K8S_WORKER_RESOURCES_ENV,
K8S_WORKER_SERVICE_ACCOUNT_NAME_ENV, K8S_WORKER_SLOTS_ENV, K8S_WORKER_VOLUMES_ENV,
K8S_WORKER_VOLUME_MOUNTS_ENV, NODE_ID_ENV, RUN_ID_ENV, TASK_SLOTS_ENV,
string_config, u32_config, WorkerId, ADMIN_PORT_ENV, ARROYO_PROGRAM_ENV,
ARROYO_PROGRAM_FILE_ENV, CONTROLLER_ADDR_ENV, GRPC_PORT_ENV, JOB_ID_ENV, K8S_NAMESPACE_ENV,
K8S_WORKER_ANNOTATIONS_ENV, K8S_WORKER_CONFIG_MAP_ENV, K8S_WORKER_IMAGE_ENV,
K8S_WORKER_IMAGE_PULL_POLICY_ENV, K8S_WORKER_LABELS_ENV, K8S_WORKER_NAME_ENV,
K8S_WORKER_RESOURCES_ENV, K8S_WORKER_SERVICE_ACCOUNT_NAME_ENV, K8S_WORKER_SLOTS_ENV,
K8S_WORKER_VOLUMES_ENV, K8S_WORKER_VOLUME_MOUNTS_ENV, NODE_ID_ENV, RUN_ID_ENV, TASK_SLOTS_ENV,
};
use async_trait::async_trait;
use base64::{engine::general_purpose, Engine as _};
use k8s_openapi::api::apps::v1::ReplicaSet;
use k8s_openapi::api::core::v1::{Pod, ResourceRequirements, Volume, VolumeMount};
use k8s_openapi::api::core::v1::{ConfigMap, Pod, ResourceRequirements, Volume, VolumeMount};
use kube::api::{DeleteParams, ListParams};
use kube::{Api, Client};
use prost::Message;
Expand All @@ -22,11 +22,13 @@ use std::collections::BTreeMap;
use std::env;
use std::time::Duration;
use tonic::Status;
use tracing::warn;

const CLUSTER_LABEL: &str = "cluster";
const JOB_ID_LABEL: &str = "job_id";
const RUN_ID_LABEL: &str = "run_id";
const JOB_NAME_LABEL: &str = "job_name";
const PROGRAM_FILE_NAME: &str = "program.arroyo";

pub struct KubernetesScheduler {
client: Option<Client>,
Expand All @@ -42,6 +44,7 @@ pub struct KubernetesScheduler {
volumes: Vec<Volume>,
volume_mounts: Vec<VolumeMount>,
config_map: Option<String>,
deploy_program_file: bool,
}

fn yaml_config<T: DeserializeOwned>(var: &str, default: T) -> T {
Expand Down Expand Up @@ -89,12 +92,11 @@ impl KubernetesScheduler {
volumes: yaml_config(K8S_WORKER_VOLUMES_ENV, vec![]),
volume_mounts: yaml_config(K8S_WORKER_VOLUME_MOUNTS_ENV, vec![]),
config_map: env::var(K8S_WORKER_CONFIG_MAP_ENV).ok(),
deploy_program_file: env::var("ARROYO_K8S_PROGRAM_FILES").is_ok(),
}
}

fn make_replicaset(&self, req: StartPipelineReq) -> ReplicaSet {
let replicas = (req.slots as f32 / self.slots_per_pod as f32).ceil() as usize;

fn labels(&self, req: &StartPipelineReq) -> Value {
let mut labels = json!({
CLUSTER_LABEL: self.name,
JOB_ID_LABEL: req.job_id,
Expand All @@ -108,6 +110,13 @@ impl KubernetesScheduler {
.insert(k.clone(), Value::String(v.clone()));
}

labels
}

fn make_replicaset(&self, req: StartPipelineReq, config_map: &str) -> ReplicaSet {
let replicas = (req.slots as f32 / self.slots_per_pod as f32).ceil() as usize;
let labels = self.labels(&req);

let mut annotations = json!({});
for (k, v) in &self.annotations {
annotations
Expand Down Expand Up @@ -138,11 +147,6 @@ impl KubernetesScheduler {
{
"name": ADMIN_PORT_ENV, "value": "6901",
},
{
"name": ARROYO_PROGRAM_ENV,
"value": general_purpose::STANDARD_NO_PAD
.encode(api::ArrowProgram::from(req.program).encode_to_vec()),
},
{
"name": "WASM_BIN",
"value": req.wasm_path
Expand All @@ -163,6 +167,41 @@ impl KubernetesScheduler {
}));
}

let mut volumes = self.volumes.clone();

let mut volume_mounts = self.volume_mounts.clone();

if self.deploy_program_file {
env.as_array_mut().unwrap().push(json!({
"name": ARROYO_PROGRAM_FILE_ENV, "value": format!("/program/{PROGRAM_FILE_NAME}"),
}));

volumes.push(
serde_json::from_value(json!({
"name": "program-conf",
"configMap": {
"name": config_map,
}
}))
.unwrap(),
);

volume_mounts.push(
serde_json::from_value(json!({
"name": "program-conf",
"mountPath": "/program",
"readOnly": true
}))
.unwrap(),
);
} else {
env.as_array_mut().unwrap().push(json!({
"name": ARROYO_PROGRAM_ENV,
"value": general_purpose::STANDARD_NO_PAD
.encode(api::ArrowProgram::from(req.program).encode_to_vec()),
}));
}

serde_json::from_value(json!({
"apiVersion": "apps/v1",
"kind": "ReplicaSet",
Expand All @@ -186,7 +225,7 @@ impl KubernetesScheduler {
"annotations": annotations,
},
"spec": {
"volumes": self.volumes,
"volumes": volumes,
"containers": [
{
"name": "worker",
Expand All @@ -205,7 +244,7 @@ impl KubernetesScheduler {
}
],
"env": env,
"volumeMounts": self.volume_mounts,
"volumeMounts": volume_mounts,
"envFrom": self.config_map.as_ref().map(|name| {
json!([
{"configMapRef": {
Expand All @@ -221,16 +260,51 @@ impl KubernetesScheduler {
}
})).unwrap()
}

fn make_configmap(&self, req: &StartPipelineReq) -> ConfigMap {
let program = general_purpose::STANDARD_NO_PAD
.encode(api::ArrowProgram::from(req.program.clone()).encode_to_vec());
let labels = self.labels(req);

let name = format!(
"arroyo-pipeline-{}-{}-cm",
req.job_id.replace("_", "-").to_lowercase(),
req.run_id
);

serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": name,
"labels": labels,
},
"data": {
PROGRAM_FILE_NAME: program,
}
}))
.unwrap()
}
}

#[async_trait]
impl Scheduler for KubernetesScheduler {
async fn start_workers(&self, req: StartPipelineReq) -> Result<(), SchedulerError> {
let api: Api<ReplicaSet> = Api::default_namespaced(self.client.as_ref().unwrap().clone());

let rs = self.make_replicaset(req);

api.create(&Default::default(), &rs)
let rs_api: Api<ReplicaSet> =
Api::default_namespaced(self.client.as_ref().unwrap().clone());
let cm_api: Api<ConfigMap> = Api::default_namespaced(self.client.as_ref().unwrap().clone());

let configmap = self.make_configmap(&req);
let rs = self.make_replicaset(req, &configmap.metadata.name.as_ref().unwrap());

if self.deploy_program_file {
cm_api
.create(&Default::default(), &configmap)
.await
.map_err(|e| SchedulerError::Other(e.to_string()))?;
}
rs_api
.create(&Default::default(), &rs)
.await
.map_err(|e| SchedulerError::Other(e.to_string()))?;

Expand All @@ -256,8 +330,6 @@ impl Scheduler for KubernetesScheduler {
run_id: Option<i64>,
force: bool,
) -> anyhow::Result<()> {
let api: Api<ReplicaSet> = Api::default_namespaced(self.client.as_ref().unwrap().clone());

let mut labels = format!("{}={}", JOB_ID_LABEL, job_id);
if let Some(run_id) = run_id {
labels.push_str(&format!(",{}={}", RUN_ID_LABEL, run_id));
Expand All @@ -269,7 +341,9 @@ impl Scheduler for KubernetesScheduler {
DeleteParams::default()
};

let result = api
let rs_api: Api<ReplicaSet> =
Api::default_namespaced(self.client.as_ref().unwrap().clone());
let result = rs_api
.delete_collection(&delete_params, &ListParams::default().labels(&labels))
.await?;

Expand All @@ -279,6 +353,22 @@ impl Scheduler for KubernetesScheduler {
}
}

if self.deploy_program_file {
let cm_api: Api<ConfigMap> =
Api::default_namespaced(self.client.as_ref().unwrap().clone());
let result = cm_api
.delete_collection(&delete_params, &ListParams::default().labels(&labels))
.await?;
if let Some(status) = result.right() {
if status.is_failure() {
warn!(
"Failed to clean config map for job {}: {:?}",
job_id, status
);
}
}
}

// wait for workers to stop
for i in 0..20 {
tokio::time::sleep(Duration::from_millis(i * 10)).await;
Expand Down Expand Up @@ -339,8 +429,9 @@ mod test {
env_vars: Default::default(),
};

KubernetesScheduler::new(None)
// test that we don't panic when creating the replicaset
.make_replicaset(req);
let sched = KubernetesScheduler::new(None);
// test that we don't panic when creating the replicaset
let cm = sched.make_configmap(&req);
sched.make_replicaset(req, &cm.metadata.name.unwrap());
}
}
2 changes: 2 additions & 0 deletions k8s/arroyo/templates/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ spec:
value: "{{ .Values.prometheus.queryRate }}"
- name: SCHEDULER
value: "kubernetes"
- name: ARROYO_K8S_PROGRAM_FILES
value: "true"
- name: K8S_NAMESPACE
valueFrom:
fieldRef:
Expand Down
1 change: 1 addition & 0 deletions k8s/arroyo/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ rules:
resources:
- services
- pods
- configmaps
verbs:
- create
- get
Expand Down
Loading