Skip to content

Commit

Permalink
cloud: Added support to all ExternalStorage ReadFile methods, to rais…
Browse files Browse the repository at this point in the history
…e a sentinel ErrFileDoesNotExist error

The `ReadFile` interface method is responsible for returning a `Reader`
which can be used to stream data from an external storage source.
There are also instances where we use this method to solely check for
the existence (or absence) of a particular file/object, by attempting
to open a stream. eg: checking for BackupManifest/BackupManifestCheckpoint
before exporting data. Previously, we would treat any error returned
by the storage vendor API as a signal for the file/object not existing.

This change adds logic to catch the native "file does not exist"
errors for each storage provider, and throw a sentinel error to users
of the `ReadFile` method. This allows for more careful error handling.

Relevant unit tests have also been added.

Release note: None
  • Loading branch information
adityamaru committed Jun 5, 2020
1 parent c817d37 commit cd7fae2
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 29 deletions.
11 changes: 10 additions & 1 deletion pkg/blobs/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ package blobs

import (
"context"
"os"

"github.com/cockroachdb/cockroach/pkg/blobs/blobspb"
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// Service implements the gRPC BlobService which exchanges bulk files between different nodes.
Expand Down Expand Up @@ -89,5 +92,11 @@ func (s *Service) Delete(

// Stat implements the gRPC service.
func (s *Service) Stat(ctx context.Context, req *blobspb.StatRequest) (*blobspb.BlobStat, error) {
return s.localStorage.Stat(req.Filename)
resp, err := s.localStorage.Stat(req.Filename)
if os.IsNotExist(err) {
// gRPC hides the underlying golang ErrNotExist error, so we send back an
// equivalent gRPC error which can be handled gracefully on the client side.
return nil, status.Error(codes.NotFound, err.Error())
}
return resp, err
}
32 changes: 19 additions & 13 deletions pkg/ccl/backupccl/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -103,8 +104,11 @@ func readBackupManifestFromStore(
func containsManifest(ctx context.Context, exportStore cloud.ExternalStorage) (bool, error) {
r, err := exportStore.ReadFile(ctx, BackupManifestName)
if err != nil {
//nolint:returnerrcheck
return false, nil /* TODO(dt): only silence non-exists errors */
if errors.Is(err, cloud.ErrFileDoesNotExist) {
//nolint:returnerrcheck
return false, nil
}
return false, err
}
r.Close()
return true, nil
Expand Down Expand Up @@ -661,28 +665,30 @@ func VerifyUsableExportTarget(
readable string,
encryption *roachpb.FileEncryptionOptions,
) error {
if r, err := exportStore.ReadFile(ctx, BackupManifestName); err == nil {
// TODO(dt): If we audit exactly what not-exists error each ExternalStorage
// returns (and then wrap/tag them), we could narrow this check.
r, err := exportStore.ReadFile(ctx, BackupManifestName)
if err == nil {
r.Close()
return pgerror.Newf(pgcode.FileAlreadyExists,
"%s already contains a %s file",
readable, BackupManifestName)
}
if r, err := exportStore.ReadFile(ctx, BackupManifestName); err == nil {
// TODO(dt): If we audit exactly what not-exists error each ExternalStorage
// returns (and then wrap/tag them), we could narrow this check.
r.Close()
return pgerror.Newf(pgcode.FileAlreadyExists,
"%s already contains a %s file",
readable, BackupManifestName)

if !errors.Is(err, cloud.ErrFileDoesNotExist) {
return errors.Wrapf(err, fmt.Sprintf("%s returned an unexpected error when checking for the existence of %s file", readable, BackupManifestName))
}
if r, err := exportStore.ReadFile(ctx, BackupManifestCheckpointName); err == nil {

r, err = exportStore.ReadFile(ctx, BackupManifestCheckpointName)
if err == nil {
r.Close()
return pgerror.Newf(pgcode.FileAlreadyExists,
"%s already contains a %s file (is another operation already in progress?)",
readable, BackupManifestCheckpointName)
}

if !errors.Is(err, cloud.ErrFileDoesNotExist) {
return errors.Wrapf(err, fmt.Sprintf("%s returned an unexpected error when checking for the existence of %s file", readable, BackupManifestCheckpointName))
}

if err := writeBackupManifest(
ctx, settings, exportStore, BackupManifestCheckpointName, encryption, &BackupManifest{},
); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/cloud/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ func (s *azureStorage) ReadFile(ctx context.Context, basename string) (io.ReadCl
blob := s.getBlob(basename)
get, err := blob.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false)
if err != nil {
if errors.HasType(err, (azblob.StorageError)(nil)) {
azerr := err.(azblob.StorageError)
switch azerr.ServiceCode() {
// TODO(adityamaru): Investigate whether both these conditions are required.
case azblob.ServiceCodeBlobNotFound, azblob.ServiceCodeResourceNotFound:
return nil, errors.Wrap(ErrFileDoesNotExist, "azure blob does not exist")
}
}
return nil, errors.Wrap(err, "failed to create azure reader")
}
reader := get.Body(azblob.RetryReaderOptions{MaxRetryRequests: 3})
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/cloud/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ var redactedQueryParams = map[string]struct{}{
// ErrListingUnsupported is a marker for indicating listing is unsupported.
var ErrListingUnsupported = errors.New("listing is not supported")

// ErrFileDoesNotExist is a sentinel error for indicating that a specified
// bucket/object/key/file (depending on storage terminology) does not exist.
// This error is raised by the ReadFile method.
var ErrFileDoesNotExist = errors.New("external_storage: file doesn't exist")

// ExternalStorageFactory describes a factory function for ExternalStorage.
type ExternalStorageFactory func(ctx context.Context, dest roachpb.ExternalStorage) (ExternalStorage, error)

Expand All @@ -113,6 +118,8 @@ type ExternalStorage interface {
Conf() roachpb.ExternalStorage

// ReadFile should return a Reader for requested name.
// ErrFileDoesNotExist is raised if `basename` cannot be located in storage.
// This can be leveraged for an existence check.
ReadFile(ctx context.Context, basename string) (io.ReadCloser, error)

// WriteFile should write the content to requested name.
Expand Down
47 changes: 37 additions & 10 deletions pkg/storage/cloud/external_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/bank"
"github.com/cockroachdb/errors"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -125,9 +126,8 @@ func testExportStoreWithExternalIOConfig(
if !bytes.Equal(res, payload) {
t.Fatalf("got %v expected %v", res, payload)
}
if err := s.Delete(ctx, name); err != nil {
t.Fatal(err)
}

require.NoError(t, s.Delete(ctx, name))
}
})

Expand Down Expand Up @@ -160,9 +160,7 @@ func testExportStoreWithExternalIOConfig(
if !bytes.Equal(content, testingContent) {
t.Fatalf("wrong content")
}
if err := s.Delete(ctx, testingFilename); err != nil {
t.Fatal(err)
}
require.NoError(t, s.Delete(ctx, testingFilename))
})
if skipSingleFile {
return
Expand All @@ -188,9 +186,7 @@ func testExportStoreWithExternalIOConfig(
if !bytes.Equal(content, []byte("aaa")) {
t.Fatalf("wrong content")
}
if err := s.Delete(ctx, testingFilename); err != nil {
t.Fatal(err)
}
require.NoError(t, s.Delete(ctx, testingFilename))
})
t.Run("write-single-file-by-uri", func(t *testing.T) {
const testingFilename = "B"
Expand All @@ -214,9 +210,40 @@ func testExportStoreWithExternalIOConfig(
if !bytes.Equal(content, []byte("bbb")) {
t.Fatalf("wrong content")
}
if err := s.Delete(ctx, testingFilename); err != nil {

require.NoError(t, s.Delete(ctx, testingFilename))
})
// This test ensures that the ReadFile method of the ExternalStorage interface
// raises a sentinel error indicating that a requested bucket/key/file/object
// (based on the storage system) could not be found.
t.Run("file-does-not-exist", func(t *testing.T) {
const testingFilename = "A"
if err := s.WriteFile(ctx, testingFilename, bytes.NewReader([]byte("aaa"))); err != nil {
t.Fatal(err)
}
singleFile := storeFromURI(ctx, t, storeURI, clientFactory)
defer singleFile.Close()

// Read a valid file.
res, err := singleFile.ReadFile(ctx, testingFilename)
if err != nil {
t.Fatal(err)
}
defer res.Close()
content, err := ioutil.ReadAll(res)
if err != nil {
t.Fatal(err)
}
// Verify the result contains what we wrote.
if !bytes.Equal(content, []byte("aaa")) {
t.Fatalf("wrong content")
}

// Attempt to read a file which does not exist.
_, err = singleFile.ReadFile(ctx, "file_does_not_exist")
require.Error(t, err)
require.True(t, errors.Is(err, ErrFileDoesNotExist), "Expected a file does not exist error but returned %s")
require.NoError(t, s.Delete(ctx, testingFilename))
})
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/cloud/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ func (g *gcsStorage) ReadFile(ctx context.Context, basename string) (io.ReadClos
object: path.Join(g.prefix, basename),
}
if err := reader.openStream(); err != nil {
// The Google SDK has a specialized ErrBucketDoesNotExist error, but
// the code path from this method first triggers an ErrObjectNotExist in
// both scenarios - when a Bucket does not exist or an Object does not
// exist.
if errors.Is(err, gcs.ErrObjectNotExist) {
return nil, ErrFileDoesNotExist
}
return nil, err
}
return reader, nil
Expand Down
41 changes: 40 additions & 1 deletion pkg/storage/cloud/gcs_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -76,7 +77,7 @@ func (c *antagonisticConn) Read(b []byte) (int, error) {
func TestAntagonisticRead(t *testing.T) {
if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" {
// This test requires valid GS credential file.
return
t.Skip("GOOGLE_APPLICATION_CREDENTIALS env var must be set")
}

rnd, _ := randutil.NewPseudoRand()
Expand Down Expand Up @@ -110,3 +111,41 @@ func TestAntagonisticRead(t *testing.T) {
_, err = ioutil.ReadAll(stream)
require.NoError(t, err)
}

// TestFileDoesNotExist ensures that the ReadFile method of google cloud storage
// returns a sentinel error when the `Bucket` or `Object` being read do not
// exist.
func TestFileDoesNotExist(t *testing.T) {
if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" {
// This test requires valid GS credential file.
t.Skip("GOOGLE_APPLICATION_CREDENTIALS env var must be set")
}

{
// Invalid gsFile.
gsFile := "gs://cockroach-fixtures/tpch-csv/sf-1/invalid_region.tbl?AUTH=implicit"
conf, err := ExternalStorageConfFromURI(gsFile)
require.NoError(t, err)

s, err := MakeExternalStorage(
context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil)
require.NoError(t, err)
_, err = s.ReadFile(context.Background(), "")
require.Error(t, err, "")
require.True(t, errors.Is(err, ErrFileDoesNotExist))
}

{
// Invalid gsBucket.
gsFile := "gs://cockroach-fixtures-invalid/tpch-csv/sf-1/region.tbl?AUTH=implicit"
conf, err := ExternalStorageConfFromURI(gsFile)
require.NoError(t, err)

s, err := MakeExternalStorage(
context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil)
require.NoError(t, err)
_, err = s.ReadFile(context.Background(), "")
require.Error(t, err, "")
require.True(t, errors.Is(err, ErrFileDoesNotExist))
}
}
8 changes: 6 additions & 2 deletions pkg/storage/cloud/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,15 @@ func (h *httpStorage) req(

switch resp.StatusCode {
case 200, 201, 204, 206:
// Pass.
// Pass.
default:
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
return nil, errors.Errorf("error response from server: %s %q", resp.Status, body)
err := errors.Errorf("error response from server: %s %q", resp.Status, body)
if resp.StatusCode == 404 {
err = errors.Wrap(ErrFileDoesNotExist, "")
}
return nil, err
}
return resp, nil
}
2 changes: 1 addition & 1 deletion pkg/storage/cloud/http_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestPutHttp(t *testing.T) {
srv, files, cleanup := makeServer()
defer cleanup()
testExportStore(t, srv.String(), false)
if expected, actual := 13, files(); expected != actual {
if expected, actual := 14, files(); expected != actual {
t.Fatalf("expected %d files to be written to single http store, got %d", expected, actual)
}
})
Expand Down
17 changes: 16 additions & 1 deletion pkg/storage/cloud/nodelocal_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import (
"context"
"fmt"
"io"
"os"
"path"
"strings"

"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type localFileStorage struct {
Expand Down Expand Up @@ -78,7 +81,19 @@ func (l *localFileStorage) WriteFile(
}

func (l *localFileStorage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) {
return l.blobClient.ReadFile(ctx, joinRelativePath(l.base, basename))
var err error
var reader io.ReadCloser
if reader, err = l.blobClient.ReadFile(ctx, joinRelativePath(l.base, basename)); err != nil {
// The format of the error returned by the above ReadFile call differs based
// on whether we are reading from a local or remote nodelocal store.
// The local store returns a golang native ErrNotFound, whereas the remote
// store returns a gRPC native NotFound error.
if os.IsNotExist(err) || status.Code(err) == codes.NotFound {
return nil, ErrFileDoesNotExist
}
return nil, err
}
return reader, nil
}

func (l *localFileStorage) ListFiles(ctx context.Context, patternSuffix string) ([]string, error) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/cloud/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
Expand Down Expand Up @@ -171,6 +172,14 @@ func (s *s3Storage) ReadFile(ctx context.Context, basename string) (io.ReadClose
Key: aws.String(path.Join(s.prefix, basename)),
})
if err != nil {
if errors.HasType(err, (awserr.Error)(nil)) {
aerr := err.(awserr.Error)
switch aerr.Code() {
// Relevant 404 errors reported by AWS.
case s3.ErrCodeNoSuchBucket, s3.ErrCodeNoSuchKey:
return nil, errors.Wrap(ErrFileDoesNotExist, "s3 object does not exist")
}
}
return nil, errors.Wrap(err, "failed to get s3 object")
}
return out.Body, nil
Expand Down
Loading

0 comments on commit cd7fae2

Please sign in to comment.