-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DATA-645] Make CLI download binary in parallel. #1659
Conversation
…context.Cancelled from cancelling other things
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really awesome and quick work! Excited about this massive speedup — thanks for the thorough writeup, clean code, and stress/performance testing! Mostly minor comments.
cli/cmd/main.go
Outdated
dataFlagMimeTypes = "mime_types" | ||
dataFlagStart = "start" | ||
dataFlagEnd = "end" | ||
dataFlagConcurrentDownloads = "concurrent" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional nit] Even though concurrency and parallelism are different, I think to the user "parallel" is the more common user-friendly term, so would potentially make this --parallel and explain it's concurrent in the usage description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah parallel is definitely the more accurate word here, updated
cli/data.go
Outdated
go func() { | ||
defer wg.Done() | ||
var limit int | ||
if concurrentDownloads > 100 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Extract this into a const at the top of the file
[optional] If we bump up to the latest stable version of urfave/cli, we can use flag validation to only accept a value within a certain range: https://github.com/urfave/cli/blob/main/docs/v2/examples/flags.md#flag-actions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still want to support parallel values higher than this limit, so I'm going to still allow arbitrary values for parallel. This will just get the IDs in multiple requests, but the channel feeding them into the download routine is still parallelRequests big, so we can still do parallelRequests at a time
Added maxLimit const
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a really clever way to limit the server load from BinaryDataByIDs calls but parallelize the actual local downloads further. Nice.
cli/cmd/main.go
Outdated
&cli.IntFlag{ | ||
Name: dataFlagConcurrentDownloads, | ||
Required: false, | ||
Usage: "number of download requests to make in parallel", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would make a note to explicitly set this value to 1 for consecutive downloads, since we otherwise default to 10 concurrent requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added not about default value but left out the note about needing to set to 1 for non-parallel downloads, because I think it's directly implied
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
cli/data.go
Outdated
}(nextID) | ||
} | ||
downloadWG.Wait() | ||
if numFilesDownloaded.Load()%logEveryN == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this would only print every N if concurrentDownloads % logEveryN == 0. Can move this right after numFilesDownload.Add(1)
above or, if that somehow runs into concurrency oddness, set logEveryN based on numFilesDownloaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yup you're totally right, added to within the download goroutine so this check is done on every download
cli/data.go
Outdated
} | ||
|
||
//nolint:gosec | ||
dataFile, err := os.Create(filepath.Join(dst, "data", datum.GetMetadata().GetId()+datum.GetMetadata().GetFileExt())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this section also lost the newest changes with dataDir/metadataDir and prepending timestamp to the filename. The latter is especially needed now that we're interleaving downloads :)
If you could locally test a scenario where you're moving a motor while collecting data and, post-sort, seeing all the data in chronological order, that'd be great.
In the future when we support downloads via our cloud service, we can actually sort + package nicely before downloading to browser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops yup bad merging, added the naming stuff back.
If you could locally test a scenario where you're moving a motor while collecting data and, post-sort, seeing all the data in chronological order, that'd be great.
It looks like we weren't sorting tabular data by timestamp before, and this doesn't do tabular export in parallel. Did you mean for binary?
While exporting tabular I do think I found a bug in how we're persisting time stamps for tabular data, though. For any syncInterval
, it looks like we record the same time requested/received for all data points. You can see this by running this command:
go run go.viam.com/rdk/cli/cmd data --location_ids=zqegrpxfhl --robot_name=Detector --org_ids=1c614556-2ff9-4234-9a94-d59b0a6d3378 --data_type=tabular --start=2022-11-01T04:00:00.000Z
This data was being collected at 10hz with a sync interval of 1 minute. If you open the file, you can see every 600 messages have the same time requested/received. I think we should prioritize fixing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yup found the bug, will tag you on follow up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we weren't sorting tabular data by timestamp before, and this doesn't do tabular export in parallel. Did you mean for binary?
Yep meant to add this in the binary section, and both tabular/binary needed merge updates.
This data was being collected at 10hz with a sync interval of 1 minute. If you open the file, you can see every 600 messages have the same time requested/received. I think we should prioritize fixing this.
Really cool seeing the E2E manual testing uncovering a bug like this. Fantastic job on both for spotting it and pushing out a super quick fix!
Interesting to see that even though it had full test coverage, the tests were expecting the wrong thing. Worth listing out scenarios in PR descriptions to help get high-level consensus on test behavior, since sometimes harder to spot when writing/reviewing as code.
if err := r.Close(); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
// TabularData downloads binary data matching filter to dst. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like TabularData needs to sync to the newer changes with maxRetryCount, logging, and dataDir/metadataDir
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oof yup more bad merging on my part, fixed
cli/data.go
Outdated
Limit: uint64(limit), | ||
Last: last, | ||
}, | ||
CountOnly: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can also explicitly set IncludeBinary to false here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
datum := data[0] | ||
mdJSONBytes, err := protojson.Marshal(datum.GetMetadata()) | ||
if err != nil { | ||
// getMatchingIDs queries client for all BinaryData matching filter, and passes each of their ids into ids. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supernit: "// getMatchingIDs queries client for all BinaryData matching filter and passes each of their ids into channel."
nextID = <-ids | ||
|
||
// If nextID is zero value, the channel has been closed and there are no more IDs to be read. | ||
if nextID == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supernit: If nextID is empty
if err != nil { | ||
return err | ||
} | ||
if _, err := jsonFile.Write(mdJSONBytes); err != nil { | ||
return err | ||
// If no data is returned, there is no more data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supernit/opt: comment seems redundant, maybe just "No more data is returned."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super cool! A couple nit things, but I ran this locally and was very excited by the results!
if err != nil { | ||
return errors.Wrapf(err, "received error from server") | ||
if numFilesDownloaded.Load()%logEveryN != 0 { | ||
fmt.Fprintf(c.c.App.Writer, "downloaded %d files to %s\n", numFilesDownloaded.Load(), dst) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I find it helpful as a user to have (downloaded # files / total files) as a indication of what's done and what's still left to go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Immeasurably excited about this change. Really amazing work. 💯 🚀
cli/data.go
Outdated
@@ -202,7 +208,7 @@ func downloadBinary(ctx context.Context, client datapb.DataServiceClient, dst, i | |||
} | |||
|
|||
//nolint:gosec | |||
dataFile, err := os.Create(filepath.Join(dst, "data", datum.GetMetadata().GetId()+datum.GetMetadata().GetFileExt())) | |||
dataFile, err := os.Create(filepath.Join(dst, dataDir, "data"+".ndjson")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think this got copied from tabular, but this would instead be dataFile, err := os.Create(filepath.Join(dst, dataDir, fileName+datum.GetMetadata().GetFileExt()))
in order to reuse the timestamped binary file and its file extension
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eek yes bad copying/pasting, fixed
cli/data.go
Outdated
DataRequest: &datapb.DataRequest{ | ||
Filter: filter, | ||
// TODO: For now don't worry about skip/limit. Just do everything in one request. Can implement batching when | ||
// tabular is implemented. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking on this comment which was copied over from before: is this referencing if a TabularDataByIDs is implemented or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was actually referencing when tabular wasn't implemented in the backend yet. So right now this is actually just trying to get all the sensor data in a single request. This is fine for small values (< default max proto message size, which I believe is like 4MB), but won't work for larger ones. I made a ticket to add this and linked in it the TODO.
cli/cmd/main.go
Outdated
&cli.IntFlag{ | ||
Name: dataFlagConcurrentDownloads, | ||
Required: false, | ||
Usage: "number of download requests to make in parallel", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
cli/data.go
Outdated
go func() { | ||
defer wg.Done() | ||
var limit int | ||
if concurrentDownloads > 100 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a really clever way to limit the server load from BinaryDataByIDs calls but parallelize the actual local downloads further. Nice.
for count := 0; count < maxRetryCount; count++ { | ||
resp, err = c.sendBinaryDataByFilterRequest(filter, last) | ||
if err == nil { | ||
if parallelDownloads == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to handle a negative value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great call out, I'll go ahead and change it to a uint flag to validate that from the start
Adds a concurrentDownloads parameter that controls the number of download binary requests to make in parallel.
Tested locally and:
go run go.viam.com/rdk/cli/cmd data --org_ids=1c614556-2ff9-4234-9a94-d59b0a6d3378 --data_type=binary --mime_types=image/jpeg,image/png --start=2022-11-01T04:00:00.000Z --destination=/tmp/cli_speed_test/1 --concurrent=25
. I suspect that we'd approach N times speed up as the amount of total data downloaded increases.Something to note is that rpi images are very small (~16kb it seems). For files this small, having one goroutine per download is pretty inefficient. A future optimization would be to more adjust the number of downloads per routine based on the size of each file (which we can grab from the metadata when grabbing the id). For the time being this seems sufficient, though, especially since this sub-optimal behavior is most relevant in "small" cases where optimal performance is least important (because it will still be pretty damn fast). If we discover that exporting very large numbers of very small is a common use case, we can prioritize this.