Skip to content

Commit

Permalink
make declare async
Browse files Browse the repository at this point in the history
  • Loading branch information
DominicBurkart committed Sep 29, 2020
1 parent f06af69 commit 5b1f407
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 26 deletions.
2 changes: 1 addition & 1 deletion turbolift_internals/src/distributed_platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub type JsonResponse = String;
#[async_trait]
pub trait DistributionPlatform {
/// declare a function
fn declare(&mut self, function_name: &str, project_tar: &[u8]);
async fn declare(&mut self, function_name: &str, project_tar: &[u8]) -> DistributionResult<()>;

// dispatch params to a function
async fn dispatch(
Expand Down
40 changes: 22 additions & 18 deletions turbolift_internals/src/kubernetes.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use kube::api::{Api, Meta, ListParams, PostParams, WatchEvent};
use kube::Client;
use k8s_openapi::api::core::v1::Pod;
use cached::proc_macro::cached;
use k8s_openapi::api::core::v1::Pod;
use kube::api::{Api, PostParams};
use kube::Client;
use url::Url;

use crate::distributed_platform::{
Expand All @@ -13,25 +12,24 @@ use crate::distributed_platform::{
const K8S_NAMESPACE: &str = "turbolift";
type ImageTag = String;

pub type K8sConfig = kube::config::Config;
// pub type K8sConfig = kube::config::Config; todo

pub struct K8s {
config: K8sConfig,
pods: Vec<Pod>,
}

#[async_trait]
impl DistributionPlatform for K8s {
fn declare(&mut self, function_name: &str, project_tar: &[u8]) {
async fn declare(&mut self, function_name: &str, project_tar: &[u8]) -> DistributionResult<()> {
// connect to cluster. tries in-cluster configuration first, then falls back to kubeconfig file.
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::namespaced(client, K8S_NAMESPACE);

// generate image & host it on a local repo
let repo_url = setup_repo().expect("error initializing network repository");
let local_tag = make_image(function_name, project_tar).expect("error making image");
let tag_in_repo = add_image_to_repo(local_tag).expect("error adding image to repo");
let image_url = repo_url.join(&tag_in_repo).expect("url parse error");
let repo_url = setup_repo();
let local_tag = make_image(function_name, project_tar)?;
let tag_in_repo = add_image_to_repo(local_tag)?;
let image_url = repo_url.join(&tag_in_repo)?;

// make pod
let pod_name = function_name;
Expand All @@ -51,24 +49,30 @@ impl DistributionPlatform for K8s {
],
}
}))?;
self.pods.push(pods.create(&PostParams::default(), &pod).await?);
self.pods
.push(pods.create(&PostParams::default(), &pod).await?);
// todo do we need to monitor the pod in any way??
Ok(())
}

async fn dispatch(&mut self, function_name: &str, params: ArgsString) -> DistributionResult<JsonResponse> {
async fn dispatch(
&mut self,
_function_name: &str,
_params: ArgsString,
) -> DistributionResult<JsonResponse> {
unimplemented!()
}
}

#[cached(size=1)]
fn setup_repo() -> DistributionResult<Url> {
#[cached(size = 1)]
fn setup_repo() -> Url {
unimplemented!()
}

fn make_image(function_name: &str, project_tar: &[u8]) -> DistributionResult<ImageTag> {
fn make_image(_function_name: &str, _project_tar: &[u8]) -> DistributionResult<ImageTag> {
unimplemented!()
}

fn add_image_to_repo(local_tag: ImageTag) -> DistributionResult<ImageTag> {
fn add_image_to_repo(_local_tag: ImageTag) -> DistributionResult<ImageTag> {
unimplemented!()
}
}
9 changes: 5 additions & 4 deletions turbolift_internals/src/local_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,20 @@ impl LocalQueue {
#[async_trait]
impl DistributionPlatform for LocalQueue {
/// declare a function. Runs once.
fn declare(&mut self, function_name: &str, project_tar: &[u8]) {
async fn declare(&mut self, function_name: &str, project_tar: &[u8]) -> DistributionResult<()> {
let relative_build_dir = Path::new(".")
.join(".turbolift")
.join(".worker_build_cache");
fs::create_dir_all(&relative_build_dir).unwrap();
let build_dir = relative_build_dir.canonicalize().unwrap();
fs::create_dir_all(&relative_build_dir)?;
let build_dir = relative_build_dir.canonicalize()?;
decompress_proj_src(project_tar, &build_dir).unwrap();
let function_executable =
Path::new(CACHE_PATH.as_os_str()).join(function_name.to_string() + "_server");
make_executable(&build_dir.join(function_name), Some(&function_executable)).unwrap();
make_executable(&build_dir.join(function_name), Some(&function_executable))?;
self.fn_name_to_binary_path
.insert(function_name.to_string(), function_executable);
//std::fs::remove_dir_all(build_dir.join(function_name)).unwrap(); todo
Ok(())
}

// dispatch params to a function. Runs each time the function is called.
Expand Down
13 changes: 10 additions & 3 deletions turbolift_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS
&function_name_string,
untyped_params,
);
let declaration_error_text = format!(
"turbolift: error while declaring {}",
original_target_function_name
);

// todo extract any docs from passed function and put into fn wrapper

Expand Down Expand Up @@ -154,18 +158,21 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS
turbolift::DistributionResult<#result_type> {
use std::time::Duration;
use turbolift::DistributionPlatform;
use turbolift::DistributionResult;
use turbolift::async_std::task;
use turbolift::cached::proc_macro::cached;

// call .declare once by memoizing the call
#[cached(size=1)]
fn setup() {
async fn setup() -> DistributionResult<()> {
#distribution_platform
.lock()
.unwrap()
.declare(#original_target_function_name, #project_source_binary);
.declare(#original_target_function_name, #project_source_binary)
.await
.expect(#declaration_error_text)
}
setup();
setup().await?;

let params = #params_vec.join("/");

Expand Down

0 comments on commit 5b1f407

Please sign in to comment.