Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: incorporate remote tracing spans from snapshots #86436

Merged
merged 1 commit into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ message SnapshotResponse {
Status status = 1;
string message = 2;
reserved 3;

// Traces from snapshot processing, returned on status APPLIED or ERROR.
repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false];
}

// DelegateSnapshotRequest is the request used to delegate send snapshot requests.
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/storage_services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ import "kv/kvserver/api.proto";

service MultiRaft {
rpc RaftMessageBatch (stream cockroach.kv.kvserver.kvserverpb.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.kvserverpb.RaftMessageResponse) {}
// RaftSnapshot asks the server to accept and apply a range snapshot.
// The client is expected to initially send a message consisting solely of
// a Header, upon which the server will respond with a message with status
// ACCEPTED, or ERROR if it cannot accept the snapshot. Once accepted, the
// client will send multiple messages with KVBatch data followed by a
// terminal message with the final flag set to true. Once finalized,
// the server will ultimately send a message back with status APPLIED, or
// ERROR, including any collected traces from processing.
rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {}
// DelegateRaftSnapshot asks the server to send a range snapshot to a target
// (so the client delegates the sending of the snapshot to the server). The
Expand Down
45 changes: 28 additions & 17 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -906,6 +907,8 @@ func (s *Store) checkSnapshotOverlapLocked(
func (s *Store) receiveSnapshot(
ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream,
) error {
sp := tracing.SpanFromContext(ctx)

// Draining nodes will generally not be rebalanced to (see the filtering that
// happens in getStoreListFromIDsLocked()), but in case they are, they should
// reject the incoming rebalancing snapshots.
Expand Down Expand Up @@ -1028,29 +1031,43 @@ func (s *Store) receiveSnapshot(
s.metrics.RangeSnapshotUnknownRcvdBytes.Inc(inc)
}
}
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data")
ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data")
defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors
inSnap, err := ss.Receive(ctx, stream, *header, recordBytesReceived)
sp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors
if err != nil {
return err
}
inSnap.placeholder = placeholder

rec := sp.GetConfiguredRecording()

// Use a background context for applying the snapshot, as handleRaftReady is
// not prepared to deal with arbitrary context cancellation. Also, we've
// already received the entire snapshot here, so there's no point in
// abandoning application half-way through if the caller goes away.
applyCtx := s.AnnotateCtx(context.Background())
if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil {
return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot"))
return sendSnapshotErrorWithTrace(stream,
errors.Wrap(err.GoError(), "failed to apply snapshot"), rec,
)
}
return stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_APPLIED})
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_APPLIED,
CollectedSpans: rec,
})
}

func sendSnapshotError(stream incomingSnapshotStream, err error) error {
return sendSnapshotErrorWithTrace(stream, err, nil /* trace */)
}

func sendSnapshotErrorWithTrace(
stream incomingSnapshotStream, err error, trace tracingpb.Recording,
) error {
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
CollectedSpans: trace,
})
}

Expand Down Expand Up @@ -1449,6 +1466,7 @@ func sendSnapshot(
}
switch resp.Status {
case kvserverpb.SnapshotResponse_ERROR:
sp.ImportRemoteRecording(resp.CollectedSpans)
storePool.Throttle(storepool.ThrottleFailed, resp.Message, to.StoreID)
return errors.Errorf("%s: remote couldn't accept %s with error: %s",
to, snap, resp.Message)
Expand Down Expand Up @@ -1526,6 +1544,7 @@ func sendSnapshot(
if err != nil {
return errors.Wrapf(err, "%s: remote failed to apply snapshot", to)
}
sp.ImportRemoteRecording(resp.CollectedSpans)
// NB: wait for EOF which ensures that all processing on the server side has
// completed (such as defers that might be run after the previous message was
// received).
Expand Down Expand Up @@ -1601,17 +1620,9 @@ func delegateSnapshot(
unexpectedResp,
)
}
// Import the remotely collected spans, if any.
if len(resp.CollectedSpans) != 0 {
span := tracing.SpanFromContext(ctx)
if span == nil {
log.Warningf(
ctx,
"trying to ingest remote spans but there is no recording span set up",
)
} else {
span.ImportRemoteRecording(resp.CollectedSpans)
}
sp := tracing.SpanFromContext(ctx)
if sp != nil {
sp.ImportRemoteRecording(resp.CollectedSpans)
}
switch resp.SnapResponse.Status {
case kvserverpb.SnapshotResponse_ERROR:
Expand Down