diff --git a/gcs.go b/gcs.go new file mode 100644 index 00000000..e9f325b4 --- /dev/null +++ b/gcs.go @@ -0,0 +1,86 @@ +// Copyright © 2018 Mikael Rapp, github.com/zatte +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package afero + +import ( + "log" + "os" + "time" + + "cloud.google.com/go/storage" + "github.com/zatte/afero/gcs" + + "golang.org/x/net/context" +) + +func NewGcsFs(ctx context.Context, cl *storage.Client, bucket string, folderSep string) Fs { + return &GcsFsWrapper{ + gcs.NewGcsFs(ctx, cl, bucket, folderSep), + } +} + +// NewGcsFsFromDefaultCredentials Creates a GCS client assuming that +// $GOOGLE_APPLICATION_CREDENTIALS is set and points to a service account +func NewGcsFsFromDefaultCredentials(ctx context.Context, bucket string, folderSep string) Fs { + client, err := storage.NewClient(ctx) + if err != nil { + log.Fatalf("Failed to create client: %v", err) + } + + return &GcsFsWrapper{ + gcs.NewGcsFs(ctx, client, bucket, folderSep), + } +} + +//Only wrapes gcs.GcsFs and convert some return types to afero interfaces. +type GcsFsWrapper struct { + GcsFs *gcs.GcsFs +} + +func (fs *GcsFsWrapper) Name() string { + return fs.GcsFs.Name() +} +func (fs *GcsFsWrapper) Create(name string) (File, error) { + return fs.GcsFs.Create(name) +} +func (fs *GcsFsWrapper) Mkdir(name string, perm os.FileMode) error { + return fs.GcsFs.Mkdir(name, perm) +} +func (fs *GcsFsWrapper) MkdirAll(path string, perm os.FileMode) error { + return fs.GcsFs.MkdirAll(path, perm) +} +func (fs *GcsFsWrapper) Open(name string) (File, error) { + return fs.GcsFs.Open(name) +} +func (fs *GcsFsWrapper) OpenFile(name string, flag int, perm os.FileMode) (File, error) { + return fs.GcsFs.OpenFile(name, flag, perm) +} +func (fs *GcsFsWrapper) Remove(name string) error { + return fs.GcsFs.Remove(name) +} +func (fs *GcsFsWrapper) RemoveAll(path string) error { + return fs.GcsFs.RemoveAll(path) +} +func (fs *GcsFsWrapper) Rename(oldname, newname string) error { + return fs.GcsFs.Rename(oldname, newname) +} +func (fs *GcsFsWrapper) Stat(name string) (os.FileInfo, error) { + return fs.GcsFs.Stat(name) +} +func (fs *GcsFsWrapper) Chmod(name string, mode os.FileMode) error { + return fs.GcsFs.Chmod(name, mode) +} +func (fs *GcsFsWrapper) Chtimes(name string, atime time.Time, mtime time.Time) error { + return fs.GcsFs.Chtimes(name, atime, mtime) +} diff --git a/gcs/errors.go b/gcs/errors.go new file mode 100644 index 00000000..f6045544 --- /dev/null +++ b/gcs/errors.go @@ -0,0 +1,13 @@ +package gcs + +import ( + "errors" + "os" +) + +var ( + ErrFileClosed = errors.New("File is closed") + ErrOutOfRange = errors.New("Out of range") + ErrTooLarge = errors.New("Too large") + ErrFileNotFound = os.ErrNotExist +) diff --git a/gcs/gcsFile.go b/gcs/gcsFile.go new file mode 100644 index 00000000..9f3a1ed4 --- /dev/null +++ b/gcs/gcsFile.go @@ -0,0 +1,250 @@ +// Copyright © 2018 Mikael Rapp, github.com/zatte +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcs + +import ( + "context" + "fmt" + "io" + "log" + "os" + "sort" + + "cloud.google.com/go/storage" + + "google.golang.org/api/iterator" +) + +// GcsFs is the Afero version adapted for GCS +type GcsFile struct { + openFlags int + fileMode os.FileMode + fhoffset int64 //File handle specific offset + closed bool + ReadDirIt *storage.ObjectIterator + resource *gcsFileResource +} + +func NewGcsFile( + ctx context.Context, + fs *GcsFs, + obj *storage.ObjectHandle, + openFlags int, + fileMode os.FileMode, + name string, +) *GcsFile { + return &GcsFile{ + openFlags: openFlags, + fileMode: fileMode, + fhoffset: 0, + closed: false, + ReadDirIt: nil, + resource: &gcsFileResource{ + ctx: ctx, + fs: fs, + + obj: obj, + name: name, + + currentGcsSize: 0, + + offset: 0, + reader: nil, + writer: nil, + }, + } +} + +func NewGcsFileFromOldFH( + openFlags int, + fileMode os.FileMode, + oldFile *gcsFileResource, +) *GcsFile { + return &GcsFile{ + openFlags: openFlags, + fileMode: fileMode, + fhoffset: 0, + closed: false, + ReadDirIt: nil, + + resource: oldFile, + } +} + +func (o *GcsFile) Close() error { + // Threre shouldn't be a case where both are open at the same time + // but the check is omitted at this time. + o.closed = true + return o.resource.Close() +} + +func (o *GcsFile) Seek(newOffset int64, whence int) (int64, error) { + if o.closed { + return 0, ErrFileClosed + } + + //Since this is an expensive operation; let's make sure we need it + if (whence == 0 && newOffset == o.fhoffset) || (whence == 1 && newOffset == 0) { + return o.fhoffset, nil + } + log.Printf("WARNING; Seek beavhior triggerd, highly inefficent. Offset before seek is at %d\n", o.fhoffset) + + //Fore the reader/writers to be reopened (at correct offset) + o.Sync() + stat, err := o.Stat() + if err != nil { + return 0, nil + } + + switch whence { + case 0: + o.fhoffset = newOffset + case 1: + o.fhoffset += newOffset + case 2: + o.fhoffset = stat.Size() + newOffset + } + return o.fhoffset, nil +} + +func (o *GcsFile) Read(p []byte) (n int, err error) { + return o.ReadAt(p, o.fhoffset) +} + +func (o *GcsFile) ReadAt(p []byte, off int64) (n int, err error) { + if o.closed { + return 0, ErrFileClosed + } + + read, err := o.resource.ReadAt(p, off) + o.fhoffset += int64(read) + return read, err +} + +func (o *GcsFile) Write(p []byte) (n int, err error) { + return o.WriteAt(p, o.fhoffset) +} + +func (o *GcsFile) WriteAt(b []byte, off int64) (n int, err error) { + if o.closed { + return 0, ErrFileClosed + } + + if o.openFlags == os.O_RDONLY { + return 0, fmt.Errorf("File is opend as read only") + } + + written, err := o.resource.WriteAt(b, off) + o.fhoffset += int64(written) + return written, err +} + +func (o *GcsFile) Name() string { + return o.resource.name +} + +func (o *GcsFile) readdir(count int) ([]*fileInfo, error) { + o.Sync() + //normSeparators should maybe not be here; adds + path := o.resource.fs.ensureTrailingSeparator(normSeparators(o.Name(), o.resource.fs.separator)) + if o.ReadDirIt == nil { + //log.Printf("Querying path : %s\n", path) + o.ReadDirIt = o.resource.fs.bucket.Objects( + o.resource.ctx, &storage.Query{o.resource.fs.separator, path, false}) + } + var res []*fileInfo + for { + object, err := o.ReadDirIt.Next() + if err == iterator.Done { + if len(res) > 0 || count <= 0 { + return res, nil + } + return res, io.EOF + } + if err != nil { + return res, err + } + + tmp := fileInfo{object, o.resource.fs} + + // Since we create "virtual folders which are empty objects they can sometimes be returned twice + // when we do a query (As the query will also return GCS version of "virtual folders" buy they only + // have a .Prefix, and not .Name) + if object.Name == "" { + continue + } + + res = append(res, &tmp) + if count > 0 && len(res) >= count { + break + } + } + return res, nil +} + +func (o *GcsFile) Readdir(count int) ([]os.FileInfo, error) { + fi, err := o.readdir(count) + if len(fi) > 0 { + sort.Sort(ByName(fi)) + } + + var res []os.FileInfo + for _, f := range fi { + res = append(res, f) + } + return res, err +} + +func (o *GcsFile) Readdirnames(n int) ([]string, error) { + fi, err := o.Readdir(n) + if err != nil && err != io.EOF { + return nil, err + } + names := make([]string, len(fi)) + + for i, f := range fi { + names[i] = f.Name() + } + return names, err +} + +func (o *GcsFile) Stat() (os.FileInfo, error) { + o.Sync() + objAttrs, err := o.resource.obj.Attrs(o.resource.ctx) + if err != nil { + if err.Error() == "storage: object doesn't exist" { + return nil, os.ErrNotExist //works with os.IsNotExist check + } + return nil, err + } + return &fileInfo{objAttrs, o.resource.fs}, nil +} + +func (o *GcsFile) Sync() error { + return o.resource.maybeCloseIo() +} + +func (o *GcsFile) Truncate(wantedSize int64) error { + if o.closed { + return ErrFileClosed + } + if o.openFlags == os.O_RDONLY { + return fmt.Errorf("File is opend as read only") + } + return o.resource.Truncate(wantedSize) +} + +func (o *GcsFile) WriteString(s string) (ret int, err error) { + return o.Write([]byte(s)) +} diff --git a/gcs/gcsFileInfo.go b/gcs/gcsFileInfo.go new file mode 100644 index 00000000..251b4497 --- /dev/null +++ b/gcs/gcsFileInfo.go @@ -0,0 +1,68 @@ +// Copyright © 2018 Mikael Rapp, github.com/zatte +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcs + +import ( + "os" + "path/filepath" + "strings" + "time" + + "cloud.google.com/go/storage" +) + +type fileInfo struct { + objAtt *storage.ObjectAttrs + fs *GcsFs +} + +func (fi *fileInfo) name() string { + if fi.objAtt.Name != "" { + return fi.objAtt.Name + } + //In case of GCS virtual folders; they will only have a prefix + return fi.objAtt.Prefix +} + +func (fi *fileInfo) Name() string { + return filepath.Base(fi.name()) +} + +func (fi *fileInfo) Size() int64 { + return fi.objAtt.Size +} +func (fi *fileInfo) Mode() os.FileMode { + if fi.IsDir() { + return 0755 + } + return 0664 +} + +func (fi *fileInfo) ModTime() time.Time { + return fi.objAtt.Updated +} + +func (fi *fileInfo) IsDir() bool { + return fi.objAtt.Metadata["virtual_folder"] == "y" || strings.HasSuffix(fi.Name(), fi.fs.separator) +} + +func (fi *fileInfo) Sys() interface{} { + return nil +} + +type ByName []*fileInfo + +func (a ByName) Len() int { return len(a) } +func (a ByName) Swap(i, j int) { a[i].objAtt, a[j].objAtt = a[j].objAtt, a[i].objAtt } +func (a ByName) Less(i, j int) bool { return strings.Compare(a[i].Name(), a[j].Name()) == -1 } diff --git a/gcs/gcsFileResource.go b/gcs/gcsFileResource.go new file mode 100644 index 00000000..5f5a74bc --- /dev/null +++ b/gcs/gcsFileResource.go @@ -0,0 +1,214 @@ +// Copyright © 2018 Mikael Rapp, github.com/zatte +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcs + +import ( + "bytes" + "context" + "fmt" + "io" + + "cloud.google.com/go/storage" +) + +// gcsFileResource represents a singleton version of each GCS object; +// Google cloud storage allows users to open multiple writers(!) to the same +// underlying resource, once the write is closed the written stream is commented. We are doing +// some magic where we reand and write to the same file which requires syncronization +// of the underlying resource. +type gcsFileResource struct { + ctx context.Context + fs *GcsFs + + obj *storage.ObjectHandle + name string + + currentGcsSize int64 + offset int64 + reader io.ReadCloser + writer io.WriteCloser + + closed bool +} + +func (o *gcsFileResource) Close() error { + o.closed = true + delete(o.fs.rawGcsObjects, o.name) + return o.maybeCloseIo() +} + +func (o *gcsFileResource) maybeCloseIo() error { + o.maybeCloseReader() + return o.maybeCloseWriter() +} + +func (o *gcsFileResource) maybeCloseReader() { + if o.reader == nil { + return + } + o.reader.Close() + o.reader = nil +} + +func (o *gcsFileResource) maybeCloseWriter() error { + if o.writer == nil { + return nil + } + + // In cases of partial writes (e.g. to the middle of a file stream), we need to + // append any remaining data from the orignial file before we close the reader (and + // commit the results.) + // For small writes it can be more efficient + // to keep the original reader but that is for another iteration + if o.currentGcsSize > o.offset { + currentFile, err := o.obj.NewRangeReader(o.ctx, o.offset, -1) + if err != nil { + return fmt.Errorf( + "Couldn't simulate a partial write; the closing (and thus"+ + " the whole file write) is NOT commited to GCS. %v", err) + } + if err == nil && currentFile != nil && currentFile.Remain() > 0 { + io.Copy(o.writer, currentFile) + } + } + + o.writer.Close() + o.writer = nil + return nil +} + +func (o *gcsFileResource) ReadAt(p []byte, off int64) (n int, err error) { + if cap(p) == 0 { + return 0, nil + } + + // Assume that if the reader is open; it is at the correct fhoffset + // a good performance assumption that we must ensure holds + if off == o.offset && o.reader != nil { + read, err := o.reader.Read(p) + o.offset += int64(read) + return read, err + } + + //If any writers have written anything; commit it first so we can read it back. + o.maybeCloseIo() + + //Then read at the correct offset. + r, err := o.obj.NewRangeReader(o.ctx, off, -1) + if err != nil { + return 0, err + } + o.reader = r + o.offset = off + + read, err := o.reader.Read(p) + o.offset += int64(read) + return read, err +} + +func (o *gcsFileResource) WriteAt(b []byte, off int64) (n int, err error) { + //If the writer is opened and at the correct offset we're good! + if off == o.offset && o.writer != nil { + written, err := o.writer.Write(b) + o.offset += int64(written) + return written, err + } + + // Ensure readers must be re-opened and that if a writer is active at another + // offset it is first commited before we do a "seek" below + o.maybeCloseIo() + + w := o.obj.NewWriter(o.ctx) + // TRIGGER WARNING: This can seem like a hack but it works thanks + // to GCS strong consistency. We will open and write to the same file; First when the + // writer is closed will the content get commented to GCS. + // The general idea is this: + // Objectv1[:offset] -> Objectv2 + // newData1 -> Objectv2 + // Objectv1[offset+len(newData1):] -> Objectv2 + // Objectv2.Close + // + // It will however require a download and upload of the original file but it + // can't be avoided if we should support seek-write-operations on GCS. + objAttrs, err := o.obj.Attrs(o.ctx) + if err != nil { + if off > 0 { + return 0, err // WriteAt to a non existing file + } + o.currentGcsSize = 0 + } else { + o.currentGcsSize = int64(objAttrs.Size) + } + + if off > o.currentGcsSize { + return 0, ErrOutOfRange + } + + if off > 0 { + r, err := o.obj.NewReader(o.ctx) + if err != nil { + return 0, err + } + if _, err := io.CopyN(w, r, off); err != nil { + return 0, err + } + r.Close() + } + + o.writer = w + o.offset = off + + written, err := o.writer.Write(b) + o.offset += int64(written) + return written, err +} + +func min(x, y int) int { + if x < y { + return x + } + return y +} + +func (o *gcsFileResource) Truncate(wantedSize int64) error { + if wantedSize < 0 { + return ErrOutOfRange + } + + o.maybeCloseIo() + + r, err := o.obj.NewRangeReader(o.ctx, 0, wantedSize) + if err != nil { + return err + } + + w := o.obj.NewWriter(o.ctx) + written, err := io.Copy(w, r) + if err != nil { + return err + } + + MAX_WRITE_SIZE := 10000 + for written < wantedSize { + //Bulk up padding writes + paddingBytes := bytes.Repeat([]byte(" "), min(MAX_WRITE_SIZE, int(wantedSize-written))) + if w, err := w.Write(paddingBytes); err != nil { + return err + } else { + written += int64(w) + } + } + r.Close() + return w.Close() +} diff --git a/gcs/gcsFs.go b/gcs/gcsFs.go new file mode 100644 index 00000000..7e16e3bc --- /dev/null +++ b/gcs/gcsFs.go @@ -0,0 +1,205 @@ +// Copyright © 2018 Mikael Rapp, github.com/zatte +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcs + +import ( + "context" + "os" + "path/filepath" + "strings" + "time" + + "cloud.google.com/go/storage" + "google.golang.org/api/iterator" +) + +// GcsFs is a Fs implementation that uses functions provided by google cloud storage +type GcsFs struct { + ctx context.Context + client *storage.Client + bucket *storage.BucketHandle + separator string + rawGcsObjects map[string]*GcsFile + + autoRemoveEmptyFolders bool //trigger for creating "virtual folders" (not required by GCSs) +} + +func NewGcsFs(ctx context.Context, cl *storage.Client, bucket string, folderSep string) *GcsFs { + return &GcsFs{ + ctx: ctx, + client: cl, + bucket: cl.Bucket(bucket), + separator: folderSep, + rawGcsObjects: make(map[string]*GcsFile), + + autoRemoveEmptyFolders: true, + } +} + +// normSeparators will normalize all "\\" and "/" to the provided separator +func normSeparators(s string, to string) string { + return strings.Replace(strings.Replace(s, "\\", to, -1), "/", to, -1) +} + +func (fs *GcsFs) ensureTrailingSeparator(s string) string { + if len(s) > 0 && !strings.HasSuffix(s, fs.separator) { + return s + fs.separator + } + return s +} + +func (fs *GcsFs) getObj(name string) *storage.ObjectHandle { + return fs.bucket.Object(normSeparators(name, fs.separator)) //normalize paths for ll oses +} + +func (fs *GcsFs) Name() string { return "GcsFs" } + +func (fs *GcsFs) Create(name string) (*GcsFile, error) { + if !fs.autoRemoveEmptyFolders { + baseDir := filepath.Base(name) + if stat, err := fs.Stat(baseDir); err != nil || !stat.IsDir() { + fs.MkdirAll(baseDir, 0) + } + } + + obj := fs.getObj(name) + w := obj.NewWriter(fs.ctx) + if err := w.Close(); err != nil { + return nil, err + } + file := NewGcsFile(fs.ctx, fs, obj, os.O_RDWR, 0, name) + fs.rawGcsObjects[name] = file + return file, nil +} + +func (fs *GcsFs) Mkdir(name string, perm os.FileMode) error { + name = normSeparators(name, fs.separator) + obj := fs.getObj(name) + w := obj.NewWriter(fs.ctx) + if err := w.Close(); err != nil { + return err + } + meta := make(map[string]string) + meta["virtual_folder"] = "y" + _, err := obj.Update(fs.ctx, storage.ObjectAttrsToUpdate{Metadata: meta}) + //fmt.Printf("Created virtual folder: %v\n", name) + + return err +} + +func (fs *GcsFs) MkdirAll(path string, perm os.FileMode) error { + root := "" + folders := strings.Split(normSeparators(path, fs.separator), fs.separator) + for _, f := range folders { + //Don't force a delimiter prefix + if root != "" { + root = root + fs.separator + f + } else { + root = f + } + + if err := fs.Mkdir(root, perm); err != nil { + return err + } + } + return nil +} + +func (fs *GcsFs) Open(name string) (*GcsFile, error) { + return fs.OpenFile(name, os.O_RDONLY, 0) +} + +func (fs *GcsFs) OpenFile(name string, flag int, perm os.FileMode) (*GcsFile, error) { + var file *GcsFile + obj, found := fs.rawGcsObjects[name] + if found { + file = NewGcsFileFromOldFH(flag, perm, obj.resource) + } else { + file = NewGcsFile(fs.ctx, fs, fs.getObj(name), flag, perm, name) + } + + if flag&os.O_TRUNC != 0 { + file.resource.obj.Delete(fs.ctx) + return fs.Create(name) + } + + if flag&os.O_APPEND != 0 { + _, err := file.Seek(0, 2) + if err != nil { + return nil, err + } + } + + if flag&os.O_CREATE != 0 { + file.WriteString("") + } + return file, nil +} + +func (fs *GcsFs) Remove(name string) error { + obj := fs.getObj(name) + if _, err := fs.Stat(name); err != nil { + return err + } + delete(fs.rawGcsObjects, name) + return obj.Delete(fs.ctx) +} + +func (fs *GcsFs) RemoveAll(path string) error { + it := fs.bucket.Objects(fs.ctx, &storage.Query{fs.separator, path, false}) + for { + objAttrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + fs.Remove(objAttrs.Name) + } + return nil +} + +func (fs *GcsFs) Rename(oldname, newname string) error { + src := fs.bucket.Object(oldname) + dst := fs.bucket.Object(newname) + + if _, err := dst.CopierFrom(src).Run(fs.ctx); err != nil { + return err + } + delete(fs.rawGcsObjects, oldname) + return src.Delete(fs.ctx) +} + +func (fs *GcsFs) Stat(name string) (os.FileInfo, error) { + obj := fs.getObj(name) + objAttrs, err := obj.Attrs(fs.ctx) + if err != nil { + if err.Error() == "storage: object doesn't exist" { + return nil, os.ErrNotExist //works with os.IsNotExist check + } + return nil, err + } + return &fileInfo{objAttrs, fs}, nil +} + +func (fs *GcsFs) Chmod(name string, mode os.FileMode) error { + panic("CHMOD not implemented in GCS") + return nil +} + +func (fs *GcsFs) Chtimes(name string, atime time.Time, mtime time.Time) error { + panic("Chtimes not implemented. Create, Delete, Updated times are read only fields in GCS and set implicitly") + return nil +}