Skip to content

Commit

Permalink
feat: switch to aws sdk v2
Browse files Browse the repository at this point in the history
BREAKING CHANGE
  • Loading branch information
jszwec committed Dec 11, 2023
1 parent dda3af7 commit b83a316
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 153 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ name: Go

on:
push:
branches: [ main ]
branches: [ main, v1, v2 ]
pull_request:
branches: [ main ]
branches: [ main, v1, v2 ]

jobs:
localstack:
runs-on: ubuntu-latest
services:
minio:
image: localstack/localstack
image: localstack/localstack:0.14.0
ports:
- "4566:4566"
- "4571:4571"
Expand All @@ -24,13 +24,13 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.19
go-version: 1.21

- name: Wait for localstack
run: 'for i in {1..20}; do sleep 3 && curl --silent --fail http://localhost:4566/health | grep "\"s3\": \"available\"" > /dev/null && break; done'

- name: Test
run: go test -v -endpoint='localhost:4566' -cover
run: go test -v -endpoint='http://localhost:4566' -cover

minio:
runs-on: ubuntu-latest
Expand All @@ -41,11 +41,11 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.19
go-version: 1.21

- name: Test
env:
SERVER_ENDPOINT: localhost:9000
SERVER_ENDPOINT: http://localhost:9000
ACCESS_KEY: minioadmin
SECRET_KEY: minioadmin
MINIO_ACCESS_KEY: minioadmin
Expand Down
25 changes: 13 additions & 12 deletions dir.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package s3fs

import (
"context"
"errors"
"io"
"io/fs"
Expand All @@ -9,16 +10,14 @@ import (
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

var _ fs.ReadDirFile = (*dir)(nil)

type dir struct {
fileInfo
s3cl s3iface.S3API
s3cl Client
bucket string
marker *string
done bool
Expand Down Expand Up @@ -105,12 +104,14 @@ func (d *dir) readNext() error {
name += "/"
}

out, err := d.s3cl.ListObjects(&s3.ListObjectsInput{
Bucket: &d.bucket,
Delimiter: aws.String("/"),
Prefix: &name,
Marker: d.marker,
})
out, err := d.s3cl.ListObjects(
context.Background(),
&s3.ListObjectsInput{
Bucket: &d.bucket,
Delimiter: ptr("/"),
Prefix: &name,
Marker: d.marker,
})
if err != nil {
return err
}
Expand All @@ -131,7 +132,7 @@ func (d *dir) readNext() error {
}

for _, p := range out.CommonPrefixes {
if p == nil || p.Prefix == nil {
if p.Prefix == nil {
continue
}

Expand All @@ -148,7 +149,7 @@ func (d *dir) readNext() error {
}

for _, o := range out.Contents {
if o == nil || o.Key == nil {
if o.Key == nil {
continue
}

Expand Down
36 changes: 20 additions & 16 deletions file.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package s3fs

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -9,11 +10,8 @@ import (
"path"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

var (
Expand All @@ -23,7 +21,7 @@ var (
)

type file struct {
cl s3iface.S3API
cl Client
bucket string
name string

Expand All @@ -33,8 +31,8 @@ type file struct {
eTag string
}

func openFile(cl s3iface.S3API, bucket string, name string) (fs.File, error) {
out, err := cl.GetObject(&s3.GetObjectInput{
func openFile(cl Client, bucket string, name string) (fs.File, error) {
out, err := cl.GetObject(context.Background(), &s3.GetObjectInput{
Key: &name,
Bucket: &bucket,
})
Expand All @@ -56,7 +54,7 @@ func openFile(cl s3iface.S3API, bucket string, name string) (fs.File, error) {
}, nil
}

func getStatFunc(cl s3iface.S3API, bucket string, name string, s3ObjOutput s3.GetObjectOutput) func() (fs.FileInfo, error) {
func getStatFunc(cl Client, bucket string, name string, s3ObjOutput s3.GetObjectOutput) func() (fs.FileInfo, error) {
statFunc := func() (fs.FileInfo, error) {
return stat(cl, bucket, name)
}
Expand Down Expand Up @@ -127,17 +125,19 @@ func (f *file) Seek(offset int64, whence int) (int64, error) {
}

rawObject, err := f.cl.GetObject(
context.Background(),
&s3.GetObjectInput{
Bucket: aws.String(f.bucket),
Key: aws.String(f.name),
Range: aws.String(fmt.Sprintf("bytes=%d-", newOffset)),
IfMatch: aws.String(f.eTag),
Bucket: &f.bucket,
Key: &f.name,
Range: ptr(fmt.Sprintf("bytes=%d-", newOffset)),
IfMatch: &f.eTag,
})

if err != nil {
var requestFailureError awserr.RequestFailure
if errors.As(err, &requestFailureError) && requestFailureError.StatusCode() == http.StatusPreconditionFailed {
return 0, fmt.Errorf("s3fs.file.Seek: file has changed while seeking: %w", fs.ErrNotExist)
if e := new(awshttp.ResponseError); errors.As(err, &e) {
if e.HTTPStatusCode() == http.StatusPreconditionFailed {
return 0, fmt.Errorf("s3fs.file.Seek: file has changed while seeking: %w", fs.ErrNotExist)
}
}
return 0, err
}
Expand Down Expand Up @@ -167,3 +167,7 @@ func (fi fileInfo) Sys() interface{} { return nil }
type eofReader struct{}

func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }

func ptr[T any](v T) *T {
return &v
}
67 changes: 41 additions & 26 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
package s3fs

import (
"context"
"errors"
"io/fs"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)

var (
Expand All @@ -30,19 +30,28 @@ type Option func(*S3FS)
// has to be handled by the caller.
func WithReadSeeker(fsys *S3FS) { fsys.readSeeker = true }

// Client wraps the s3 client methods that this package is using.
// This interface may change in the future and should not be relied on by
// packages using it.
type Client interface {
HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error)
ListObjects(ctx context.Context, params *s3.ListObjectsInput, optFns ...func(*s3.Options)) (*s3.ListObjectsOutput, error)
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
}

// S3FS is a S3 filesystem implementation.
//
// S3 has a flat structure instead of a hierarchy. S3FS simulates directories
// by using prefixes and delims ("/"). Because directories are simulated, ModTime
// is always a default Time value (IsZero returns true).
type S3FS struct {
cl s3iface.S3API
cl Client
bucket string
readSeeker bool
}

// New returns a new filesystem that works on the specified bucket.
func New(cl s3iface.S3API, bucket string, opts ...Option) *S3FS {
func New(cl Client, bucket string, opts ...Option) *S3FS {
fsys := &S3FS{
cl: cl,
bucket: bucket,
Expand Down Expand Up @@ -127,7 +136,7 @@ func (f *S3FS) ReadDir(name string) ([]fs.DirEntry, error) {
return d.ReadDir(-1)
}

func stat(s3cl s3iface.S3API, bucket, name string) (fs.FileInfo, error) {
func stat(s3cl Client, bucket, name string) (fs.FileInfo, error) {
if !fs.ValidPath(name) {
return nil, fs.ErrInvalid
}
Expand All @@ -143,10 +152,12 @@ func stat(s3cl s3iface.S3API, bucket, name string) (fs.FileInfo, error) {
}, nil
}

head, err := s3cl.HeadObject(&s3.HeadObjectInput{
Bucket: &bucket,
Key: aws.String(name),
})
head, err := s3cl.HeadObject(
context.Background(),
&s3.HeadObjectInput{
Bucket: &bucket,
Key: &name,
})
if err != nil {
if !isNotFoundErr(err) {
return nil, err
Expand All @@ -160,12 +171,14 @@ func stat(s3cl s3iface.S3API, bucket, name string) (fs.FileInfo, error) {
}, nil
}

out, err := s3cl.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: &bucket,
Delimiter: aws.String("/"),
Prefix: aws.String(name + "/"),
MaxKeys: aws.Int64(1),
})
out, err := s3cl.ListObjects(
context.Background(),
&s3.ListObjectsInput{
Bucket: &bucket,
Delimiter: ptr("/"),
Prefix: ptr(name + "/"),
MaxKeys: ptr[int32](1),
})
if err != nil {
return nil, err
}
Expand All @@ -182,7 +195,7 @@ func stat(s3cl s3iface.S3API, bucket, name string) (fs.FileInfo, error) {
return nil, fs.ErrNotExist
}

func openDir(s3cl s3iface.S3API, bucket, name string) (fs.ReadDirFile, error) {
func openDir(s3cl Client, bucket, name string) (fs.ReadDirFile, error) {
fi, err := stat(s3cl, bucket, name)
if err != nil {
return nil, err
Expand All @@ -194,16 +207,18 @@ func openDir(s3cl s3iface.S3API, bucket, name string) (fs.ReadDirFile, error) {
return nil, errNotDir
}

var notFoundCodes = map[string]struct{}{
s3.ErrCodeNoSuchKey: {},
"NotFound": {}, // localstack
}

func isNotFoundErr(err error) bool {
if aerr, ok := err.(awserr.Error); ok {
_, ok := notFoundCodes[aerr.Code()]
return ok
if e := new(types.NoSuchKey); errors.As(err, &e) {
return true
}

if e := new(http.ResponseError); errors.As(err, &e) {
// localstack workaround
if e.HTTPStatusCode() == 404 {
return true
}
}

return false
}

Expand Down
Loading

0 comments on commit b83a316

Please sign in to comment.