Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions pkg/blobs/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,37 @@ type Service struct {
localStorage *LocalStorage
}

// drpcService is a DRPC wrapper around the Service.
type drpcService Service

var _ blobspb.BlobServer = &Service{}
var _ blobspb.DRPCBlobServer = (*drpcService)(nil)

// NewBlobService instantiates a blob service server.
func NewBlobService(externalIODir string) (*Service, error) {
localStorage, err := NewLocalStorage(externalIODir)
return &Service{localStorage: localStorage}, err
}

// GetStream implements the gRPC service.
// AsDRPCServer returns the DRPC server implementation for the Blob service.
func (s *Service) AsDRPCServer() blobspb.DRPCBlobServer {
return (*drpcService)(s)
}

// GetStream implements the DRPC service
func (s *drpcService) GetStream(
req *blobspb.GetRequest, stream blobspb.DRPCBlob_GetStreamStream,
) error {
return (*Service)(s).getStream(req, stream)
}

// GetStream implements the gRPC service
func (s *Service) GetStream(req *blobspb.GetRequest, stream blobspb.Blob_GetStreamServer) error {
return s.getStream(req, stream)
}

// getStream is the shared implementation for GetStream for both gRPC and DRPC.
func (s *Service) getStream(req *blobspb.GetRequest, stream blobspb.RPCBlob_GetStreamStream) error {
content, _, err := s.localStorage.ReadFile(req.Filename, req.Offset)
if err != nil {
return err
Expand All @@ -56,8 +77,18 @@ func (s *Service) GetStream(req *blobspb.GetRequest, stream blobspb.Blob_GetStre
return streamContent(stream.Context(), stream, content)
}

// PutStream implements the gRPC service.
// PutStream implements the DRPC service
func (s *drpcService) PutStream(stream blobspb.DRPCBlob_PutStreamStream) error {
return (*Service)(s).putStream(stream)
}

// PutStream implements the gRPC service
func (s *Service) PutStream(stream blobspb.Blob_PutStreamServer) error {
return s.putStream(stream)
}

// putStream is the shared implementation for PutStream for both gRPC and DRPC.
func (s *Service) putStream(stream blobspb.RPCBlob_PutStreamStream) error {
filename, ok := grpcutil.FastFirstValueFromIncomingContext(stream.Context(), "filename")
if !ok {
return errors.New("could not fetch metadata or no filename in metadata")
Expand Down Expand Up @@ -86,22 +117,43 @@ func (s *Service) PutStream(stream blobspb.Blob_PutStreamServer) error {
return err
}

// List implements the gRPC service.
// List implements the DRPC service
func (s *drpcService) List(
ctx context.Context, req *blobspb.GlobRequest,
) (*blobspb.GlobResponse, error) {
return (*Service)(s).List(ctx, req)
}

// List implements the gRPC service
func (s *Service) List(
ctx context.Context, req *blobspb.GlobRequest,
) (*blobspb.GlobResponse, error) {
matches, err := s.localStorage.List(req.Pattern)
return &blobspb.GlobResponse{Files: matches}, err
}

// Delete implements the gRPC service.
// Delete implements the DRPC service
func (s *drpcService) Delete(
ctx context.Context, req *blobspb.DeleteRequest,
) (*blobspb.DeleteResponse, error) {
return (*Service)(s).Delete(ctx, req)
}

// Delete implements the gRPC service
func (s *Service) Delete(
ctx context.Context, req *blobspb.DeleteRequest,
) (*blobspb.DeleteResponse, error) {
return &blobspb.DeleteResponse{}, s.localStorage.Delete(req.Filename)
}

// Stat implements the gRPC service.
// Stat implements the DRPC service
func (s *drpcService) Stat(
ctx context.Context, req *blobspb.StatRequest,
) (*blobspb.BlobStat, error) {
return (*Service)(s).Stat(ctx, req)
}

// Stat implements the gRPC service
func (s *Service) Stat(ctx context.Context, req *blobspb.StatRequest) (*blobspb.BlobStat, error) {
resp, err := s.localStorage.Stat(req.Filename)
if oserror.IsNotExist(err) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/blobs/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newGetStreamReader(client blobspb.RPCBlob_GetStreamClient) ioctx.ReadCloser

// newPutStreamReader creates an io.ReadCloser that uses gRPC's streaming API
// to read chunks of data.
func newPutStreamReader(client blobspb.Blob_PutStreamServer) ioctx.ReadCloserCtx {
func newPutStreamReader(client blobspb.RPCBlob_PutStreamStream) ioctx.ReadCloserCtx {
return &blobStreamReader{stream: client}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
return nil, errors.Wrap(err, "creating blob service")
}
blobspb.RegisterBlobServer(grpcServer.Server, blobService)
if err := blobspb.DRPCRegisterBlob(drpcServer, blobService.AsDRPCServer()); err != nil {
return nil, err
}

replicationReporter := reports.NewReporter(
db, node.stores, storePool, st, nodeLiveness, internalExecutor, systemConfigWatcher,
Expand Down