From 22ff6354eb7441d370bd2e8c9db81619e65c4a30 Mon Sep 17 00:00:00 2001 From: Shubham Dhama Date: Fri, 30 May 2025 12:59:43 +0530 Subject: [PATCH] blobs: register `Blob` service with DRPC server Enable the `Blob` service on the DRPC server in addition to gRPC. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Epic: CRDB-48925 Release note: None --- pkg/blobs/service.go | 62 ++++++++++++++++++++++++++++++++++++++++---- pkg/blobs/stream.go | 2 +- pkg/server/server.go | 3 +++ 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/pkg/blobs/service.go b/pkg/blobs/service.go index a5652aacf7dd..a1a39ec5493d 100644 --- a/pkg/blobs/service.go +++ b/pkg/blobs/service.go @@ -38,7 +38,11 @@ type Service struct { localStorage *LocalStorage } +// drpcService is a DRPC wrapper around the Service. +type drpcService Service + var _ blobspb.BlobServer = &Service{} +var _ blobspb.DRPCBlobServer = (*drpcService)(nil) // NewBlobService instantiates a blob service server. func NewBlobService(externalIODir string) (*Service, error) { @@ -46,8 +50,25 @@ func NewBlobService(externalIODir string) (*Service, error) { return &Service{localStorage: localStorage}, err } -// GetStream implements the gRPC service. +// AsDRPCServer returns the DRPC server implementation for the Blob service. +func (s *Service) AsDRPCServer() blobspb.DRPCBlobServer { + return (*drpcService)(s) +} + +// GetStream implements the DRPC service +func (s *drpcService) GetStream( + req *blobspb.GetRequest, stream blobspb.DRPCBlob_GetStreamStream, +) error { + return (*Service)(s).getStream(req, stream) +} + +// GetStream implements the gRPC service func (s *Service) GetStream(req *blobspb.GetRequest, stream blobspb.Blob_GetStreamServer) error { + return s.getStream(req, stream) +} + +// getStream is the shared implementation for GetStream for both gRPC and DRPC. +func (s *Service) getStream(req *blobspb.GetRequest, stream blobspb.RPCBlob_GetStreamStream) error { content, _, err := s.localStorage.ReadFile(req.Filename, req.Offset) if err != nil { return err @@ -56,8 +77,18 @@ func (s *Service) GetStream(req *blobspb.GetRequest, stream blobspb.Blob_GetStre return streamContent(stream.Context(), stream, content) } -// PutStream implements the gRPC service. +// PutStream implements the DRPC service +func (s *drpcService) PutStream(stream blobspb.DRPCBlob_PutStreamStream) error { + return (*Service)(s).putStream(stream) +} + +// PutStream implements the gRPC service func (s *Service) PutStream(stream blobspb.Blob_PutStreamServer) error { + return s.putStream(stream) +} + +// putStream is the shared implementation for PutStream for both gRPC and DRPC. +func (s *Service) putStream(stream blobspb.RPCBlob_PutStreamStream) error { filename, ok := grpcutil.FastFirstValueFromIncomingContext(stream.Context(), "filename") if !ok { return errors.New("could not fetch metadata or no filename in metadata") @@ -86,7 +117,14 @@ func (s *Service) PutStream(stream blobspb.Blob_PutStreamServer) error { return err } -// List implements the gRPC service. +// List implements the DRPC service +func (s *drpcService) List( + ctx context.Context, req *blobspb.GlobRequest, +) (*blobspb.GlobResponse, error) { + return (*Service)(s).List(ctx, req) +} + +// List implements the gRPC service func (s *Service) List( ctx context.Context, req *blobspb.GlobRequest, ) (*blobspb.GlobResponse, error) { @@ -94,14 +132,28 @@ func (s *Service) List( return &blobspb.GlobResponse{Files: matches}, err } -// Delete implements the gRPC service. +// Delete implements the DRPC service +func (s *drpcService) Delete( + ctx context.Context, req *blobspb.DeleteRequest, +) (*blobspb.DeleteResponse, error) { + return (*Service)(s).Delete(ctx, req) +} + +// Delete implements the gRPC service func (s *Service) Delete( ctx context.Context, req *blobspb.DeleteRequest, ) (*blobspb.DeleteResponse, error) { return &blobspb.DeleteResponse{}, s.localStorage.Delete(req.Filename) } -// Stat implements the gRPC service. +// Stat implements the DRPC service +func (s *drpcService) Stat( + ctx context.Context, req *blobspb.StatRequest, +) (*blobspb.BlobStat, error) { + return (*Service)(s).Stat(ctx, req) +} + +// Stat implements the gRPC service func (s *Service) Stat(ctx context.Context, req *blobspb.StatRequest) (*blobspb.BlobStat, error) { resp, err := s.localStorage.Stat(req.Filename) if oserror.IsNotExist(err) { diff --git a/pkg/blobs/stream.go b/pkg/blobs/stream.go index 59ce5d601f80..f6a2cc8f8e73 100644 --- a/pkg/blobs/stream.go +++ b/pkg/blobs/stream.go @@ -60,7 +60,7 @@ func newGetStreamReader(client blobspb.RPCBlob_GetStreamClient) ioctx.ReadCloser // newPutStreamReader creates an io.ReadCloser that uses gRPC's streaming API // to read chunks of data. -func newPutStreamReader(client blobspb.Blob_PutStreamServer) ioctx.ReadCloserCtx { +func newPutStreamReader(client blobspb.RPCBlob_PutStreamStream) ioctx.ReadCloserCtx { return &blobStreamReader{stream: client} } diff --git a/pkg/server/server.go b/pkg/server/server.go index 8f4701172d74..1ee94504ad7d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1017,6 +1017,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf return nil, errors.Wrap(err, "creating blob service") } blobspb.RegisterBlobServer(grpcServer.Server, blobService) + if err := blobspb.DRPCRegisterBlob(drpcServer, blobService.AsDRPCServer()); err != nil { + return nil, err + } replicationReporter := reports.NewReporter( db, node.stores, storePool, st, nodeLiveness, internalExecutor, systemConfigWatcher,