-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support ADLS Gen2 #5027
Support ADLS Gen2 #5027
Conversation
# Conflicts: # go.mod # go.sum
0cf99e2
to
3a5c213
Compare
# Conflicts: # go.mod # pkg/block/azure/adapter.go # pkg/block/factory/build.go # pkg/ingest/store/factory.go
# Conflicts: # esti/import_test.go
PR ready to review. @ozkatz adding you to reviewers as I had to make some changes to the pre signed URL code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great addition, the new SDK seems much better than the previous one. Moving to the new SDK should even solve more issues other than support ADLS Gen2. Thanks!
Added some comments
case azure.AuthMethodMSI: | ||
credentials, err = azure.GetMSICredentials() | ||
default: | ||
err = ErrAuthMethodNotSupported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we aren't supporting MSI anymore, not sure it's used, but this will be a breaking change. Can we still support it?
Another thing, that shouldn't be part of this PR. IIRC the new SDK has a way to connect and take the configurations from the running environment which was once requested, maybe we should open an issue for that and fix it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right - I originally added the code which checks for default credentials (CLI, ENV etc..)
Re-added it - let me know if it solves the issue
ctx: ctx, | ||
cancel: cancel, | ||
reader: from, | ||
to: to, | ||
id: newID(), | ||
o: o, | ||
errCh: make(chan error, 1), | ||
buffers: buffers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General message on this file, previously it was copy of the chunkwriting file from the SDK with small modifications, that way if there is a significant change in a new version applying it would be straightforward. I noticed that the implementation in the new Azure SDK is different. As it looks now we have a copy of the old implementation with our modifications, is there a way to adjust somehow the implementation from the new SDK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new implementation is using a tracker to track the parts which we are already doing in a different way to support multiple requests. I did use the new mmbPool struct they are using and modified the old code to work similarly to how they do it. I found it difficult to align to their implementation but if you have any suggestions - I would love to hear
func GetAccessKeyCredentials(accountName, accountKey string) (azblob.Credential, error) { | ||
if len(accountName) == 0 && len(accountKey) == 0 { | ||
// fallback to Azure environment variables | ||
accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") | ||
} | ||
return azblob.NewSharedKeyCredential(accountName, accountKey) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC we are losing the support of taking the AZURE configurations... this might be a breaking change, can we keep this support. (Also here I think that the new way of generating an azure client might solve this as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
func (m *MultipartBlockWriter) Upload(_ context.Context, _ io.ReadSeekCloser, _ *blockblob.UploadOptions) (blockblob.UploadResponse, error) { | ||
panic("Should not be called") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure we want to panic here? would logging and returning and error be enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not support it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We needed to implement the interface - but we don't really need to use it.
I'm not sure what is the right implementation as I do not know how this will be used.
|
||
// TODO(Guys): remove this work around once azure fixes panic issue and use azblob.UploadStreamToBlockBlob | ||
transferManager, err := azblob.NewStaticBuffer(_1MiB, MaxBuffers) | ||
if err != nil { | ||
return err | ||
} | ||
uploadOpts := a.translatePutOpts(ctx, opts) | ||
uploadOpts.TransferManager = transferManager | ||
defer transferManager.Close() | ||
resp, err := copyFromReader(ctx, reader, blobURL, uploadOpts) | ||
if err != nil { | ||
return err | ||
} | ||
_ = resp == nil // this is done in order to ignore "result 0 is never used" error ( copyFromReader is copied from azure, and we want to keep it with minimum changes) | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏 😄
pkg/block/azure/adapter.go
Outdated
qp, err := client.GetAccountSASURL(aztables.AccountSASResourceTypes{ | ||
Container: true, | ||
Object: true, | ||
}, permissions, time.Now(), a.preSignedURLDurationGenerator()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we should use time.Now() it seems like we pass a time that "IsZero" it isn't specified, which is what I believe we want in this case.
pkg/ingest/store/factory.go
Outdated
if err != nil { | ||
return nil, err | ||
} | ||
} else { | ||
var err error | ||
p, err = getAzureClient() | ||
c, err = getAzureClient() | ||
if err != nil { | ||
return nil, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can extract out the if err != nil
block
func BuildAzureServiceClient(params params.Azure) (*service.Client, error) { | ||
url := fmt.Sprintf(azure.AzURLTemplate, params.StorageAccount) | ||
options := service.ClientOptions{ClientOptions: azcore.ClientOptions{Retry: policy.RetryOptions{TryTimeout: params.TryTimeout}}} | ||
if params.StorageAccessKey != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change the documentation to reflect this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened #5044
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! nice upgrade from the previous SDK.
pkg/block/azure/adapter.go
Outdated
|
||
preSignedBlobPattern = "https://%s.blob.core.windows.net/%s/%s?%s" | ||
AzURLTemplate = "https://%s.blob.core.windows.net/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already in azure package - so maybe just URLTemplate? or something without the Az
prefix?
|
||
// TODO (niro): copy is limited to 256MB, should we handle it somehow? | ||
_, err = destClient.CopyFromURL(ctx, sasKey, nil) | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe we should document this limitation as we do not support a working copy for large objects.
Closes #5037