Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DATA-3338] fix-stability-of-vision-capture-all-from-camera-and-camera-get-images #4557

Merged
5 changes: 3 additions & 2 deletions components/camera/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ func newGetImagesCollector(resource interface{}, params data.CollectorParams) (d
return res, err
}
binaries = append(binaries, data.Binary{
Payload: imgBytes,
MimeType: data.CameraFormatToMimeType(format),
Annotations: data.Annotations{Classifications: []data.Classification{{Label: img.SourceName}}},
dmhilly marked this conversation as resolved.
Show resolved Hide resolved
Payload: imgBytes,
MimeType: data.CameraFormatToMimeType(format),
})
}
ts := data.Timestamps{
Expand Down
7 changes: 5 additions & 2 deletions components/camera/collectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/benbjohnson/clock"
v1 "go.viam.com/api/app/data/v1"
datasyncpb "go.viam.com/api/app/datasync/v1"
"go.viam.com/test"
"go.viam.com/utils/artifact"
Expand Down Expand Up @@ -123,13 +124,15 @@ func TestCollectors(t *testing.T) {
expected: []*datasyncpb.SensorData{
{
Metadata: &datasyncpb.SensorMetadata{
MimeType: datasyncpb.MimeType_MIME_TYPE_IMAGE_JPEG,
MimeType: datasyncpb.MimeType_MIME_TYPE_IMAGE_JPEG,
Annotations: &v1.Annotations{Classifications: []*v1.Classification{{Label: "left"}}},
},
Data: &datasyncpb.SensorData_Binary{Binary: viamLogoJpeg},
},
{
Metadata: &datasyncpb.SensorMetadata{
MimeType: datasyncpb.MimeType_MIME_TYPE_IMAGE_JPEG,
MimeType: datasyncpb.MimeType_MIME_TYPE_IMAGE_JPEG,
Annotations: &v1.Annotations{Classifications: []*v1.Classification{{Label: "right"}}},
},
Data: &datasyncpb.SensorData_Binary{Binary: viamLogoJpeg},
},
Expand Down
16 changes: 6 additions & 10 deletions data/capture_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// CaptureBufferedWriter is a buffered, persistent queue of SensorData.
type CaptureBufferedWriter interface {
WriteBinary(items []*v1.SensorData) error
WriteTabular(items []*v1.SensorData) error
WriteTabular(items *v1.SensorData) error
Flush() error
Path() string
}
Expand Down Expand Up @@ -76,14 +76,12 @@ func (b *CaptureBuffer) WriteBinary(items []*v1.SensorData) error {
// '.prog'.
// Files that have finished being written to are indicated by
// '.capture'.
func (b *CaptureBuffer) WriteTabular(items []*v1.SensorData) error {
func (b *CaptureBuffer) WriteTabular(item *v1.SensorData) error {
b.lock.Lock()
defer b.lock.Unlock()

for _, item := range items {
if IsBinary(item) {
return errInvalidTabularSensorData
}
if IsBinary(item) {
return errInvalidTabularSensorData
}

if b.nextFile == nil {
Expand All @@ -103,10 +101,8 @@ func (b *CaptureBuffer) WriteTabular(items []*v1.SensorData) error {
b.nextFile = nextFile
}

for _, item := range items {
if err := b.nextFile.WriteNext(item); err != nil {
return err
}
if err := b.nextFile.WriteNext(item); err != nil {
return err
}

return nil
Expand Down
24 changes: 12 additions & 12 deletions data/capture_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestCaptureQueue(t *testing.T) {
err := sut.WriteBinary([]*v1.SensorData{binarySensorData})
test.That(t, err, test.ShouldBeNil)
case tc.dataType == CaptureTypeTabular.ToProto():
err := sut.WriteTabular([]*v1.SensorData{structSensorData})
err := sut.WriteTabular(structSensorData)
test.That(t, err, test.ShouldBeNil)
default:
t.Error("unknown data type")
Expand Down Expand Up @@ -250,15 +250,15 @@ func TestCaptureBufferReader(t *testing.T) {
now := time.Now()
timeRequested := timestamppb.New(now.UTC())
timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC())
msg := []*v1.SensorData{{
msg := &v1.SensorData{
Metadata: &v1.SensorMetadata{
TimeRequested: timeRequested,
TimeReceived: timeReceived,
},
Data: &v1.SensorData_Struct{
Struct: tc.readings[0],
},
}}
}
test.That(t, b.WriteTabular(msg), test.ShouldBeNil)
test.That(t, b.Flush(), test.ShouldBeNil)
dirEntries, err := os.ReadDir(b.Path())
Expand All @@ -275,37 +275,37 @@ func TestCaptureBufferReader(t *testing.T) {

sd, err := cf.ReadNext()
test.That(t, err, test.ShouldBeNil)
test.That(t, sd, test.ShouldResemble, msg[0])
test.That(t, sd, test.ShouldResemble, msg)

_, err = cf.ReadNext()
test.That(t, err, test.ShouldBeError, io.EOF)

now = time.Now()
timeRequested = timestamppb.New(now.UTC())
timeReceived = timestamppb.New(now.Add(time.Millisecond).UTC())
msg2 := []*v1.SensorData{{
msg2 := &v1.SensorData{
Metadata: &v1.SensorMetadata{
TimeRequested: timeRequested,
TimeReceived: timeReceived,
},
Data: &v1.SensorData_Struct{
Struct: tc.readings[1],
},
}}
}
test.That(t, b.WriteTabular(msg2), test.ShouldBeNil)

now = time.Now()
timeRequested = timestamppb.New(now.UTC())
timeReceived = timestamppb.New(now.Add(time.Millisecond).UTC())
msg3 := []*v1.SensorData{{
msg3 := &v1.SensorData{
Metadata: &v1.SensorMetadata{
TimeRequested: timeRequested,
TimeReceived: timeReceived,
},
Data: &v1.SensorData_Struct{
Struct: tc.readings[2],
},
}}
}
test.That(t, b.WriteTabular(msg3), test.ShouldBeNil)

dirEntries2, err := os.ReadDir(b.Path())
Expand Down Expand Up @@ -343,11 +343,11 @@ func TestCaptureBufferReader(t *testing.T) {

sd2, err := cf2.ReadNext()
test.That(t, err, test.ShouldBeNil)
test.That(t, sd2, test.ShouldResemble, msg2[0])
test.That(t, sd2, test.ShouldResemble, msg2)

sd3, err := cf2.ReadNext()
test.That(t, err, test.ShouldBeNil)
test.That(t, sd3, test.ShouldResemble, msg3[0])
test.That(t, sd3, test.ShouldResemble, msg3)

_, err = cf2.ReadNext()
test.That(t, err, test.ShouldBeError, io.EOF)
Expand Down Expand Up @@ -391,15 +391,15 @@ func TestCaptureBufferReader(t *testing.T) {
now := time.Now()
timeRequested := timestamppb.New(now.UTC())
timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC())
msg := []*v1.SensorData{{
msg := &v1.SensorData{
Metadata: &v1.SensorMetadata{
TimeRequested: timeRequested,
TimeReceived: timeReceived,
},
Data: &v1.SensorData_Binary{
Binary: []byte("this is a fake image"),
},
}}
}
test.That(t, b.WriteTabular(msg), test.ShouldBeError, errInvalidTabularSensorData)
test.That(t, b.Flush(), test.ShouldBeNil)
dirEntries, err := os.ReadDir(b.Path())
Expand Down
10 changes: 7 additions & 3 deletions data/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.opencensus.io/trace"
"go.viam.com/utils"
"go.viam.com/utils/protoutils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"

"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -285,7 +283,13 @@ func (c *collector) writeCaptureResults() {

switch msg.Type {
case CaptureTypeTabular:
if err := c.target.WriteTabular(proto); err != nil {
if len(proto) != 1 {
// This is impossible and could only happen if a future code change breaks CaptureResult.ToProto()
err := errors.New("tabular CaptureResult returned more than one tabular result")
c.logger.Error(errors.Wrap(err, fmt.Sprintf("failed to write tabular data to prog file %s", c.target.Path())).Error())
return
}
if err := c.target.WriteTabular(proto[0]); err != nil {
c.logger.Error(errors.Wrap(err, fmt.Sprintf("failed to write tabular data to prog file %s", c.target.Path())).Error())
return
}
Expand Down
5 changes: 2 additions & 3 deletions data/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ func TestLogErrorsOnlyOnce(t *testing.T) {
c.Collect()
mockClock.Add(interval * 5)

// close(wrote)
test.That(t, logs.FilterLevelExact(zapcore.ErrorLevel).Len(), test.ShouldEqual, 1)
mockClock.Add(3 * time.Second)
test.That(t, logs.FilterLevelExact(zapcore.ErrorLevel).Len(), test.ShouldEqual, 2)
Expand Down Expand Up @@ -388,8 +387,8 @@ func (b *signalingBuffer) WriteBinary(items []*v1.SensorData) error {
return ret
}

func (b *signalingBuffer) WriteTabular(items []*v1.SensorData) error {
ret := b.bw.WriteTabular(items)
func (b *signalingBuffer) WriteTabular(item *v1.SensorData) error {
ret := b.bw.WriteTabular(item)
select {
case b.wrote <- struct{}{}:
case <-b.ctx.Done():
Expand Down
91 changes: 57 additions & 34 deletions data/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ func NewTabularCaptureResultReadings(ts Timestamps, readings map[string]interfac
TabularData: TabularData{
Payload: &structpb.Struct{
Fields: map[string]*structpb.Value{
// In previous versions of the code, we decided to special-case the
// GetReadingsResponse because it already contains
// structpb.Values in it, and the StructToStructPb logic at the time
// didnt't handle that cleanly.
// With the clarity of hindsight, this decision was a mistake as
// it would actually have been easy to convert the
// readings map[string]interface{} into a structpb.Struct (removing the
// need for a top level "readings" key for all future readings responses).
// We didn't know that at the time.
// Unfortunately this top level key needs to be maintained for backwards
// compatibility.
// C'est la vie.
nicksanford marked this conversation as resolved.
Show resolved Hide resolved
"readings": structpb.NewStructValue(&structpb.Struct{Fields: values}),
dmhilly marked this conversation as resolved.
Show resolved Hide resolved
},
},
Expand Down Expand Up @@ -87,43 +99,47 @@ func NewTabularCaptureResult(ts Timestamps, i interface{}) (CaptureResult, error
// ToProto converts a CaptureResult into a []*datasyncPB.SensorData{}.
func (cr *CaptureResult) ToProto() []*datasyncPB.SensorData {
ts := cr.Timestamps
if td := cr.TabularData.Payload; td != nil {
if cr.Type == CaptureTypeTabular {
return []*datasyncPB.SensorData{{
Metadata: &datasyncPB.SensorMetadata{
TimeRequested: timestamppb.New(ts.TimeRequested.UTC()),
TimeReceived: timestamppb.New(ts.TimeReceived.UTC()),
},
Data: &datasyncPB.SensorData_Struct{
Struct: td,
Struct: cr.TabularData.Payload,
},
}}
}

var sd []*datasyncPB.SensorData
for _, b := range cr.Binaries {
sd = append(sd, &datasyncPB.SensorData{
Metadata: &datasyncPB.SensorMetadata{
TimeRequested: timestamppb.New(ts.TimeRequested.UTC()),
TimeReceived: timestamppb.New(ts.TimeReceived.UTC()),
MimeType: b.MimeType.ToProto(),
Annotations: b.Annotations.ToProto(),
},
Data: &datasyncPB.SensorData_Binary{
Binary: b.Payload,
},
})
if cr.Type == CaptureTypeBinary {
var sd []*datasyncPB.SensorData
for _, b := range cr.Binaries {
sd = append(sd, &datasyncPB.SensorData{
Metadata: &datasyncPB.SensorMetadata{
TimeRequested: timestamppb.New(ts.TimeRequested.UTC()),
TimeReceived: timestamppb.New(ts.TimeReceived.UTC()),
MimeType: b.MimeType.ToProto(),
Annotations: b.Annotations.ToProto(),
},
Data: &datasyncPB.SensorData_Binary{
Binary: b.Payload,
},
})
}
return sd
}
return sd

// This should never happen
return nil
}

// Validate returns an error if the *CaptureResult is invalid.
func (cr *CaptureResult) Validate() error {
var ts Timestamps
if cr.Timestamps.TimeRequested == ts.TimeRequested {
if cr.Timestamps.TimeRequested.IsZero() {
return errors.New("Timestamps.TimeRequested must be set")
}

if cr.Timestamps.TimeReceived == ts.TimeReceived {
if cr.Timestamps.TimeReceived.IsZero() {
return errors.New("Timestamps.TimeRequested must be set")
}

Expand Down Expand Up @@ -206,7 +222,7 @@ type Timestamps struct {
type MimeType int

// This follows the mime types supported in
// https://github.com/viamrobotics/api/pull/571/files#diff-b77927298d8d5d5228beeea47bd0860d9b322b4f3ef45e129bc238ec17704826R75
// https://github.com/viamrobotics/api/blob/d7436a969cbc03bf7e84bb456b6dbfeb51f664d7/proto/viam/app/datasync/v1/data_sync.proto#L69
const (
// MimeTypeUnspecified means that the mime type was not specified.
MimeTypeUnspecified MimeType = iota
Expand Down Expand Up @@ -260,8 +276,10 @@ func CameraFormatToMimeType(f camerapb.Format) MimeType {
case camerapb.Format_FORMAT_PNG:
return MimeTypeImagePng
case camerapb.Format_FORMAT_RAW_RGBA:
// TODO: https://viam.atlassian.net/browse/DATA-3497
fallthrough
case camerapb.Format_FORMAT_RAW_DEPTH:
// TODO: https://viam.atlassian.net/browse/DATA-3497
fallthrough
default:
return MimeTypeUnspecified
Expand Down Expand Up @@ -357,39 +375,44 @@ func (mt Annotations) ToProto() *dataPB.Annotations {
}

const (
extDefault = ""
extDat = ".dat"
extPcd = ".pcd"
extJpeg = ".jpeg"
extPng = ".png"
// ExtDefault is the default file extension.
ExtDefault = ""
// ExtDat is the file extension for tabular data.
ExtDat = ".dat"
// ExtPcd is the file extension for pcd files.
ExtPcd = ".pcd"
// ExtJpeg is the file extension for jpeg files.
ExtJpeg = ".jpeg"
// ExtPng is the file extension for png files.
ExtPng = ".png"
)

// getFileExt gets the file extension for a capture file.
func getFileExt(dataType CaptureType, methodName string, parameters map[string]string) string {
switch dataType {
case CaptureTypeTabular:
return extDat
return ExtDat
case CaptureTypeBinary:
if methodName == nextPointCloud {
return extPcd
return ExtPcd
}
if methodName == readImage {
// TODO: Add explicit file extensions for all mime types.
switch parameters["mime_type"] {
case rutils.MimeTypeJPEG:
return extJpeg
return ExtJpeg
case rutils.MimeTypePNG:
return extPng
return ExtPng
case rutils.MimeTypePCD:
return extPcd
return ExtPcd
default:
return extDefault
return ExtDefault
}
}
case CaptureTypeUnspecified:
return extDefault
return ExtDefault
default:
return extDefault
return ExtDefault
}
return extDefault
return ExtDefault
}
Loading
Loading