diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 4690ba069a0..2f5094d7351 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -107,9 +107,11 @@ This reference uses `.` to denote the nesting of values. * `blockstore.local.path` `(string: "~/lakefs/data")` - When using the local Block Adapter, which directory to store files in * `blockstore.gs.credentials_file` `(string : )` - If specified will be used as a file path of the JSON file that contains your Google service account key * `blockstore.gs.credentials_json` `(string : )` - If specified will be used as JSON string that contains your Google service account key (when credentials_file is not set) +* `blockstore.gs.pre_signed_expiry` `(time duration : "15m")` - Expiry of pre-signed URL. * `blockstore.azure.storage_account` `(string : )` - If specified, will be used as the Azure storage account * `blockstore.azure.storage_access_key` `(string : )` - If specified, will be used as the Azure storage access key * `blockstore.azure.auth_method` `(one of ["msi", "access-key"]: "access-key" )` - Authentication method to use (msi is used for Azure AD authentication). +* `blockstore.azure.pre_signed_expiry` `(time duration : "15m")` - Expiry of pre-signed URL. * `blockstore.s3.region` `(string : "us-east-1")` - Default region for lakeFS to use when interacting with S3. * `blockstore.s3.profile` `(string : )` - If specified, will be used as a [named credentials profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) * `blockstore.s3.credentials_file` `(string : )` - If specified, will be used as a [credentials file](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html) @@ -124,6 +126,7 @@ This reference uses `.` to denote the nesting of values. * `blockstore.s3.skip_verify_certificate_test_only` `(boolean : false)` - Skip certificate verification while connecting to the storage endpoint. Should be used only for testing. * `blockstore.s3.server_side_encryption` `(string : )` - Server side encryption format used (Example on AWS using SSE-KMS while passing "aws:kms") * `blockstore.s3.server_side_encryption_kms_key_id` `(string : )` - Server side encryption KMS key ID +* `blockstore.s3.pre_signed_expiry` `(time duration : "15m")` - Expiry of pre-signed URL. * `graveler.reposiory_cache.size` `(int : 1000)` - How many items to store in the repository cache. * `graveler.reposiory_cache.ttl` `(time duration : "5s")` - How long to store an item in the repository cache. * `graveler.reposiory_cache.jitter` `(time duration : "2s")` - A random amount of time between 0 and this value is added to each item's TTL. diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index f2786f246e2..7132c9b0eb7 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -55,7 +55,7 @@ const ( ) // DefaultPreSignExpiryDuration is the amount of time pre-signed requests are valid for. -const DefaultPreSignExpiryDuration = time.Minute * 15 +const DefaultPreSignExpiryDuration = 15 * time.Minute // ObjectPointer is a unique identifier of an object in the object // store: the store is a 1:1 mapping between pointers and objects. diff --git a/pkg/block/azure/adapter.go b/pkg/block/azure/adapter.go index cce9155ee4a..e06229c3ae7 100644 --- a/pkg/block/azure/adapter.go +++ b/pkg/block/azure/adapter.go @@ -32,16 +32,19 @@ const ( ) type Adapter struct { - preSignedURLDurationGenerator func() time.Time - clientCache *ClientContainerCache + clientCache *ClientContainerCache + preSignedExpiry time.Duration } func NewAdapter(params params.Azure) *Adapter { + preSignedExpiry := params.PreSignedExpiry + if preSignedExpiry == 0 { + preSignedExpiry = block.DefaultPreSignExpiryDuration + } + return &Adapter{ - clientCache: NewCache(params), - preSignedURLDurationGenerator: func() time.Time { - return time.Now().UTC().Add(block.DefaultPreSignExpiryDuration) - }, + clientCache: NewCache(params), + preSignedExpiry: preSignedExpiry, } } @@ -223,7 +226,7 @@ func (a *Adapter) getPreSignedURL(obj block.ObjectPointer, permissions sas.BlobP } blobURL := containerClient.NewBlobClient(qualifiedKey.BlobURL) - u, err := blobURL.GetSASURL(permissions, time.Time{}, a.preSignedURLDurationGenerator()) + u, err := blobURL.GetSASURL(permissions, time.Time{}, a.newPreSignedTime()) if err != nil { return "", err } @@ -396,7 +399,7 @@ func (a *Adapter) Copy(ctx context.Context, sourceObj, destinationObj block.Obje sasKey, err := sourceClient.GetSASURL(sas.BlobPermissions{ Read: true, - }, time.Time{}, a.preSignedURLDurationGenerator()) + }, time.Time{}, a.newPreSignedTime()) if err != nil { return err } @@ -522,3 +525,7 @@ func (a *Adapter) GetStorageNamespaceInfo() block.StorageNamespaceInfo { func (a *Adapter) RuntimeStats() map[string]string { return nil } + +func (a *Adapter) newPreSignedTime() time.Time { + return time.Now().UTC().Add(a.preSignedExpiry) +} diff --git a/pkg/block/factory/build.go b/pkg/block/factory/build.go index 342b8b1c57c..30cacb3a49e 100644 --- a/pkg/block/factory/build.go +++ b/pkg/block/factory/build.go @@ -108,6 +108,7 @@ func buildS3Adapter(statsCollector stats.Collector, params params.S3) (*s3a.Adap s3a.WithStreamingChunkTimeout(params.StreamingChunkTimeout), s3a.WithStatsCollector(statsCollector), s3a.WithDiscoverBucketRegion(params.DiscoverBucketRegion), + s3a.WithPreSignedExpiry(params.PreSignedExpiry), } if params.ServerSideEncryption != "" { opts = append(opts, s3a.WithServerSideEncryption(params.ServerSideEncryption)) @@ -139,7 +140,7 @@ func buildGSAdapter(ctx context.Context, params params.GS) (*gs.Adapter, error) if err != nil { return nil, err } - adapter := gs.NewAdapter(client) + adapter := gs.NewAdapter(client, gs.WithPreSignedExpiry(params.PreSignedExpiry)) logging.Default().WithField("type", "gs").Info("initialized blockstore adapter") return adapter, nil } diff --git a/pkg/block/gs/adapter.go b/pkg/block/gs/adapter.go index b075d626bb4..5e31147d517 100644 --- a/pkg/block/gs/adapter.go +++ b/pkg/block/gs/adapter.go @@ -33,16 +33,24 @@ var ( ) type Adapter struct { - client *storage.Client - presignDurationGenerator func() time.Time + client *storage.Client + preSignedExpiry time.Duration } -func NewAdapter(client *storage.Client, opts ...func(a *Adapter)) *Adapter { +func WithPreSignedExpiry(v time.Duration) func(a *Adapter) { + return func(a *Adapter) { + if v == 0 { + a.preSignedExpiry = block.DefaultPreSignExpiryDuration + } else { + a.preSignedExpiry = v + } + } +} + +func NewAdapter(client *storage.Client, opts ...func(adapter *Adapter)) *Adapter { a := &Adapter{ - client: client, - presignDurationGenerator: func() time.Time { - return time.Now().Add(block.DefaultPreSignExpiryDuration) - }, + client: client, + preSignedExpiry: block.DefaultPreSignExpiryDuration, } for _, opt := range opts { opt(a) @@ -54,6 +62,10 @@ func (a *Adapter) log(ctx context.Context) logging.Logger { return logging.FromContext(ctx) } +func (a *Adapter) newPreSignedTime() time.Time { + return time.Now().UTC().Add(a.preSignedExpiry) +} + func resolveNamespace(obj block.ObjectPointer) (block.QualifiedKey, error) { qualifiedKey, err := block.ResolveNamespace(obj.StorageNamespace, obj.Identifier, obj.IdentifierType) if err != nil { @@ -130,7 +142,7 @@ func (a *Adapter) GetPreSignedURL(ctx context.Context, obj block.ObjectPointer, opts := &storage.SignedURLOptions{ Scheme: storage.SigningSchemeV4, Method: method, - Expires: a.presignDurationGenerator(), + Expires: a.newPreSignedTime(), } k, err := a.client.Bucket(qualifiedKey.StorageNamespace).SignedURL(qualifiedKey.Key, opts) if err != nil { @@ -261,7 +273,7 @@ func (a *Adapter) Copy(ctx context.Context, sourceObj, destinationObj block.Obje sourceObjectHandle := a.client.Bucket(qualifiedSourceKey.StorageNamespace).Object(qualifiedSourceKey.Key) _, err = destinationObjectHandle.CopierFrom(sourceObjectHandle).Run(ctx) if err != nil { - return fmt.Errorf("Copy: %w", err) + return fmt.Errorf("copy: %w", err) } return nil } @@ -375,7 +387,7 @@ func (a *Adapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinatio w := o.NewWriter(ctx) _, err = io.Copy(w, reader) if err != nil { - return nil, fmt.Errorf("Copy: %w", err) + return nil, fmt.Errorf("copy: %w", err) } err = w.Close() if err != nil { diff --git a/pkg/block/params/block.go b/pkg/block/params/block.go index 5f178f376fd..1fc0e6b537b 100644 --- a/pkg/block/params/block.go +++ b/pkg/block/params/block.go @@ -29,11 +29,13 @@ type S3 struct { SkipVerifyCertificateTestOnly bool ServerSideEncryption string ServerSideEncryptionKmsKeyID string + PreSignedExpiry time.Duration } type GS struct { CredentialsFile string CredentialsJSON string + PreSignedExpiry time.Duration } type Azure struct { @@ -41,4 +43,5 @@ type Azure struct { StorageAccessKey string AuthMethod string TryTimeout time.Duration + PreSignedExpiry time.Duration } diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index e3557cb64ce..f15ecca4a5b 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -66,6 +66,7 @@ type Adapter struct { respServerLock sync.Mutex ServerSideEncryption string ServerSideEncryptionKmsKeyID string + preSignedExpiry time.Duration } func WithStreamingChunkSize(sz int) func(a *Adapter) { @@ -92,6 +93,12 @@ func WithDiscoverBucketRegion(b bool) func(a *Adapter) { } } +func WithPreSignedExpiry(v time.Duration) func(a *Adapter) { + return func(a *Adapter) { + a.preSignedExpiry = v + } +} + func WithServerSideEncryption(s string) func(a *Adapter) { return func(a *Adapter) { a.ServerSideEncryption = s @@ -112,6 +119,7 @@ func NewAdapter(awsSession *session.Session, opts ...AdapterOption) *Adapter { httpClient: awsSession.Config.HTTPClient, streamingChunkSize: DefaultStreamingChunkSize, streamingChunkTimeout: DefaultStreamingChunkTimeout, + preSignedExpiry: block.DefaultPreSignExpiryDuration, } for _, opt := range opts { opt(a) @@ -307,7 +315,7 @@ func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer, _ int64) (io } func (a *Adapter) GetPreSignedURL(ctx context.Context, obj block.ObjectPointer, mode block.PreSignMode) (string, error) { - log := a.log(ctx).WithField("operation", "GetPresignedURL") + log := a.log(ctx).WithField("operation", "GetPreSignedURL") qualifiedKey, err := resolveNamespace(obj) if err != nil { log.WithField("namespace", obj.StorageNamespace). @@ -323,14 +331,14 @@ func (a *Adapter) GetPreSignedURL(ctx context.Context, obj block.ObjectPointer, Key: aws.String(qualifiedKey.Key), } req, _ := client.PutObjectRequest(putObjectInput) - preSignedURL, err = req.Presign(block.DefaultPreSignExpiryDuration) + preSignedURL, err = req.Presign(a.preSignedExpiry) } else { getObjectInput := &s3.GetObjectInput{ Bucket: aws.String(qualifiedKey.StorageNamespace), Key: aws.String(qualifiedKey.Key), } req, _ := client.GetObjectRequest(getObjectInput) - preSignedURL, err = req.Presign(block.DefaultPreSignExpiryDuration) + preSignedURL, err = req.Presign(a.preSignedExpiry) } if err != nil { log.WithField("namespace", obj.StorageNamespace). diff --git a/pkg/config/config.go b/pkg/config/config.go index 4ce42c848a0..5ba9498ef70 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -194,18 +194,21 @@ type Config struct { SkipVerifyCertificateTestOnly bool `mapstructure:"skip_verify_certificate_test_only"` ServerSideEncryption string `mapstructure:"server_side_encryption"` ServerSideEncryptionKmsKeyID string `mapstructure:"server_side_encryption_kms_key_id"` - } + PreSignedExpiry time.Duration `mapstructure:"pre_signed_expiry"` + } `mapstructure:"s3"` Azure *struct { TryTimeout time.Duration `mapstructure:"try_timeout"` StorageAccount string `mapstructure:"storage_account"` StorageAccessKey string `mapstructure:"storage_access_key"` AuthMethod string `mapstructure:"auth_method"` - } + PreSignedExpiry time.Duration `mapstructure:"pre_signed_expiry"` + } `mapstructure:"azure"` GS *struct { - S3Endpoint string `mapstructure:"s3_endpoint"` - CredentialsFile string `mapstructure:"credentials_file"` - CredentialsJSON string `mapstructure:"credentials_json"` - } + S3Endpoint string `mapstructure:"s3_endpoint"` + CredentialsFile string `mapstructure:"credentials_file"` + CredentialsJSON string `mapstructure:"credentials_json"` + PreSignedExpiry time.Duration `mapstructure:"pre_signed_expiry"` + } `mapstructure:"gs"` } Committed struct { LocalCache struct { @@ -458,6 +461,7 @@ func (c *Config) BlockstoreS3Params() (blockparams.S3, error) { SkipVerifyCertificateTestOnly: c.Blockstore.S3.SkipVerifyCertificateTestOnly, ServerSideEncryption: c.Blockstore.S3.ServerSideEncryption, ServerSideEncryptionKmsKeyID: c.Blockstore.S3.ServerSideEncryptionKmsKeyID, + PreSignedExpiry: c.Blockstore.S3.PreSignedExpiry, }, nil } @@ -475,6 +479,7 @@ func (c *Config) BlockstoreGSParams() (blockparams.GS, error) { return blockparams.GS{ CredentialsFile: c.Blockstore.GS.CredentialsFile, CredentialsJSON: c.Blockstore.GS.CredentialsJSON, + PreSignedExpiry: c.Blockstore.GS.PreSignedExpiry, }, nil } @@ -484,6 +489,7 @@ func (c *Config) BlockstoreAzureParams() (blockparams.Azure, error) { StorageAccessKey: c.Blockstore.Azure.StorageAccessKey, AuthMethod: c.Blockstore.Azure.AuthMethod, TryTimeout: c.Blockstore.Azure.TryTimeout, + PreSignedExpiry: c.Blockstore.Azure.PreSignedExpiry, }, nil } diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 337766d1c72..020a5f429d6 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -50,6 +50,7 @@ func setDefaults(local bool) { viper.SetDefault("blockstore.s3.streaming_chunk_timeout", time.Second) // or 1 seconds, whatever comes first viper.SetDefault("blockstore.s3.max_retries", 5) viper.SetDefault("blockstore.s3.discover_bucket_region", true) + viper.SetDefault("blockstore.s3.pre_signed_expiry", 15*time.Minute) viper.SetDefault("committed.local_cache.size_bytes", 1*1024*1024*1024) viper.SetDefault("committed.local_cache.dir", "~/lakefs/data/cache") @@ -67,6 +68,7 @@ func setDefaults(local bool) { viper.SetDefault("gateways.s3.region", "us-east-1") viper.SetDefault("blockstore.gs.s3_endpoint", "https://storage.googleapis.com") + viper.SetDefault("blockstore.gs.pre_signed_expiry", 15*time.Minute) viper.SetDefault("stats.enabled", true) viper.SetDefault("stats.address", "https://stats.treeverse.io") @@ -77,6 +79,7 @@ func setDefaults(local bool) { viper.SetDefault("blockstore.azure.try_timeout", 10*time.Minute) viper.SetDefault("blockstore.azure.auth_method", "access-key") + viper.SetDefault("blockstore.azure.pre_signed_expiry", 15*time.Minute) viper.SetDefault("security.audit_check_interval", 24*time.Hour) viper.SetDefault("security.audit_check_url", "https://audit.lakefs.io/audit")