-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DATA-refactor] Make data capture files a well defined data structure. #1490
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really fantastic work on a massive refactor that moves all of the file and protobuf logic down a layer of abstraction away from the Data Manager Service and Collectors, adds consistency to all helper methods to be on os.File*, and will make it significantly easier to plug in other file types. Thank you for your thorough work on this. Looks great to me, just some minor comments and will go back through one more time to make sure there's nothing I missed on my first pass!
@@ -563,6 +563,8 @@ func (svc *builtIn) uploadData(cancelCtx context.Context, intervalMins float64) | |||
if err != nil { | |||
svc.logger.Errorw("data capture files failed to sync", "error", err) | |||
} | |||
// TODO: deadlock where we're in this case when Close is called, which acquires svc.lock, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch of a tricky edge case. Could you add a ticket for this and include in the TODO comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, created https://viam.atlassian.net/browse/DATA-660 to generally clean up how we're handling mutual exclusion in the whole service
func CreateDataCaptureFile(captureDir string, md *v1.DataCaptureMetadata) (*os.File, error) { | ||
// File is the data structure containing data captured by collectors. It is backed by a file on disk containing | ||
// length delimited protobuf messages, where the first message is the CaptureMetadata for the file, and ensuing | ||
// messages contain the captured metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*contain the captured data.
if err := f.file.Close(); err != nil { | ||
return err | ||
} | ||
return os.Remove(f.file.Name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can reuse f.path
instead of f.file.Name()
@@ -56,7 +174,7 @@ func BuildCaptureMetadata(compType resource.SubtypeName, compName, compModel, me | |||
return nil, err | |||
} | |||
|
|||
dataType := getDataType(method) | |||
dataType := getDataType(string(compType), method) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can remove this change + the additional param to getDataType
since unused
|
||
// Create []v1.UploadRequest object from test case input 'expData []*structpb.Struct'. | ||
expectedMsgs := buildSensorDataUploadRequests(tc.toSend, v1.DataType_DATA_TYPE_BINARY_SENSOR, filepath.Base(tf.Name())) | ||
// Create []v1.UploadRequest object from test case input 'expData []*structpb.Struct'. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: from test case input 'toSend []*v1.SensorData'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went ahead and just removed the comment, I don't think it's necessary
f, err := datacapture.NewFile(tmpDir, &captureMetadata) | ||
test.That(t, err, test.ShouldBeNil) | ||
|
||
for i := range tc.toSend { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supersupernit here and above: checking on preferred readability/succinctness on this vs.
for _, msg := range tc.toSend {
err := f.WriteNext(msg)
test.That(t, err, test.ShouldBeNil)
}
actMsgs = mockService.getUploadRequests() | ||
compareTabularUploadRequests(t, actMsgs, expMsgs) | ||
|
||
// Validate progress file exists and has correct value. | ||
progressFile := filepath.Join(viamProgressDotDir, filepath.Base(captureFile.Name())) | ||
// TODO: directly testing this is a big abstraction leak |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also make a small ticket that's linked here for this, along with a brief (1-2 sentence) summary of how you'd rather test this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went ahead and just did it. Removed this check, replaced the check at the bottom with just verifying that the progress dir is empty. The behavior we care about is just that we're properly resuming uploads (== not resending acked data), so now this only tests that
sut, err := NewManager(logger, partID, client, conn) | ||
test.That(t, err, test.ShouldBeNil) | ||
sut.Sync([]string{tf.Name()}) | ||
// Upload the contents from the created file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is already a massive cleanup, but checking if there's an easy enough extraction of this server/client/manager setup since it's repeated throughout this test file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It definitely can be extracted, but not that easily because in each of these cases we set different mockService parameters so would need to pass those in. I'd rather not include it in this PR.
return err | ||
} | ||
} | ||
|
||
// Check remaining data capture file contents so we know whether to continue upload process. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking on the removal here. Redundant since ReadNext's ReadDelimited will return an io.EOF error at the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly
return f.size | ||
} | ||
|
||
// GetPath returns the path of the underlying os.File. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supernit: Since we often call filepath.Base(f.GetPath()), is it worth adding a helper f.GetPathBase() method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second look, ok to leave this interface closer to os.File
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really great work, LGTM! Thanks for updating some of the tests directly within the PR.
|
The goal of this PR is to create an explicitly defined data structure for dealing with data captured by collectors/the data manager service.
We store collected data in "data capture files" which are files containing:
Currently, this is all handled implicitly. We deal with
os
andpbutil
primitives for reading/writing/generally interacting with these files. This has gotten increasingly cumbersome as more functionality has been added.This PR defines a struct datacapture.File with all of the higher level operations that the datamanager service needs (e.g. Read/Write next, New, ReadMetadata, etc). I think this makes it significantly easier to reason about these things while working on the data manager, because it pushes the complexity down a layer of abstraction and allows maintainers of datamanager to not need to know the implementation details of our storage format. It also replaces free floating functions in the datacapture package with methods on File, which I think makes discovery significantly easier.
I know there's a lot of code change here, and I'm happy to talk/slack through any of them. The changes fall into two broad categories:
Tested locally with
go test -race -count 50 ./...
in the datamanager directory. This runs all tests in all subdirs 50 times, failing if any fail. It ran successfully, even with t.Skip() removed from TestPartialUpload and TestExponentialRetry. I think this may have fixed these race conditions, but I'm leaving t.Skip() in for now because I'd rather spend some time more thoroughly testing that those flakiness issues are gone.EDIT:
The above is an essay. Here's a super tldr summary.
Related to data capture files, got rid of these functions, as well as all the *os.File/filepath/pbutil/etc operations surrounding usages of these functions:
ReadDataCaptureMetadata(f *os.File) (*v1.DataCaptureMetadata, error
ReadNextSensorData(f *os.File) (*v1.SensorData, error)
writeSensorData(f *os.File, sds []*v1.SensorData) error
createTmpDataCaptureFile() (file *os.File, err error)
Added a type File with the methods:
func ReadFile(f *os.File) (*File, error)
func NewFile(captureDir string, md *v1.DataCaptureMetadata) (*File, error)
func (f *File) ReadMetadata() (*v1.DataCaptureMetadata, error)
func (f *File) ReadNext() (*v1.SensorData, error
func (f *File) WriteNext(data *v1.SensorData) error
func (f *File) Sync() error
func (f *File) Size() int64
func (f *File) GetPath() string
func (f *File) Close() error
func (f *File) Delete() error