From 504b8c802f1133702c8aee3b67461e6eebcc90cb Mon Sep 17 00:00:00 2001 From: Nick Sanford Date: Wed, 4 Dec 2024 21:55:17 -0500 Subject: [PATCH] wip --- components/camera/collectors.go | 5 ++-- data/capture_buffer.go | 16 +++++------ data/capture_buffer_test.go | 24 ++++++++--------- data/collector.go | 8 +++++- data/collector_test.go | 4 +-- data/collector_types.go | 12 +++++++++ data/collector_types_test.go | 48 ++++++++++++++------------------- testutils/file_utils.go | 12 ++++----- 8 files changed, 67 insertions(+), 62 deletions(-) diff --git a/components/camera/collectors.go b/components/camera/collectors.go index 74910928763..ea13239a8aa 100644 --- a/components/camera/collectors.go +++ b/components/camera/collectors.go @@ -157,8 +157,9 @@ func newGetImagesCollector(resource interface{}, params data.CollectorParams) (d return res, err } binaries = append(binaries, data.Binary{ - Payload: imgBytes, - MimeType: data.CameraFormatToMimeType(format), + Annotations: data.Annotations{Classifications: []data.Classification{{Label: img.SourceName}}}, + Payload: imgBytes, + MimeType: data.CameraFormatToMimeType(format), }) } ts := data.Timestamps{ diff --git a/data/capture_buffer.go b/data/capture_buffer.go index c7dc1cfb689..e4419e4f594 100644 --- a/data/capture_buffer.go +++ b/data/capture_buffer.go @@ -10,7 +10,7 @@ import ( // CaptureBufferedWriter is a buffered, persistent queue of SensorData. type CaptureBufferedWriter interface { WriteBinary(items []*v1.SensorData) error - WriteTabular(items []*v1.SensorData) error + WriteTabular(items *v1.SensorData) error Flush() error Path() string } @@ -76,14 +76,12 @@ func (b *CaptureBuffer) WriteBinary(items []*v1.SensorData) error { // '.prog'. // Files that have finished being written to are indicated by // '.capture'. -func (b *CaptureBuffer) WriteTabular(items []*v1.SensorData) error { +func (b *CaptureBuffer) WriteTabular(item *v1.SensorData) error { b.lock.Lock() defer b.lock.Unlock() - for _, item := range items { - if IsBinary(item) { - return errInvalidTabularSensorData - } + if IsBinary(item) { + return errInvalidTabularSensorData } if b.nextFile == nil { @@ -103,10 +101,8 @@ func (b *CaptureBuffer) WriteTabular(items []*v1.SensorData) error { b.nextFile = nextFile } - for _, item := range items { - if err := b.nextFile.WriteNext(item); err != nil { - return err - } + if err := b.nextFile.WriteNext(item); err != nil { + return err } return nil diff --git a/data/capture_buffer_test.go b/data/capture_buffer_test.go index 7012eb4b91c..31f53dcb859 100644 --- a/data/capture_buffer_test.go +++ b/data/capture_buffer_test.go @@ -91,7 +91,7 @@ func TestCaptureQueue(t *testing.T) { err := sut.WriteBinary([]*v1.SensorData{binarySensorData}) test.That(t, err, test.ShouldBeNil) case tc.dataType == CaptureTypeTabular.ToProto(): - err := sut.WriteTabular([]*v1.SensorData{structSensorData}) + err := sut.WriteTabular(structSensorData) test.That(t, err, test.ShouldBeNil) default: t.Error("unknown data type") @@ -250,7 +250,7 @@ func TestCaptureBufferReader(t *testing.T) { now := time.Now() timeRequested := timestamppb.New(now.UTC()) timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) - msg := []*v1.SensorData{{ + msg := &v1.SensorData{ Metadata: &v1.SensorMetadata{ TimeRequested: timeRequested, TimeReceived: timeReceived, @@ -258,7 +258,7 @@ func TestCaptureBufferReader(t *testing.T) { Data: &v1.SensorData_Struct{ Struct: tc.readings[0], }, - }} + } test.That(t, b.WriteTabular(msg), test.ShouldBeNil) test.That(t, b.Flush(), test.ShouldBeNil) dirEntries, err := os.ReadDir(b.Path()) @@ -275,7 +275,7 @@ func TestCaptureBufferReader(t *testing.T) { sd, err := cf.ReadNext() test.That(t, err, test.ShouldBeNil) - test.That(t, sd, test.ShouldResemble, msg[0]) + test.That(t, sd, test.ShouldResemble, msg) _, err = cf.ReadNext() test.That(t, err, test.ShouldBeError, io.EOF) @@ -283,7 +283,7 @@ func TestCaptureBufferReader(t *testing.T) { now = time.Now() timeRequested = timestamppb.New(now.UTC()) timeReceived = timestamppb.New(now.Add(time.Millisecond).UTC()) - msg2 := []*v1.SensorData{{ + msg2 := &v1.SensorData{ Metadata: &v1.SensorMetadata{ TimeRequested: timeRequested, TimeReceived: timeReceived, @@ -291,13 +291,13 @@ func TestCaptureBufferReader(t *testing.T) { Data: &v1.SensorData_Struct{ Struct: tc.readings[1], }, - }} + } test.That(t, b.WriteTabular(msg2), test.ShouldBeNil) now = time.Now() timeRequested = timestamppb.New(now.UTC()) timeReceived = timestamppb.New(now.Add(time.Millisecond).UTC()) - msg3 := []*v1.SensorData{{ + msg3 := &v1.SensorData{ Metadata: &v1.SensorMetadata{ TimeRequested: timeRequested, TimeReceived: timeReceived, @@ -305,7 +305,7 @@ func TestCaptureBufferReader(t *testing.T) { Data: &v1.SensorData_Struct{ Struct: tc.readings[2], }, - }} + } test.That(t, b.WriteTabular(msg3), test.ShouldBeNil) dirEntries2, err := os.ReadDir(b.Path()) @@ -343,11 +343,11 @@ func TestCaptureBufferReader(t *testing.T) { sd2, err := cf2.ReadNext() test.That(t, err, test.ShouldBeNil) - test.That(t, sd2, test.ShouldResemble, msg2[0]) + test.That(t, sd2, test.ShouldResemble, msg2) sd3, err := cf2.ReadNext() test.That(t, err, test.ShouldBeNil) - test.That(t, sd3, test.ShouldResemble, msg3[0]) + test.That(t, sd3, test.ShouldResemble, msg3) _, err = cf2.ReadNext() test.That(t, err, test.ShouldBeError, io.EOF) @@ -391,7 +391,7 @@ func TestCaptureBufferReader(t *testing.T) { now := time.Now() timeRequested := timestamppb.New(now.UTC()) timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) - msg := []*v1.SensorData{{ + msg := &v1.SensorData{ Metadata: &v1.SensorMetadata{ TimeRequested: timeRequested, TimeReceived: timeReceived, @@ -399,7 +399,7 @@ func TestCaptureBufferReader(t *testing.T) { Data: &v1.SensorData_Binary{ Binary: []byte("this is a fake image"), }, - }} + } test.That(t, b.WriteTabular(msg), test.ShouldBeError, errInvalidTabularSensorData) test.That(t, b.Flush(), test.ShouldBeNil) dirEntries, err := os.ReadDir(b.Path()) diff --git a/data/collector.go b/data/collector.go index b90db70c2cf..039afa9b444 100644 --- a/data/collector.go +++ b/data/collector.go @@ -285,7 +285,13 @@ func (c *collector) writeCaptureResults() { switch msg.Type { case CaptureTypeTabular: - if err := c.target.WriteTabular(proto); err != nil { + if len(proto) != 1 { + // This is impossible and could only happen if a future code change breaks CaptureResult.ToProto() + err := errors.New("tabular CaptureResult returned more than one tabular result") + c.logger.Error(errors.Wrap(err, fmt.Sprintf("failed to write tabular data to prog file %s", c.target.Path())).Error()) + return + } + if err := c.target.WriteTabular(proto[0]); err != nil { c.logger.Error(errors.Wrap(err, fmt.Sprintf("failed to write tabular data to prog file %s", c.target.Path())).Error()) return } diff --git a/data/collector_test.go b/data/collector_test.go index 73aae1e8085..011d49b9808 100644 --- a/data/collector_test.go +++ b/data/collector_test.go @@ -387,8 +387,8 @@ func (b *signalingBuffer) WriteBinary(items []*v1.SensorData) error { return ret } -func (b *signalingBuffer) WriteTabular(items []*v1.SensorData) error { - ret := b.bw.WriteTabular(items) +func (b *signalingBuffer) WriteTabular(item *v1.SensorData) error { + ret := b.bw.WriteTabular(item) select { case b.wrote <- struct{}{}: case <-b.ctx.Done(): diff --git a/data/collector_types.go b/data/collector_types.go index 9f2e34c39da..253beafacc5 100644 --- a/data/collector_types.go +++ b/data/collector_types.go @@ -58,6 +58,18 @@ func NewTabularCaptureResultReadings(ts Timestamps, readings map[string]interfac TabularData: TabularData{ Payload: &structpb.Struct{ Fields: map[string]*structpb.Value{ + // In previous versions of the code, we decided to special-case the + // GetReadingsResponse because it already contains + // structpb.Values in it, and the StructToStructPb logic at the time + // didnt't handle that cleanly. + // With the clarity of hindsight, this decision was a mistake as + // it would actually have been easy to convert the + // readings map[string]interface{} into a structpb.Struct (removing the + // need for a top level "readings" key for all future readings responses). + // We didn't know that at the time. + // Unfortunately this top level key needs to be maintained for backwards + // compatibility. + // C'est la vie. "readings": structpb.NewStructValue(&structpb.Struct{Fields: values}), }, }, diff --git a/data/collector_types_test.go b/data/collector_types_test.go index 322a3d160ca..16c07ae8bab 100644 --- a/data/collector_types_test.go +++ b/data/collector_types_test.go @@ -23,7 +23,7 @@ func TestNewBinaryCaptureResult(t *testing.T) { timeReceived := time.Now() ts := Timestamps{TimeRequested: timeRequested, TimeReceived: timeReceived} type testCase struct { - input CaptureResult + input []Binary output CaptureResult validateErr error } @@ -64,15 +64,6 @@ func TestNewBinaryCaptureResult(t *testing.T) { }, }, }, - { - Payload: []byte("hi too am here here"), - MimeType: MimeTypeImageJpeg, - Annotations: Annotations{ - Classifications: []Classification{ - {Label: "something completely different"}, - }, - }, - }, } multipleComplexBinaries := []Binary{ @@ -115,12 +106,12 @@ func TestNewBinaryCaptureResult(t *testing.T) { } tcs := []testCase{ { - input: NewBinaryCaptureResult(ts, nil), + input: nil, output: CaptureResult{Type: CaptureTypeBinary, Timestamps: ts}, validateErr: errors.New("binary result must have non empty binary data"), }, { - input: NewBinaryCaptureResult(ts, emptyBinaries), + input: emptyBinaries, output: CaptureResult{ Type: CaptureTypeBinary, Timestamps: ts, @@ -129,7 +120,7 @@ func TestNewBinaryCaptureResult(t *testing.T) { validateErr: errors.New("binary result must have non empty binary data"), }, { - input: NewBinaryCaptureResult(ts, singleSimpleBinaries), + input: singleSimpleBinaries, output: CaptureResult{ Type: CaptureTypeBinary, Timestamps: ts, @@ -137,7 +128,7 @@ func TestNewBinaryCaptureResult(t *testing.T) { }, }, { - input: NewBinaryCaptureResult(ts, singleSimpleBinariesWithMimeType), + input: singleSimpleBinariesWithMimeType, output: CaptureResult{ Type: CaptureTypeBinary, Timestamps: ts, @@ -145,7 +136,7 @@ func TestNewBinaryCaptureResult(t *testing.T) { }, }, { - input: NewBinaryCaptureResult(ts, singleComplexBinaries), + input: singleComplexBinaries, output: CaptureResult{ Type: CaptureTypeBinary, Timestamps: ts, @@ -153,7 +144,7 @@ func TestNewBinaryCaptureResult(t *testing.T) { }, }, { - input: NewBinaryCaptureResult(ts, multipleComplexBinaries), + input: multipleComplexBinaries, output: CaptureResult{ Type: CaptureTypeBinary, Timestamps: ts, @@ -164,29 +155,30 @@ func TestNewBinaryCaptureResult(t *testing.T) { for i, tc := range tcs { t.Logf("index: %d", i) - // confirm input resembles output - test.That(t, tc.input, test.ShouldResemble, tc.output) + // confirm response resembles output + res := NewBinaryCaptureResult(ts, tc.input) + test.That(t, res, test.ShouldResemble, tc.output) - // confirm input conforms to validation expectations + // confirm response conforms to validation expectations if tc.validateErr != nil { - test.That(t, tc.input.Validate(), test.ShouldBeError, tc.validateErr) + test.That(t, res.Validate(), test.ShouldBeError, tc.validateErr) continue } - test.That(t, tc.input.Validate(), test.ShouldBeNil) + test.That(t, res.Validate(), test.ShouldBeNil) - // confirm input conforms to ToProto expectations - proto := tc.input.ToProto() - test.That(t, len(proto), test.ShouldEqual, len(tc.input.Binaries)) - for j := range tc.input.Binaries { + // confirm response conforms to ToProto expectations + proto := res.ToProto() + test.That(t, len(proto), test.ShouldEqual, len(res.Binaries)) + for j := range res.Binaries { test.That(t, proto[j].Metadata, test.ShouldResemble, &datasyncPB.SensorMetadata{ TimeRequested: timestamppb.New(timeRequested.UTC()), TimeReceived: timestamppb.New(timeReceived.UTC()), - MimeType: tc.input.Binaries[j].MimeType.ToProto(), - Annotations: tc.input.Binaries[j].Annotations.ToProto(), + MimeType: res.Binaries[j].MimeType.ToProto(), + Annotations: res.Binaries[j].Annotations.ToProto(), }) test.That(t, proto[j].Data, test.ShouldResemble, &datasyncPB.SensorData_Binary{ - Binary: tc.input.Binaries[j].Payload, + Binary: res.Binaries[j].Payload, }) } } diff --git a/testutils/file_utils.go b/testutils/file_utils.go index a160fa45141..452d035afa6 100644 --- a/testutils/file_utils.go +++ b/testutils/file_utils.go @@ -205,18 +205,16 @@ func (m *MockBuffer) WriteBinary(items []*v1.SensorData) error { } // WriteTabular writes tabular sensor data to the Writes channel. -func (m *MockBuffer) WriteTabular(items []*v1.SensorData) error { +func (m *MockBuffer) WriteTabular(item *v1.SensorData) error { if err := m.ctx.Err(); err != nil { return err } - for i, item := range items { - if isBinary(item) { - m.t.Errorf("MockBuffer.WriteTabular called with binary data. index: %d, items: %#v\n", i, items) - m.t.FailNow() - } + if isBinary(item) { + m.t.Errorf("MockBuffer.WriteTabular called with binary data. item: %#v\n", item) + m.t.FailNow() } select { - case m.Writes <- items: + case m.Writes <- []*v1.SensorData{item}: case <-m.ctx.Done(): } return nil