Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove MMF Dependency #48

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions azfile/bytes_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package azfile

import (
"errors"
)

type bytesWriter []byte

func NewBytesWriter(b []byte) bytesWriter {
return b
}

func (c bytesWriter) WriteAt(b []byte, off int64) (int, error) {
if off >= int64(len(c)) || off < 0 {
return 0, errors.New("Offset value is out of range")
}

n := copy(c[int(off):], b)
if n < len(b) {
return n, errors.New("not enough space for all bytes")
}

return n, nil
}
45 changes: 14 additions & 31 deletions azfile/highlevel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"

"bytes"
"os"
"sync"

Expand All @@ -21,7 +20,7 @@ const (
fileSegmentSize = 500 * 1024 * 1024
)

// UploadToAzureFileOptions identifies options used by the UploadBufferToAzureFile and UploadFileToAzureFile functions.
// UploadToAzureFileOptions identifies options used by the UploadReaderAtToAzureFile and UploadFileToAzureFile functions.
type UploadToAzureFileOptions struct {
// RangeSize specifies the range size to use in each parallel upload; the default (and maximum size) is FileMaxUploadRangeBytes.
RangeSize int64
Expand All @@ -39,11 +38,10 @@ type UploadToAzureFileOptions struct {
Metadata Metadata
}

// UploadBufferToAzureFile uploads a buffer to an Azure file.
// UploadReaderAtToAzureFile uploads a buffer to an Azure file.
// Note: o.RangeSize must be >= 0 and <= FileMaxUploadRangeBytes, and if not specified, method will use FileMaxUploadRangeBytes by default.
// The total size to be uploaded should be <= FileMaxSizeInBytes.
func UploadBufferToAzureFile(ctx context.Context, b []byte,
fileURL FileURL, o UploadToAzureFileOptions) error {
func UploadReaderAtToAzureFile(ctx context.Context, reader io.ReaderAt, readerSize int64, fileURL FileURL, o UploadToAzureFileOptions) error {

// 1. Validate parameters, and set defaults.
if o.RangeSize < 0 || o.RangeSize > FileMaxUploadRangeBytes {
Expand All @@ -53,20 +51,18 @@ func UploadBufferToAzureFile(ctx context.Context, b []byte,
o.RangeSize = FileMaxUploadRangeBytes
}

size := int64(len(b))

parallelism := o.Parallelism
if parallelism == 0 {
parallelism = defaultParallelCount // default parallelism
}

// 2. Try to create the Azure file.
_, err := fileURL.Create(ctx, size, o.FileHTTPHeaders, o.Metadata)
_, err := fileURL.Create(ctx, readerSize, o.FileHTTPHeaders, o.Metadata)
if err != nil {
return err
}
// If size equals to 0, upload nothing and directly return.
if size == 0 {
if readerSize == 0 {
return nil
}

Expand All @@ -75,12 +71,12 @@ func UploadBufferToAzureFile(ctx context.Context, b []byte,
progressLock := &sync.Mutex{}

return doBatchTransfer(ctx, batchTransferOptions{
transferSize: size,
transferSize: readerSize,
chunkSize: o.RangeSize,
parallelism: parallelism,
operation: func(offset int64, curRangeSize int64) error {
// Prepare to read the proper section of the buffer.
var body io.ReadSeeker = bytes.NewReader(b[offset : offset+curRangeSize])
var body io.ReadSeeker = io.NewSectionReader(reader, offset, curRangeSize)
if o.Progress != nil {
rangeProgress := int64(0)
body = pipeline.NewRequestBodyProgress(body,
Expand All @@ -97,7 +93,7 @@ func UploadBufferToAzureFile(ctx context.Context, b []byte,
_, err := fileURL.UploadRange(ctx, int64(offset), body, nil)
return err
},
operationName: "UploadBufferToAzureFile",
operationName: "UploadReaderAtToAzureFile",
})
}

Expand All @@ -109,15 +105,7 @@ func UploadFileToAzureFile(ctx context.Context, file *os.File,
if err != nil {
return err
}
m := mmf{} // Default to an empty slice; used for 0-size file
if stat.Size() != 0 {
m, err = newMMF(file, false, 0, int(stat.Size()))
if err != nil {
return err
}
defer m.unmap()
}
return UploadBufferToAzureFile(ctx, m, fileURL, o)
return UploadReaderAtToAzureFile(ctx, file, stat.Size(), fileURL, o)
}

// DownloadFromAzureFileOptions identifies options used by the DownloadAzureFileToBuffer and DownloadAzureFileToFile functions.
Expand Down Expand Up @@ -247,17 +235,12 @@ func DownloadAzureFileToFile(ctx context.Context, fileURL FileURL, file *os.File
}
}

// 4. Set mmap and call DownloadAzureFileToBuffer, in this case file size should be > 0.
m := mmf{} // Default to an empty slice; used for 0-size file
if azfileSize > 0 {
m, err = newMMF(file, true, 0, int(azfileSize))
if err != nil {
return nil, err
}
defer m.unmap()
b := make([]byte, azfileSize)
_, err = file.Read(b)
if err != nil {
return nil, err
}

return downloadAzureFileToBuffer(ctx, fileURL, azfileProperties, m, o)
return downloadAzureFileToBuffer(ctx, fileURL, azfileProperties, b, o)
}

// BatchTransferOptions identifies options used by doBatchTransfer.
Expand Down
47 changes: 47 additions & 0 deletions azfile/section_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package azfile

import (
"errors"
"io"
)

type SectionWriter struct {
Count int64
Offset int64
Position int64
WriterAt io.WriterAt
}

func NewSectionWriter(c io.WriterAt, off int64, count int64) *SectionWriter {
return &SectionWriter{
Count: count,
Offset: off,
WriterAt: c,
}
}

func (c *SectionWriter) Write(p []byte) (int, error) {
remaining := c.Count - c.Position

if remaining <= 0 {
return 0, errors.New("end of section reached")
}

slice := p

if int64(len(slice)) > remaining {
slice = slice[:remaining]
}

n, err := c.WriterAt.WriteAt(slice, c.Offset+c.Position)
c.Position += int64(n)
if err != nil {
return n, err
}

if len(p) > n {
return n, errors.New("not enough space for all bytes")
}

return n, nil
}
18 changes: 9 additions & 9 deletions azfile/url_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ func (f FileURL) AbortCopy(ctx context.Context, copyID string) (*FileAbortCopyRe
return f.fileClient.AbortCopy(ctx, copyID, nil, nil)
}

// Download downloads count bytes of data from the start offset.
// Download downloads Count bytes of data from the start Offset.
// The response includes all of the file’s properties. However, passing true for rangeGetContentMD5 returns the range’s MD5 in the ContentMD5
// response header/property if the range is <= 4MB; the HTTP request fails with 400 (Bad Request) if the requested range is greater than 4MB.
// Note: offset must be >=0, count must be >= 0.
// If count is CountToEnd (0), then data is read from specified offset to the end.
// Note: Offset must be >=0, Count must be >= 0.
// If Count is CountToEnd (0), then data is read from specified Offset to the end.
// rangeGetContentMD5 only works with partial data downloading.
// For more information, see https://docs.microsoft.com/rest/api/storageservices/get-file.
func (f FileURL) Download(ctx context.Context, offset int64, count int64, rangeGetContentMD5 bool) (*RetryableDownloadResponse, error) {
Expand Down Expand Up @@ -197,7 +197,7 @@ func (f FileURL) Resize(ctx context.Context, length int64) (*FileSetHTTPHeadersR
}

// UploadRange writes bytes to a file.
// offset indicates the offset at which to begin writing, in bytes.
// Offset indicates the Offset at which to begin writing, in bytes.
// For more information, see https://docs.microsoft.com/en-us/rest/api/storageservices/put-range.
func (f FileURL) UploadRange(ctx context.Context, offset int64, body io.ReadSeeker, transactionalMD5 []byte) (*FileUploadRangeResponse, error) {
if body == nil {
Expand All @@ -214,7 +214,7 @@ func (f FileURL) UploadRange(ctx context.Context, offset int64, body io.ReadSeek
}

// Update range with bytes from a specific URL.
// offset indicates the offset at which to begin writing, in bytes.
// Offset indicates the Offset at which to begin writing, in bytes.
func (f FileURL) UploadRangeFromURL(ctx context.Context, sourceURL url.URL, sourceOffset int64, destOffset int64,
count int64) (*FileUploadRangeFromURLResponse, error) {

Expand All @@ -223,21 +223,21 @@ func (f FileURL) UploadRangeFromURL(ctx context.Context, sourceURL url.URL, sour
}

// ClearRange clears the specified range and releases the space used in storage for that range.
// offset means the start offset of the range to clear.
// count means count of bytes to clean, it cannot be CountToEnd (0), and must be explicitly specified.
// Offset means the start Offset of the range to clear.
// Count means Count of bytes to clean, it cannot be CountToEnd (0), and must be explicitly specified.
// If the range specified is not 512-byte aligned, the operation will write zeros to
// the start or end of the range that is not 512-byte aligned and free the rest of the range inside that is 512-byte aligned.
// For more information, see https://docs.microsoft.com/en-us/rest/api/storageservices/put-range.
func (f FileURL) ClearRange(ctx context.Context, offset int64, count int64) (*FileUploadRangeResponse, error) {
if count <= 0 {
return nil, errors.New("invalid argument, count cannot be CountToEnd, and must be > 0")
return nil, errors.New("invalid argument, Count cannot be CountToEnd, and must be > 0")
}

return f.fileClient.UploadRange(ctx, *toRange(offset, count), FileRangeWriteClear, 0, nil, nil, nil, nil)
}

// GetRangeList returns the list of valid ranges for a file.
// Use a count with value CountToEnd (0) to indicate the left part of file start from offset.
// Use a Count with value CountToEnd (0) to indicate the left part of file start from Offset.
// For more information, see https://docs.microsoft.com/en-us/rest/api/storageservices/list-ranges.
func (f FileURL) GetRangeList(ctx context.Context, offset int64, count int64) (*ShareFileRangeList, error) {
return f.fileClient.GetRangeList(ctx,
Expand Down
28 changes: 0 additions & 28 deletions azfile/zc_mmf_unix.go

This file was deleted.

40 changes: 0 additions & 40 deletions azfile/zc_mmf_windows.go

This file was deleted.

8 changes: 4 additions & 4 deletions azfile/zc_policy_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewRetryPolicyFactory(o RetryOptions) pipeline.Factory {
// Even tries go against primary; odd tries go against the secondary
// For a primary wait ((2 ^ primaryTries - 1) * delay * random(0.8, 1.2)
// If secondary gets a 404, don't fail, retry but future retries are only against the primary
// When retrying against a secondary, ignore the retry count and wait (.1 second * random(0.8, 1.2))
// When retrying against a secondary, ignore the retry Count and wait (.1 second * random(0.8, 1.2))
for try := int32(1); try <= o.MaxTries; try++ {
logf("\n=====> Try=%d\n", try)

Expand All @@ -160,7 +160,7 @@ func NewRetryPolicyFactory(o RetryOptions) pipeline.Factory {
requestCopy := request.Copy()

// For each try, seek to the beginning of the Body stream. We do this even for the 1st try because
// the stream may not be at offset 0 when we first get it and we want the same behavior for the
// the stream may not be at Offset 0 when we first get it and we want the same behavior for the
// 1st try as for additional tries.
if err = requestCopy.RewindBody(); err != nil {
sanityCheckFailed(err.Error())
Expand Down Expand Up @@ -367,10 +367,10 @@ func (r *deadlineExceededReadCloser) Read(p []byte) (int, error) {
}
return n, improveDeadlineExceeded(err)
}
func (r *deadlineExceededReadCloser) Seek(offset int64, whence int) (int64, error) {
func (r *deadlineExceededReadCloser) Seek(Offset int64, whence int) (int64, error) {
// For an HTTP request, the ReadCloser MUST also implement seek
// For an HTTP response, Seek MUST not be called (or this will panic)
o, err := r.r.(io.Seeker).Seek(offset, whence)
o, err := r.r.(io.Seeker).Seek(Offset, whence)
return o, improveDeadlineExceeded(err)
}
func (r *deadlineExceededReadCloser) Close() error {
Expand Down
14 changes: 7 additions & 7 deletions azfile/zc_retry_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ type HTTPGetter func(ctx context.Context, i HTTPGetterInfo) (*http.Response, err
// HTTPGetterInfo is passed to an HTTPGetter function passing it parameters
// that should be used to make an HTTP GET request.
type HTTPGetterInfo struct {
// Offset specifies the start offset that should be used when
// Offset specifies the start Offset that should be used when
// creating the HTTP GET request's Range header
Offset int64

// Count specifies the count of bytes that should be used to calculate
// the end offset when creating the HTTP GET request's Range header
// Count specifies the Count of bytes that should be used to calculate
// the end Offset when creating the HTTP GET request's Range header
Count int64

// ETag specifies the resource's etag that should be used when creating
Expand Down Expand Up @@ -105,7 +105,7 @@ func (s *retryReader) Read(p []byte) (n int, err error) {
for try := 0; ; try++ {
//fmt.Println(try) // Comment out for debugging.
if s.countWasBounded && s.info.Count == CountToEnd {
// User specified an original count and the remaining bytes are 0, return 0, EOF
// User specified an original Count and the remaining bytes are 0, return 0, EOF
return 0, io.EOF
}

Expand Down Expand Up @@ -134,16 +134,16 @@ func (s *retryReader) Read(p []byte) (n int, err error) {

// We successfully read data or end EOF.
if err == nil || err == io.EOF {
s.info.Offset += int64(n) // Increments the start offset in case we need to make a new HTTP request in the future
s.info.Offset += int64(n) // Increments the start Offset in case we need to make a new HTTP request in the future
if s.info.Count != CountToEnd {
s.info.Count -= int64(n) // Decrement the count in case we need to make a new HTTP request in the future
s.info.Count -= int64(n) // Decrement the Count in case we need to make a new HTTP request in the future
}
return n, err // Return the return to the caller
}
s.Close() // Error, close stream
s.setResponse(nil) // Our stream is no longer good

// Check the retry count and error code, and decide whether to retry.
// Check the retry Count and error code, and decide whether to retry.
retriesExhausted := try >= s.o.MaxRetryRequests
_, isNetError := err.(net.Error)
isUnexpectedEOF := err == io.ErrUnexpectedEOF
Expand Down
Loading