Skip to content

Commit

Permalink
kvcoord: return error on failed scan response checksum
Browse files Browse the repository at this point in the history
Found in cockroachdb#89477.

Epic: none
Release note: None
  • Loading branch information
tbg committed Dec 7, 2022
1 parent 1b4947b commit c7537a3
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 10 deletions.
14 changes: 10 additions & 4 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,18 @@ func (gt *grpcTransport) sendBatch(
// If we queried a remote node, perform extra validation and
// import trace spans.
if reply != nil && !rpc.IsLocal(iface) {
for i := range reply.Responses {
if err := reply.Responses[i].GetInner().Verify(ba.Requests[i].GetInner()); err != nil {
log.Errorf(ctx, "%v", err)
if err == nil {
for i := range reply.Responses {
err = reply.Responses[i].GetInner().Verify(ba.Requests[i].GetInner())
if err != nil {
log.Errorf(ctx, "verification of response for %s failed: %v", ba.Requests[i].GetInner(), err)
break
}
}
}
// Import the remotely collected spans, if any.

// Import the remotely collected spans, if any. Do this on error too,
// to get traces in that case as well (or to at least have a chance).
if len(reply.CollectedSpans) != 0 {
span := tracing.SpanFromContext(ctx)
if span == nil {
Expand Down
54 changes: 48 additions & 6 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,43 @@ func TestSpanImport(t *testing.T) {
}
}

func TestResponseVerifyFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
metrics := makeDistSenderMetrics()
gt := grpcTransport{
opts: SendOptions{
metrics: &metrics,
},
}

ba := &roachpb.BatchRequest{}
req := roachpb.NewScan(roachpb.KeyMin, roachpb.KeyMax, false /* forUpdate */)
ba.Add(req)
br := ba.CreateReply()
resp := br.Responses[0].GetInner().(*roachpb.ScanResponse)
val := roachpb.MakeValueFromString("hi")
val.InitChecksum(roachpb.Key("not the right key"))
resp.Rows = append(resp.Rows, roachpb.KeyValue{
Key: roachpb.Key("x"),
Value: val,
})
require.Error(t, resp.Verify(req)) // we set this up to fail

server := mockInternalClient{
br: br,
}

_, err := gt.sendBatch(ctx, roachpb.NodeID(1), &server, ba)
require.ErrorContains(t, err, "invalid checksum")
}

// mockInternalClient is an implementation of roachpb.InternalClient.
// It simulates aspects of how the Node normally handles tracing in gRPC calls.
type mockInternalClient struct {
tr *tracing.Tracer
br *roachpb.BatchResponse
pErr *roachpb.Error
}

Expand All @@ -160,15 +193,24 @@ func (*mockInternalClient) ResetQuorum(
func (m *mockInternalClient) Batch(
ctx context.Context, in *roachpb.BatchRequest, opts ...grpc.CallOption,
) (*roachpb.BatchResponse, error) {
sp := m.tr.StartSpan("mock", tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()
ctx = tracing.ContextWithSpan(ctx, sp)
var sp *tracing.Span
if m.tr != nil {
sp = m.tr.StartSpan("mock", tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()
ctx = tracing.ContextWithSpan(ctx, sp)
}

log.Eventf(ctx, "mockInternalClient processing batch")
br := &roachpb.BatchResponse{}
br := m.br
if br == nil {
br = &roachpb.BatchResponse{}
}
br.Error = m.pErr
if rec := sp.GetConfiguredRecording(); rec != nil {
br.CollectedSpans = append(br.CollectedSpans, rec...)

if sp != nil {
if rec := sp.GetConfiguredRecording(); rec != nil {
br.CollectedSpans = append(br.CollectedSpans, rec...)
}
}
return br, nil
}
Expand Down

0 comments on commit c7537a3

Please sign in to comment.