Skip to content

Commit

Permalink
Limit concurrent fetches to one per repo
Browse files Browse the repository at this point in the history
Concurrent fetches for the same repo used to cause races and errors
when updating refs.

Change: one-fetch-per-repo
  • Loading branch information
christian-schilling committed Dec 14, 2022
1 parent b93518b commit 8907f56
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
12 changes: 9 additions & 3 deletions josh-proxy/src/bin/josh-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ struct JoshProxyService {
upstream: JoshProxyUpstream,
fetch_timers: Arc<RwLock<FetchTimers>>,
heads_map: HeadsMap,
fetch_permits: Arc<tokio::sync::Semaphore>,
fetch_permits: Arc<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Semaphore>>>>,
filter_permits: Arc<tokio::sync::Semaphore>,
poll: Polls,
}
Expand Down Expand Up @@ -169,7 +169,13 @@ async fn fetch_upstream(
let span = tracing::span!(tracing::Level::TRACE, "fetch worker");
let us = upstream_repo.clone();
let ru = remote_url.clone();
let permit = service.fetch_permits.acquire().await;
let semaphore = service
.fetch_permits
.lock()?
.entry(us.clone())
.or_insert(Arc::new(tokio::sync::Semaphore::new(1)))
.clone();
let permit = semaphore.acquire().await;
let task_remote_auth = remote_auth.clone();
let fetch_result = tokio::task::spawn_blocking(move || {
let _span_guard = span.enter();
Expand Down Expand Up @@ -1291,7 +1297,7 @@ async fn run_proxy() -> josh::JoshResult<i32> {
fetch_timers: Arc::new(RwLock::new(FetchTimers::new())),
heads_map: Arc::new(RwLock::new(std::collections::HashMap::new())),
poll: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
fetch_permits: Arc::new(tokio::sync::Semaphore::new(ARGS.concurrent_n)),
fetch_permits: Default::default(),
filter_permits: Arc::new(tokio::sync::Semaphore::new(10)),
});

Expand Down
11 changes: 3 additions & 8 deletions josh-proxy/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub struct Args {
pub gc: bool,
pub require_auth: bool,
pub no_background: bool,
pub concurrent_n: usize,
pub port: u16,
pub cache_duration: u64,
pub static_resource_proxy_target: Option<String>,
Expand Down Expand Up @@ -68,11 +67,9 @@ fn make_command() -> clap::Command {
.long("no-background")
.action(clap::ArgAction::SetTrue),
)
.arg(
clap::Arg::new("n")
.short('n')
.help("Number of concurrent upstream git fetch/push operations"),
)
.arg(clap::Arg::new("n").short('n').help(
"DEPRECATED - no effect! Number of concurrent upstream git fetch/push operations",
))
.arg(clap::Arg::new("port").long("port"))
.arg(
clap::Arg::new("cache-duration")
Expand Down Expand Up @@ -140,7 +137,6 @@ pub fn parse_args() -> josh::JoshResult<Args> {
.clone();

let poll_user = args.get_one::<String>("poll").map(String::clone);
let concurrent_n = parse_int::<usize>(&args, "n", Some(1))?;
let port = parse_int::<u16>(&args, "port", Some(8000))?;
let cache_duration = parse_int::<u64>(&args, "cache-duration", Some(0))?;
let static_resource_proxy_target = args
Expand All @@ -156,7 +152,6 @@ pub fn parse_args() -> josh::JoshResult<Args> {
gc: args.get_flag("gc"),
require_auth: args.get_flag("require-auth"),
no_background: args.get_flag("no-background"),
concurrent_n,
port,
cache_duration,
static_resource_proxy_target,
Expand Down

0 comments on commit 8907f56

Please sign in to comment.