Skip to content

Commit

Permalink
kv: decompose roles of Gossip dependency in DistSender
Browse files Browse the repository at this point in the history
This commit decomposes the roles of Gossip as a dependency in DistSender
into that of a NodeDescStore, a source of node descriptors, and that of
a FirstRangeProvider, a provider of information on the first range in a
cluster.

This decomposition will be used to address cockroachdb#47909 by:
1. replacing Gossip with a TenantService as a NodeDescStore
2. providing a custom RangeDescriptorDB (also backed by a TenantService)
   instead of a FirstRangeProvider.

Together, these changes will allow us to remove DistSender's dependency
on Gossip for SQL-only tenant processes.

The next step after this will be to introduce a TenantService that can
satisfy these two dependencies (NodeDescStore and RangeDescriptorDB) and
also use the new NodeDescStore-to-AddressResolver binding to replace the
use of Gossip with the TenantService in nodedialer instances.
  • Loading branch information
nvanbenschoten committed Jun 11, 2020
1 parent 646a8b0 commit ec3d0e5
Show file tree
Hide file tree
Showing 15 changed files with 378 additions and 222 deletions.
26 changes: 26 additions & 0 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,32 @@ func (g *Gossip) findClient(match func(*client) bool) *client {
return nil
}

// A firstRangeMissingError indicates that the first range has not yet
// been gossiped. This will be the case for a node which hasn't yet
// joined the gossip network.
type firstRangeMissingError struct{}

// Error is part of the error interface.
func (f firstRangeMissingError) Error() string {
return "the descriptor for the first range is not available via gossip"
}

// GetFirstRangeDescriptor implements kvcoord.FirstRangeProvider.
func (g *Gossip) GetFirstRangeDescriptor() (*roachpb.RangeDescriptor, error) {
desc := &roachpb.RangeDescriptor{}
if err := g.GetInfoProto(KeyFirstRangeDescriptor, desc); err != nil {
return nil, firstRangeMissingError{}
}
return desc, nil
}

// OnFirstRangeChanged implements kvcoord.FirstRangeProvider.
func (g *Gossip) OnFirstRangeChanged(cb func(roachpb.Value)) {
g.RegisterCallback(KeyFirstRangeDescriptor, func(_ string, value roachpb.Value) {
cb(value)
})
}

// MakeExposedGossip initializes a DeprecatedGossip instance which exposes a
// wrapped Gossip instance via Optional(). This is used on SQL servers running
// inside of a KV server (i.e. single-tenant deployments).
Expand Down
162 changes: 100 additions & 62 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,18 @@ func makeDistSenderMetrics() DistSenderMetrics {
}
}

// A firstRangeMissingError indicates that the first range has not yet
// been gossiped. This will be the case for a node which hasn't yet
// joined the gossip network.
type firstRangeMissingError struct{}

// Error is part of the error interface.
func (f firstRangeMissingError) Error() string {
return "the descriptor for the first range is not available via gossip"
// FirstRangeProvider is capable of providing DistSender with the descriptor of
// the first range in the cluster and notifying the DistSender when the first
// range in the cluster has changed.
type FirstRangeProvider interface {
// GetFirstRangeDescriptor returns the RangeDescriptor for the first range
// in the cluster.
GetFirstRangeDescriptor() (*roachpb.RangeDescriptor, error)

// OnFirstRangeChanged calls the provided callback when the descriptor for
// the first range has changed. The callback is passed the RangeDescriptor
// in its marshaled form.
OnFirstRangeChanged(func(roachpb.Value))
}

// A DistSender provides methods to access Cockroach's monolithic,
Expand All @@ -202,13 +206,16 @@ type DistSender struct {
// clock is used to set time for some calls. E.g. read-only ops
// which span ranges and don't require read consistency.
clock *hlc.Clock
// gossip provides up-to-date information about the start of the
// key range, used to find the replica metadata for arbitrary key
// ranges.
gossip *gossip.Gossip
// nodeDescs provides information on other nodes that DistSender may
// consider routing requests to.
nodeDescs NodeDescStore
// metrics stored DistSender-related metrics.
metrics DistSenderMetrics
// rangeCache caches replica metadata for key ranges.
rangeCache *RangeDescriptorCache
// firstRangeProvider provides the range descriptor for range one.
// The dependency is optional.
firstRangeProvider FirstRangeProvider
// leaseHolderCache caches range lease holders by range ID.
leaseHolderCache *LeaseHolderCache
transportFactory TransportFactory
Expand Down Expand Up @@ -238,17 +245,30 @@ var _ kv.Sender = &DistSender{}
type DistSenderConfig struct {
AmbientCtx log.AmbientContext

Settings *cluster.Settings
Clock *hlc.Clock
RPCRetryOptions *retry.Options
// nodeDescriptor, if provided, is used to describe which node the DistSender
// lives on, for instance when deciding where to send RPCs.
Settings *cluster.Settings
Clock *hlc.Clock
NodeDescs NodeDescStore
// nodeDescriptor, if provided, is used to describe which node the
// DistSender lives on, for instance when deciding where to send RPCs.
// Usually it is filled in from the Gossip network on demand.
nodeDescriptor *roachpb.NodeDescriptor
RPCContext *rpc.Context
RangeDescriptorDB RangeDescriptorDB
nodeDescriptor *roachpb.NodeDescriptor
RPCRetryOptions *retry.Options
RPCContext *rpc.Context
NodeDialer *nodedialer.Dialer

NodeDialer *nodedialer.Dialer
// One of the following dependencies must be provided, but not both.
//
// If FirstRangeProvider is provided, DistSender will issue ScanRequest
// RPCs to satisfy range lookups when possible (e.g. looking up descriptors
// for meta2 and user ranges) and will delegate to the FirstRangeProvider
// when the descriptor for meta1 (range 1) is needed. If RangeDescriptorDB
// is provided, all range lookups will be delegated to it.
//
// If both are provided (not required, but allowed) range lookups will be
// delegated to the RangeDescriptorDB but FirstRangeProvider will still be
// used to listen for updates to the first range's descriptor.
FirstRangeProvider FirstRangeProvider
RangeDescriptorDB RangeDescriptorDB

TestingKnobs ClientTestingKnobs
}
Expand All @@ -257,12 +277,12 @@ type DistSenderConfig struct {
// Cockroach cluster via the supplied gossip instance. Supplying a
// DistSenderContext or the fields within is optional. For omitted values, sane
// defaults will be used.
func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender {
func NewDistSender(cfg DistSenderConfig) *DistSender {
ds := &DistSender{
st: cfg.Settings,
clock: cfg.Clock,
gossip: g,
metrics: makeDistSenderMetrics(),
st: cfg.Settings,
clock: cfg.Clock,
nodeDescs: cfg.NodeDescs,
metrics: makeDistSenderMetrics(),
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
Expand All @@ -276,10 +296,17 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender {
if cfg.nodeDescriptor != nil {
atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(cfg.nodeDescriptor))
}
rdb := cfg.RangeDescriptorDB
if rdb == nil {
var rdb RangeDescriptorDB
if cfg.FirstRangeProvider != nil {
ds.firstRangeProvider = cfg.FirstRangeProvider
rdb = ds
}
if cfg.RangeDescriptorDB != nil {
rdb = cfg.RangeDescriptorDB
}
if rdb == nil {
panic("DistSenderConfig must contain either FirstRangeProvider or RangeDescriptorDB")
}
getRangeDescCacheSize := func() int64 {
return rangeDescriptorCacheSize.Get(&ds.st.SV)
}
Expand Down Expand Up @@ -310,23 +337,22 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender {
})
ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper"))

if g != nil {
if ds.firstRangeProvider != nil {
ctx := ds.AnnotateCtx(context.Background())
g.RegisterCallback(gossip.KeyFirstRangeDescriptor,
func(_ string, value roachpb.Value) {
if atomic.LoadInt32(&ds.disableFirstRangeUpdates) == 1 {
return
}
if log.V(1) {
var desc roachpb.RangeDescriptor
if err := value.GetProto(&desc); err != nil {
log.Errorf(ctx, "unable to parse gossiped first range descriptor: %s", err)
} else {
log.Infof(ctx, "gossiped first range descriptor: %+v", desc.Replicas())
}
ds.firstRangeProvider.OnFirstRangeChanged(func(value roachpb.Value) {
if atomic.LoadInt32(&ds.disableFirstRangeUpdates) == 1 {
return
}
if log.V(1) {
var desc roachpb.RangeDescriptor
if err := value.GetProto(&desc); err != nil {
log.Errorf(ctx, "unable to parse gossiped first range descriptor: %s", err)
} else {
log.Infof(ctx, "gossiped first range descriptor: %+v", desc.Replicas())
}
ds.rangeCache.EvictByKey(ctx, roachpb.RKeyMin)
})
}
ds.rangeCache.EvictByKey(ctx, roachpb.RKeyMin)
})
}
return ds
}
Expand Down Expand Up @@ -360,11 +386,12 @@ func (ds *DistSender) LeaseHolderCache() *LeaseHolderCache {
return ds.leaseHolderCache
}

// RangeLookup implements the RangeDescriptorDB interface. It uses LookupRange
// to perform a lookup scan for the provided key, using DistSender itself as the
// client.Sender. This means that the scan will recurse into DistSender, which
// will in turn use the RangeDescriptorCache again to lookup the RangeDescriptor
// necessary to perform the scan.
// RangeLookup implements the RangeDescriptorDB interface.
//
// It uses LookupRange to perform a lookup scan for the provided key, using
// DistSender itself as the client.Sender. This means that the scan will recurse
// into DistSender, which will in turn use the RangeDescriptorCache again to
// lookup the RangeDescriptor necessary to perform the scan.
func (ds *DistSender) RangeLookup(
ctx context.Context, key roachpb.RKey, useReverseScan bool,
) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error) {
Expand All @@ -386,17 +413,26 @@ func (ds *DistSender) RangeLookup(
}

// FirstRange implements the RangeDescriptorDB interface.
// FirstRange returns the RangeDescriptor for the first range on the cluster,
// which is retrieved from the gossip protocol instead of the datastore.
//
// It returns the RangeDescriptor for the first range in the cluster using the
// FirstRangeProvider, which is typically implemented using the gossip protocol
// instead of the datastore.
func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error) {
if ds.gossip == nil {
panic("with `nil` Gossip, DistSender must not use itself as rangeDescriptorDB")
if ds.firstRangeProvider == nil {
panic("with `nil` firstRangeProvider, DistSender must not use itself as RangeDescriptorDB")
}
rangeDesc := &roachpb.RangeDescriptor{}
if err := ds.gossip.GetInfoProto(gossip.KeyFirstRangeDescriptor, rangeDesc); err != nil {
return nil, firstRangeMissingError{}
return ds.firstRangeProvider.GetFirstRangeDescriptor()
}

// getNodeID attempts to return the local node ID. It returns 0 if the DistSender
// does not have access to the Gossip network.
func (ds *DistSender) getNodeID() roachpb.NodeID {
// TODO(nvanbenschoten): open an issue about the effect of this.
g, ok := ds.nodeDescs.(*gossip.Gossip)
if !ok {
return 0
}
return rangeDesc, nil
return g.NodeID.Get()
}

// getNodeDescriptor returns ds.nodeDescriptor, but makes an attempt to load
Expand All @@ -408,17 +444,19 @@ func (ds *DistSender) getNodeDescriptor() *roachpb.NodeDescriptor {
if desc := atomic.LoadPointer(&ds.nodeDescriptor); desc != nil {
return (*roachpb.NodeDescriptor)(desc)
}
if ds.gossip == nil {
// TODO(nvanbenschoten): open an issue about the effect of this.
g, ok := ds.nodeDescs.(*gossip.Gossip)
if !ok {
return nil
}

ownNodeID := ds.gossip.NodeID.Get()
ownNodeID := g.NodeID.Get()
if ownNodeID > 0 {
// TODO(tschottdorf): Consider instead adding the NodeID of the
// coordinator to the header, so we can get this from incoming
// requests. Just in case we want to mostly eliminate gossip here.
nodeDesc := &roachpb.NodeDescriptor{}
if err := ds.gossip.GetInfoProto(gossip.MakeNodeIDKey(ownNodeID), nodeDesc); err == nil {
if err := g.GetInfoProto(gossip.MakeNodeIDKey(ownNodeID), nodeDesc); err == nil {
atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(nodeDesc))
return nodeDesc
}
Expand Down Expand Up @@ -494,8 +532,8 @@ func (ds *DistSender) initAndVerifyBatch(
ctx context.Context, ba *roachpb.BatchRequest,
) *roachpb.Error {
// Attach the local node ID to each request.
if ba.Header.GatewayNodeID == 0 && ds.gossip != nil {
ba.Header.GatewayNodeID = ds.gossip.NodeID.Get()
if ba.Header.GatewayNodeID == 0 {
ba.Header.GatewayNodeID = ds.getNodeID()
}

// In the event that timestamp isn't set and read consistency isn't
Expand Down Expand Up @@ -1611,7 +1649,7 @@ func (ds *DistSender) sendToReplicas(
ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor, withCommit bool,
) (*roachpb.BatchResponse, error) {
ba.RangeID = desc.RangeID
replicas, err := NewReplicaSlice(ctx, ds.gossip, desc)
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (ds *DistSender) singleRangeFeed(
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
replicas, err := NewReplicaSlice(ctx, ds.gossip, desc)
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc)
if err != nil {
return args.Timestamp, err
}
Expand Down
56 changes: 27 additions & 29 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,15 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
// Create a new DistSender and client.DB so that the Get below is guaranteed
// to not hit in the range descriptor cache forcing a RangeLookup operation.
ambient := log.AmbientContext{Tracer: s.ClusterSettings().Tracer}
ds := kvcoord.NewDistSender(
kvcoord.DistSenderConfig{
AmbientCtx: ambient,
Clock: s.Clock(),
RPCContext: s.RPCContext(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())),
Settings: cluster.MakeTestingClusterSettings(),
},
s.(*server.TestServer).Gossip(),
)
ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{
AmbientCtx: ambient,
Settings: cluster.MakeTestingClusterSettings(),
Clock: s.Clock(),
NodeDescs: s.(*server.TestServer).Gossip(),
RPCContext: s.RPCContext(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())),
FirstRangeProvider: s.(*server.TestServer).Gossip(),
})
tsf := kvcoord.NewTxnCoordSenderFactory(
kvcoord.TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Expand Down Expand Up @@ -1210,16 +1209,15 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) {
} {
manual := hlc.NewManualClock(ts[0].WallTime + 1)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
ds := kvcoord.NewDistSender(
kvcoord.DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer},
Clock: clock,
RPCContext: s.RPCContext(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())),
Settings: cluster.MakeTestingClusterSettings(),
},
s.(*server.TestServer).Gossip(),
)
ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer},
Settings: cluster.MakeTestingClusterSettings(),
Clock: clock,
NodeDescs: s.(*server.TestServer).Gossip(),
RPCContext: s.RPCContext(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())),
FirstRangeProvider: s.(*server.TestServer).Gossip(),
})

reply, err := kv.SendWrappedWith(context.Background(), ds, roachpb.Header{
ReadConsistency: rc,
Expand Down Expand Up @@ -1416,15 +1414,15 @@ func TestBatchPutWithConcurrentSplit(t *testing.T) {

// Now, split further at the given keys, but use a new dist sender so
// we don't update the caches on the default dist sender-backed client.
ds := kvcoord.NewDistSender(
kvcoord.DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer},
Clock: s.Clock(),
RPCContext: s.RPCContext(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())),
Settings: cluster.MakeTestingClusterSettings(),
}, s.(*server.TestServer).Gossip(),
)
ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer},
Clock: s.Clock(),
NodeDescs: s.(*server.TestServer).Gossip(),
RPCContext: s.RPCContext(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())),
Settings: cluster.MakeTestingClusterSettings(),
FirstRangeProvider: s.(*server.TestServer).Gossip(),
})
for _, key := range []string{"c"} {
req := &roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down
Loading

0 comments on commit ec3d0e5

Please sign in to comment.