Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ go_library(
"@com_github_prometheus_client_model//go",
"@com_github_raduberinde_btreemap//:btreemap",
"@io_opentelemetry_go_otel//attribute",
"@io_storj_drpc//:drpc",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_time//rate",
],
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2482,6 +2482,7 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
nodedialer.New(tc.Servers[0].RPCContext(),
gossip.AddressResolver(tc.Servers[0].GossipI().(*gossip.Gossip))),
nil, /* grpcServer */
nil, /* drpcServer */
(*node_rac2.AdmittedPiggybacker)(nil),
nil, /* PiggybackedAdmittedResponseScheduler */
nil, /* knobs */
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3436,6 +3436,7 @@ func TestReplicaGCRace(t *testing.T) {
tc.Servers[0].Clock(),
nodedialer.New(tc.Servers[0].RPCContext(), gossip.AddressResolver(fromStore.Gossip())),
nil, /* grpcServer */
nil, /* drpcServer */
(*node_rac2.AdmittedPiggybacker)(nil),
nil, /* PiggybackedAdmittedResponseScheduler */
nil, /* knobs */
Expand Down Expand Up @@ -3828,6 +3829,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
nodedialer.New(tc.Servers[0].RPCContext(),
gossip.AddressResolver(tc.GetFirstStoreFromServer(t, 0).Gossip())),
nil, /* grpcServer */
nil, /* drpcServer */
(*node_rac2.AdmittedPiggybacker)(nil),
nil, /* PiggybackedAdmittedResponseScheduler */
nil, /* knobs */
Expand Down
59 changes: 54 additions & 5 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
drpc "storj.io/drpc"
)

const (
Expand Down Expand Up @@ -78,7 +79,7 @@ type RaftMessageResponseStream interface {
// Send. Note that the default implementation of grpc.Stream for server
// responses (grpc.serverStream) is not safe for concurrent calls to Send.
type lockedRaftMessageResponseStream struct {
wrapped MultiRaft_RaftMessageBatchServer
wrapped RPCMultiRaft_RaftMessageBatchStream
sendMu syncutil.Mutex
}

Expand Down Expand Up @@ -235,9 +236,7 @@ func NewDummyRaftTransport(
resolver := func(roachpb.NodeID) (net.Addr, roachpb.Locality, error) {
return nil, roachpb.Locality{}, errors.New("dummy resolver")
}
return NewRaftTransport(ambient, st, nil, clock, nodedialer.New(nil, resolver), nil,
nil, nil, nil,
)
return NewRaftTransport(ambient, st, nil, clock, nodedialer.New(nil, resolver), nil, nil, nil, nil, nil)
}

// NewRaftTransport creates a new RaftTransport.
Expand All @@ -248,6 +247,7 @@ func NewRaftTransport(
clock *hlc.Clock,
dialer *nodedialer.Dialer,
grpcServer *grpc.Server,
drpcMux drpc.Mux,
piggybackReader node_rac2.PiggybackMsgReader,
piggybackedResponseScheduler PiggybackedAdmittedResponseScheduler,
knobs *RaftTransportTestingKnobs,
Expand All @@ -271,6 +271,9 @@ func NewRaftTransport(
if grpcServer != nil {
RegisterMultiRaftServer(grpcServer, t)
}
if drpcMux != nil {
_ = DRPCRegisterMultiRaft(drpcMux, t.AsDRPCServer())
}
return t
}

Expand Down Expand Up @@ -375,8 +378,29 @@ func newRaftMessageResponse(
return resp
}

type drpcRaftTransport RaftTransport

// AsDRPCServer returns the DRPC server implementation for the Raft service.
func (t *RaftTransport) AsDRPCServer() DRPCMultiRaftServer {
return (*drpcRaftTransport)(t)
}

// RaftMessageBatch proxies the incoming requests to the listening server interface.
func (t *drpcRaftTransport) RaftMessageBatch(
stream DRPCMultiRaft_RaftMessageBatchStream,
) (lastErr error) {
return (*RaftTransport)(t).raftMessageBatch(stream)
}

// RaftMessageBatch proxies the incoming requests to the listening server interface.
func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer) (lastErr error) {
return t.raftMessageBatch(stream)
}

// raftMessageBatch is the shared implementation for RaftMessageBatch for both gRPC and DRPC.
func (t *RaftTransport) raftMessageBatch(
stream RPCMultiRaft_RaftMessageBatchStream,
) (lastErr error) {
errCh := make(chan error, 1)

// Node stopping error is caught below in the select.
Expand Down Expand Up @@ -444,10 +468,25 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
}
}

// DelegateRaftSnapshot handles incoming delegated snapshot requests and passes
// the request to pass off to the sender store. Errors during the snapshots
// process are sent back as a response.
func (t *drpcRaftTransport) DelegateRaftSnapshot(
stream DRPCMultiRaft_DelegateRaftSnapshotStream,
) error {
return (*RaftTransport)(t).delegateRaftSnapshot(stream)
}

// DelegateRaftSnapshot handles incoming delegated snapshot requests and passes
// the request to pass off to the sender store. Errors during the snapshots
// process are sent back as a response.
func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapshotServer) error {
return t.delegateRaftSnapshot(stream)
}

// delegateRaftSnapshot is the shared implementation for DelegateRaftSnapshot
// for both gRPC and DRPC.
func (t *RaftTransport) delegateRaftSnapshot(stream RPCMultiRaft_DelegateRaftSnapshotStream) error {
ctx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context())
defer cancel()
req, err := stream.Recv()
Expand Down Expand Up @@ -494,8 +533,18 @@ func (t *RaftTransport) InternalDelegateRaftSnapshot(
return incomingMessageHandler.HandleDelegatedSnapshot(ctx, req)
}

// RaftSnapshot handles incoming streaming snapshot requests.
func (t *drpcRaftTransport) RaftSnapshot(stream DRPCMultiRaft_RaftSnapshotStream) error {
return (*RaftTransport)(t).raftSnapshot(stream)
}

// RaftSnapshot handles incoming streaming snapshot requests.
func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error {
return t.raftSnapshot(stream)
}

// raftSnapshot is the shared implementation for RaftSnapshot for both gRPC and DRPC.
func (t *RaftTransport) raftSnapshot(stream RPCMultiRaft_RaftSnapshotStream) error {
ctx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context())
defer cancel()
req, err := stream.Recv()
Expand Down Expand Up @@ -547,7 +596,7 @@ func (t *RaftTransport) StopOutgoingMessage(storeID roachpb.StoreID) {
// lost and a new instance of processQueue will be started by the next message
// to be sent.
func (t *RaftTransport) processQueue(
q *raftSendQueue, stream MultiRaft_RaftMessageBatchClient, _ rpcbase.ConnectionClass,
q *raftSendQueue, stream RPCMultiRaft_RaftMessageBatchClient, _ rpcbase.ConnectionClass,
) error {
errCh := make(chan error, 1)

Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ func (rttc *raftTransportTestContext) AddNodeWithoutGossip(
manual := hlc.NewHybridManualClock()
clock := hlc.NewClockForTesting(manual)
rttc.clocks[nodeID] = clockWithManualSource{manual: manual, clock: clock}
grpcServer, err := rpc.NewServer(context.Background(), rttc.nodeRPCContext)
ctx := context.Background()
grpcServer, err := rpc.NewServer(ctx, rttc.nodeRPCContext)
require.NoError(rttc.t, err)
drpcServer, err := rpc.NewDRPCServer(ctx, rttc.nodeRPCContext)
require.NoError(rttc.t, err)
transport := kvserver.NewRaftTransport(
log.MakeTestingAmbientCtxWithNewTracer(),
Expand All @@ -191,6 +194,7 @@ func (rttc *raftTransportTestContext) AddNodeWithoutGossip(
clock,
nodedialer.New(rttc.nodeRPCContext, gossip.AddressResolver(rttc.gossip)),
grpcServer,
drpcServer,
piggybacker,
piggybackedResponseScheduler,
knobs,
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/raft_transport_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func TestRaftTransportStartNewQueue(t *testing.T) {

grpcServer, err := rpc.NewServer(ctx, rpcC)
require.NoError(t, err)
drpcServer, err := rpc.NewDRPCServer(ctx, rpcC)
require.NoError(t, err)
// RegisterMultiRaftServer(grpcServer, mrs)

var addr net.Addr
Expand All @@ -71,6 +73,7 @@ func TestRaftTransportStartNewQueue(t *testing.T) {
hlc.NewClockForTesting(nil),
nodedialer.New(rpcC, resolver),
grpcServer,
drpcServer,
(*node_rac2.AdmittedPiggybacker)(nil),
nil, /* PiggybackedAdmittedResponseScheduler */
nil, /* knobs */
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func createTestStoreWithoutStart(
}
rpcContext := rpc.NewContext(ctx, rpcOpts)
stopper.SetTracer(cfg.AmbientCtx.Tracer)
server, err := rpc.NewServer(ctx, rpcContext) // never started
grpcServer, err := rpc.NewServer(ctx, rpcContext) // never started
require.NoError(t, err)
drpcServer, err := rpc.NewDRPCServer(ctx, rpcContext) // never started
require.NoError(t, err)

// Some tests inject their own Gossip and StorePool, via
Expand Down Expand Up @@ -235,7 +237,8 @@ func createTestStoreWithoutStart(
stopper,
cfg.Clock,
cfg.NodeDialer,
server,
grpcServer,
drpcServer,
(*node_rac2.AdmittedPiggybacker)(nil),
nil, /* PiggybackedAdmittedResponseScheduler */
nil, /* knobs */
Expand All @@ -246,7 +249,7 @@ func createTestStoreWithoutStart(
supportGracePeriod := rpcContext.StoreLivenessWithdrawalGracePeriod()
options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod)
transport := storeliveness.NewTransport(
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, server, nil, /* knobs */
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, nil, /* knobs */
)
knobs := cfg.TestingKnobs.StoreLivenessKnobs
cfg.StoreLiveness = storeliveness.NewNodeContainer(stopper, options, transport, knobs)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
clock,
kvNodeDialer,
grpcServer.Server,
drpcServer.DRPCServer,
admittedPiggybacker,
storesForRACv2,
raftTransportKnobs,
Expand Down