Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
wushiling50 authored Sep 21, 2024
2 parents 124d48c + 9db2aa0 commit 1aee920
Showing 1 changed file with 53 additions and 17 deletions.
70 changes: 53 additions & 17 deletions pkg/grpc/extension/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *S3Server) GetObject(req *s3.GetObjectInput, stream s3.ObjectStorageServ
buffsPtr := bytesPool.Get().(*[]byte)
buf := *buffsPtr
if len(buf) == 0 {
buf = make([]byte, 102400)
buf = make([]byte, 1*1024*1024)
}
defer func() {
if err := recover(); err != nil {
Expand All @@ -111,20 +111,45 @@ func (s *S3Server) GetObject(req *s3.GetObjectInput, stream s3.ObjectStorageServ
}()

for {
length, err := result.DataStream.Read(buf)
if err != nil && err != io.EOF {
log.DefaultLogger.Warnf("get file fail, err: %+v", err)
return status.Errorf(codes.Internal, "get file fail,err: %+v", err)
}
if err == nil || (err == io.EOF && length != 0) {
resp := &s3.GetObjectOutput{}
err := transferData(result, resp)
if err != nil {
return status.Errorf(codes.InvalidArgument, "transfer request data fail for GetObject,err: %+v", err)
var err error
var length int
totalBytesRead := 0
// Reduce the number of transmissions to reduce the performance impact caused by the introduction of sidecar
for totalBytesRead < len(buf) {
length, err = result.DataStream.Read(buf[totalBytesRead:])
if err != nil && err != io.EOF {
log.DefaultLogger.Warnf("oss GetObject fail, err: %+v", err)
return status.Errorf(codes.Internal, "oss GetObject fail, err: %+v", err)
}
resp.Body = buf[:length]
totalBytesRead += length
if err == io.EOF {
break
}
}

if err == nil || (err == io.EOF && totalBytesRead != 0) {
resp := &s3.GetObjectOutput{
CacheControl: result.CacheControl,
ContentDisposition: result.ContentDisposition,
ContentEncoding: result.ContentEncoding,
ContentLanguage: result.ContentLanguage,
ContentLength: result.ContentLength,
ContentRange: result.ContentRange,
ContentType: result.ContentType,
DeleteMarker: result.DeleteMarker,
Etag: result.ETag,
Expiration: result.Expiration,
Expires: result.Expires,
LastModified: result.LastModified,
VersionId: result.VersionId,
TagCount: result.TagCount,
StorageClass: result.StorageClass,
PartsCount: result.PartsCount,
Metadata: result.Metadata,
}
resp.Body = buf[:totalBytesRead]
if err = stream.Send(resp); err != nil {
return status.Errorf(codes.Internal, "send file data fail,err: %+v", err)
return status.Errorf(codes.Internal, "GetObject send data fail,err: %+v", err)
}
}
if err == io.EOF {
Expand Down Expand Up @@ -185,10 +210,21 @@ func (s *S3Server) PutObject(stream s3.ObjectStorageService_PutObjectServer) err
}
fileReader := newPutObjectStreamReader(req.Body, stream)

st := &l8s3.PutObjectInput{}
err = transferData(req, st)
if err != nil {
return status.Errorf(codes.InvalidArgument, "transfer request data fail for PutObject,err: %+v", err)
st := &l8s3.PutObjectInput{
ACL: req.Acl,
Bucket: req.Bucket,
Key: req.Key,
BucketKeyEnabled: req.BucketKeyEnabled,
CacheControl: req.CacheControl,
ContentDisposition: req.ContentDisposition,
ContentEncoding: req.ContentEncoding,
Expires: req.Expires,
ServerSideEncryption: req.ServerSideEncryption,
SignedUrl: req.SignedUrl,
Meta: req.Meta,
Tagging: req.Tagging,
StorageClass: req.StorageClass,
ContentLength: req.ContentLength,
}
st.DataStream = fileReader
var resp *l8s3.PutObjectOutput
Expand Down

0 comments on commit 1aee920

Please sign in to comment.