Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow quick & shallow copies of Git repos to select which repos to index #1115

Merged
merged 11 commits into from
Nov 21, 2023
4 changes: 1 addition & 3 deletions .github/workflows/server-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
run: git lfs ls-files --long | cut -d ' ' -f1 | sort > .lfs-assets-id

- name: LFS Cache
uses: actions/cache@v3
uses: takost/cache@2001ca4114dbc44888c30c0ec3550fecf724b8d9
with:
path: .git/lfs/objects
key: ${{ runner.os }}-lfs-${{ hashFiles('.lfs-assets-id') }}
Expand All @@ -44,8 +44,6 @@ jobs:
- name: Load other environment changes
run: direnv export gha >> "$GITHUB_ENV"

- uses: Swatinem/rust-cache@v2

- name: Rustfmt
run: cargo --locked fmt -p bleep -- --check

Expand Down
4 changes: 1 addition & 3 deletions .github/workflows/tauri-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
run: git lfs ls-files --long | cut -d ' ' -f1 | sort > .lfs-assets-id

- name: LFS Cache
uses: actions/cache@v3
uses: takost/cache@2001ca4114dbc44888c30c0ec3550fecf724b8d9
with:
path: .git/lfs/objects
key: ${{ runner.os }}-lfs-${{ hashFiles('.lfs-assets-id') }}
Expand All @@ -44,8 +44,6 @@ jobs:
- name: Load other environment changes
run: direnv export gha >> "$GITHUB_ENV"

- uses: Swatinem/rust-cache@v2

- name: Rustfmt
run: cargo --locked fmt -p bloop -- --check

Expand Down
64 changes: 40 additions & 24 deletions server/bleep/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
use std::{future::Future, pin::Pin, sync::Arc, thread};

mod sync;
pub(crate) use sync::SyncHandle;
pub(crate) use sync::{SyncConfig, SyncHandle};

mod control;
pub(crate) use control::SyncPipes;
Expand Down Expand Up @@ -65,7 +65,6 @@ pub struct BackgroundExecutor {
sender: flume::Sender<Task>,
}

pub struct BoundSyncQueue(pub(crate) Application, pub(crate) SyncQueue);
impl BackgroundExecutor {
fn start(config: Arc<Configuration>) -> Self {
let (sender, receiver) = flume::unbounded();
Expand Down Expand Up @@ -194,8 +193,8 @@ impl SyncQueue {
instance
}

pub fn bind(&self, app: Application) -> BoundSyncQueue {
BoundSyncQueue(app, self.clone())
pub fn broadcast(&self) -> tokio::sync::broadcast::Sender<Progress> {
self.progress.clone()
}

pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<Progress> {
Expand Down Expand Up @@ -240,23 +239,35 @@ pub(crate) enum QueueState {
Queued,
}

pub struct BoundSyncQueue(pub(crate) Application);
impl BoundSyncQueue {
/// Enqueue repo for syncing
pub(crate) async fn enqueue(self, config: SyncConfig) {
self.0
.sync_queue
.queue
.push(config.into_handle().await)
.await;
}

/// Enqueue repos for syncing with the current configuration.
///
/// Skips any repositories in the list which are already queued or being synced.
/// Returns the number of new repositories queued for syncing.
pub(crate) async fn enqueue_sync(self, repositories: Vec<RepoRef>) -> usize {
let mut num_queued = 0;
pub(crate) async fn enqueue_all(self, repositories: Vec<RepoRef>) -> usize {
let Self(app) = &self;
let jobs = &app.sync_queue;

let mut num_queued = 0;
for reporef in repositories {
if self.1.queue.contains(&reporef).await || self.1.active.contains(&reporef) {
if jobs.queue.contains(&reporef).await || jobs.active.contains(&reporef) {
continue;
}

info!(%reporef, "queueing for sync");
let handle =
SyncHandle::new(self.0.clone(), reporef, self.1.progress.clone(), None).await;
self.1.queue.push(handle).await;
jobs.queue
.push(SyncConfig::new(app, reporef).into_handle().await)
.await;
num_queued += 1;
}

Expand All @@ -267,15 +278,21 @@ impl BoundSyncQueue {
///
/// Returns the new status.
pub(crate) async fn block_until_synced(self, reporef: RepoRef) -> anyhow::Result<SyncStatus> {
let handle = SyncHandle::new(self.0.clone(), reporef, self.1.progress.clone(), None).await;
let Self(app) = &self;
let jobs = &app.sync_queue;

let handle = SyncConfig::new(app, reporef).into_handle().await;
let finished = handle.notify_done();
self.1.queue.push(handle).await;

jobs.queue.push(handle).await;
Ok(finished.recv_async().await?)
}

pub(crate) async fn remove(self, reporef: RepoRef) -> Option<()> {
let active = self
.1
let Self(app) = &self;
let jobs = &app.sync_queue;

let active = jobs
.active
.update_async(&reporef, |_, v| {
v.pipes.remove();
Expand All @@ -285,23 +302,22 @@ impl BoundSyncQueue {

if active.is_none() {
// Re-queue to the front, so clean any currently queued refs
self.1.queue.remove(reporef.clone()).await;

self.0
.repo_pool
jobs.queue.remove(reporef.clone()).await;
app.repo_pool
.update_async(&reporef, |_k, v| v.mark_removed())
.await?;

let handle =
SyncHandle::new(self.0.clone(), reporef, self.1.progress.clone(), None).await;
self.1.queue.push_front(handle).await;
jobs.queue
.push_front(SyncConfig::new(app, reporef).into_handle().await)
.await;
}

Some(())
}

pub(crate) async fn cancel(&self, reporef: RepoRef) {
self.1
self.0
.sync_queue
.active
.update_async(&reporef, |_, v| {
v.set_status(|_| SyncStatus::Cancelling);
Expand All @@ -311,12 +327,12 @@ impl BoundSyncQueue {
}

pub(crate) async fn startup_scan(self) -> anyhow::Result<()> {
let Self(Application { repo_pool, .. }, _) = &self;
let Self(Application { ref repo_pool, .. }) = self;

let mut repos = vec![];
repo_pool.scan_async(|k, _| repos.push(k.clone())).await;

self.enqueue_sync(repos).await;
self.enqueue_all(repos).await;

Ok(())
}
Expand Down
90 changes: 78 additions & 12 deletions server/bleep/src/background/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use crate::{
cache::FileCache,
indexes,
remotes::RemoteError,
repo::{Backend, FilterUpdate, RepoError, RepoMetadata, RepoRef, Repository, SyncStatus},
repo::{
iterator::FileFilterRule, Backend, FileFilterConfig, FilterUpdate, RepoError, RepoMetadata,
RepoRef, Repository, SyncStatus,
},
Application,
};

use std::{path::PathBuf, sync::Arc};
use std::{borrow::Borrow, num::NonZeroU32, path::PathBuf, sync::Arc};

use super::control::SyncPipes;

Expand All @@ -20,6 +23,8 @@ pub struct SyncHandle {
pub(crate) pipes: SyncPipes,
pub(crate) file_cache: FileCache,
pub(crate) app: Application,
pub(crate) shallow_config: gix::remote::fetch::Shallow,
shallow: bool,
exited: flume::Sender<SyncStatus>,
exit_signal: flume::Receiver<SyncStatus>,
}
Expand Down Expand Up @@ -93,19 +98,71 @@ impl Drop for SyncHandle {
}
}

pub struct SyncConfig {
app: Application,
reporef: RepoRef,
filter_updates: Option<FilterUpdate>,
shallow: bool,
}

impl SyncConfig {
pub fn new(app: impl Borrow<Application>, reporef: RepoRef) -> SyncConfig {
SyncConfig {
app: app.borrow().clone(),
reporef,
filter_updates: None,
shallow: false,
}
}

pub fn filter_updates(mut self, update: Option<FilterUpdate>) -> Self {
self.filter_updates = update;
self
}

pub fn shallow(mut self, shallow: bool) -> Self {
self.shallow = shallow;
self
}

pub async fn into_handle(self) -> Arc<SyncHandle> {
SyncHandle::new(self).await
}
}

impl SyncHandle {
pub(crate) async fn new(
app: Application,
reporef: RepoRef,
status: super::ProgressStream,
filter_updates: Option<FilterUpdate>,
) -> Arc<Self> {
async fn new(config: SyncConfig) -> Arc<Self> {
let SyncConfig {
app,
reporef,
filter_updates,
shallow,
..
} = config;
let status = app.sync_queue.broadcast();

let mut shallow_config = if shallow {
gix::remote::fetch::Shallow::DepthAtRemote(NonZeroU32::new(1).unwrap())
} else {
gix::remote::fetch::Shallow::DepthAtRemote(NonZeroU32::new(1000).unwrap())
};

// Going through an extra hoop here to ensure the outward
// facing interface communicates intent.
//
// How filter updates work specifically should not have to
// trickle down to all callers.
let filter_updates = filter_updates.unwrap_or_default();
let filter_updates = if shallow {
FilterUpdate {
file_filter: Some(FileFilterConfig {
rules: vec![FileFilterRule::ExcludeRegex(".*".into())],
}),
..Default::default()
}
} else {
filter_updates.unwrap_or_default()
};

let (exited, exit_signal) = flume::bounded(1);
let pipes = SyncPipes::new(reporef.clone(), filter_updates.clone(), status);
let current = app
Expand All @@ -126,6 +183,7 @@ impl SyncHandle {
Repository {
disk_path,
remote,
shallow,
sync_status: SyncStatus::Queued,
pub_sync_status: SyncStatus::Queued,
last_index_unix_secs: 0,
Expand All @@ -138,10 +196,18 @@ impl SyncHandle {
}
});

// if we're not upgrading from shallow to full checkout
// this seems to be a speed optimization for git operations
if !shallow && !current.get().shallow {
shallow_config = gix::remote::fetch::Shallow::NoChange;
}

let sh = Self {
app: app.clone(),
reporef: reporef.clone(),
file_cache: FileCache::new(app.sql.clone(), app.semantic.clone()),
shallow_config,
shallow,
pipes,
filter_updates,
exited,
Expand Down Expand Up @@ -196,7 +262,7 @@ impl SyncHandle {
.await
.unwrap();

let tutorial_questions = if repository.last_index_unix_secs == 0 {
let tutorial_questions = if !repository.shallow {
let db = self.app.sql.clone();
let llm_gateway = self.app.user().await.llm_gateway(&self.app).await;
let repo_pool = self.app.repo_pool.clone();
Expand All @@ -215,7 +281,7 @@ impl SyncHandle {
Ok(Either::Right(state)) => {
info!("commit complete; indexing done");
self.app.repo_pool.update(&self.reporef, |_k, repo| {
repo.sync_done_with(&self.filter_updates, state)
repo.sync_done_with(self.shallow, &self.filter_updates, state)
});

if let Some(tutorial_questions) = tutorial_questions {
Expand All @@ -225,7 +291,7 @@ impl SyncHandle {
}

// technically `sync_done_with` does this, but we want to send notifications
self.set_status(|_| SyncStatus::Done)
self.set_status(|repo| repo.sync_status.clone())
}
Err(SyncError::Cancelled) => self.set_status(|_| SyncStatus::Cancelled),
Err(err) => self.set_status(|_| SyncStatus::Error {
Expand Down
2 changes: 0 additions & 2 deletions server/bleep/src/ee.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! Modules for Bloop's Enterprise Edition.
//! Please see `LICENSE` for details.

#[cfg(feature = "ee-pro")]
pub(crate) mod background;
#[cfg(feature = "ee-cloud")]
pub(crate) mod embedder;
#[cfg(feature = "ee-pro")]
Expand Down
27 changes: 0 additions & 27 deletions server/bleep/src/ee/background.rs

This file was deleted.

Loading