Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Project name added to user-agent #678

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func download_http(sourceUrl *url.URL, destination string, payload *payloadStruc
// Start the workers
for i := 1; i <= 5; i++ {
wg.Add(1)
go startDownloadWorker(sourceUrl.Path, destination, token, transfers, &wg, workChan, results)
go startDownloadWorker(sourceUrl.Path, destination, token, transfers, payload, &wg, workChan, results)
}

// For each file, send it to the worker
Expand Down Expand Up @@ -389,7 +389,7 @@ func download_http(sourceUrl *url.URL, destination string, payload *payloadStruc

}

func startDownloadWorker(source string, destination string, token string, transfers []TransferDetails, wg *sync.WaitGroup, workChan <-chan string, results chan<- TransferResults) {
func startDownloadWorker(source string, destination string, token string, transfers []TransferDetails, payload *payloadStruct, wg *sync.WaitGroup, workChan <-chan string, results chan<- TransferResults) {

defer wg.Done()
var success bool
Expand All @@ -407,7 +407,7 @@ func startDownloadWorker(source string, destination string, token string, transf
for _, transfer := range transfers {
transfer.Url.Path = file
log.Debugln("Constructed URL:", transfer.Url.String())
if downloaded, err = DownloadHTTP(transfer, finalDest, token); err != nil {
if downloaded, err = DownloadHTTP(transfer, finalDest, token, payload); err != nil {
log.Debugln("Failed to download:", err)
var ope *net.OpError
var cse *ConnectionSetupError
Expand Down Expand Up @@ -468,7 +468,7 @@ func parseTransferStatus(status string) (int, string) {
}

// DownloadHTTP - Perform the actual download of the file
func DownloadHTTP(transfer TransferDetails, dest string, token string) (int64, error) {
func DownloadHTTP(transfer TransferDetails, dest string, token string, payload *payloadStruct) (int64, error) {

// Create the client, request, and context
client := grab.NewClient()
Expand Down Expand Up @@ -513,6 +513,9 @@ func DownloadHTTP(transfer TransferDetails, dest string, token string) (int64, e
// Set the headers
req.HTTPRequest.Header.Set("X-Transfer-Status", "true")
req.HTTPRequest.Header.Set("TE", "trailers")
if payload != nil && payload.ProjectName != "" {
req.HTTPRequest.Header.Set("User-Agent", payload.ProjectName)
}
req.WithContext(ctx)

// Test the transfer speed every 5 seconds
Expand Down Expand Up @@ -778,7 +781,7 @@ func (pr *ProgressReader) Size() int64 {
}

// Recursively uploads a directory with all files and nested dirs, keeping file structure on server side
func UploadDirectory(src string, dest *url.URL, token string, namespace namespaces.Namespace) (int64, error) {
func UploadDirectory(src string, dest *url.URL, token string, namespace namespaces.Namespace, projectName string) (int64, error) {
var files []string
var amountDownloaded int64
srcUrl := url.URL{Path: src}
Expand All @@ -798,7 +801,7 @@ func UploadDirectory(src string, dest *url.URL, token string, namespace namespac
if err != nil {
return 0, err
}
downloaded, err := UploadFile(file, &tempDest, token, namespace)
downloaded, err := UploadFile(file, &tempDest, token, namespace, projectName)
if err != nil {
return 0, err
}
Expand All @@ -813,7 +816,7 @@ func UploadDirectory(src string, dest *url.URL, token string, namespace namespac
}

// UploadFile Uploads a file using HTTP
func UploadFile(src string, origDest *url.URL, token string, namespace namespaces.Namespace) (int64, error) {
func UploadFile(src string, origDest *url.URL, token string, namespace namespaces.Namespace, projectName string) (int64, error) {

log.Debugln("In UploadFile")
log.Debugln("Dest", origDest.String())
Expand Down Expand Up @@ -888,6 +891,9 @@ func UploadFile(src string, origDest *url.URL, token string, namespace namespace
}
// Set the authorization header
request.Header.Set("Authorization", "Bearer "+token)
if projectName != "" {
request.Header.Set("User-Agent", projectName)
}
var lastKnownWritten int64
t := time.NewTicker(20 * time.Second)
defer t.Stop()
Expand Down
8 changes: 4 additions & 4 deletions client/handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestSlowTransfers(t *testing.T) {
var err error
// Do a quick timeout
go func() {
_, err = DownloadHTTP(transfers[0], filepath.Join(t.TempDir(), "test.txt"), "")
_, err = DownloadHTTP(transfers[0], filepath.Join(t.TempDir(), "test.txt"), "", nil)
finishedChannel <- true
}()

Expand Down Expand Up @@ -256,7 +256,7 @@ func TestStoppedTransfer(t *testing.T) {
var err error

go func() {
_, err = DownloadHTTP(transfers[0], filepath.Join(t.TempDir(), "test.txt"), "")
_, err = DownloadHTTP(transfers[0], filepath.Join(t.TempDir(), "test.txt"), "", nil)
finishedChannel <- true
}()

Expand Down Expand Up @@ -286,7 +286,7 @@ func TestConnectionError(t *testing.T) {
addr := l.Addr().String()
l.Close()

_, err = DownloadHTTP(TransferDetails{Url: url.URL{Host: addr, Scheme: "http"}, Proxy: false}, filepath.Join(t.TempDir(), "test.txt"), "")
_, err = DownloadHTTP(TransferDetails{Url: url.URL{Host: addr, Scheme: "http"}, Proxy: false}, filepath.Join(t.TempDir(), "test.txt"), "", nil)

assert.IsType(t, &ConnectionSetupError{}, err)

Expand Down Expand Up @@ -319,7 +319,7 @@ func TestTrailerError(t *testing.T) {
assert.Equal(t, svr.URL, transfers[0].Url.String())

// Call DownloadHTTP and check if the error is returned correctly
_, err := DownloadHTTP(transfers[0], filepath.Join(t.TempDir(), "test.txt"), "")
_, err := DownloadHTTP(transfers[0], filepath.Join(t.TempDir(), "test.txt"), "", nil)

assert.NotNil(t, err)
assert.EqualError(t, err, "transfer error: Unable to read test.txt; input/output error")
Expand Down
50 changes: 32 additions & 18 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ func getTokenName(destination *url.URL) (scheme, tokenName string) {
}

// Do writeback to stash using SciTokens
func doWriteBack(source string, destination *url.URL, namespace namespaces.Namespace, recursive bool) (int64, error) {
func doWriteBack(source string, destination *url.URL, namespace namespaces.Namespace, recursive bool, projectName string) (int64, error) {

scitoken_contents, err := getToken(destination, namespace, true, "")
if err != nil {
return 0, err
}
if recursive {
return UploadDirectory(source, destination, scitoken_contents, namespace)
return UploadDirectory(source, destination, scitoken_contents, namespace, projectName)
} else {
return UploadFile(source, destination, scitoken_contents, namespace)
return UploadFile(source, destination, scitoken_contents, namespace, projectName)
}
}

Expand Down Expand Up @@ -543,7 +543,7 @@ func DoPut(localObject string, remoteDestination string, recursive bool) (bytesT
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from source")
}
uploadedBytes, err := doWriteBack(localObjectUrl.Path, remoteDestUrl, ns, recursive)
uploadedBytes, err := doWriteBack(localObjectUrl.Path, remoteDestUrl, ns, recursive, "")
AddError(err)
return uploadedBytes, err

Expand Down Expand Up @@ -646,7 +646,7 @@ func DoGet(remoteObject string, localDestination string, recursive bool) (bytesT
//Fill out the payload as much as possible
payload.filename = remoteObjectUrl.Path

parse_job_ad(payload)
parse_job_ad(&payload)

payload.start1 = time.Now().Unix()

Expand Down Expand Up @@ -764,6 +764,9 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
return 0, errors.New("Do not understand destination scheme")
}

payload := payloadStruct{}
parse_job_ad(&payload)

// Get the namespace of the remote filesystem
// For write back, it will be the destination
// For read it will be the source.
Expand All @@ -778,7 +781,7 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from destination")
}
uploadedBytes, err := doWriteBack(source_url.Path, dest_url, ns, recursive)
uploadedBytes, err := doWriteBack(source_url.Path, dest_url, ns, recursive, payload.ProjectName)
AddError(err)
return uploadedBytes, err
}
Expand Down Expand Up @@ -814,16 +817,11 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
destination = path.Join(destPath, sourceFilename)
}

payload := payloadStruct{}
payload.version = version

//Fill out the payload as much as possible
payload.filename = source_url.Path

// ??

parse_job_ad(payload)

payload.start1 = time.Now().Unix()

// Go thru the download methods
Expand Down Expand Up @@ -918,7 +916,7 @@ func get_ips(name string) []string {

}

func parse_job_ad(payload payloadStruct) { // TODO: needs the payload
func parse_job_ad(payload *payloadStruct) {

//Parse the .job.ad file for the Owner (username) and ProjectName of the callee.

Expand All @@ -940,18 +938,34 @@ func parse_job_ad(payload payloadStruct) { // TODO: needs the payload
}

// Get all matches from file
classadRegex, e := regexp.Compile(`^\s*(Owner|ProjectName)\s=\s"(.*)"`)
// Note: This appears to be invalid regex but is the only thing that appears to work. This way it successfully finds our matches
classadRegex, e := regexp.Compile(`^*\s*(Owner|ProjectName)\s=\s"(.*)"`)
if e != nil {
log.Fatal(e)
}

matches := classadRegex.FindAll(b, -1)

for _, match := range matches {
if string(match[0]) == "Owner" {
payload.Owner = string(match[1])
} else if string(match) == "ProjectName" {
payload.ProjectName = string(match[1])
matchString := strings.TrimSpace(string(match))

if strings.HasPrefix(matchString, "Owner") {
matchParts := strings.Split(strings.TrimSpace(matchString), "=")

if len(matchParts) == 2 { // just confirm we get 2 parts of the string
matchValue := strings.TrimSpace(matchParts[1])
matchValue = strings.Trim(matchValue, "\"") //trim any "" around the match if present
payload.Owner = matchValue
}
}

if strings.HasPrefix(matchString, "ProjectName") {
matchParts := strings.Split(strings.TrimSpace(matchString), "=")

if len(matchParts) == 2 { // just confirm we get 2 parts of the string
matchValue := strings.TrimSpace(matchParts[1])
matchValue = strings.Trim(matchValue, "\"") //trim any "" around the match if present
payload.ProjectName = matchValue
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion client/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,5 +331,5 @@ func TestParseNoJobAd(t *testing.T) {
os.Setenv("_CONDOR_JOB_AD", path)

payload := payloadStruct{}
parse_job_ad(payload)
parse_job_ad(&payload)
}
Loading