Skip to content

Commit

Permalink
Add a handler newtype wrapper so we don't need the annoying self: Arc…
Browse files Browse the repository at this point in the history
…<Self> everywhere
  • Loading branch information
rklaehn committed Nov 29, 2024
1 parent e8783f9 commit 1248206
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Default for GcState {
#[derive(Debug)]
pub struct Blobs<S> {
rt: LocalPoolHandle,
store: S,
pub(crate) store: S,
events: EventSender,
downloader: Downloader,
batches: tokio::sync::Mutex<BlobBatches>,
Expand Down
95 changes: 51 additions & 44 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{
io,
ops::Deref,
sync::{Arc, Mutex},
};

Expand Down Expand Up @@ -81,15 +82,33 @@ impl<D: crate::store::Store> Blobs<D> {
C: ChannelTypes<RpcService>,
{
use Request::*;
let handler = Handler(self);
match msg {
Blobs(msg) => self.handle_blobs_request(msg, chan).await,
Tags(msg) => self.handle_tags_request(msg, chan).await,
Blobs(msg) => handler.handle_blobs_request(msg, chan).await,
Tags(msg) => handler.handle_tags_request(msg, chan).await,
}
}
}

#[derive(Clone)]
struct Handler<S>(Arc<Blobs<S>>);

impl<S> Deref for Handler<S> {
type Target = Blobs<S>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<D: crate::store::Store> Handler<D> {
fn store(&self) -> &D {
&self.0.store
}

/// Handle a tags request
async fn handle_tags_request<C>(
self: Arc<Self>,
pub async fn handle_tags_request<C>(
self,
msg: proto::tags::Request,
chan: RpcChannel<proto::RpcService, C>,
) -> std::result::Result<(), RpcServerError<C>>
Expand All @@ -106,8 +125,8 @@ impl<D: crate::store::Store> Blobs<D> {
}

/// Handle a blobs request
async fn handle_blobs_request<C>(
self: Arc<Self>,
pub async fn handle_blobs_request<C>(
self,
msg: proto::blobs::Request,
chan: RpcChannel<proto::RpcService, C>,
) -> std::result::Result<(), RpcServerError<C>>
Expand Down Expand Up @@ -150,7 +169,7 @@ impl<D: crate::store::Store> Blobs<D> {
}
}

async fn blob_status(self: Arc<Self>, msg: BlobStatusRequest) -> RpcResult<BlobStatusResponse> {
async fn blob_status(self, msg: BlobStatusRequest) -> RpcResult<BlobStatusResponse> {
let blobs = self;
let entry = blobs
.store()
Expand All @@ -171,7 +190,7 @@ impl<D: crate::store::Store> Blobs<D> {
}))
}

async fn blob_list_impl(self: Arc<Self>, co: &Co<RpcResult<BlobInfo>>) -> io::Result<()> {
async fn blob_list_impl(self, co: &Co<RpcResult<BlobInfo>>) -> io::Result<()> {
use bao_tree::io::fsm::Outboard;

let blobs = self;
Expand All @@ -190,7 +209,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

async fn blob_list_incomplete_impl(
self: Arc<Self>,
self,
co: &Co<RpcResult<IncompleteBlobInfo>>,
) -> io::Result<()> {
let blobs = self;
Expand All @@ -216,7 +235,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

fn blob_list(
self: Arc<Self>,
self,
_msg: ListRequest,
) -> impl Stream<Item = RpcResult<BlobInfo>> + Send + 'static {
Gen::new(|co| async move {
Expand All @@ -227,7 +246,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

fn blob_list_incomplete(
self: Arc<Self>,
self,
_msg: ListIncompleteRequest,
) -> impl Stream<Item = RpcResult<IncompleteBlobInfo>> + Send + 'static {
Gen::new(move |co| async move {
Expand All @@ -237,26 +256,23 @@ impl<D: crate::store::Store> Blobs<D> {
})
}

async fn blob_delete_tag(self: Arc<Self>, msg: TagDeleteRequest) -> RpcResult<()> {
async fn blob_delete_tag(self, msg: TagDeleteRequest) -> RpcResult<()> {
self.store()
.set_tag(msg.name, None)
.await
.map_err(|e| RpcError::new(&e))?;
Ok(())
}

async fn blob_delete_blob(self: Arc<Self>, msg: DeleteRequest) -> RpcResult<()> {
async fn blob_delete_blob(self, msg: DeleteRequest) -> RpcResult<()> {
self.store()
.delete(vec![msg.hash])
.await
.map_err(|e| RpcError::new(&e))?;
Ok(())
}

fn blob_list_tags(
self: Arc<Self>,
msg: TagListRequest,
) -> impl Stream<Item = TagInfo> + Send + 'static {
fn blob_list_tags(self, msg: TagListRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
tracing::info!("blob_list_tags");
let blobs = self;
Gen::new(|co| async move {
Expand All @@ -274,7 +290,7 @@ impl<D: crate::store::Store> Blobs<D> {

/// Invoke validate on the database and stream out the result
fn blob_validate(
self: Arc<Self>,
self,
msg: ValidateRequest,
) -> impl Stream<Item = ValidateProgress> + Send + 'static {
let (tx, rx) = async_channel::bounded(1);
Expand All @@ -296,7 +312,7 @@ impl<D: crate::store::Store> Blobs<D> {

/// Invoke validate on the database and stream out the result
fn blob_consistency_check(
self: Arc<Self>,
self,
msg: ConsistencyCheckRequest,
) -> impl Stream<Item = ConsistencyCheckProgress> + Send + 'static {
let (tx, rx) = async_channel::bounded(1);
Expand All @@ -316,10 +332,7 @@ impl<D: crate::store::Store> Blobs<D> {
rx
}

fn blob_add_from_path(
self: Arc<Self>,
msg: AddPathRequest,
) -> impl Stream<Item = AddPathResponse> {
fn blob_add_from_path(self, msg: AddPathRequest) -> impl Stream<Item = AddPathResponse> {
// provide a little buffer so that we don't slow down the sender
let (tx, rx) = async_channel::bounded(32);
let tx2 = tx.clone();
Expand All @@ -332,7 +345,7 @@ impl<D: crate::store::Store> Blobs<D> {
rx.map(AddPathResponse)
}

async fn tags_set(self: Arc<Self>, msg: TagsSetRequest) -> RpcResult<()> {
async fn tags_set(self, msg: TagsSetRequest) -> RpcResult<()> {
let blobs = self;
blobs
.store()
Expand All @@ -354,7 +367,7 @@ impl<D: crate::store::Store> Blobs<D> {
Ok(())
}

async fn tags_create(self: Arc<Self>, msg: TagsCreateRequest) -> RpcResult<Tag> {
async fn tags_create(self, msg: TagsCreateRequest) -> RpcResult<Tag> {
let blobs = self;
let tag = blobs
.store()
Expand All @@ -374,10 +387,7 @@ impl<D: crate::store::Store> Blobs<D> {
Ok(tag)
}

fn blob_download(
self: Arc<Self>,
msg: BlobDownloadRequest,
) -> impl Stream<Item = DownloadResponse> {
fn blob_download(self, msg: BlobDownloadRequest) -> impl Stream<Item = DownloadResponse> {
let (sender, receiver) = async_channel::bounded(1024);
let endpoint = self.endpoint().clone();
let progress = AsyncChannelProgressSender::new(sender);
Expand All @@ -399,7 +409,7 @@ impl<D: crate::store::Store> Blobs<D> {
receiver.map(DownloadResponse)
}

fn blob_export(self: Arc<Self>, msg: ExportRequest) -> impl Stream<Item = ExportResponse> {
fn blob_export(self, msg: ExportRequest) -> impl Stream<Item = ExportResponse> {
let (tx, rx) = async_channel::bounded(1024);
let progress = AsyncChannelProgressSender::new(tx);
let rt = self.rt().clone();
Expand All @@ -425,7 +435,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

async fn blob_add_from_path0(
self: Arc<Self>,
self,
msg: AddPathRequest,
progress: async_channel::Sender<AddProgress>,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -543,18 +553,15 @@ impl<D: crate::store::Store> Blobs<D> {
Ok(())
}

async fn batch_create_temp_tag(
self: Arc<Self>,
msg: BatchCreateTempTagRequest,
) -> RpcResult<()> {
async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> {
let blobs = self;
let tag = blobs.store().temp_tag(msg.content);
blobs.batches().await.store(msg.batch, tag);
Ok(())
}

fn batch_add_stream(
self: Arc<Self>,
self,
msg: BatchAddStreamRequest,
stream: impl Stream<Item = BatchAddStreamUpdate> + Send + Unpin + 'static,
) -> impl Stream<Item = BatchAddStreamResponse> {
Expand All @@ -572,7 +579,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

fn batch_add_from_path(
self: Arc<Self>,
self,
msg: BatchAddPathRequest,
) -> impl Stream<Item = BatchAddPathResponse> {
// provide a little buffer so that we don't slow down the sender
Expand All @@ -590,7 +597,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

async fn batch_add_stream0(
self: Arc<Self>,
self,
msg: BatchAddStreamRequest,
stream: impl Stream<Item = BatchAddStreamUpdate> + Send + Unpin + 'static,
progress: async_channel::Sender<BatchAddStreamResponse>,
Expand Down Expand Up @@ -624,7 +631,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

async fn batch_add_from_path0(
self: Arc<Self>,
self,
msg: BatchAddPathRequest,
progress: async_channel::Sender<BatchAddPathProgress>,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -664,7 +671,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

fn blob_add_stream(
self: Arc<Self>,
self,
msg: AddStreamRequest,
stream: impl Stream<Item = AddStreamUpdate> + Send + Unpin + 'static,
) -> impl Stream<Item = AddStreamResponse> {
Expand All @@ -681,7 +688,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

async fn blob_add_stream0(
self: Arc<Self>,
self,
msg: AddStreamRequest,
stream: impl Stream<Item = AddStreamUpdate> + Send + Unpin + 'static,
progress: async_channel::Sender<AddProgress>,
Expand Down Expand Up @@ -735,7 +742,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

fn blob_read_at(
self: Arc<Self>,
self,
req: ReadAtRequest,
) -> impl Stream<Item = RpcResult<ReadAtResponse>> + Send + 'static {
let (tx, rx) = async_channel::bounded(RPC_BLOB_GET_CHANNEL_CAP);
Expand Down Expand Up @@ -816,7 +823,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

fn batch_create(
self: Arc<Self>,
self,
_: BatchCreateRequest,
mut updates: impl Stream<Item = BatchUpdate> + Send + Unpin + 'static,
) -> impl Stream<Item = BatchCreateResponse> {
Expand All @@ -842,7 +849,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

async fn create_collection(
self: Arc<Self>,
self,
req: CreateCollectionRequest,
) -> RpcResult<CreateCollectionResponse> {
let CreateCollectionRequest {
Expand Down

0 comments on commit 1248206

Please sign in to comment.