Skip to content

Commit

Permalink
Merge pull request #162 from lawrencegripper/jc/sub-dirs
Browse files Browse the repository at this point in the history
Support sub directories and allow bigger blob uploads
  • Loading branch information
jjcollinge authored Sep 3, 2018
2 parents 66f3129 + 96b7190 commit 9072a4a
Show file tree
Hide file tree
Showing 18 changed files with 283 additions and 118 deletions.
14 changes: 13 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@
[[constraint]]
branch = "master"
name = "github.com/jjcollinge/logrus-appinsights"

[[constraint]]
name = "github.com/azure/azure-storage-blob-go"
version = "0.1.4"
25 changes: 14 additions & 11 deletions internal/app/handler/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,32 @@ func (c *Committer) doCommit() error {
}

//CommitBlob commits the blob directory to an external blob provider
func (c *Committer) commitBlob(blobsPath string) (map[string]string, error) {
if _, err := os.Stat(blobsPath); os.IsNotExist(err) {
logger.Debug(c.context, fmt.Sprintf("blob output directory '%s' does not exists '%+v'", blobsPath, err))
func (c *Committer) commitBlob(blobsDir string) (map[string]string, error) {
if _, err := os.Stat(blobsDir); os.IsNotExist(err) {
logger.Debug(c.context, fmt.Sprintf("blob output directory '%s' does not exists '%+v'", blobsDir, err))
return nil, nil
}

// TODO: Search recursively to support sub folders.
files, err := ioutil.ReadDir(blobsPath)
files := make([]string, 0)
err := filepath.Walk(blobsDir, func(path string, f os.FileInfo, err error) error {
if f.IsDir() {
return nil
}
files = append(files, path)
return err
})
if err != nil {
return nil, err
}
var fileNames []string
for _, file := range files {
fileNames = append(fileNames, filepath.FromSlash(path.Join(blobsPath, file.Name())))
}
blobURIs, err := c.dataPlane.PutBlobs(fileNames)

blobURIs, err := c.dataPlane.PutBlobs(files)
if err != nil {
return nil, fmt.Errorf("failed to commit blob: %+v", err)
}

logger.Info(c.context, "committed blob data")
logger.DebugWithFields(c.context, "blob file names", map[string]interface{}{
"files": fileNames,
"files": files,
})
return blobURIs, nil
}
Expand Down
42 changes: 29 additions & 13 deletions internal/app/handler/committer/tests/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func TestMain(m *testing.M) {
persistentEventsDir = filepath.FromSlash(fmt.Sprintf("%s/events", testdata))
eventTypes = append(eventTypes, "test_events")

environment = module.GetModuleEnvironment(testdata)
environment.Build()

// Create mock dataplane...

// Metadata store
Expand All @@ -53,7 +56,7 @@ func TestMain(m *testing.M) {
blob, err := filesystem.NewBlobStorage(&filesystem.Config{
InputDir: persistentInBlobDir,
OutputDir: persistentOutBlobDir,
})
}, environment)
if err != nil {
panic(fmt.Sprintf("failed to create file system storage with error '%+v'", err))
}
Expand All @@ -75,9 +78,6 @@ func TestMain(m *testing.M) {
ParentEventID: "parentid",
}

environment = module.GetModuleEnvironment(testdata)
environment.Build()

// Create committer
log.SetOutput(os.Stdout)
c = committer.NewCommitter(testdata, nil)
Expand Down Expand Up @@ -108,24 +108,40 @@ func TestCommitBlob(t *testing.T) {
"file4.txt",
},
},
{
files: []string{
"subdir/file1.txt",
"subdir/subsubdir/file2.txt",
"file3.txt",
},
},
}
for _, test := range testCases {
for _, file := range test.files {
path := filepath.FromSlash(path.Join(environment.OutputBlobDirPath, file))
f, err := os.Create(path)
dirPath := filepath.Dir(file)
dirPathInEnv := path.Join(environment.OutputBlobDirPath, dirPath)
_ = os.MkdirAll(dirPathInEnv, os.ModePerm)
outputFilePath := filepath.Join(environment.OutputBlobDirPath, file)
f, err := os.Create(outputFilePath)
f.Close()
if err != nil {
t.Fatalf("error creating test file '%s'", file)
t.Fatalf("error creating test file '%s'", outputFilePath)
continue
}
}
if err := c.Commit(context, dataPlane, eventTypes); err != nil {
t.Fatal(err)
}
files, err := ioutil.ReadDir(persistentOutBlobDir)
files := make([]string, 0)
err := filepath.Walk(persistentOutBlobDir, func(path string, f os.FileInfo, err error) error {
if f.IsDir() {
return nil
}
files = append(files, path)
return err
})
if err != nil {
t.Fatalf("error reading blob directory '%+v'", err)
continue
t.Fatal(err)
}
outLen := len(files)
blobLen := len(test.files)
Expand All @@ -134,7 +150,7 @@ func TestCommitBlob(t *testing.T) {
continue
}
if outLen != blobLen {
t.Fatal("expected the blob directory to be the same size as the output directory but wasn't")
t.Fatalf("expected the blob directory to contain %d items but it actually contained %d", outLen, blobLen)
continue
}

Expand All @@ -161,7 +177,7 @@ func TestCommitInsights(t *testing.T) {
t.Errorf("error encoding insights: '%+v'", err)
continue
}
if err := ioutil.WriteFile(environment.OutputMetaFilePath, b, 0777); err != nil {
if err := ioutil.WriteFile(environment.OutputMetaFilePath, b, os.ModePerm); err != nil {
t.Errorf("error writing insight file: '%+v'", err)
continue
}
Expand Down Expand Up @@ -315,7 +331,7 @@ func TestCommitEvents(t *testing.T) {
continue
}
outputEventFilePath := filepath.FromSlash(path.Join(environment.OutputEventsDirPath, fmt.Sprintf("event%d.json", i)))
if err := ioutil.WriteFile(outputEventFilePath, b, 0777); err != nil {
if err := ioutil.WriteFile(outputEventFilePath, b, os.ModePerm); err != nil {
t.Errorf("error writing event file: '%+v'", err)
continue
}
Expand Down
130 changes: 82 additions & 48 deletions internal/app/handler/dataplane/blobstorage/azure/azure.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
package azure

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/storage"
"github.com/azure/azure-storage-blob-go/2016-05-31/azblob"
"github.com/lawrencegripper/ion/internal/app/handler/dataplane/documentstorage"
"github.com/lawrencegripper/ion/internal/app/handler/helpers"
"github.com/lawrencegripper/ion/internal/app/handler/module"
log "github.com/sirupsen/logrus"
)

// cSpell:ignore nolint, golint, sasuris, sasuri

const (
// ContainerAlreadyExistsErr returned when creating a container that already exists
ContainerAlreadyExistsErr = "ContainerAlreadyExists"
)

//Config to setup a BlobStorage blob provider
type Config struct {
Enabled bool `description:"Enable Azure Blob storage provider"`
Expand All @@ -27,73 +38,106 @@ type Config struct {
//BlobStorage is responsible for handling the connections to Azure Blob Storage
// nolint: golint
type BlobStorage struct {
blobClient storage.BlobStorageClient
containerName string
outputBlobPrefix string
inputBlobPrefix string
eventMeta *documentstorage.EventMeta
accountKey string
accountName string
env *module.Environment
}

//NewBlobStorage creates a new Azure Blob Storage object
func NewBlobStorage(config *Config, inputBlobPrefix, outputBlobPrefix string, eventMeta *documentstorage.EventMeta) (*BlobStorage, error) {
blobClient, err := storage.NewBasicClient(config.BlobAccountName, config.BlobAccountKey)
if err != nil {
return nil, fmt.Errorf("error creating storage blobClient: %+v", err)
}
blob := blobClient.GetBlobService()
func NewBlobStorage(config *Config, inputBlobPrefix, outputBlobPrefix string, eventMeta *documentstorage.EventMeta, env *module.Environment) (*BlobStorage, error) {
asb := &BlobStorage{
blobClient: blob,
containerName: config.ContainerName,
outputBlobPrefix: outputBlobPrefix,
inputBlobPrefix: inputBlobPrefix,
eventMeta: eventMeta,
accountName: config.BlobAccountName,
accountKey: config.BlobAccountKey,
env: env,
}
return asb, nil
}

//PutBlobs puts a file into Azure Blob Storage
func (a *BlobStorage) PutBlobs(filePaths []string) (map[string]string, error) {
container, err := a.createContainerIfNotExist()
if err != nil {
return nil, err
}
blobSASURIs := make(map[string]string)

readStorageOptions := storage.BlobSASOptions{
BlobServiceSASPermissions: storage.BlobServiceSASPermissions{
Read: true,
},
SASOptions: storage.SASOptions{
Start: time.Now().Add(time.Duration(-1) * time.Hour),
Expiry: time.Now().Add(time.Duration(24) * time.Hour),
c := azblob.NewSharedKeyCredential(a.accountName, a.accountKey)
p := azblob.NewPipeline(c, azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: 3,
},
})
URL, _ := url.Parse(
fmt.Sprintf("https://%s.blob.core.windows.net/%s", a.accountName, a.containerName))
containerURL := azblob.NewContainerURL(*URL, p)
ctx := context.Background()
_, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
if err != nil {
if serr, ok := err.(azblob.StorageError); !ok {
return nil, err
} else { // nolint: golint
if serr.ServiceCode() != ContainerAlreadyExistsErr {
return nil, err
}
}
}

blobSASURIs := make(map[string]string)

for _, filePath := range filePaths {
_, nakedFilePath := path.Split(filePath)
blobPath := helpers.JoinBlobPath(a.outputBlobPrefix, nakedFilePath)
filePathOutOfEnv := strings.Replace(filePath, a.env.OutputBlobDirPath, "", 1)
if filePathOutOfEnv[0] == '/' {
filePathOutOfEnv = filePathOutOfEnv[1:]
}
blobPath := helpers.JoinBlobPath(a.outputBlobPrefix, filePathOutOfEnv)
file, err := os.Open(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read data from file '%s', error: '%+v'", filePath, err)
}
defer file.Close() // nolint: errcheck
blobRef := container.GetBlobReference(blobPath)
_, err = blobRef.DeleteIfExists(&storage.DeleteBlobOptions{})
if err != nil {
return nil, err
}
err = blobRef.CreateBlockBlobFromReader(file, &storage.PutBlobOptions{})

stat, err := file.Stat()
if err != nil {
return nil, err
}

uri, err := blobRef.GetSASURI(readStorageOptions)
b := stat.Size()
kb := float64(b) / 1024
mb := float64(kb / 1024)
var timeout time.Duration
if mb > 5 {
timeout = time.Duration(mb) * 60 * time.Second
}
p = azblob.NewPipeline(c, azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: 3,
TryTimeout: timeout,
},
})
blobURL := containerURL.WithPipeline(p).NewBlockBlobURL(blobPath)
parallelism := uint16(runtime.NumCPU())
_, err = azblob.UploadFileToBlockBlob(ctx, file, blobURL, azblob.UploadToBlockBlobOptions{
BlockSize: 1 * 1024 * 1024,
Parallelism: parallelism})
if err != nil {
return nil, err
}

blobSASURIs[nakedFilePath] = uri
sasQueryParams := azblob.BlobSASSignatureValues{
Protocol: azblob.SASProtocolHTTPS,
StartTime: time.Now().UTC().Add(-1 * time.Hour),
ExpiryTime: time.Now().UTC().Add(24 * time.Hour),
Permissions: azblob.BlobSASPermissions{Read: true}.String(),
ContainerName: a.containerName,
BlobName: blobPath,
}.NewSASQueryParameters(c)

queryParams := sasQueryParams.Encode()
blobSASURIs[filePathOutOfEnv] = fmt.Sprintf("%s?%s", blobURL, queryParams)
}
return blobSASURIs, nil
}
Expand Down Expand Up @@ -124,10 +168,13 @@ func (a *BlobStorage) GetBlobs(outputDir string, filePaths []string) error {
if err != nil {
return fmt.Errorf("failed to read blob '%s' with error '%+v'", fileSASURL, err)
}
outputFilePath := path.Join(outputDir, filePath)
err = ioutil.WriteFile(outputFilePath, bytes, 0777)
dirPath := filepath.Dir(filePath)
dirPathInEnv := path.Join(outputDir, dirPath)
_ = os.MkdirAll(dirPathInEnv, os.ModePerm)
filePathInEnv := filepath.Join(outputDir, filePath)
err = ioutil.WriteFile(filePathInEnv, bytes, os.ModePerm)
if err != nil {
return fmt.Errorf("failed to write file '%s' with error '%+v'", outputFilePath, err)
return fmt.Errorf("failed to write file '%s' with error '%+v'", filePathInEnv, err)
}
}
return nil
Expand All @@ -136,16 +183,3 @@ func (a *BlobStorage) GetBlobs(outputDir string, filePaths []string) error {
//Close cleans up any external resources
func (a *BlobStorage) Close() {
}

//createContainerIfNotExist creates the container if it doesn't exist
func (a *BlobStorage) createContainerIfNotExist() (*storage.Container, error) {
containerName := a.containerName
container := a.blobClient.GetContainerReference(containerName)
_, err := container.CreateIfNotExists(&storage.CreateContainerOptions{
Access: storage.ContainerAccessTypePrivate,
})
if err != nil {
return nil, fmt.Errorf("error thrown creating container %s: %+v", containerName, err)
}
return container, nil
}
Loading

0 comments on commit 9072a4a

Please sign in to comment.