Skip to content

Commit

Permalink
Create devcontainer if actor is live, and optimized amp-resources
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Oct 12, 2023
1 parent 79a9639 commit 0a1d7e1
Show file tree
Hide file tree
Showing 23 changed files with 466 additions and 270 deletions.
2 changes: 1 addition & 1 deletion apiserver/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ COPY --from=builder \
EXPOSE 8170

# What the container should run when it is started
CMD ["/usr/local/bin/amp-apiserver"]
ENTRYPOINT ["/usr/local/bin/amp-apiserver"]
2 changes: 1 addition & 1 deletion controllers/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@ COPY --from=builder \
/usr/local/bin/

# What the container should run when it is started
CMD ["/usr/local/bin/amp-controllers"]
ENTRYPOINT ["/usr/local/bin/amp-controllers"]
11 changes: 11 additions & 0 deletions controllers/src/actor_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ async fn init(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result<
}

async fn build(actor: &Actor, ctx: &Arc<Context>, recorder: &Recorder) -> Result<Action> {
// Return if the actor is live
if actor.spec.live {
tracing::info!("The actor is live mode, Running");
let condition = ActorState::running(true, "AutoRun", None);
actor::patch_status(&ctx.k8s, actor, condition)
.await
.map_err(Error::ResourceError)?;

return Ok(Action::await_change());
}

// Return if the image already exists
let credentials = ctx.credentials.read().await;
let config = DockerConfig::from(&credentials.registries);
Expand Down
57 changes: 30 additions & 27 deletions resources/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::error::{Error, Result};

use amp_common::resource::{Actor, ActorSpec, ActorState, Playbook};
use k8s_metrics::v1beta1::PodMetrics;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
use kube::api::{ListParams, Patch, PatchParams, PostParams};
use kube::{Api, Client, Resource, ResourceExt};
use serde_json::json;
use tracing::debug;

use super::error::{Error, Result};
use tracing::{debug, error, info};

pub async fn exists(client: &Client, playbook: &Playbook, name: &str) -> Result<bool> {
let namespace = playbook.spec.namespace.clone();
let api: Api<Actor> = Api::namespaced(client.clone(), namespace.as_str());

Ok(api.get_opt(name).await.map_err(Error::KubeError)?.is_some())
}

Expand All @@ -37,14 +38,14 @@ pub async fn create(client: &Client, playbook: &Playbook, spec: &ActorSpec) -> R
resource
.owner_references_mut()
.push(playbook.controller_owner_ref(&()).unwrap());
tracing::debug!("The Actor resource:\n {:?}\n", resource);
debug!("The Actor resource:\n {:?}\n", resource);

let actor = api
.create(&PostParams::default(), &resource)
.await
.map_err(Error::KubeError)?;

tracing::info!("Created Actor: {}", actor.name_any());
info!("Created Actor: {}", actor.name_any());

// Patch this actor as initial Pending status
patch_status(client, &actor, ActorState::pending()).await?;
Expand All @@ -57,27 +58,27 @@ pub async fn update(client: &Client, playbook: &Playbook, spec: &ActorSpec) -> R

let name = spec.name.clone();
let mut actor = api.get(&name).await.map_err(Error::KubeError)?;
tracing::debug!("The Actor {} already exists: {:?}", &spec.name, actor);

if &actor.spec != spec {
let mut resource = Actor::new(&name, spec.clone());
resource
.owner_references_mut()
.push(playbook.controller_owner_ref(&()).unwrap());
tracing::debug!("The updating Actor resource:\n {:?}\n", resource);

actor = api
.patch(
&name,
&PatchParams::apply("amp-controllers").force(),
&Patch::Apply(&resource),
)
.await
.map_err(Error::KubeError)?;

tracing::info!("Updated Actor: {}", actor.name_any());
debug!("The Actor {} already exists: {:?}", &spec.name, actor);

if &actor.spec == spec {
debug!("The Actor {} is already up-to-date", &spec.name);
return Ok(actor);
}

let mut resource = Actor::new(&name, spec.clone());
resource
.owner_references_mut()
.push(playbook.controller_owner_ref(&()).unwrap());
debug!("The updating Actor resource:\n {:?}\n", resource);

let params = &PatchParams::apply("amp-controllers").force();
actor = api
.patch(&name, params, &Patch::Apply(&resource))
.await
.map_err(Error::KubeError)?;

info!("Updated Actor: {}", actor.name_any());

Ok(actor)
}

Expand All @@ -98,7 +99,7 @@ pub async fn patch_status(client: &Client, actor: &Actor, condition: Condition)
.await
.map_err(Error::KubeError)?;

tracing::info!("Patched status {:?} for Actor {}", actor.status, actor.name_any());
info!("Patched status {:?} for Actor {}", actor.status, actor.name_any());

Ok(())
}
Expand All @@ -121,11 +122,11 @@ pub async fn metrics(client: &Client, namespace: &str, name: &str) -> Result<Pod
// check if the error is NotFound
if let kube::Error::Api(error_response) = &err {
if error_response.code == 404 {
tracing::error!("No metrics found for Actor {}", name);
error!("No metrics found for Actor {}", name);
return Err(Error::MetricsNotAvailable);
}
}
tracing::error!("Failed to get metrics for Actor {}: {}", name, err);
error!("Failed to get metrics for Actor {}: {}", name, err);
Err(Error::KubeError(err))
}
}
Expand All @@ -134,11 +135,13 @@ pub async fn metrics(client: &Client, namespace: &str, name: &str) -> Result<Pod
pub async fn get(client: &Client, namespace: &str, name: &str) -> Result<Actor> {
let api: Api<Actor> = Api::namespaced(client.clone(), namespace);
let actor = api.get(name).await.map_err(Error::KubeError)?;

Ok(actor)
}

pub async fn list(client: &Client, namespace: &str) -> Result<Vec<Actor>> {
let api: Api<Actor> = Api::namespaced(client.clone(), namespace);
let actors = api.list(&ListParams::default()).await.map_err(Error::KubeError)?;

Ok(actors.items)
}
133 changes: 48 additions & 85 deletions resources/src/builder/mod.rs → resources/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod buildpacks;
pub mod git_sync;
pub mod kaniko;

use std::collections::BTreeMap;

use amp_common::resource::Actor;
use amp_common::schema::BuildMethod;
use k8s_openapi::api::batch::v1::{Job, JobSpec};
use k8s_openapi::api::core::v1::{KeyToPath, PodTemplateSpec, SecretVolumeSource, Volume, VolumeMount};
use k8s_openapi::api::core::v1::PodTemplateSpec;
use kube::api::{Patch, PatchParams, PostParams};
use kube::core::ObjectMeta;
use kube::{Api, Client, Resource, ResourceExt};
use tracing::{debug, info};

use crate::containers::{buildpacks, kaniko};
use crate::error::{Error, Result};
use crate::{hash, LAST_APPLIED_HASH_KEY};

const DEFAULT_KANIKO_IMAGE: &str = "gcr.io/kaniko-project/executor:v1.15.0";
const DEFAULT_GIT_SYNC_IMAGE: &str = "registry.k8s.io/git-sync/git-sync:v4.0.0";
const WORKSPACE_DIR: &str = "/workspace/app";

pub async fn exists(client: &Client, actor: &Actor) -> Result<bool> {
let namespace = actor
.namespace()
Expand All @@ -50,14 +44,14 @@ pub async fn create(client: &Client, actor: &Actor) -> Result<Job> {
let api: Api<Job> = Api::namespaced(client.clone(), namespace.as_str());

let resource = new(actor)?;
tracing::debug!("The Job resource:\n {:?}\n", resource);
debug!("The Job resource:\n {:?}\n", resource);

let job = api
.create(&PostParams::default(), &resource)
.await
.map_err(Error::KubeError)?;

tracing::info!("Created Job: {}", job.name_any());
info!("Created Job: {}", job.name_any());
Ok(job)
}

Expand All @@ -69,82 +63,88 @@ pub async fn update(client: &Client, actor: &Actor) -> Result<Job> {
let name = actor.spec.name();

let mut job = api.get(&name).await.map_err(Error::KubeError)?;
tracing::debug!("The Job {} already exists: {:?}", &name, job);
debug!("The Job {} already exists: {:?}", &name, job);

let expected_hash = hash(&actor.spec)?;
let found_hash: String = job
.annotations()
.get(LAST_APPLIED_HASH_KEY)
.map_or("".into(), |v| v.into());

if found_hash != expected_hash {
let resource = new(actor)?;
tracing::debug!("The updating Job resource:\n {:?}\n", resource);
if found_hash == expected_hash {
debug!("The Job {} is already up-to-date", &name);
return Ok(job);
}

job = api
.patch(
&name,
&PatchParams::apply("amp-controllers").force(),
&Patch::Apply(&resource),
)
.await
.map_err(Error::KubeError)?;
let resource = new(actor)?;
debug!("The updating Job resource:\n {:?}\n", resource);

tracing::info!("Updated Job: {}", job.name_any());
}
let params = &PatchParams::apply("amp-controllers").force();
job = api
.patch(&name, params, &Patch::Apply(&resource))
.await
.map_err(Error::KubeError)?;

info!("Updated Job: {}", job.name_any());
Ok(job)
}

/// Create a Job for build images
fn new(actor: &Actor) -> Result<Job> {
let name = actor.spec.name();

// Build the metadata for the job
let owner_reference = actor.controller_owner_ref(&()).unwrap();
let annotations = BTreeMap::from([(LAST_APPLIED_HASH_KEY.into(), hash(&actor.spec)?)]);
let labels = BTreeMap::from([
("app.kubernetes.io/name".into(), name.clone()),
("app.kubernetes.io/managed-by".into(), "Amphitheatre".into()),
]);
let metadata = ObjectMeta {
name: Some(name),
owner_references: Some(vec![owner_reference]),
labels: Some(labels.clone()),
annotations: Some(annotations),
..Default::default()
};

// Prefer to use Kaniko to build images with Dockerfile,
// else, build the image with Cloud Native Buildpacks
let build = actor.spec.character.build.clone().unwrap_or_default();
let pod = match build.method() {
BuildMethod::Dockerfile => {
tracing::debug!("Found dockerfile, build it with kaniko");
debug!("Found dockerfile, build it with kaniko");
kaniko::pod(&actor.spec)
}
BuildMethod::Buildpacks => {
tracing::debug!("Build the image with Cloud Native Buildpacks");
debug!("Build the image with Cloud Native Buildpacks");
buildpacks::pod(&actor.spec)
}
};

Ok(Job {
metadata: ObjectMeta {
name: Some(name),
owner_references: Some(vec![owner_reference]),
labels: Some(labels.clone()),
annotations: Some(annotations),
..Default::default()
// Build the spec for the job
let spec = JobSpec {
backoff_limit: Some(0),
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(labels),
..Default::default()
}),
spec: Some(pod),
},
spec: Some(JobSpec {
backoff_limit: Some(0),
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(labels),
..Default::default()
}),
spec: Some(pod),
},
..Default::default()
}),
..Default::default()
};

// Build and return the job resource
Ok(Job {
metadata,
spec: Some(spec),
..Default::default()
})
}

pub async fn completed(client: &Client, actor: &Actor) -> Result<bool> {
tracing::debug!("Check If the build Job has not completed");
debug!("Check If the build Job has not completed");

let namespace = actor
.namespace()
Expand All @@ -153,47 +153,10 @@ pub async fn completed(client: &Client, actor: &Actor) -> Result<bool> {
let name = actor.spec.name();

if let Ok(Some(job)) = api.get_opt(&name).await {
tracing::debug!("Found Job {}", &name);
debug!("Found Job {}", &name);
Ok(job.status.map_or(false, |s| s.succeeded >= Some(1)))
} else {
tracing::debug!("Not found Job {}", &name);
debug!("Not found Job {}", &name);
Ok(false)
}
}

/// volume for /workspace based on k8s emptyDir
#[inline]
pub fn workspace_volume() -> Volume {
Volume {
name: "workspace".to_string(),
empty_dir: Some(Default::default()),
..Default::default()
}
}

/// volume mount for /workspace
#[inline]
pub fn workspace_mount() -> VolumeMount {
VolumeMount {
name: "workspace".to_string(),
mount_path: "/workspace".to_string(),
..Default::default()
}
}

#[inline]
pub fn docker_config_volume() -> Volume {
Volume {
name: "docker-config".to_string(),
secret: Some(SecretVolumeSource {
secret_name: Some("amp-registry-credentials".into()),
items: Some(vec![KeyToPath {
key: ".dockerconfigjson".into(),
path: "config.json".into(),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
}
}
1 change: 1 addition & 0 deletions resources/src/character.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ use super::error::{Error, Result};
pub async fn get(client: &Client, name: &str) -> Result<Character> {
let api: Api<Character> = Api::all(client.clone());
let resources = api.get(name).await.map_err(Error::KubeError)?;

Ok(resources)
}
Loading

0 comments on commit 0a1d7e1

Please sign in to comment.