From 48f2e54fcc8d5b8f0d58fad36e483f70e7070d23 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 10 Jun 2020 16:41:27 -0400 Subject: [PATCH 1/6] kv: don't assign DistSender.nodeDialer twice I'm not sure why we were doing this. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 5e94c7b34ed1..b926ed17817f 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -259,11 +259,10 @@ type DistSenderConfig struct { // defaults will be used. func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender { ds := &DistSender{ - st: cfg.Settings, - clock: cfg.Clock, - gossip: g, - metrics: makeDistSenderMetrics(), - nodeDialer: cfg.NodeDialer, + st: cfg.Settings, + clock: cfg.Clock, + gossip: g, + metrics: makeDistSenderMetrics(), } if ds.st == nil { ds.st = cluster.MakeTestingClusterSettings() @@ -299,11 +298,11 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender { panic("no RPCContext set in DistSenderConfig") } ds.rpcContext = cfg.RPCContext + ds.nodeDialer = cfg.NodeDialer if ds.rpcRetryOptions.Closer == nil { ds.rpcRetryOptions.Closer = ds.rpcContext.Stopper.ShouldQuiesce() } ds.clusterID = &cfg.RPCContext.ClusterID - ds.nodeDialer = cfg.NodeDialer ds.asyncSenderSem = quotapool.NewIntPool("DistSender async concurrency", uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV))) senderConcurrencyLimit.SetOnChange(&cfg.Settings.SV, func() { From da0f853cbb8e82277bfadf5b8861b9b254efac4c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 10 Jun 2020 17:47:31 -0400 Subject: [PATCH 2/6] gossip: fix comments on gossip keys A few of the comments were stale, primarily around the structured value that was associated with a given key. --- pkg/gossip/keys.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/pkg/gossip/keys.go b/pkg/gossip/keys.go index 4da7ab39628c..7a8c771dadd7 100644 --- a/pkg/gossip/keys.go +++ b/pkg/gossip/keys.go @@ -28,27 +28,27 @@ const separator = ":" // Constants for gossip keys. const ( // KeyClusterID is the unique UUID for this Cockroach cluster. - // The value is a string UUID for the cluster. The cluster ID is + // The value is a string UUID for the cluster. The cluster ID is // gossiped by all nodes that contain a replica of the first range, // and it serves as a check for basic gossip connectivity. The // Gossip.Connected channel is closed when we see this key. KeyClusterID = "cluster-id" // KeyStorePrefix is the key prefix for gossiping stores in the network. - // The suffix is a store ID and the value is roachpb.StoreDescriptor. + // The suffix is a store ID and the value is a roachpb.StoreDescriptor. KeyStorePrefix = "store" - // KeyNodeIDPrefix is the key prefix for gossiping node id - // addresses. The actual key is suffixed with the decimal - // representation of the node id and the value is the host:port - // string address of the node. E.g. node:1 => 127.0.0.1:24001 + // KeyNodeIDPrefix is the key prefix for gossiping node id addresses. + // The actual key is suffixed with the decimal representation of the + // node id (e.g. 'node:1') and the value is a roachpb.NodeDescriptor. KeyNodeIDPrefix = "node" - // KeyHealthAlertPrefix is the key prefix for gossiping health alerts. The - // value is a proto of type HealthCheckResult. + // KeyHealthAlertPrefix is the key prefix for gossiping health alerts. + // The value is a proto of type HealthCheckResult. KeyNodeHealthAlertPrefix = "health-alert" - // KeyNodeLivenessPrefix is the key prefix for gossiping node liveness info. + // KeyNodeLivenessPrefix is the key prefix for gossiping node liveness + // info. KeyNodeLivenessPrefix = "liveness" // KeySentinel is a key for gossip which must not expire or @@ -57,10 +57,9 @@ const ( // the range lease for the first range. KeySentinel = "sentinel" - // KeyFirstRangeDescriptor is the descriptor for the "first" - // range. The "first" range contains the meta1 key range, the first - // level of the bi-level key addressing scheme. The value is a slice - // of storage.Replica structs. + // KeyFirstRangeDescriptor is the descriptor for the "first" range. The + // "first" range contains the meta1 key range, the first level of the + // bi-level key addressing scheme. The value is a roachpb.RangeDescriptor. KeyFirstRangeDescriptor = "first-range" // KeySystemConfig is the gossip key for the system DB span. From 1fe95af49852b1b09f1ea5b26041725461c63f54 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 10 Jun 2020 19:15:54 -0400 Subject: [PATCH 3/6] replicaoracle: thread context through replicaSliceOrErr Removes a context.TODO(). --- pkg/sql/physicalplan/replicaoracle/oracle.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index a9e110fce00d..8d8ab95e3038 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -133,7 +133,7 @@ func (o *randomOracle) Oracle(_ *kv.Txn) Oracle { func (o *randomOracle) ChoosePreferredReplica( ctx context.Context, desc *roachpb.RangeDescriptor, _ QueryState, ) (roachpb.ReplicaDescriptor, error) { - replicas, err := replicaSliceOrErr(desc, o.gossip) + replicas, err := replicaSliceOrErr(ctx, desc, o.gossip) if err != nil { return roachpb.ReplicaDescriptor{}, err } @@ -163,7 +163,7 @@ func (o *closestOracle) Oracle(_ *kv.Txn) Oracle { func (o *closestOracle) ChoosePreferredReplica( ctx context.Context, desc *roachpb.RangeDescriptor, _ QueryState, ) (roachpb.ReplicaDescriptor, error) { - replicas, err := replicaSliceOrErr(desc, o.gossip) + replicas, err := replicaSliceOrErr(ctx, desc, o.gossip) if err != nil { return roachpb.ReplicaDescriptor{}, err } @@ -229,7 +229,7 @@ func (o *binPackingOracle) ChoosePreferredReplica( } } - replicas, err := replicaSliceOrErr(desc, o.gossip) + replicas, err := replicaSliceOrErr(ctx, desc, o.gossip) if err != nil { return roachpb.ReplicaDescriptor{}, err } @@ -259,9 +259,9 @@ func (o *binPackingOracle) ChoosePreferredReplica( // is available in gossip. If no nodes are available, a RangeUnavailableError is // returned. func replicaSliceOrErr( - desc *roachpb.RangeDescriptor, gsp gossip.DeprecatedOracleGossip, + ctx context.Context, desc *roachpb.RangeDescriptor, gsp gossip.DeprecatedOracleGossip, ) (kvcoord.ReplicaSlice, error) { - replicas, err := kvcoord.NewReplicaSlice(context.TODO(), gsp, desc) + replicas, err := kvcoord.NewReplicaSlice(ctx, gsp, desc) if err != nil { return kvcoord.ReplicaSlice{}, sqlbase.NewRangeUnavailableError(desc.RangeID, err) } From 86657178ec08c6d13354dc495d4c50831952e0fa Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 10 Jun 2020 20:09:50 -0400 Subject: [PATCH 4/6] roachpb: extract AddressForLocality method While doing this, remove a pessimization. Not only was the localityTierMap not worth it given how few locality tiers we expect in practice, but it was represented as a `map[string]struct{}` instead of a `map[roachpb.Tier]struct{}`, meaning that we needed to stringify each LocalityAddress tier before each map access, which forced memory allocations and of extra work. An n^2 approach is simpler and almost certainly faster up to ~100 locality tiers. --- pkg/gossip/gossip.go | 24 ++++++------------ pkg/roachpb/metadata.go | 21 ++++++++++++++++ pkg/roachpb/metadata_test.go | 49 ++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 17 deletions(-) diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go index 3a40a5ee3017..b543c102e385 100644 --- a/pkg/gossip/gossip.go +++ b/pkg/gossip/gossip.go @@ -196,7 +196,7 @@ func NewKeyNotPresentError(key string) error { } // AddressResolver is a thin wrapper around gossip's GetNodeIDAddress -// that allows it to be used as a nodedialer.AddressResolver +// that allows it to be used as a nodedialer.AddressResolver. func AddressResolver(gossip *Gossip) nodedialer.AddressResolver { return func(nodeID roachpb.NodeID) (net.Addr, error) { return gossip.GetNodeIDAddress(nodeID) @@ -274,7 +274,7 @@ type Gossip struct { resolverAddrs map[util.UnresolvedAddr]resolver.Resolver bootstrapAddrs map[util.UnresolvedAddr]roachpb.NodeID - localityTierMap map[string]struct{} + locality roachpb.Locality lastConnectivity redact.RedactableString @@ -320,13 +320,10 @@ func New( storeMap: make(map[roachpb.StoreID]roachpb.NodeID), resolverAddrs: map[util.UnresolvedAddr]resolver.Resolver{}, bootstrapAddrs: map[util.UnresolvedAddr]roachpb.NodeID{}, - localityTierMap: map[string]struct{}{}, + locality: locality, defaultZoneConfig: defaultZoneConfig, } - for _, loc := range locality.Tiers { - g.localityTierMap[loc.String()] = struct{}{} - } stopper.AddCloser(stop.CloserFn(g.server.AmbientContext.FinishEventLog)) registry.AddMetric(g.outgoing.gauge) @@ -980,19 +977,12 @@ func (g *Gossip) getNodeIDAddressLocked(nodeID roachpb.NodeID) (*util.Unresolved if err != nil { return nil, err } - for i := range nd.LocalityAddress { - locality := &nd.LocalityAddress[i] - if _, ok := g.localityTierMap[locality.LocalityTier.String()]; ok { - return &locality.Address, nil - } - } - return &nd.Address, nil + return nd.AddressForLocality(g.locality), nil } -// getNodeIDAddressLocked looks up the SQL address of the node by ID. The mutex is -// assumed held by the caller. This method is called externally via -// GetNodeIDSQLAddress or internally when looking up a "distant" node address to -// connect directly to. +// getNodeIDAddressLocked looks up the SQL address of the node by ID. The mutex +// is assumed held by the caller. This method is called externally via +// GetNodeIDSQLAddress. func (g *Gossip) getNodeIDSQLAddressLocked(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) { nd, err := g.getNodeDescriptorLocked(nodeID) if err != nil { diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 70a34a1cf24e..4e241466cb1b 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/redact" @@ -449,6 +450,26 @@ func (sc StoreCapacity) FractionUsed() float64 { return float64(sc.Used) / float64(sc.Available+sc.Used) } +// AddressForLocality returns the network address that nodes in the specified +// locality should use when connecting to the node described by the descriptor. +func (n *NodeDescriptor) AddressForLocality(loc Locality) *util.UnresolvedAddr { + // If the provided locality has any tiers that are an exact exact match (key + // and value) with a tier in the node descriptor's custom LocalityAddress + // list, return the corresponding address. Otherwise, return the default + // address. + // + // O(n^2), but we expect very few locality tiers in practice. + for i := range n.LocalityAddress { + nLoc := &n.LocalityAddress[i] + for _, loc := range loc.Tiers { + if loc == nLoc.LocalityTier { + return &nLoc.Address + } + } + } + return &n.Address +} + // String returns a string representation of the Tier. func (t Tier) String() string { return fmt.Sprintf("%s=%s", t.Key, t.Value) diff --git a/pkg/roachpb/metadata_test.go b/pkg/roachpb/metadata_test.go index e7357ddcb292..ee7ff4218da0 100644 --- a/pkg/roachpb/metadata_test.go +++ b/pkg/roachpb/metadata_test.go @@ -16,8 +16,10 @@ import ( "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/redact" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) func TestPercentilesFromData(t *testing.T) { @@ -111,6 +113,53 @@ func TestRangeDescriptorMissingReplica(t *testing.T) { } } +func TestNodeDescriptorAddressForLocality(t *testing.T) { + addr := func(name string) util.UnresolvedAddr { + return util.UnresolvedAddr{NetworkField: name} + } + desc := NodeDescriptor{ + Address: addr("1"), + LocalityAddress: []LocalityAddress{ + {Address: addr("2"), LocalityTier: Tier{Key: "region", Value: "east"}}, + {Address: addr("3"), LocalityTier: Tier{Key: "zone", Value: "a"}}, + }, + } + for _, tc := range []struct { + locality Locality + expected util.UnresolvedAddr + }{ + { + locality: Locality{}, + expected: addr("1"), + }, + { + locality: Locality{Tiers: []Tier{ + {Key: "region", Value: "west"}, + {Key: "zone", Value: "b"}, + }}, + expected: addr("1"), + }, + { + locality: Locality{Tiers: []Tier{ + {Key: "region", Value: "east"}, + {Key: "zone", Value: "b"}, + }}, + expected: addr("2"), + }, + { + locality: Locality{Tiers: []Tier{ + {Key: "region", Value: "west"}, + {Key: "zone", Value: "a"}, + }}, + expected: addr("3"), + }, + } { + t.Run(tc.locality.String(), func(t *testing.T) { + require.Equal(t, tc.expected, *desc.AddressForLocality(tc.locality)) + }) + } +} + // TestLocalityConversions verifies that setting the value from the CLI short // hand format works correctly. func TestLocalityConversions(t *testing.T) { From 54333ab69ff5ac63778d8618212a8c490ad72610 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 11 Jun 2020 16:07:43 -0400 Subject: [PATCH 5/6] kv: decompose roles of Gossip dependency in DistSender This commit decomposes the roles of Gossip as a dependency in DistSender into that of a NodeDescStore, a source of node descriptors, and that of a FirstRangeProvider, a provider of information on the first range in a cluster. This decomposition will be used to address #47909 by: 1. replacing Gossip with a TenantService as a NodeDescStore 2. providing a custom RangeDescriptorDB (also backed by a TenantService) instead of a FirstRangeProvider. Together, these changes will allow us to remove DistSender's dependency on Gossip for SQL-only tenant processes. The next step after this will be to introduce a TenantService that can satisfy these two dependencies (NodeDescStore and RangeDescriptorDB) and also use the new NodeDescStore-to-AddressResolver binding to replace the use of Gossip with the TenantService in nodedialer instances. --- pkg/gossip/gossip.go | 32 ++++ pkg/kv/kvclient/kvcoord/dist_sender.go | 159 +++++++++++------- .../kvclient/kvcoord/dist_sender_rangefeed.go | 2 +- .../kvcoord/dist_sender_server_test.go | 56 +++--- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 149 ++++++++++------ .../kvcoord/local_test_cluster_util.go | 18 +- pkg/kv/kvclient/kvcoord/node_store.go | 42 +++++ pkg/kv/kvclient/kvcoord/range_iter_test.go | 12 +- pkg/kv/kvclient/kvcoord/replica_slice.go | 8 +- pkg/kv/kvclient/kvcoord/send_test.go | 12 +- .../kvclient/kvcoord/txn_coord_sender_test.go | 22 ++- pkg/kv/kvserver/client_test.go | 17 +- pkg/server/server.go | 20 ++- pkg/server/testserver.go | 22 +-- pkg/sql/physicalplan/replicaoracle/oracle.go | 32 ++-- 15 files changed, 380 insertions(+), 223 deletions(-) create mode 100644 pkg/kv/kvclient/kvcoord/node_store.go diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go index b543c102e385..8d33b374f62f 100644 --- a/pkg/gossip/gossip.go +++ b/pkg/gossip/gossip.go @@ -1609,6 +1609,38 @@ func (g *Gossip) findClient(match func(*client) bool) *client { return nil } +// A firstRangeMissingError indicates that the first range has not yet +// been gossiped. This will be the case for a node which hasn't yet +// joined the gossip network. +type firstRangeMissingError struct{} + +// Error is part of the error interface. +func (f firstRangeMissingError) Error() string { + return "the descriptor for the first range is not available via gossip" +} + +// GetFirstRangeDescriptor implements kvcoord.FirstRangeProvider. +func (g *Gossip) GetFirstRangeDescriptor() (*roachpb.RangeDescriptor, error) { + desc := &roachpb.RangeDescriptor{} + if err := g.GetInfoProto(KeyFirstRangeDescriptor, desc); err != nil { + return nil, firstRangeMissingError{} + } + return desc, nil +} + +// OnFirstRangeChanged implements kvcoord.FirstRangeProvider. +func (g *Gossip) OnFirstRangeChanged(cb func(*roachpb.RangeDescriptor)) { + g.RegisterCallback(KeyFirstRangeDescriptor, func(_ string, value roachpb.Value) { + ctx := context.Background() + desc := &roachpb.RangeDescriptor{} + if err := value.GetProto(desc); err != nil { + log.Errorf(ctx, "unable to parse gossiped first range descriptor: %s", err) + } else { + cb(desc) + } + }) +} + // MakeExposedGossip initializes a DeprecatedGossip instance which exposes a // wrapped Gossip instance via Optional(). This is used on SQL servers running // inside of a KV server (i.e. single-tenant deployments). diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index b926ed17817f..c36100005e12 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -175,14 +175,17 @@ func makeDistSenderMetrics() DistSenderMetrics { } } -// A firstRangeMissingError indicates that the first range has not yet -// been gossiped. This will be the case for a node which hasn't yet -// joined the gossip network. -type firstRangeMissingError struct{} - -// Error is part of the error interface. -func (f firstRangeMissingError) Error() string { - return "the descriptor for the first range is not available via gossip" +// FirstRangeProvider is capable of providing DistSender with the descriptor of +// the first range in the cluster and notifying the DistSender when the first +// range in the cluster has changed. +type FirstRangeProvider interface { + // GetFirstRangeDescriptor returns the RangeDescriptor for the first range + // in the cluster. + GetFirstRangeDescriptor() (*roachpb.RangeDescriptor, error) + + // OnFirstRangeChanged calls the provided callback when the RangeDescriptor + // for the first range has changed. + OnFirstRangeChanged(func(*roachpb.RangeDescriptor)) } // A DistSender provides methods to access Cockroach's monolithic, @@ -202,13 +205,16 @@ type DistSender struct { // clock is used to set time for some calls. E.g. read-only ops // which span ranges and don't require read consistency. clock *hlc.Clock - // gossip provides up-to-date information about the start of the - // key range, used to find the replica metadata for arbitrary key - // ranges. - gossip *gossip.Gossip + // nodeDescs provides information on the KV nodes that DistSender may + // consider routing requests to. + nodeDescs NodeDescStore + // metrics stored DistSender-related metrics. metrics DistSenderMetrics // rangeCache caches replica metadata for key ranges. rangeCache *RangeDescriptorCache + // firstRangeProvider provides the range descriptor for range one. + // This is not required if a RangeDescriptorDB is supplied. + firstRangeProvider FirstRangeProvider // leaseHolderCache caches range lease holders by range ID. leaseHolderCache *LeaseHolderCache transportFactory TransportFactory @@ -238,17 +244,33 @@ var _ kv.Sender = &DistSender{} type DistSenderConfig struct { AmbientCtx log.AmbientContext - Settings *cluster.Settings - Clock *hlc.Clock - RPCRetryOptions *retry.Options - // nodeDescriptor, if provided, is used to describe which node the DistSender - // lives on, for instance when deciding where to send RPCs. + Settings *cluster.Settings + Clock *hlc.Clock + NodeDescs NodeDescStore + // nodeDescriptor, if provided, is used to describe which node the + // DistSender lives on, for instance when deciding where to send RPCs. // Usually it is filled in from the Gossip network on demand. - nodeDescriptor *roachpb.NodeDescriptor - RPCContext *rpc.Context - RangeDescriptorDB RangeDescriptorDB + nodeDescriptor *roachpb.NodeDescriptor + RPCRetryOptions *retry.Options + RPCContext *rpc.Context + NodeDialer *nodedialer.Dialer - NodeDialer *nodedialer.Dialer + // One of the following two must be provided, but not both. + // + // If only FirstRangeProvider is supplied, DistSender will use itself as a + // RangeDescriptorDB and scan the meta ranges directly to satisfy range + // lookups, using the FirstRangeProvider to bootstrap the location of the + // meta1 range. Additionally, it will proactively update its range + // descriptor cache with any meta1 updates from the provider. + // + // If only RangeDescriptorDB is provided, all range lookups will be + // delegated to it. + // + // If both are provided (not required, but allowed for tests) range lookups + // will be delegated to the RangeDescriptorDB but FirstRangeProvider will + // still be used to listen for updates to the first range's descriptor. + FirstRangeProvider FirstRangeProvider + RangeDescriptorDB RangeDescriptorDB TestingKnobs ClientTestingKnobs } @@ -257,12 +279,12 @@ type DistSenderConfig struct { // Cockroach cluster via the supplied gossip instance. Supplying a // DistSenderContext or the fields within is optional. For omitted values, sane // defaults will be used. -func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender { +func NewDistSender(cfg DistSenderConfig) *DistSender { ds := &DistSender{ - st: cfg.Settings, - clock: cfg.Clock, - gossip: g, - metrics: makeDistSenderMetrics(), + st: cfg.Settings, + clock: cfg.Clock, + nodeDescs: cfg.NodeDescs, + metrics: makeDistSenderMetrics(), } if ds.st == nil { ds.st = cluster.MakeTestingClusterSettings() @@ -276,10 +298,17 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender { if cfg.nodeDescriptor != nil { atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(cfg.nodeDescriptor)) } - rdb := cfg.RangeDescriptorDB - if rdb == nil { + var rdb RangeDescriptorDB + if cfg.FirstRangeProvider != nil { + ds.firstRangeProvider = cfg.FirstRangeProvider rdb = ds } + if cfg.RangeDescriptorDB != nil { + rdb = cfg.RangeDescriptorDB + } + if rdb == nil { + panic("DistSenderConfig must contain either FirstRangeProvider or RangeDescriptorDB") + } getRangeDescCacheSize := func() int64 { return rangeDescriptorCacheSize.Get(&ds.st.SV) } @@ -310,23 +339,15 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender { }) ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper")) - if g != nil { + if ds.firstRangeProvider != nil { ctx := ds.AnnotateCtx(context.Background()) - g.RegisterCallback(gossip.KeyFirstRangeDescriptor, - func(_ string, value roachpb.Value) { - if atomic.LoadInt32(&ds.disableFirstRangeUpdates) == 1 { - return - } - if log.V(1) { - var desc roachpb.RangeDescriptor - if err := value.GetProto(&desc); err != nil { - log.Errorf(ctx, "unable to parse gossiped first range descriptor: %s", err) - } else { - log.Infof(ctx, "gossiped first range descriptor: %+v", desc.Replicas()) - } - } - ds.rangeCache.EvictByKey(ctx, roachpb.RKeyMin) - }) + ds.firstRangeProvider.OnFirstRangeChanged(func(desc *roachpb.RangeDescriptor) { + if atomic.LoadInt32(&ds.disableFirstRangeUpdates) == 1 { + return + } + log.VEventf(ctx, 1, "gossiped first range descriptor: %+v", desc.Replicas()) + ds.rangeCache.EvictByKey(ctx, roachpb.RKeyMin) + }) } return ds } @@ -360,11 +381,12 @@ func (ds *DistSender) LeaseHolderCache() *LeaseHolderCache { return ds.leaseHolderCache } -// RangeLookup implements the RangeDescriptorDB interface. It uses LookupRange -// to perform a lookup scan for the provided key, using DistSender itself as the -// client.Sender. This means that the scan will recurse into DistSender, which -// will in turn use the RangeDescriptorCache again to lookup the RangeDescriptor -// necessary to perform the scan. +// RangeLookup implements the RangeDescriptorDB interface. +// +// It uses LookupRange to perform a lookup scan for the provided key, using +// DistSender itself as the client.Sender. This means that the scan will recurse +// into DistSender, which will in turn use the RangeDescriptorCache again to +// lookup the RangeDescriptor necessary to perform the scan. func (ds *DistSender) RangeLookup( ctx context.Context, key roachpb.RKey, useReverseScan bool, ) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error) { @@ -386,17 +408,26 @@ func (ds *DistSender) RangeLookup( } // FirstRange implements the RangeDescriptorDB interface. -// FirstRange returns the RangeDescriptor for the first range on the cluster, -// which is retrieved from the gossip protocol instead of the datastore. +// +// It returns the RangeDescriptor for the first range in the cluster using the +// FirstRangeProvider, which is typically implemented using the gossip protocol +// instead of the datastore. func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error) { - if ds.gossip == nil { - panic("with `nil` Gossip, DistSender must not use itself as rangeDescriptorDB") + if ds.firstRangeProvider == nil { + panic("with `nil` firstRangeProvider, DistSender must not use itself as RangeDescriptorDB") } - rangeDesc := &roachpb.RangeDescriptor{} - if err := ds.gossip.GetInfoProto(gossip.KeyFirstRangeDescriptor, rangeDesc); err != nil { - return nil, firstRangeMissingError{} + return ds.firstRangeProvider.GetFirstRangeDescriptor() +} + +// getNodeID attempts to return the local node ID. It returns 0 if the DistSender +// does not have access to the Gossip network. +func (ds *DistSender) getNodeID() roachpb.NodeID { + // TODO(nvanbenschoten): open an issue about the effect of this. + g, ok := ds.nodeDescs.(*gossip.Gossip) + if !ok { + return 0 } - return rangeDesc, nil + return g.NodeID.Get() } // getNodeDescriptor returns ds.nodeDescriptor, but makes an attempt to load @@ -408,17 +439,19 @@ func (ds *DistSender) getNodeDescriptor() *roachpb.NodeDescriptor { if desc := atomic.LoadPointer(&ds.nodeDescriptor); desc != nil { return (*roachpb.NodeDescriptor)(desc) } - if ds.gossip == nil { + // TODO(nvanbenschoten): open an issue about the effect of this. + g, ok := ds.nodeDescs.(*gossip.Gossip) + if !ok { return nil } - ownNodeID := ds.gossip.NodeID.Get() + ownNodeID := g.NodeID.Get() if ownNodeID > 0 { // TODO(tschottdorf): Consider instead adding the NodeID of the // coordinator to the header, so we can get this from incoming // requests. Just in case we want to mostly eliminate gossip here. nodeDesc := &roachpb.NodeDescriptor{} - if err := ds.gossip.GetInfoProto(gossip.MakeNodeIDKey(ownNodeID), nodeDesc); err == nil { + if err := g.GetInfoProto(gossip.MakeNodeIDKey(ownNodeID), nodeDesc); err == nil { atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(nodeDesc)) return nodeDesc } @@ -494,8 +527,8 @@ func (ds *DistSender) initAndVerifyBatch( ctx context.Context, ba *roachpb.BatchRequest, ) *roachpb.Error { // Attach the local node ID to each request. - if ba.Header.GatewayNodeID == 0 && ds.gossip != nil { - ba.Header.GatewayNodeID = ds.gossip.NodeID.Get() + if ba.Header.GatewayNodeID == 0 { + ba.Header.GatewayNodeID = ds.getNodeID() } // In the event that timestamp isn't set and read consistency isn't @@ -1611,7 +1644,7 @@ func (ds *DistSender) sendToReplicas( ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor, withCommit bool, ) (*roachpb.BatchResponse, error) { ba.RangeID = desc.RangeID - replicas, err := NewReplicaSlice(ctx, ds.gossip, desc) + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc) if err != nil { return nil, err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 4a1c88c63ca1..16d7ed8168ea 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -230,7 +230,7 @@ func (ds *DistSender) singleRangeFeed( if ds.rpcContext != nil { latencyFn = ds.rpcContext.RemoteClocks.Latency } - replicas, err := NewReplicaSlice(ctx, ds.gossip, desc) + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc) if err != nil { return args.Timestamp, err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index cb3c568b0e9b..fdef99ef6555 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -88,16 +88,15 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) { // Create a new DistSender and client.DB so that the Get below is guaranteed // to not hit in the range descriptor cache forcing a RangeLookup operation. ambient := log.AmbientContext{Tracer: s.ClusterSettings().Tracer} - ds := kvcoord.NewDistSender( - kvcoord.DistSenderConfig{ - AmbientCtx: ambient, - Clock: s.Clock(), - RPCContext: s.RPCContext(), - NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())), - Settings: cluster.MakeTestingClusterSettings(), - }, - s.(*server.TestServer).Gossip(), - ) + ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{ + AmbientCtx: ambient, + Settings: cluster.MakeTestingClusterSettings(), + Clock: s.Clock(), + NodeDescs: s.(*server.TestServer).Gossip(), + RPCContext: s.RPCContext(), + NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())), + FirstRangeProvider: s.(*server.TestServer).Gossip(), + }) tsf := kvcoord.NewTxnCoordSenderFactory( kvcoord.TxnCoordSenderFactoryConfig{ AmbientCtx: ambient, @@ -1210,16 +1209,15 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) { } { manual := hlc.NewManualClock(ts[0].WallTime + 1) clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) - ds := kvcoord.NewDistSender( - kvcoord.DistSenderConfig{ - AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer}, - Clock: clock, - RPCContext: s.RPCContext(), - NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())), - Settings: cluster.MakeTestingClusterSettings(), - }, - s.(*server.TestServer).Gossip(), - ) + ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer}, + Settings: cluster.MakeTestingClusterSettings(), + Clock: clock, + NodeDescs: s.(*server.TestServer).Gossip(), + RPCContext: s.RPCContext(), + NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())), + FirstRangeProvider: s.(*server.TestServer).Gossip(), + }) reply, err := kv.SendWrappedWith(context.Background(), ds, roachpb.Header{ ReadConsistency: rc, @@ -1416,15 +1414,15 @@ func TestBatchPutWithConcurrentSplit(t *testing.T) { // Now, split further at the given keys, but use a new dist sender so // we don't update the caches on the default dist sender-backed client. - ds := kvcoord.NewDistSender( - kvcoord.DistSenderConfig{ - AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer}, - Clock: s.Clock(), - RPCContext: s.RPCContext(), - NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())), - Settings: cluster.MakeTestingClusterSettings(), - }, s.(*server.TestServer).Gossip(), - ) + ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer}, + Clock: s.Clock(), + NodeDescs: s.(*server.TestServer).Gossip(), + RPCContext: s.RPCContext(), + NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())), + Settings: cluster.MakeTestingClusterSettings(), + FirstRangeProvider: s.(*server.TestServer).Gossip(), + }) for _, key := range []string{"c"} { req := &roachpb.AdminSplitRequest{ RequestHeader: roachpb.RequestHeader{ diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 6c885db0abdd..e459153aade2 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -348,6 +348,7 @@ func TestSendRPCOrder(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -357,7 +358,7 @@ func TestSendRPCOrder(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) for n, tc := range testCases { t.Run("", func(t *testing.T) { @@ -518,6 +519,7 @@ func TestImmutableBatchArgs(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -527,7 +529,7 @@ func TestImmutableBatchArgs(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) txn := roachpb.MakeTransaction( "test", nil /* baseKey */, roachpb.NormalUserPriority, @@ -590,6 +592,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -598,7 +601,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) v := roachpb.MakeValueFromString("value") put := roachpb.NewPut(roachpb.Key("a"), v) if _, pErr := kv.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") { @@ -667,6 +670,7 @@ func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -688,7 +692,7 @@ func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) { {[]roachpb.LeaseSequence{1, 0, 1, 2, 1}, 3}, } { sequences = c.leaseSequences - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) v := roachpb.MakeValueFromString("value") put := roachpb.NewPut(roachpb.Key("a"), v) if _, pErr := kv.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") { @@ -743,6 +747,7 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transport), @@ -767,7 +772,7 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) ds.LeaseHolderCache().Update(ctx, roachpb.RangeID(1), roachpb.StoreID(1)) var ba roachpb.BatchRequest @@ -810,6 +815,7 @@ func TestRetryOnDescriptorLookupError(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(stubRPCSendFn), @@ -828,7 +834,7 @@ func TestRetryOnDescriptorLookupError(t *testing.T) { NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) put := roachpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value")) // Error on descriptor lookup, second attempt successful. if _, pErr := kv.SendWrapped(context.Background(), ds, put); pErr != nil { @@ -883,6 +889,7 @@ func TestEvictOnFirstRangeGossip(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: SenderTransportFactory( @@ -890,12 +897,15 @@ func TestEvictOnFirstRangeGossip(t *testing.T) { kv.SenderFunc(sender), ), }, - RangeDescriptorDB: rDB, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), - Settings: cluster.MakeTestingClusterSettings(), + // Provide both FirstRangeProvider and RangeDescriptorDB to listen to + // changes to the first range while still using a MockRangeDescriptorDB. + FirstRangeProvider: g, + RangeDescriptorDB: rDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g).withMetaRecursion() + ds := NewDistSender(cfg).withMetaRecursion() anyKey := roachpb.Key("anything") rAnyKey := keys.MustAddr(anyKey) @@ -1010,6 +1020,7 @@ func TestEvictCacheOnError(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -1018,7 +1029,7 @@ func TestEvictCacheOnError(t *testing.T) { NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) ds.leaseHolderCache.Update(context.Background(), 1, leaseHolder.StoreID) key := roachpb.Key("a") put := roachpb.NewPut(key, roachpb.MakeValueFromString("value")) @@ -1078,6 +1089,7 @@ func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -1086,7 +1098,7 @@ func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) { NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) key := roachpb.Key("a") put := roachpb.NewPut(key, roachpb.MakeValueFromString("value")) @@ -1180,14 +1192,16 @@ func TestRetryOnWrongReplicaError(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), - Settings: cluster.MakeTestingClusterSettings(), + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + FirstRangeProvider: g, + Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) scan := roachpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), false) if _, err := kv.SendWrapped(context.Background(), ds, scan); err != nil { t.Errorf("scan encountered error: %s", err) @@ -1283,14 +1297,16 @@ func TestRetryOnWrongReplicaErrorWithSuggestion(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), - Settings: cluster.MakeTestingClusterSettings(), + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + FirstRangeProvider: g, + Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) scan := roachpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), false) if _, err := kv.SendWrapped(context.Background(), ds, scan); err != nil { t.Errorf("scan encountered error: %s", err) @@ -1309,11 +1325,13 @@ func TestGetFirstRangeDescriptor(t *testing.T) { } n.Start() ds := NewDistSender(DistSenderConfig{ - AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, - RPCContext: n.RPCContext, - NodeDialer: nodedialer.New(n.RPCContext, gossip.AddressResolver(n.Nodes[0].Gossip)), - Settings: cluster.MakeTestingClusterSettings(), - }, n.Nodes[0].Gossip) + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + NodeDescs: n.Nodes[0].Gossip, + RPCContext: n.RPCContext, + NodeDialer: nodedialer.New(n.RPCContext, gossip.AddressResolver(n.Nodes[0].Gossip)), + FirstRangeProvider: n.Nodes[0].Gossip, + Settings: cluster.MakeTestingClusterSettings(), + }) if _, err := ds.FirstRange(); err == nil { t.Errorf("expected not to find first range descriptor") } @@ -1401,6 +1419,7 @@ func TestSendRPCRetry(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -1408,7 +1427,7 @@ func TestSendRPCRetry(t *testing.T) { RangeDescriptorDB: descDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) scan := roachpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), false) sr, err := kv.SendWrappedWith(context.Background(), ds, roachpb.Header{MaxSpanRequestKeys: 1}, scan) if err != nil { @@ -1488,6 +1507,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -1495,7 +1515,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { RangeDescriptorDB: descDB, Settings: cluster.MakeTestingClusterSettings(), } - ds = NewDistSender(cfg, g) + ds = NewDistSender(cfg) get := roachpb.NewGet(roachpb.Key("b")) _, err := kv.SendWrapped(context.Background(), ds, get) if err != nil { @@ -1519,11 +1539,13 @@ func TestGetNodeDescriptor(t *testing.T) { rpcContext := rpc.NewInsecureTestingContext(clock, stopper) g := makeGossip(t, stopper, rpcContext) ds := NewDistSender(DistSenderConfig{ - AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, - RPCContext: rpcContext, - Clock: clock, - Settings: cluster.MakeTestingClusterSettings(), - }, g) + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + FirstRangeProvider: g, + Settings: cluster.MakeTestingClusterSettings(), + }) g.NodeID.Reset(5) if err := g.SetNodeDescriptor(newNodeDesc(5)); err != nil { t.Fatal(err) @@ -1595,6 +1617,7 @@ func TestMultiRangeGapReverse(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, RangeDescriptorDB: rdb, TestingKnobs: ClientTestingKnobs{ @@ -1606,7 +1629,7 @@ func TestMultiRangeGapReverse(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) txn := roachpb.MakeTransaction("foo", nil, 1.0, clock.Now(), 0) @@ -1698,6 +1721,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -1715,7 +1739,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) { }), Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) scan := roachpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), false) // Set the Txn info to avoid an OpRequiresTxnError. reply, err := kv.SendWrappedWith(context.Background(), ds, roachpb.Header{ @@ -1744,6 +1768,7 @@ func TestRangeLookupOptionOnReverseScan(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(stubRPCSendFn), @@ -1759,7 +1784,7 @@ func TestRangeLookupOptionOnReverseScan(t *testing.T) { }), Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) rScan := &roachpb.ReverseScanRequest{ RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, } @@ -1781,12 +1806,13 @@ func TestClockUpdateOnResponse(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, RangeDescriptorDB: defaultMockRangeDescriptorDB, NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) expectedErr := roachpb.NewError(errors.New("boom")) @@ -1914,6 +1940,7 @@ func TestTruncateWithSpanAndDescriptor(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendStub), @@ -1921,7 +1948,7 @@ func TestTruncateWithSpanAndDescriptor(t *testing.T) { RangeDescriptorDB: descDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) // Send a batch request containing two puts. In the first // attempt, the span of the descriptor found in the cache is @@ -2042,6 +2069,7 @@ func TestTruncateWithLocalSpanAndDescriptor(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendStub), @@ -2049,7 +2077,7 @@ func TestTruncateWithLocalSpanAndDescriptor(t *testing.T) { RangeDescriptorDB: descDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) // Send a batch request contains two scans. In the first // attempt, the range of the descriptor found in the cache is @@ -2235,6 +2263,7 @@ func TestMultiRangeWithEndTxn(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -2242,7 +2271,7 @@ func TestMultiRangeWithEndTxn(t *testing.T) { RangeDescriptorDB: descDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) ds.DisableParallelBatches() // Send a batch request containing two puts. @@ -2370,6 +2399,7 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -2377,7 +2407,7 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { RangeDescriptorDB: defaultMockRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) ds.DisableParallelBatches() // Send a batch request containing the requests. @@ -2486,6 +2516,7 @@ func TestParallelCommitsDetectIntentMissingCause(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -2493,7 +2524,7 @@ func TestParallelCommitsDetectIntentMissingCause(t *testing.T) { RangeDescriptorDB: defaultMockRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) // Send a parallel commit batch request. var ba roachpb.BatchRequest @@ -2564,6 +2595,7 @@ func TestCountRanges(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(stubRPCSendFn), @@ -2571,7 +2603,7 @@ func TestCountRanges(t *testing.T) { RangeDescriptorDB: descDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) // Verify counted ranges. keyIn := func(desc roachpb.RangeDescriptor) roachpb.RKey { @@ -2657,6 +2689,7 @@ func TestGatewayNodeID(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -2664,7 +2697,7 @@ func TestGatewayNodeID(t *testing.T) { RangeDescriptorDB: defaultMockRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) var ba roachpb.BatchRequest ba.Add(roachpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value"))) if _, err := ds.Send(context.Background(), ba); err != nil { @@ -2852,6 +2885,7 @@ func TestMultipleErrorsMerged(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -2860,7 +2894,7 @@ func TestMultipleErrorsMerged(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), RPCRetryOptions: &retry.Options{MaxRetries: 1}, } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) var ba roachpb.BatchRequest ba.Txn = txn.Clone() @@ -2993,6 +3027,7 @@ func TestErrorIndexAlignment(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -3000,7 +3035,7 @@ func TestErrorIndexAlignment(t *testing.T) { RangeDescriptorDB: descDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) ds.DisableParallelBatches() var ba roachpb.BatchRequest @@ -3070,6 +3105,7 @@ func TestCanSendToFollower(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -3120,7 +3156,7 @@ func TestCanSendToFollower(t *testing.T) { t.Run("", func(t *testing.T) { sentTo = ReplicaInfo{} canSend = c.canSendToFollower - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) ds.clusterID = &base.ClusterIDContainer{} // set 2 to be the leaseholder ds.LeaseHolderCache().Update(context.Background(), 2 /* rangeID */, 2 /* storeID */) @@ -3286,14 +3322,16 @@ func TestEvictMetaRange(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), - Settings: cluster.MakeTestingClusterSettings(), + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + FirstRangeProvider: g, + Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) scan := roachpb.NewScan(roachpb.Key("a"), roachpb.Key("b"), false) if _, pErr := kv.SendWrapped(context.Background(), ds, scan); pErr != nil { @@ -3391,6 +3429,7 @@ func TestConnectionClass(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), @@ -3402,7 +3441,7 @@ func TestConnectionClass(t *testing.T) { RangeDescriptorDB: rDB, Settings: cluster.MakeTestingClusterSettings(), } - ds := NewDistSender(cfg, g) + ds := NewDistSender(cfg) // Check the three important cases to ensure they are sent with the correct // ConnectionClass. @@ -3525,17 +3564,19 @@ func TestEvictionTokenCoalesce(t *testing.T) { cfg := DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, - TestingKnobs: ClientTestingKnobs{ - TransportFactory: adaptSimpleTransport(testFn), - }, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), RPCRetryOptions: &retry.Options{ MaxRetries: 1, }, - Settings: cluster.MakeTestingClusterSettings(), + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + FirstRangeProvider: g, + Settings: cluster.MakeTestingClusterSettings(), + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(testFn), + }, } - ds = NewDistSender(cfg, g) + ds = NewDistSender(cfg) var batchWaitGroup sync.WaitGroup putFn := func(key, value string) { diff --git a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go index d6f4ddebb715..eece86fbffea 100644 --- a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go +++ b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go @@ -83,13 +83,15 @@ func NewDistSenderForLocalTestCluster( rpcContext := rpc.NewInsecureTestingContext(clock, stopper) senderTransportFactory := SenderTransportFactory(tracer, stores) return NewDistSender(DistSenderConfig{ - AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, - Settings: st, - Clock: clock, - RPCContext: rpcContext, - RPCRetryOptions: &retryOpts, - nodeDescriptor: nodeDesc, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, + Settings: st, + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + RPCRetryOptions: &retryOpts, + nodeDescriptor: nodeDesc, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + FirstRangeProvider: g, TestingKnobs: ClientTestingKnobs{ TransportFactory: func( opts SendOptions, @@ -103,5 +105,5 @@ func NewDistSenderForLocalTestCluster( return &localTestClusterTransport{transport, latency}, nil }, }, - }, g) + }) } diff --git a/pkg/kv/kvclient/kvcoord/node_store.go b/pkg/kv/kvclient/kvcoord/node_store.go new file mode 100644 index 000000000000..a2d8482122ba --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/node_store.go @@ -0,0 +1,42 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "net" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +) + +// NodeDescStore stores a collection of NodeDescriptors. +// +// Implementations of the interface are expected to be threadsafe. +type NodeDescStore interface { + // GetNodeDescriptor looks up the descriptor of the node by ID. + // It returns an error if the node is not known by the store. + GetNodeDescriptor(roachpb.NodeID) (*roachpb.NodeDescriptor, error) +} + +// AddressResolver wraps a NodeDescStore and a Locality and allows the pair to +// be used as a nodedialer.AddressResolver. +func AddressResolver(ns NodeDescStore, loc roachpb.Locality) nodedialer.AddressResolver { + return func(nodeID roachpb.NodeID) (net.Addr, error) { + nd, err := ns.GetNodeDescriptor(nodeID) + if err != nil { + return nil, err + } + return nd.AddressForLocality(loc), nil + } +} + +// Silence unused warning. +var _ = AddressResolver diff --git a/pkg/kv/kvclient/kvcoord/range_iter_test.go b/pkg/kv/kvclient/kvcoord/range_iter_test.go index dda122ff6bee..e3a632145d43 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter_test.go +++ b/pkg/kv/kvclient/kvcoord/range_iter_test.go @@ -62,10 +62,11 @@ func TestRangeIterForward(t *testing.T) { ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), - }, g) + }) ctx := context.Background() @@ -97,10 +98,11 @@ func TestRangeIterSeekForward(t *testing.T) { ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), - }, g) + }) ctx := context.Background() @@ -135,10 +137,11 @@ func TestRangeIterReverse(t *testing.T) { ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), - }, g) + }) ctx := context.Background() @@ -170,10 +173,11 @@ func TestRangeIterSeekReverse(t *testing.T) { ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, Clock: clock, + NodeDescs: g, RPCContext: rpcContext, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), - }, g) + }) ctx := context.Background() diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index d24262296c8c..513647031364 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -46,11 +46,7 @@ type ReplicaSlice []ReplicaInfo // If there's no info in gossip for any of the nodes in the descriptor, a // sendError is returned. func NewReplicaSlice( - ctx context.Context, - gossip interface { - GetNodeDescriptor(roachpb.NodeID) (*roachpb.NodeDescriptor, error) - }, - desc *roachpb.RangeDescriptor, + ctx context.Context, nodeDescs NodeDescStore, desc *roachpb.RangeDescriptor, ) (ReplicaSlice, error) { // Learner replicas won't serve reads/writes, so we'll send only to the // `Voters` replicas. This is just an optimization to save a network hop, @@ -58,7 +54,7 @@ func NewReplicaSlice( voters := desc.Replicas().Voters() rs := make(ReplicaSlice, 0, len(voters)) for _, r := range voters { - nd, err := gossip.GetNodeDescriptor(r.NodeID) + nd, err := nodeDescs.GetNodeDescriptor(r.NodeID) if err != nil { if log.V(1) { log.Infof(ctx, "node %d is not gossiped: %v", r.NodeID, err) diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 8dca452b0e15..9be4ec14ccf7 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -287,14 +287,16 @@ func sendBatch( } ds := NewDistSender(DistSenderConfig{ - AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, - RPCContext: rpcContext, + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Settings: cluster.MakeTestingClusterSettings(), + NodeDescs: g, + RPCContext: rpcContext, + NodeDialer: nodeDialer, + FirstRangeProvider: g, TestingKnobs: ClientTestingKnobs{ TransportFactory: transportFactory, }, - Settings: cluster.MakeTestingClusterSettings(), - NodeDialer: nodeDialer, - }, g) + }) return ds.sendToReplicas(ctx, roachpb.BatchRequest{}, desc, false /* withCommit */) } diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 0964fe7846be..118ce8d17cc9 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -221,19 +221,17 @@ func TestTxnCoordSenderCondenseLockSpans(t *testing.T) { return resp, nil } ambient := log.AmbientContext{Tracer: tracing.NewTracer()} - ds := NewDistSender( - DistSenderConfig{ - AmbientCtx: ambient, - Clock: s.Clock, - RPCContext: s.Cfg.RPCContext, - TestingKnobs: ClientTestingKnobs{ - TransportFactory: adaptSimpleTransport(sendFn), - }, - RangeDescriptorDB: descDB, - Settings: cluster.MakeTestingClusterSettings(), + ds := NewDistSender(DistSenderConfig{ + AmbientCtx: ambient, + Clock: s.Clock, + NodeDescs: s.Gossip, + RPCContext: s.Cfg.RPCContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(sendFn), }, - s.Gossip, - ) + RangeDescriptorDB: descDB, + Settings: cluster.MakeTestingClusterSettings(), + }) tsf := NewTxnCoordSenderFactory( TxnCoordSenderFactoryConfig{ AmbientCtx: ambient, diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 5096b90ec3f3..c7782a779929 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -156,15 +156,17 @@ func createTestStoreWithOpts( retryOpts := base.DefaultRetryOptions() retryOpts.Closer = stopper.ShouldQuiesce() distSender := kvcoord.NewDistSender(kvcoord.DistSenderConfig{ - AmbientCtx: ac, - Clock: storeCfg.Clock, - Settings: storeCfg.Settings, - RPCContext: rpcContext, + AmbientCtx: ac, + Settings: storeCfg.Settings, + Clock: storeCfg.Clock, + NodeDescs: storeCfg.Gossip, + RPCContext: rpcContext, + RPCRetryOptions: &retryOpts, + FirstRangeProvider: storeCfg.Gossip, TestingKnobs: kvcoord.ClientTestingKnobs{ TransportFactory: kvcoord.SenderTransportFactory(tracer, stores), }, - RPCRetryOptions: &retryOpts, - }, storeCfg.Gossip) + }) tcsFactory := kvcoord.NewTxnCoordSenderFactory( kvcoord.TxnCoordSenderFactoryConfig{ @@ -793,6 +795,7 @@ func (m *multiTestContext) populateDB(idx int, st *cluster.Settings, stopper *st m.distSenders[idx] = kvcoord.NewDistSender(kvcoord.DistSenderConfig{ AmbientCtx: ambient, Clock: m.clocks[idx], + NodeDescs: m.gossips[idx], RPCContext: m.rpcContext, RangeDescriptorDB: mtcRangeDescriptorDB{ multiTestContext: m, @@ -803,7 +806,7 @@ func (m *multiTestContext) populateDB(idx int, st *cluster.Settings, stopper *st TransportFactory: m.kvTransportFactory, }, RPCRetryOptions: &retryOpts, - }, m.gossips[idx]) + }) tcsFactory := kvcoord.NewTxnCoordSenderFactory( kvcoord.TxnCoordSenderFactoryConfig{ AmbientCtx: ambient, diff --git a/pkg/server/server.go b/pkg/server/server.go index f6eca32c0026..62757c13b393 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -316,15 +316,17 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } retryOpts.Closer = stopper.ShouldQuiesce() distSenderCfg := kvcoord.DistSenderConfig{ - AmbientCtx: cfg.AmbientCtx, - Settings: st, - Clock: clock, - RPCContext: rpcContext, - RPCRetryOptions: &retryOpts, - TestingKnobs: clientTestingKnobs, - NodeDialer: nodeDialer, - } - distSender := kvcoord.NewDistSender(distSenderCfg, g) + AmbientCtx: cfg.AmbientCtx, + Settings: st, + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + RPCRetryOptions: &retryOpts, + NodeDialer: nodeDialer, + FirstRangeProvider: g, + TestingKnobs: clientTestingKnobs, + } + distSender := kvcoord.NewDistSender(distSenderCfg) registry.AddMetricStruct(distSender.Metrics()) txnMetrics := kvcoord.MakeTxnMetrics(cfg.HistogramWindowInterval()) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 4f386f16befb..5678617f10da 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -498,16 +498,18 @@ func makeSQLServerArgs( gossip.AddressResolver(g), // TODO(nvb): break gossip dep ) dsCfg := kvcoord.DistSenderConfig{ - AmbientCtx: baseCfg.AmbientCtx, - Settings: st, - Clock: clock, - RPCRetryOptions: &rpcRetryOptions, - RPCContext: rpcContext, - RangeDescriptorDB: nil, // use DistSender itself - NodeDialer: nodeDialer, - TestingKnobs: dsKnobs, - } - ds := kvcoord.NewDistSender(dsCfg, g) + AmbientCtx: baseCfg.AmbientCtx, + Settings: st, + Clock: clock, + NodeDescs: g, + RPCRetryOptions: &rpcRetryOptions, + RPCContext: rpcContext, + NodeDialer: nodeDialer, + RangeDescriptorDB: nil, // use DistSender itself + FirstRangeProvider: g, + TestingKnobs: dsKnobs, + } + ds := kvcoord.NewDistSender(dsCfg) var clientKnobs kvcoord.ClientTestingKnobs if p, ok := baseCfg.TestingKnobs.KVClient.(*kvcoord.ClientTestingKnobs); ok { diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index 8d8ab95e3038..986f6e33a133 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -42,8 +42,9 @@ var ( // Config is used to construct an OracleFactory. type Config struct { - NodeDesc roachpb.NodeDescriptor - Settings *cluster.Settings + NodeDesc roachpb.NodeDescriptor + Settings *cluster.Settings + // TODO(nvanbenschoten): replace with NodeDescStore once #49997 merges. Gossip gossip.DeprecatedOracleGossip RPCContext *rpc.Context LeaseHolderCache *kvcoord.LeaseHolderCache @@ -117,13 +118,13 @@ func MakeQueryState() QueryState { // randomOracle is a Oracle that chooses the lease holder randomly // among the replicas in a range descriptor. type randomOracle struct { - gossip gossip.DeprecatedOracleGossip + nodeDescs kvcoord.NodeDescStore } var _ OracleFactory = &randomOracle{} func newRandomOracleFactory(cfg Config) OracleFactory { - return &randomOracle{gossip: cfg.Gossip} + return &randomOracle{nodeDescs: cfg.Gossip} } func (o *randomOracle) Oracle(_ *kv.Txn) Oracle { @@ -133,7 +134,7 @@ func (o *randomOracle) Oracle(_ *kv.Txn) Oracle { func (o *randomOracle) ChoosePreferredReplica( ctx context.Context, desc *roachpb.RangeDescriptor, _ QueryState, ) (roachpb.ReplicaDescriptor, error) { - replicas, err := replicaSliceOrErr(ctx, desc, o.gossip) + replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc) if err != nil { return roachpb.ReplicaDescriptor{}, err } @@ -141,8 +142,8 @@ func (o *randomOracle) ChoosePreferredReplica( } type closestOracle struct { - gossip gossip.DeprecatedOracleGossip latencyFunc kvcoord.LatencyFunc + nodeDescs kvcoord.NodeDescStore // nodeDesc is the descriptor of the current node. It will be used to give // preference to the current node and others "close" to it. nodeDesc roachpb.NodeDescriptor @@ -151,7 +152,7 @@ type closestOracle struct { func newClosestOracleFactory(cfg Config) OracleFactory { return &closestOracle{ latencyFunc: latencyFunc(cfg.RPCContext), - gossip: cfg.Gossip, + nodeDescs: cfg.Gossip, nodeDesc: cfg.NodeDesc, } } @@ -163,7 +164,7 @@ func (o *closestOracle) Oracle(_ *kv.Txn) Oracle { func (o *closestOracle) ChoosePreferredReplica( ctx context.Context, desc *roachpb.RangeDescriptor, _ QueryState, ) (roachpb.ReplicaDescriptor, error) { - replicas, err := replicaSliceOrErr(ctx, desc, o.gossip) + replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc) if err != nil { return roachpb.ReplicaDescriptor{}, err } @@ -189,8 +190,9 @@ const maxPreferredRangesPerLeaseHolder = 10 type binPackingOracle struct { leaseHolderCache *kvcoord.LeaseHolderCache maxPreferredRangesPerLeaseHolder int - gossip gossip.DeprecatedOracleGossip - latencyFunc kvcoord.LatencyFunc + // TODO(nvanbenschoten): replace with NodeDescStore once #49997 merges. + gossip gossip.DeprecatedOracleGossip + latencyFunc kvcoord.LatencyFunc // nodeDesc is the descriptor of the current node. It will be used to give // preference to the current node and others "close" to it. nodeDesc roachpb.NodeDescriptor @@ -229,7 +231,7 @@ func (o *binPackingOracle) ChoosePreferredReplica( } } - replicas, err := replicaSliceOrErr(ctx, desc, o.gossip) + replicas, err := replicaSliceOrErr(ctx, o.gossip, desc) if err != nil { return roachpb.ReplicaDescriptor{}, err } @@ -256,12 +258,12 @@ func (o *binPackingOracle) ChoosePreferredReplica( // replicaSliceOrErr returns a ReplicaSlice for the given range descriptor. // ReplicaSlices are restricted to replicas on nodes for which a NodeDescriptor -// is available in gossip. If no nodes are available, a RangeUnavailableError is -// returned. +// is available in the provided NodeDescStore. If no nodes are available, a +// RangeUnavailableError is returned. func replicaSliceOrErr( - ctx context.Context, desc *roachpb.RangeDescriptor, gsp gossip.DeprecatedOracleGossip, + ctx context.Context, nodeDescs kvcoord.NodeDescStore, desc *roachpb.RangeDescriptor, ) (kvcoord.ReplicaSlice, error) { - replicas, err := kvcoord.NewReplicaSlice(ctx, gsp, desc) + replicas, err := kvcoord.NewReplicaSlice(ctx, nodeDescs, desc) if err != nil { return kvcoord.ReplicaSlice{}, sqlbase.NewRangeUnavailableError(desc.RangeID, err) } From c3b8bcd4db976e64561d4153bc0a108c28c2c52f Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Tue, 16 Jun 2020 12:58:21 -0400 Subject: [PATCH 6/6] server: Wrap ExternalStorage factory methods in externalStorageBuilder struct Previously, we initialized the ExternalStorage factory methods on creation of a NewServer() as all the required config params were ready-to-use. With future work related to user scoped storage requiring access to the underlying storage.Engine, this change introduces a wrapper around these factory methods. Using a builder struct allows us to split the "creation" and "initialization" of the builder between the NewServer() and Start() methods respectively. This allows for params which are only initialized on server.Start() to be propogated to the builder for future use. This is part of a gradual refactor of the ExternalStorage factory interface and is primarily to unblock development of #47211. Release note: None --- pkg/server/server.go | 138 ++++++++++++++++++++++++++++--------------- 1 file changed, 91 insertions(+), 47 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 7de30611ddd7..5a0dee60351c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -169,12 +169,59 @@ type Server struct { sqlServer *sqlServer + // Created in NewServer but initialized (made usable) in `(*Server).Start`. + externalStorageBuilder *externalStorageBuilder + // The following fields are populated at start time, i.e. in `(*Server).Start`. startTime time.Time engines Engines } +// externalStorageBuilder is a wrapper around the ExternalStorage factory +// methods. It allows us to separate the creation and initialization of the +// builder between NewServer() and Start() respectively. +// TODO(adityamaru): Consider moving this to pkg/storage/cloud at a future +// stage of the ongoing refactor. +type externalStorageBuilder struct { + conf base.ExternalIODirConfig + settings *cluster.Settings + blobClientFactory blobs.BlobClientFactory + engine storage.Engine + initCalled bool +} + +func (e *externalStorageBuilder) init( + conf base.ExternalIODirConfig, + settings *cluster.Settings, + blobClientFactory blobs.BlobClientFactory, + engine storage.Engine, +) { + e.conf = conf + e.settings = settings + e.blobClientFactory = blobClientFactory + e.engine = engine + e.initCalled = true +} + +func (e *externalStorageBuilder) makeExternalStorage( + ctx context.Context, dest roachpb.ExternalStorage, +) (cloud.ExternalStorage, error) { + if !e.initCalled { + return nil, errors.New("cannot create external storage before init") + } + return cloud.MakeExternalStorage(ctx, dest, e.conf, e.settings, e.blobClientFactory) +} + +func (e *externalStorageBuilder) makeExternalStorageFromURI( + ctx context.Context, uri string, +) (cloud.ExternalStorage, error) { + if !e.initCalled { + return nil, errors.New("cannot create external storage before init") + } + return cloud.ExternalStorageFromURI(ctx, uri, e.conf, e.settings, e.blobClientFactory) +} + // NewServer creates a Server from a server.Config. func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if err := cfg.ValidateAddrs(context.Background()); err != nil { @@ -388,26 +435,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { internalExecutor := &sql.InternalExecutor{} jobRegistry := &jobs.Registry{} // ditto - // This function defines how ExternalStorage objects are created. - externalStorage := func(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) { - return cloud.MakeExternalStorage( - ctx, dest, cfg.ExternalIODirConfig, st, - blobs.NewBlobClientFactory( - nodeIDContainer.Get(), - nodeDialer, - st.ExternalIODir, - ), - ) + // Create an ExternalStorageBuilder. This is only usable after Start() where + // we initialize all the configuration params. + externalStorageBuilder := &externalStorageBuilder{} + externalStorage := func(ctx context.Context, dest roachpb.ExternalStorage) (cloud. + ExternalStorage, error) { + return externalStorageBuilder.makeExternalStorage(ctx, dest) } externalStorageFromURI := func(ctx context.Context, uri string) (cloud.ExternalStorage, error) { - return cloud.ExternalStorageFromURI( - ctx, uri, cfg.ExternalIODirConfig, st, - blobs.NewBlobClientFactory( - nodeIDContainer.Get(), - nodeDialer, - st.ExternalIODir, - ), - ) + return externalStorageBuilder.makeExternalStorageFromURI(ctx, uri) } protectedtsProvider, err := ptprovider.New(ptprovider.Config{ @@ -574,35 +610,36 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { node.InitLogger(sqlServer.execCfg) *lateBoundServer = Server{ - nodeIDContainer: nodeIDContainer, - cfg: cfg, - st: st, - clock: clock, - rpcContext: rpcContext, - grpc: grpcServer, - gossip: g, - nodeDialer: nodeDialer, - nodeLiveness: nodeLiveness, - storePool: storePool, - tcsFactory: tcsFactory, - distSender: distSender, - db: db, - node: node, - registry: registry, - recorder: recorder, - runtime: runtimeSampler, - admin: sAdmin, - status: sStatus, - authentication: sAuth, - tsDB: tsDB, - tsServer: &sTS, - raftTransport: raftTransport, - stopper: stopper, - debug: debugServer, - replicationReporter: replicationReporter, - protectedtsProvider: protectedtsProvider, - protectedtsReconciler: protectedtsReconciler, - sqlServer: sqlServer, + nodeIDContainer: nodeIDContainer, + cfg: cfg, + st: st, + clock: clock, + rpcContext: rpcContext, + grpc: grpcServer, + gossip: g, + nodeDialer: nodeDialer, + nodeLiveness: nodeLiveness, + storePool: storePool, + tcsFactory: tcsFactory, + distSender: distSender, + db: db, + node: node, + registry: registry, + recorder: recorder, + runtime: runtimeSampler, + admin: sAdmin, + status: sStatus, + authentication: sAuth, + tsDB: tsDB, + tsServer: &sTS, + raftTransport: raftTransport, + stopper: stopper, + debug: debugServer, + replicationReporter: replicationReporter, + protectedtsProvider: protectedtsProvider, + protectedtsReconciler: protectedtsReconciler, + sqlServer: sqlServer, + externalStorageBuilder: externalStorageBuilder, } return lateBoundServer, err } @@ -1004,6 +1041,13 @@ func (s *Server) Start(ctx context.Context) error { } s.stopper.AddCloser(&s.engines) + // Initialize the external storage builders configuration params now that the + // engines have been created. The object can be used to create ExternalStorage + // objects hereafter. + s.externalStorageBuilder.init(s.cfg.ExternalIODirConfig, s.st, + blobs.NewBlobClientFactory(s.nodeIDContainer.Get(), + s.nodeDialer, s.st.ExternalIODir), nil) + bootstrapVersion := s.cfg.Settings.Version.BinaryVersion() if knobs := s.cfg.TestingKnobs.Server; knobs != nil { if ov := knobs.(*TestingKnobs).BootstrapVersionOverride; ov != (roachpb.Version{}) {