Skip to content

Commit

Permalink
SSH pull and push support (wip)
Browse files Browse the repository at this point in the history
commit-id:534b1577
  • Loading branch information
vlad-ivanov-name committed Nov 16, 2022
1 parent b1c76b0 commit e690c37
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 72 deletions.
64 changes: 38 additions & 26 deletions josh-proxy/src/bin/josh-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#[macro_use]
extern crate lazy_static;

use josh_proxy::{FetchError, MetaConfig, RepoConfig, RepoUpdate};
use josh_proxy::{FetchError, MetaConfig, RemoteAuth, RepoConfig, RepoUpdate};
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand Down Expand Up @@ -102,12 +102,11 @@ async fn fetch_upstream(
service: Arc<JoshProxyService>,
upstream_protocol: UpstreamProtocol,
upstream_repo: String,
auth: &josh_proxy::auth::Handle,
remote_auth: &RemoteAuth,
remote_url: String,
headref: &str,
force: bool,
) -> Result<(), FetchError> {
let auth = auth.clone();
let key = remote_url.clone();

if upstream_protocol == UpstreamProtocol::Ssh {
Expand Down Expand Up @@ -173,23 +172,23 @@ async fn fetch_upstream(

let span = tracing::span!(tracing::Level::TRACE, "fetch worker");
let us = upstream_repo.clone();
let a = auth.clone();
let ru = remote_url.clone();
let permit = service.fetch_permits.acquire().await;
let task_remote_auth = remote_auth.clone();
let fetch_result = tokio::task::spawn_blocking(move || {
let _span_guard = span.enter();
josh_proxy::fetch_refs_from_url(&br_path, &us, &ru, &refs_to_fetch, &a)
josh_proxy::fetch_refs_from_url(&br_path, &us, &ru, &refs_to_fetch, &task_remote_auth)
})
.await?;

let us = upstream_repo.clone();
let s = tracing::span!(tracing::Level::TRACE, "get_head worker");
let br_path = service.repo_path.join("mirror");
let ru = remote_url.clone();
let a = auth.clone();
let task_remote_auth = remote_auth.clone();
let hres = tokio::task::spawn_blocking(move || {
let _e = s.enter();
josh_proxy::get_head(&br_path, &ru, &a)
josh_proxy::get_head(&br_path, &ru, &task_remote_auth)
})
.await?;

Expand All @@ -199,8 +198,8 @@ async fn fetch_upstream(

std::mem::drop(permit);

match fetch_result {
Ok(_) => {
match (fetch_result, remote_auth) {
(Ok(_), RemoteAuth::Http { auth }) => {
fetch_timers.write()?.insert(key, std::time::Instant::now());

let (auth_user, _) = auth.parse().map_err(FetchError::from_josh_error)?;
Expand All @@ -209,12 +208,13 @@ async fn fetch_upstream(
service
.poll
.lock()?
.insert((upstream_repo, auth, remote_url));
.insert((upstream_repo, auth.clone(), remote_url));
}

Ok(())
}
Err(_) => fetch_result,
(Ok(_), _) => Ok(()),
(Err(e), _) => Err(e),
}
}

Expand Down Expand Up @@ -444,7 +444,7 @@ async fn query_meta_repo(
meta_repo: &str,
upstream_protocol: UpstreamProtocol,
upstream_repo: &str,
auth: &josh_proxy::auth::Handle,
remote_auth: &RemoteAuth,
) -> josh::JoshResult<josh_proxy::MetaConfig> {
let upstream = serv
.upstream
Expand All @@ -455,7 +455,7 @@ async fn query_meta_repo(
serv.clone(),
upstream_protocol,
meta_repo.to_owned(),
&auth,
&remote_auth,
remote_url.to_owned(),
&"HEAD",
false,
Expand Down Expand Up @@ -510,26 +510,35 @@ async fn query_meta_repo(

async fn make_meta_config(
serv: Arc<JoshProxyService>,
auth: Option<&josh_proxy::auth::Handle>,
remote_auth: Option<&RemoteAuth>,
upstream_protocol: UpstreamProtocol,
parsed_url: &FilteredRepoUrl,
) -> josh::JoshResult<MetaConfig> {
let meta_repo = std::env::var("JOSH_META_REPO");
let auth_token = std::env::var("JOSH_META_AUTH_TOKEN");

match (auth, meta_repo) {
match (remote_auth, meta_repo) {
(None, _) | (_, Err(_)) => Ok(MetaConfig {
config: RepoConfig {
repo: parsed_url.upstream_repo.clone(),
..Default::default()
},
..Default::default()
}),
(Some(auth), Ok(meta_repo)) => {
let auth = if let Ok(token) = auth_token {
josh_proxy::auth::add_auth(&token)?
} else {
auth.clone()
(Some(remote_auth), Ok(meta_repo)) => {
let auth = match remote_auth {
RemoteAuth::Ssh { auth_socket } => RemoteAuth::Ssh {
auth_socket: auth_socket.clone(),
},
RemoteAuth::Http { auth } => {
let auth = if let Ok(token) = auth_token {
josh_proxy::auth::add_auth(&token)?
} else {
auth.clone()
};

RemoteAuth::Http { auth }
}
};

query_meta_repo(
Expand Down Expand Up @@ -847,9 +856,10 @@ async fn call_service(
}
};

let remote_auth = RemoteAuth::Http { auth: auth.clone() };
let meta = make_meta_config(
serv.clone(),
Some(&auth),
Some(&remote_auth),
UpstreamProtocol::Http,
&parsed_url,
)
Expand Down Expand Up @@ -928,7 +938,7 @@ async fn call_service(
serv.clone(),
UpstreamProtocol::Http,
meta.config.repo.to_owned(),
&auth,
&remote_auth,
remote_url.to_owned(),
&headref,
false,
Expand Down Expand Up @@ -989,7 +999,7 @@ async fn call_service(
let repo_update = josh_proxy::RepoUpdate {
refs: HashMap::new(),
remote_url: remote_url.clone(),
auth,
remote_auth,
port: serv.port.clone(),
filter_spec: josh::filter::spec(filter),
base_ns: josh::to_ns(&meta.config.repo),
Expand Down Expand Up @@ -1236,11 +1246,12 @@ async fn run_polling(serv: Arc<JoshProxyService>) -> josh::JoshResult<()> {
let polls = serv.poll.lock()?.clone();

for (upstream_repo, auth, url) in polls {
let remote_auth = RemoteAuth::Http { auth };
let fetch_result = fetch_upstream(
serv.clone(),
UpstreamProtocol::Http,
upstream_repo.clone(),
&auth,
&remote_auth,
url.clone(),
"",
true,
Expand Down Expand Up @@ -1374,6 +1385,7 @@ async fn serve_graphql(
false,
));

let remote_auth = RemoteAuth::Http { auth };
let res = {
// First attempt to serve GraphQL query. If we can serve it
// that means all requested revisions were specified by SHA and we could find
Expand All @@ -1389,7 +1401,7 @@ async fn serve_graphql(
serv.clone(),
UpstreamProtocol::Http,
upstream_repo.to_owned(),
&auth,
&remote_auth,
remote_url.to_owned(),
&"HEAD",
false,
Expand Down Expand Up @@ -1452,7 +1464,7 @@ async fn serve_graphql(
*oid,
&reference,
&remote_url,
&auth,
&remote_auth,
&temp_ns.name(),
"META_PUSH",
false,
Expand Down
Loading

0 comments on commit e690c37

Please sign in to comment.