Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -9222,6 +9222,16 @@ def go_deps():
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/assert/com_github_zeebo_assert-v1.3.0.zip",
],
)
go_repository(
name = "com_github_zeebo_errs",
build_file_proto_mode = "disable_global",
importpath = "github.com/zeebo/errs",
sha256 = "d2fa293e275c21bfb413e2968d79036931a55f503d8b62381563ed189b523cd2",
strip_prefix = "github.com/zeebo/errs@v1.2.2",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/errs/com_github_zeebo_errs-v1.2.2.zip",
],
)
go_repository(
name = "com_github_zeebo_xxh3",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -11355,6 +11365,16 @@ def go_deps():
"https://storage.googleapis.com/cockroach-godeps/gomod/rsc.io/sampler/io_rsc_sampler-v1.3.0.zip",
],
)
go_repository(
name = "io_storj_drpc",
build_file_proto_mode = "disable_global",
importpath = "storj.io/drpc",
sha256 = "e297ccead2763d354959a3c04b0c9c27c9c84c99d129f216ec07da663ee0091a",
strip_prefix = "storj.io/drpc@v0.0.34",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/storj.io/drpc/io_storj_drpc-v0.0.34.zip",
],
)
go_repository(
name = "io_vitess_vitess",
build_file_proto_mode = "disable_global",
Expand Down
2 changes: 2 additions & 0 deletions build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/z-division/go-zookeeper/com_github_z_division_go_zookeeper-v0.0.0-20190128072838-6d7457066b9b.zip": "b0a67a3bb3cfbb1be18618b84b02588979795966e040f18c5bb4be036888cabd",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zabawaba99/go-gitignore/com_github_zabawaba99_go_gitignore-v0.0.0-20200117185801-39e6bddfb292.zip": "6c837b93e1c73e53123941c8e866de1deae6b645cc49a7d30d493c146178f8e8",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/assert/com_github_zeebo_assert-v1.3.0.zip": "1f01421d74ff37cb8247988155be9e6877d336029bcd887a1d035fd32d7ab6ae",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/errs/com_github_zeebo_errs-v1.2.2.zip": "d2fa293e275c21bfb413e2968d79036931a55f503d8b62381563ed189b523cd2",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/xxh3/com_github_zeebo_xxh3-v1.0.2.zip": "190e5ef1f672e9321a1580bdd31c6440fde6044ca8168d2b489cf50cdc4f58a6",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zenazn/goji/com_github_zenazn_goji-v0.9.0.zip": "0807a255d9d715d18427a6eedd8e4f5a22670b09e5f45fddd229c1ae38da25a9",
"https://storage.googleapis.com/cockroach-godeps/gomod/gitlab.com/golang-commonmark/html/com_gitlab_golang_commonmark_html-v0.0.0-20191124015941-a22733972181.zip": "f2ba8985dc9d6be347a17d9200a0be0cee5ab3bce4dc601c0651a77ef2bbffc3",
Expand Down Expand Up @@ -1190,6 +1191,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/sigs.k8s.io/structured-merge-diff/v4/io_k8s_sigs_structured_merge_diff_v4-v4.1.2.zip": "b32af97dadd79179a8f62aaf4ef1e0562e051be77053a60c7a4e724a5cbd00ce",
"https://storage.googleapis.com/cockroach-godeps/gomod/sigs.k8s.io/yaml/io_k8s_sigs_yaml-v1.2.0.zip": "55ed08c5df448a033bf7e2c2912d4daa85b856a05c854b0c87ccc85c7f3fbfc7",
"https://storage.googleapis.com/cockroach-godeps/gomod/sourcegraph.com/sourcegraph/appdash/com_sourcegraph_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip": "bd2492d9db05362c2fecd0b3d0f6002c89a6d90d678fb93b4158298ab883736f",
"https://storage.googleapis.com/cockroach-godeps/gomod/storj.io/drpc/io_storj_drpc-v0.0.34.zip": "e297ccead2763d354959a3c04b0c9c27c9c84c99d129f216ec07da663ee0091a",
"https://storage.googleapis.com/public-bazel-artifacts/bazel/88ef31b429631b787ceb5e4556d773b20ad797c8.zip": "92a89a2bbe6c6db2a8b87da4ce723aff6253656e8417f37e50d362817c39b98b",
"https://storage.googleapis.com/public-bazel-artifacts/bazel/bazel-gazelle-v0.39.1.tar.gz": "b760f7fe75173886007f7c2e616a21241208f3d90e8657dc65d36a771e916b6a",
"https://storage.googleapis.com/public-bazel-artifacts/bazel/bazel-lib-v1.42.3.tar.gz": "d0529773764ac61184eb3ad3c687fb835df5bee01afedf07f0cf1a45515c96bc",
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
google.golang.org/grpc v1.56.3
google.golang.org/protobuf v1.35.1
storj.io/drpc v0.0.34
)

// If any of the following dependencies get updated as a side-effect
Expand Down Expand Up @@ -429,6 +430,7 @@ require (
github.com/twpayne/go-kml v1.5.2 // indirect
github.com/urfave/cli/v2 v2.3.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/zeebo/errs v1.2.2 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
gitlab.com/golang-commonmark/html v0.0.0-20191124015941-a22733972181 // indirect
gitlab.com/golang-commonmark/linkify v0.0.0-20191026162114-a0c2df6c8f82 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2363,6 +2363,8 @@ github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292 h1:vpcCVk+
github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292/go.mod h1:qcqv8IHwbR0JmjY1LZy4PeytlwxDPn1vUkjX7Wq0VaY=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g=
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
Expand Down Expand Up @@ -3323,3 +3325,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZa
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
storj.io/drpc v0.0.34 h1:q9zlQKfJ5A7x8NQNFk8x7eKUF78FMhmAbZLnFK+og7I=
storj.io/drpc v0.0.34/go.mod h1:Y9LZaa8esL1PW2IDMqJE7CFSNq7d5bQ3RI7mGPtmKMg=
12 changes: 9 additions & 3 deletions pkg/kv/kvpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ load(":gen.bzl", "batch_gen")
go_library(
name = "kvpb",
srcs = [
":gen-batch-generated", # keep
":gen-errordetailtype-stringer", # keep
":gen-method-stringer", # keep
"ambiguous_result_error.go",
"api.go",
# DRPC protobuf file (api_drpc.pb.go) is currently generated manually.
# TODO (chandrat): Remove this once DRPC protobuf generation is
# integrated into the build process.
"api_drpc_hacky.go",
"batch.go",
"data.go",
"errors.go",
"method.go",
"node_decommissioned_error.go",
"replica_unavailable_error.go",
":gen-batch-generated", # keep
":gen-errordetailtype-stringer", # keep
":gen-method-stringer", # keep
],
embed = [":kvpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvpb",
Expand Down Expand Up @@ -46,6 +50,8 @@ go_library(
"@com_github_gogo_protobuf//types",
"@com_github_gogo_status//:status",
"@com_github_golang_mock//gomock", # keep
"@io_storj_drpc//:drpc",
"@io_storj_drpc//drpcerr",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata", # keep
],
Expand Down
207 changes: 207 additions & 0 deletions pkg/kv/kvpb/api_drpc_hacky.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

// This file was manually generated with the DRPC protogen plugin using a dummy
// `api.proto` that includes a subset of relevant service methods.
//
// For instance, to generate this file, following proto file was used:
//
// -- api.proto -- begin --
// syntax = "proto3";
// package cockroach.kv.kvpb;
// option go_package = "github.com/cockroachdb/cockroach/pkg/kv/kvpb";
// service Batch {
// rpc Batch (BatchRequest) returns (BatchResponse) {}
// rpc BatchStream (stream BatchRequest) returns (stream BatchResponse) {}
// }
// message BatchRequest{}
// message BatchResponse{}
// -- api.proto -- end --
//
// NB: The use of empty BatchRequest and BatchResponse messages is a deliberate
// decision to avoid dependencies.
//
//
// To generate this file using DRPC protogen plugin from the dummy `api.proto`
// defined above, use the following command:
//
// ```
// protoc --gogo_out=paths=source_relative:. \
// --go-drpc_out=paths=source_relative,protolib=github.com/gogo/protobuf:. \
// api.proto
// ```
//
// NB: Make sure you have `protoc` installed and `protoc-gen-gogoroach` is
// built from $COCKROACH_SRC/pkg/cmd/protoc-gen-gogoroach.
//
// This code-gen should be automated as part of productionizing drpc.

package kvpb

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"storj.io/drpc"
"storj.io/drpc/drpcerr"
)

type drpcEncoding_File_api_proto struct{}

func (drpcEncoding_File_api_proto) Marshal(msg drpc.Message) ([]byte, error) {
return protoutil.Marshal(msg.(protoutil.Message))
}

func (drpcEncoding_File_api_proto) Unmarshal(buf []byte, msg drpc.Message) error {
return protoutil.Unmarshal(buf, msg.(protoutil.Message))
}

type DRPCBatchClient interface {
DRPCConn() drpc.Conn

Batch(ctx context.Context, in *BatchRequest) (*BatchResponse, error)
BatchStream(ctx context.Context) (DRPCBatch_BatchStreamClient, error)
}

type drpcBatchClient struct {
cc drpc.Conn
}

func NewDRPCBatchClient(cc drpc.Conn) DRPCBatchClient {
return &drpcBatchClient{cc}
}

func (c *drpcBatchClient) DRPCConn() drpc.Conn { return c.cc }

func (c *drpcBatchClient) Batch(ctx context.Context, in *BatchRequest) (*BatchResponse, error) {
out := new(BatchResponse)
err := c.cc.Invoke(ctx, "/cockroach.kv.kvpb.Batch/Batch", drpcEncoding_File_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}

func (c *drpcBatchClient) BatchStream(ctx context.Context) (DRPCBatch_BatchStreamClient, error) {
stream, err := c.cc.NewStream(ctx, "/cockroach.kv.kvpb.Batch/BatchStream", drpcEncoding_File_api_proto{})
if err != nil {
return nil, err
}
x := &drpcBatch_BatchStreamClient{stream}
return x, nil
}

type DRPCBatch_BatchStreamClient interface {
drpc.Stream
Send(*BatchRequest) error
Recv() (*BatchResponse, error)
}

type drpcBatch_BatchStreamClient struct {
drpc.Stream
}

func (x *drpcBatch_BatchStreamClient) GetStream() drpc.Stream {
return x.Stream
}

func (x *drpcBatch_BatchStreamClient) Send(m *BatchRequest) error {
return x.MsgSend(m, drpcEncoding_File_api_proto{})
}

func (x *drpcBatch_BatchStreamClient) Recv() (*BatchResponse, error) {
m := new(BatchResponse)
if err := x.MsgRecv(m, drpcEncoding_File_api_proto{}); err != nil {
return nil, err
}
return m, nil
}

func (x *drpcBatch_BatchStreamClient) RecvMsg(m *BatchResponse) error {
return x.MsgRecv(m, drpcEncoding_File_api_proto{})
}

type DRPCBatchServer interface {
Batch(context.Context, *BatchRequest) (*BatchResponse, error)
BatchStream(DRPCBatch_BatchStreamStream) error
}

type DRPCBatchUnimplementedServer struct{}

func (s *DRPCBatchUnimplementedServer) Batch(
context.Context, *BatchRequest,
) (*BatchResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}

func (s *DRPCBatchUnimplementedServer) BatchStream(DRPCBatch_BatchStreamStream) error {
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}

type DRPCBatchDescription struct{}

func (DRPCBatchDescription) NumMethods() int { return 2 }

func (DRPCBatchDescription) Method(
n int,
) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n {
case 0:
return "/cockroach.kv.kvpb.Batch/Batch", drpcEncoding_File_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCBatchServer).
Batch(
ctx,
in1.(*BatchRequest),
)
}, DRPCBatchServer.Batch, true
case 1:
return "/cockroach.kv.kvpb.Batch/BatchStream", drpcEncoding_File_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return nil, srv.(DRPCBatchServer).
BatchStream(
&drpcBatch_BatchStreamStream{in1.(drpc.Stream)},
)
}, DRPCBatchServer.BatchStream, true
default:
return "", nil, nil, nil, false
}
}

func DRPCRegisterBatch(mux drpc.Mux, impl DRPCBatchServer) error {
return mux.Register(impl, DRPCBatchDescription{})
}

type DRPCBatch_BatchStream interface {
drpc.Stream
SendAndClose(*BatchResponse) error
}

type DRPCBatch_BatchStreamStream interface {
drpc.Stream
Send(*BatchResponse) error
Recv() (*BatchRequest, error)
}

type drpcBatch_BatchStreamStream struct {
drpc.Stream
}

func (x *drpcBatch_BatchStreamStream) Send(m *BatchResponse) error {
return x.MsgSend(m, drpcEncoding_File_api_proto{})
}

func (x *drpcBatch_BatchStreamStream) Recv() (*BatchRequest, error) {
m := new(BatchRequest)
if err := x.MsgRecv(m, drpcEncoding_File_api_proto{}); err != nil {
return nil, err
}
return m, nil
}

func (x *drpcBatch_BatchStreamStream) RecvMsg(m *BatchRequest) error {
return x.MsgRecv(m, drpcEncoding_File_api_proto{})
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/loqrecovery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func visitNodeWithRetry(
// Note that we use ConnectNoBreaker here to avoid any race with probe
// running on current node and target node restarting. Errors from circuit
// breaker probes could confuse us and present node as unavailable.
conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
conn, _, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
// Nodes would contain dead nodes that we don't need to visit. We can skip
// them and let caller handle incomplete info.
if err != nil {
Expand Down Expand Up @@ -803,7 +803,7 @@ func makeVisitNode(g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context)
// Note that we use ConnectNoBreaker here to avoid any race with probe
// running on current node and target node restarting. Errors from circuit
// breaker probes could confuse us and present node as unavailable.
conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
conn, _, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
if err != nil {
if grpcutil.IsClosedConnection(err) {
log.Infof(ctx, "can't dial node n%d because connection is permanently closed: %s",
Expand Down
12 changes: 12 additions & 0 deletions pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ go_library(
"connection_class.go",
"context.go",
"context_testutils.go",
"drpc.go",
"errors.go",
"heartbeat.go",
"keepalive.go",
"metrics.go",
"peer.go",
"peer_drpc.go",
"peer_map.go",
"restricted_internal_client.go",
"settings.go",
Expand All @@ -45,6 +47,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/ts/tspb",
"//pkg/util",
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/envutil",
"//pkg/util/growstack",
Expand Down Expand Up @@ -72,6 +75,15 @@ go_library(
"@com_github_montanaflynn_stats//:stats",
"@com_github_vividcortex_ewma//:ewma",
"@io_opentelemetry_go_otel//attribute",
"@io_storj_drpc//:drpc",
"@io_storj_drpc//drpcconn",
"@io_storj_drpc//drpcmanager",
"@io_storj_drpc//drpcmigrate",
"@io_storj_drpc//drpcmux",
"@io_storj_drpc//drpcpool",
"@io_storj_drpc//drpcserver",
"@io_storj_drpc//drpcstream",
"@io_storj_drpc//drpcwire",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
"@org_golang_google_grpc//codes",
Expand Down
Loading
Loading