Skip to content

Commit

Permalink
Merge pull request #41 from atlanhq/DVX-318
Browse files Browse the repository at this point in the history
DVX: 318: Added support for file upload and download using `presigned URLs`
  • Loading branch information
Aryamanz29 authored Jul 9, 2024
2 parents 47edf23 + 75430c6 commit 9f376d6
Show file tree
Hide file tree
Showing 11 changed files with 572 additions and 21 deletions.
5 changes: 3 additions & 2 deletions atlan/assets/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/atlanhq/atlan-go/atlan/model"
"github.com/atlanhq/atlan-go/atlan/model/structs"
"hash/fnv"
"reflect"
"strings"
"time"

"github.com/atlanhq/atlan-go/atlan/model"
"github.com/atlanhq/atlan-go/atlan/model/structs"
)

// AtlanObject is an interface that all asset types should implement
Expand Down
244 changes: 227 additions & 17 deletions atlan/assets/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"

"github.com/atlanhq/atlan-go/atlan/logger"
"github.com/k0kubun/go-ansi"
"github.com/schollz/progressbar/v3"
)

// AtlanClient defines the Atlan API client structure.
Expand Down Expand Up @@ -188,8 +190,129 @@ func (ac *AtlanClient) DisableLogging() {
ac.SetLogger(false, "")
}

// Removes authorization from header when using
// s3PresignedUrlFileUpload/Download and returns the removed value.
func (ac *AtlanClient) removeAuthorization() (string, error) {
if headers, ok := ac.requestParams["headers"].(map[string]string); ok {
auth, exists := headers["Authorization"]
if exists {
delete(headers, "Authorization")
return auth, nil
}
return "", nil
} else {
return "", InvalidRequestError{
AtlanError{
ErrorCode: errorCodes[UNABLE_TO_PERFORM_OPERATION_ON_AUTHORIZATION],
Args: []interface{}{"remove", "from"},
},
}
}
}

// Restores the authorization to the header when using s3PresignedUrlFileUpload/Download .
func (ac *AtlanClient) restoreAuthorization(auth string) error {
if headers, ok := ac.requestParams["headers"].(map[string]string); ok {
headers["Authorization"] = auth
} else {
return InvalidRequestError{
AtlanError{
ErrorCode: errorCodes[UNABLE_TO_PERFORM_OPERATION_ON_AUTHORIZATION],
Args: []interface{}{"restore", "to"},
},
}
}
return nil
}

// Initialize the file progress bar using default configuration settings
func initFileProgressBar(fileSize int64, description string) *progressbar.ProgressBar {
bar := progressbar.NewOptions(int(fileSize),
progressbar.OptionSetWidth(50),
progressbar.OptionShowBytes(true),
progressbar.OptionSetPredictTime(false),
progressbar.OptionEnableColorCodes(true),
progressbar.OptionSetDescription(description),
progressbar.OptionSetWriter(ansi.NewAnsiStdout()),
progressbar.OptionOnCompletion(func() {
fmt.Printf("\n")
}),
progressbar.OptionSetTheme(progressbar.Theme{
Saucer: "[blue]=[reset]",
SaucerHead: "[blue]>[reset]",
SaucerPadding: " ",
BarStart: "[",
BarEnd: "]",
}))
return bar
}

func (ac *AtlanClient) s3PresignedUrlFileUpload(api *API, uploadFile *os.File, uploadFileSize int64) error {
// Remove authorization and returns the auth value
auth, err := ac.removeAuthorization()
if err != nil {
return err
}

// Call the API with upload file options
uploadProgressBarDescription := "Uploading file to the object store:"
uploadProgressBar := initFileProgressBar(uploadFileSize, uploadProgressBarDescription)
options := map[string]interface{}{
"use_presigned_url": true,
"file_size": uploadFileSize,
"progress_bar": uploadProgressBar,
}
_, err = ac.CallAPI(api, nil, uploadFile, options)
if err != nil {
return err
}

// Restore authorization after API call
err = ac.restoreAuthorization(auth)
if err != nil {
ac.logger.Errorf("failed to restore authorization: %s", err)
return err
}

return nil
}

func (ac *AtlanClient) s3PresignedUrlFileDownload(api *API, downloadFilePath string) error {
// Remove authorization and returns the auth value
auth, err := ac.removeAuthorization()
if err != nil {
return err
}

// Call the API with download file options
downloadProgressBarDescription := "Downloading file from the object store:"
downloadProgressBar := initFileProgressBar(0, downloadProgressBarDescription)
options := map[string]interface{}{
"use_presigned_url": true,
"save_file": true,
"file_path": downloadFilePath,
"progress_bar": downloadProgressBar,
}
_, err = ac.CallAPI(api, nil, nil, options)
if err != nil {
return err
}

// Restore authorization after API call
err = ac.restoreAuthorization(auth)
if err != nil {
ac.logger.Errorf("failed to restore authorization: %s", err)
return err
}

return nil
}

// CallAPI makes a generic API call.
func (ac *AtlanClient) CallAPI(api *API, queryParams interface{}, requestObj interface{}) ([]byte, error) {
func (ac *AtlanClient) CallAPI(api *API, queryParams interface{}, requestObj interface{}, options ...interface{}) ([]byte, error) {
var saveFile bool
var filePath string
var fileProgressBar *progressbar.ProgressBar
params := deepCopy(ac.requestParams)
path := ac.host + api.Endpoint.Atlas + api.Path

Expand All @@ -213,36 +336,96 @@ func (ac *AtlanClient) CallAPI(api *API, queryParams interface{}, requestObj int
path += "?" + query.Encode()
}

// Check for extra any API call options
if len(options) > 0 {
if optMap, ok := options[0].(map[string]interface{}); ok {
if _, ok := optMap["save_file"].(bool); ok {
saveFile = ok
}
if path, ok := optMap["file_path"].(string); ok {
filePath = path
}
if fs, ok := optMap["file_size"].(int64); ok {
params["content_length"] = fs
}
if _, ok := optMap["use_presigned_url"].(bool); ok {
path = api.Path
}
if bar, ok := optMap["progress_bar"].(*progressbar.ProgressBar); ok {
fileProgressBar = bar
}
}
}

if requestObj != nil {
//fmt.Println("Request Object:", requestObj)
requestJSON, err := json.Marshal(requestObj)
logger.Log.Debugf("Request JSON: %s", string(requestJSON))
if err != nil {
ac.logger.Errorf("error marshaling request object: %v", err)
return nil, fmt.Errorf("error marshaling request object: %v", err)
switch reqObj := requestObj.(type) {
// In case of file upload/download
case *os.File:
if fileProgressBar != nil {
params["progress_bar"] = fileProgressBar
params["data"] = progressbar.NewReader(reqObj, fileProgressBar)
}
params["content_type"] = "application/octet-stream"
default:
// Otherwise just use `json.Marshal()`
requestJSON, err := json.Marshal(requestObj)
ac.logger.Debugf("Request JSON: %s", string(requestJSON))
if err != nil {
ac.logger.Errorf("error marshaling request object: %v", err)
return nil, fmt.Errorf("error marshaling request object: %v", err)
}
params["data"] = bytes.NewBuffer(requestJSON)
}
params["data"] = bytes.NewBuffer(requestJSON)
}

ac.logAPICall(api.Method, path)

//logger.Log.Debugf("Params: %v", params)
// Send the request
response, err := ac.makeRequest(api.Method, path, params)
if err != nil {
return nil, handleApiError(response, err)
}

ac.logHTTPStatus(response)

// Handle API error based on response status code
if response.StatusCode != api.Status {
return nil, handleApiError(response, err)
}

// Handle file download
if saveFile {
file, err := os.Create(filePath)
if err != nil {
return nil, AtlanError{
ErrorCode: errorCodes[UNABLE_TO_PREPARE_DOWNLOAD_FILE],
Args: []interface{}{err.Error()},
}
}
defer file.Close()

// Set the progress bar size based on the response content-length
fileProgressBar.ChangeMax64(response.ContentLength)
_, err = io.Copy(io.MultiWriter(file, fileProgressBar), response.Body)
if err != nil {
return nil, AtlanError{
ErrorCode: errorCodes[UNABLE_TO_COPY_DOWNLOAD_FILE_CONTENTS],
Args: []interface{}{err.Error()},
}
}

ac.logger.Infof("Successfully downloaded file: %s", file.Name())
return []byte{}, nil
}

// Handle JSON response
responseJSON, err := io.ReadAll(response.Body)
response.Body.Close()
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
}

if response.StatusCode != api.Status {
return nil, handleApiError(response, err)
}
// Finally, close the request body
response.Body.Close()

ac.logResponse(responseJSON)

Expand All @@ -253,22 +436,34 @@ func (ac *AtlanClient) CallAPI(api *API, queryParams interface{}, requestObj int
func (ac *AtlanClient) makeRequest(method, path string, params map[string]interface{}) (*http.Response, error) {
var req *http.Request
var err error
var contentType string

switch method {
case http.MethodGet:
req, err = http.NewRequest(method, path, nil)
if err != nil {
return nil, ThrowAtlanError(err, CONNECTION_ERROR, nil)
}
case http.MethodPost, http.MethodPut:
body, ok := params["data"].(io.Reader)
var body io.Reader
data, ok := params["data"]
if !ok {
return nil, fmt.Errorf("missing or invalid 'data' parameter for POST/PUT/DELETE request")
return nil, fmt.Errorf("missing 'data' parameter for POST/PUT request")
}
switch requestData := data.(type) {
case progressbar.Reader:
// File data upload with progressbar reader
body = &requestData
case io.Reader:
// JSON payload
body = requestData
default:
return nil, fmt.Errorf("invalid 'data' parameter type for POST/PUT request")
}
req, err = http.NewRequest(method, path, body)
if err != nil {
return nil, ThrowAtlanError(err, CONNECTION_ERROR, nil)
}
req.Header.Set("Content-Type", "application/json")
case http.MethodDelete:
// DELETE requests may not always have a body.
var body io.Reader
Expand All @@ -278,14 +473,14 @@ func (ac *AtlanClient) makeRequest(method, path string, params map[string]interf
return nil, fmt.Errorf("invalid 'data' parameter for DELETE request")
}
}
// Create a new http request
req, err = http.NewRequest(method, path, body)
if err != nil {
return nil, ThrowAtlanError(err, CONNECTION_ERROR, nil)
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}

default:
return nil, fmt.Errorf("unsupported HTTP method: %s", method)
}
Expand All @@ -302,6 +497,20 @@ func (ac *AtlanClient) makeRequest(method, path string, params map[string]interf
}
}

// Set content-type
if ct, ok := params["content_type"].(string); ok {
contentType = ct
} else {
// Default content type
contentType = "application/json"
}
req.Header.Set("Content-Type", contentType)

// Set content-length
if contentLength, ok := params["content_length"].(int64); ok {
req.ContentLength = contentLength
}

// Set query parameters
queryParams, ok := params["params"].(map[string]string)
if ok {
Expand All @@ -328,6 +537,7 @@ func (ac *AtlanClient) makeRequest(method, path string, params map[string]interf
req.URL.RawQuery = query
}

// Finally, execute the request
return ac.Session.Do(req)
}

Expand Down
16 changes: 15 additions & 1 deletion atlan/assets/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
// Entities API
ENTITY_API = "entity/"
ENTITY_BULK_API = "entity/bulk/"

// Files API
FILES_API = "files/"
)

// API defines the structure of an API call.
Expand All @@ -34,7 +37,11 @@ var AtlasEndpoint = Endpoint{
Atlas: "/api/meta/",
}

// API calls for Atlas
var HeraclesEndpoint = Endpoint{
Atlas: "/api/service/",
}

// API calls to various services (Atlas, Heracles etc)
var (
GET_TYPEDEF_BY_NAME = API{
Path: TYPEDEF_BY_NAME,
Expand Down Expand Up @@ -126,6 +133,13 @@ var (
Status: http.StatusOK,
Endpoint: AtlasEndpoint,
}

PRESIGNED_URL = API{
Path: FILES_API + "presignedUrl",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}
)

// Constants for the Atlas search DSL
Expand Down
Loading

0 comments on commit 9f376d6

Please sign in to comment.