diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index ad074b811629..786240650835 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -210,12 +210,18 @@ func (gt *grpcTransport) sendBatch( // If we queried a remote node, perform extra validation and // import trace spans. if reply != nil && !rpc.IsLocal(iface) { - for i := range reply.Responses { - if err := reply.Responses[i].GetInner().Verify(ba.Requests[i].GetInner()); err != nil { - log.Errorf(ctx, "%v", err) + if err == nil { + for i := range reply.Responses { + err = reply.Responses[i].GetInner().Verify(ba.Requests[i].GetInner()) + if err != nil { + log.Errorf(ctx, "verification of response for %s failed: %v", ba.Requests[i].GetInner(), err) + break + } } } - // Import the remotely collected spans, if any. + + // Import the remotely collected spans, if any. Do this on error too, + // to get traces in that case as well (or to at least have a chance). if len(reply.CollectedSpans) != 0 { span := tracing.SpanFromContext(ctx) if span == nil { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 6a254903ab59..7e29c58ffd34 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -141,10 +141,43 @@ func TestSpanImport(t *testing.T) { } } +func TestResponseVerifyFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + metrics := makeDistSenderMetrics() + gt := grpcTransport{ + opts: SendOptions{ + metrics: &metrics, + }, + } + + ba := &roachpb.BatchRequest{} + req := roachpb.NewScan(roachpb.KeyMin, roachpb.KeyMax, false /* forUpdate */) + ba.Add(req) + br := ba.CreateReply() + resp := br.Responses[0].GetInner().(*roachpb.ScanResponse) + val := roachpb.MakeValueFromString("hi") + val.InitChecksum(roachpb.Key("not the right key")) + resp.Rows = append(resp.Rows, roachpb.KeyValue{ + Key: roachpb.Key("x"), + Value: val, + }) + require.Error(t, resp.Verify(req)) // we set this up to fail + + server := mockInternalClient{ + br: br, + } + + _, err := gt.sendBatch(ctx, roachpb.NodeID(1), &server, ba) + require.ErrorContains(t, err, "invalid checksum") +} + // mockInternalClient is an implementation of roachpb.InternalClient. // It simulates aspects of how the Node normally handles tracing in gRPC calls. type mockInternalClient struct { tr *tracing.Tracer + br *roachpb.BatchResponse pErr *roachpb.Error } @@ -160,15 +193,24 @@ func (*mockInternalClient) ResetQuorum( func (m *mockInternalClient) Batch( ctx context.Context, in *roachpb.BatchRequest, opts ...grpc.CallOption, ) (*roachpb.BatchResponse, error) { - sp := m.tr.StartSpan("mock", tracing.WithRecording(tracingpb.RecordingVerbose)) - defer sp.Finish() - ctx = tracing.ContextWithSpan(ctx, sp) + var sp *tracing.Span + if m.tr != nil { + sp = m.tr.StartSpan("mock", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer sp.Finish() + ctx = tracing.ContextWithSpan(ctx, sp) + } log.Eventf(ctx, "mockInternalClient processing batch") - br := &roachpb.BatchResponse{} + br := m.br + if br == nil { + br = &roachpb.BatchResponse{} + } br.Error = m.pErr - if rec := sp.GetConfiguredRecording(); rec != nil { - br.CollectedSpans = append(br.CollectedSpans, rec...) + + if sp != nil { + if rec := sp.GetConfiguredRecording(); rec != nil { + br.CollectedSpans = append(br.CollectedSpans, rec...) + } } return br, nil }