Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DATA-3338] fix-stability-of-vision-capture-all-from-camera-and-camera-get-images #4557

Merged
27 changes: 17 additions & 10 deletions components/arm/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package arm
import (
"context"
"errors"
"time"

v1 "go.viam.com/api/common/v1"
pb "go.viam.com/api/component/arm/v1"
Expand Down Expand Up @@ -39,18 +40,21 @@ func newEndPositionCollector(resource interface{}, params data.CollectorParams)
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
v, err := arm.EndPosition(ctx, data.FromDMExtraMap)
if err != nil {
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, endPosition.String(), err)
return res, data.FailedToReadErr(params.ComponentName, endPosition.String(), err)
}
o := v.Orientation().OrientationVectorDegrees()
return pb.GetEndPositionResponse{
ts := data.Timestamps{TimeRequested: timeRequested, TimeReceived: time.Now()}
return data.NewTabularCaptureResult(ts, pb.GetEndPositionResponse{
Pose: &v1.Pose{
X: v.Point().X,
Y: v.Point().Y,
Expand All @@ -60,7 +64,7 @@ func newEndPositionCollector(resource interface{}, params data.CollectorParams)
OZ: o.OZ,
Theta: o.Theta,
},
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand All @@ -73,21 +77,24 @@ func newJointPositionsCollector(resource interface{}, params data.CollectorParam
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
v, err := arm.JointPositions(ctx, data.FromDMExtraMap)
if err != nil {
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
return res, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
}
jp, err := referenceframe.JointPositionsFromInputs(arm.ModelFrame(), v)
if err != nil {
return nil, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
return res, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
}
return pb.GetJointPositionsResponse{Positions: jp}, nil
ts := data.Timestamps{TimeRequested: timeRequested, TimeReceived: time.Now()}
return data.NewTabularCaptureResult(ts, pb.GetJointPositionsResponse{Positions: jp})
})
return data.NewCollector(cFunc, params)
}
Expand Down
13 changes: 7 additions & 6 deletions components/arm/collectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func TestCollectors(t *testing.T) {
tests := []struct {
name string
collector data.CollectorConstructor
expected *datasyncpb.SensorData
expected []*datasyncpb.SensorData
}{
{
name: "End position collector should write a pose",
collector: arm.NewEndPositionCollector,
expected: &datasyncpb.SensorData{
expected: []*datasyncpb.SensorData{{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{
"pose": map[string]any{
Expand All @@ -49,27 +49,28 @@ func TestCollectors(t *testing.T) {
"z": 3,
},
})},
},
}},
},
{
name: "Joint positions collector should write a list of positions",
collector: arm.NewJointPositionsCollector,
expected: &datasyncpb.SensorData{
expected: []*datasyncpb.SensorData{{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{
"positions": map[string]any{
"values": []any{1.0, 2.0, 3.0},
},
})},
},
}},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
start := time.Now()
buf := tu.NewMockBuffer()
buf := tu.NewMockBuffer(t)
params := data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand Down
32 changes: 20 additions & 12 deletions components/board/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package board

import (
"context"
"time"

"github.com/pkg/errors"
pb "go.viam.com/api/component/board/v1"
Expand Down Expand Up @@ -39,10 +40,12 @@ func newAnalogCollector(resource interface{}, params data.CollectorParams) (data
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
var analogValue AnalogValue
if _, ok := arg[analogReaderNameKey]; !ok {
return nil, data.FailedToReadErr(params.ComponentName, analogs.String(),
return res, data.FailedToReadErr(params.ComponentName, analogs.String(),
errors.New("Must supply reader_name in additional_params for analog collector"))
}
if reader, err := board.AnalogByName(arg[analogReaderNameKey].String()); err == nil {
Expand All @@ -51,17 +54,19 @@ func newAnalogCollector(resource interface{}, params data.CollectorParams) (data
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, analogs.String(), err)
return res, data.FailedToReadErr(params.ComponentName, analogs.String(), err)
}
}
return pb.ReadAnalogReaderResponse{

ts := data.Timestamps{TimeRequested: timeRequested, TimeReceived: time.Now()}
return data.NewTabularCaptureResult(ts, pb.ReadAnalogReaderResponse{
Value: int32(analogValue.Value),
MinRange: analogValue.Min,
MaxRange: analogValue.Max,
StepSize: analogValue.StepSize,
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand All @@ -74,10 +79,12 @@ func newGPIOCollector(resource interface{}, params data.CollectorParams) (data.C
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
var value bool
if _, ok := arg[gpioPinNameKey]; !ok {
return nil, data.FailedToReadErr(params.ComponentName, gpios.String(),
return res, data.FailedToReadErr(params.ComponentName, gpios.String(),
errors.New("Must supply pin_name in additional params for gpio collector"))
}
if gpio, err := board.GPIOPinByName(arg[gpioPinNameKey].String()); err == nil {
Expand All @@ -86,14 +93,15 @@ func newGPIOCollector(resource interface{}, params data.CollectorParams) (data.C
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, gpios.String(), err)
return res, data.FailedToReadErr(params.ComponentName, gpios.String(), err)
}
}
return pb.GetGPIOResponse{
ts := data.Timestamps{TimeRequested: timeRequested, TimeReceived: time.Now()}
return data.NewTabularCaptureResult(ts, pb.GetGPIOResponse{
High: value,
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand Down
14 changes: 8 additions & 6 deletions components/board/collectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ func TestCollectors(t *testing.T) {
name string
params data.CollectorParams
collector data.CollectorConstructor
expected *datasyncpb.SensorData
expected []*datasyncpb.SensorData
}{
{
name: "Board analog collector should write an analog response",
params: data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand All @@ -43,19 +44,20 @@ func TestCollectors(t *testing.T) {
},
},
collector: board.NewAnalogCollector,
expected: &datasyncpb.SensorData{
expected: []*datasyncpb.SensorData{{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{
"value": 1,
"min_range": 0,
"max_range": 10,
"step_size": float64(float32(0.1)),
})},
},
}},
},
{
name: "Board gpio collector should write a gpio response",
params: data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand All @@ -64,19 +66,19 @@ func TestCollectors(t *testing.T) {
},
},
collector: board.NewGPIOCollector,
expected: &datasyncpb.SensorData{
expected: []*datasyncpb.SensorData{{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: tu.ToStructPBStruct(t, map[string]any{
"high": true,
})},
},
}},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
start := time.Now()
buf := tu.NewMockBuffer()
buf := tu.NewMockBuffer(t)
tc.params.Clock = clock.New()
tc.params.Target = buf

Expand Down
Loading
Loading