Skip to content

Commit

Permalink
kvflowcontrol,raftlog: interfaces for replication control
Browse files Browse the repository at this point in the history
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of #79215, settling on #83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of #95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this first,
proto{col,buf}/interface-only PR, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
     TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to. This information used on the origin node to release
   flow tokens that were acquired when replicating the original log
   entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
  • Loading branch information
irfansharif committed Jan 23, 2023
1 parent b21379b commit f29a8e5
Show file tree
Hide file tree
Showing 33 changed files with 792 additions and 109 deletions.
3 changes: 2 additions & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@
/pkg/kv/kvserver/gc/ @cockroachdb/kv-prs
/pkg/kv/kvserver/idalloc/ @cockroachdb/kv-prs
/pkg/kv/kvserver/intentresolver/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvadmission/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvadmission/ @cockroachdb/admission-control
/pkg/kv/kvserver/kvflowcontrol/ @cockroachdb/admission-control
/pkg/kv/kvserver/kvserverbase/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvserverpb/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvstorage/ @cockroachdb/repl-prs
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-30 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-32 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-30</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-32</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,8 @@ GO_TARGETS = [
"//pkg/kv/kvserver/intentresolver:intentresolver",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvadmission:kvadmission",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol",
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
"//pkg/kv/kvserver/kvstorage:kvstorage",
Expand Down Expand Up @@ -2587,6 +2589,8 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/idalloc:get_x_data",
"//pkg/kv/kvserver/intentresolver:get_x_data",
"//pkg/kv/kvserver/kvadmission:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
"//pkg/kv/kvserver/kvserverpb:get_x_data",
"//pkg/kv/kvserver/kvstorage:get_x_data",
Expand Down
10 changes: 10 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,12 @@ const (
// chagnefeeds created prior to this version.
V23_1_ChangefeedExpressionProductionReady

// V23_1UseEncodingWithBelowRaftAdmissionData enables the use of raft
// command encodings that include below-raft admission control data.
//
// TODO(irfansharif): Actually use this.
V23_1UseEncodingWithBelowRaftAdmissionData

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -687,6 +693,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_ChangefeedExpressionProductionReady,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 30},
},
{
Key: V23_1UseEncodingWithBelowRaftAdmissionData,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 32},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ PROTOBUF_SRCS = [
"//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto",
"//pkg/kv/kvserver/concurrency/lock:lock_go_proto",
"//pkg/kv/kvserver/concurrency/poison:poison_go_proto",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_go_proto",
"//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
"@io_etcd_go_raft_v3//raftpb",
],
)

Expand Down
25 changes: 18 additions & 7 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"go.etcd.io/raft/v3/raftpb"
)

// elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted
Expand Down Expand Up @@ -81,6 +82,14 @@ var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting(
true,
)

// ProvisionedBandwidth set a value of the provisioned
// bandwidth for each store in the cluster.
var ProvisionedBandwidth = settings.RegisterByteSizeSetting(
settings.SystemOnly, "kvadmission.store.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
"for each store. It can be over-ridden on a per-store basis using the --store flag",
0).WithPublic()

// Controller provides admission control for the KV layer.
type Controller interface {
// AdmitKVWork must be called before performing KV work.
Expand Down Expand Up @@ -108,6 +117,9 @@ type Controller interface {
// replicated to a raft follower, that have not been subject to admission
// control.
FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes)
// AdmitRaftEntry informs admission control of a raft log entry being
// written to storage.
AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
}

// TenantWeightProvider can be periodically asked to provide the tenant
Expand Down Expand Up @@ -394,13 +406,12 @@ func (n *controllerImpl) FollowerStoreWriteBytes(
followerWriteBytes.NumEntries, followerWriteBytes.StoreWorkDoneInfo)
}

// ProvisionedBandwidth set a value of the provisioned
// bandwidth for each store in the cluster.
var ProvisionedBandwidth = settings.RegisterByteSizeSetting(
settings.SystemOnly, "kvadmission.store.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
"for each store. It can be over-ridden on a per-store basis using the --store flag",
0).WithPublic()
// AdmitRaftEntry implements the Controller interface.
func (n *controllerImpl) AdmitRaftEntry(
roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry,
) {
panic("unimplemented")
}

// FollowerStoreWriteBytes captures stats about writes done to a store by a
// replica that is not the leaseholder. These are used for admission control.
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "kvflowcontrol",
srcs = [
"doc.go",
"kvflowcontrol.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
],
)

get_x_data(name = "get_x_data")
94 changes: 94 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvflowcontrol

// TODO(irfansharif): After implementing these interfaces and integrating it
// into KV, write a "life of a replicated proposal" timeline here for flow-token
// interactions. Talk about how range splits/merges interact and how we ensure
// now flow tokens are leaked or double returned. Talk also about snapshots, log
// truncations, leader/leaseholder changes, leaseholder != leader, follower
// pausing, re-proposals (the token deduction is tracked for the first attempt),
// lossy raft transport send/recv buffers, and raft membership changing.

// This package contains machinery for "replication admission control" --
// end-to-end flow control for replication traffic. It's part of the integration
// layer between KV and admission control. There are a few components, in and
// out of this package.
//
// 1. The central interfaces/types in this package are:
// - kvflowcontrol.Controller, held at the node-level and holds all available
// kvflowcontrol.Tokens for each kvflowcontrol.Stream.
// - kvflowcontrol.Tokens represent the finite capacity of a given stream,
// expressed in bytes we're looking to replicate over the given stream.
// - kvflowcontrol.Stream models the stream over which we replicate data
// traffic, transmission for which we regulate using flow control. It's
// segmented by the specific store the traffic is bound for, and also the
// tenant driving it.
// - kvflowcontrol.Handle is held at the replica-level (only on those who are
// both leaseholder and raft leader), and is used to interface with
// the node-level kvflowcontrol.Controller. When replicating log entries,
// these replicas choose the log position (term+index) the data is to end
// up at, and use this handle to track the token deductions on a per log
// position basis. After being informed of these log entries being admitted
// by the receiving end of the kvflowcontrol.Stream, it frees up the
// tokens.
//
// 2. kvflowcontrolpb.RaftAdmissionMeta, embedded within each
// kvserverpb.RaftCommand, includes all necessary information for below-raft
// IO admission control. Also included is the node where this command
// originated, who wants to eventually learn of this command's admission.
// Entries that contain this metadata make use of AC-specific raft log entry
// encodings described below.
//
// 3. kvflowcontrolpb.AdmittedRaftLogEntries, piggybacked as part of
// kvserverpb.RaftMessageRequest[1], contains coalesced information about all
// raft log entries that were admitted below raft. We use the origin node
// encoded in raft entry (RaftAdmissionMeta.AdmissionOriginNode) to know
// where to send these to. This information used on the origin node to
// release flow tokens that were acquired when replicating the original log
// entries.
//
// 4. kvflowcontrol.Dispatch is used to dispatch information about
// admitted raft log entries (AdmittedRaftLogEntries) to the specific nodes
// where (i) said entries originated, (ii) flow tokens were deducted and
// (iii) are waiting to be returned. The interface is also used to read
// pending dispatches, which will be used in the raft transport layer when
// looking to piggyback information on traffic already bound to specific
// nodes. Since timely dispatching (read: piggybacking) is not guaranteed, we
// allow querying for all long-overdue dispatches.
//
// 5. We use specific encodings for raft log entries that contain AC data:
// EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix
// byte that informs decoding routines how to interpret the subsequent bytes.
// Since we don't want to decode anything if the command is not subject to
// replication admission control, the encoding type is a convenient place to
// capture how a specific entry is to be considered.
// - The decision to use replication admission control happens above raft
// (using cluster settings, version gates), and AC-specific metadata is
// plumbed down as part of the marshaled raft command (RaftAdmissionMeta).
//
// 6. AdmitRaftEntry, on the kvadmission.Controller is the integration
// point for log entries received below raft right as they're being written
// to storage. This is non-blocking since we're below raft in the
// raft.Ready() loop. It effectively enqueues a "virtual" work item in
// underlying StoreWorkQueue mediating store IO. This virtual work item is
// what later gets dequeued once the store granter informs the work queue of
// newly available IO tokens. For standard work queue ordering, our work item
// needs to include the CreateTime and AdmissionPriority. The tenant ID is
// plumbed to find the right tenant heap to queue it under (for inter-tenant
// isolation); the store ID to find the right store work queue on multi-store
// nodes. Since the raftpb.Entry encodes within it its origin node
// (AdmissionOriginNode), it's used post-admission to dispatch to the right
// node.
//
// [1]: kvserverpb.RaftMessageRequest is the unit of what's sent
// back-and-forth between two nodes over their two uni-directional raft
// transport streams.
125 changes: 125 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// Package kvflowcontrol provides flow control for replication traffic in KV.
// It's part of the integration layer between KV and admission control.
package kvflowcontrol

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
)

// Stream models the stream over which we replicate data traffic, the
// transmission for which we regulate using flow control. It's segmented by the
// specific store the traffic is bound for and the tenant driving it. Despite
// the underlying link/transport being shared across tenants, modeling streams
// on a per-tenant basis helps provide inter-tenant isolation.
type Stream struct {
TenantID roachpb.TenantID
StoreID roachpb.StoreID
}

// Tokens represent the finite capacity of a given stream, expressed in bytes
// for data we're looking to replicate. Use of replication streams are
// predicated on tokens being available.
type Tokens uint64

// Controller provides flow control for replication traffic in KV, held at the
// node-level.
type Controller interface {
// Admit seeks admission to replicate data of a given priority (regardless
// of size) over the specified streams. This is a blocking operation;
// requests wait until there are flow tokens available.
Admit(admissionpb.WorkPriority, ...Stream)
// DeductTokens deducts (without blocking) flow tokens for transmission over
// the given streams, for work with a given priority. Requests are expected
// to have been Admit()-ed first.
DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
// ReturnTokens returns flow tokens for the given streams. These tokens are
// expected to have been deducted earlier with a specific priority; that
// same priority is what's specified here.
ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)

// TODO(irfansharif): We might need the ability to "disable" specific
// streams/corresponding token buckets when there are failures or
// replication to a specific store is paused due to follower-pausing.
// That'll have to show up between the Handler and the Controller somehow.
}

// Handle is used to interface with replication flow control; it's typically
// backed by a node-level Controller. Handles are held on replicas initiating
// replication traffic, i.e. are both the leaseholder and raft leader. When
// replicating log entries, these replicas choose the log position (term+index)
// the data is to end up at, and use this handle to track the token deductions
// on a per log position basis. Later when freeing up tokens (typically after
// being informed of said log entries being admitted on the receiving end of the
// stream), it's done so by specifying the log position up to which we free up
// all deducted tokens. See kvflowcontrolpb.AdmittedRaftLogEntries for more
// details.
type Handle interface {
// Admit seeks admission to replicate data of a given priority (regardless
// of size). This is a blocking operation; requests wait until there are
// flow tokens available.
Admit(admissionpb.WorkPriority)
// DeductTokensFor deducts flow tokens for replicating data of a given
// priority to members of the raft group, and tracks it with respect to the
// specific raft log position it's expecting it to end up in. Requests are
// assumed to have been Admit()-ed first.
DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
// ReturnTokensUpto returns all previously deducted tokens of a given
// priority for all log positions less than or equal to the one specified.
// Once returned, subsequent attempts to return upto the same position or
// lower are no-ops.
ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
// TrackLowWater is used to set a low-water mark for a given replication
// stream. Tokens held below this position are returned back to the
// underlying Controller, regardless of priority. All subsequent returns at
// that position or lower are ignored.
//
// NB: This is used when a replica on the other end of a stream gets caught
// up via snapshot (say, after a log truncation), where we then don't expect
// dispatches for the individual AdmittedRaftLogEntries between what it
// admitted last and its latest RaftLogPosition. Another use is during
// successive lease changes (out and back) within the same raft term -- we
// want to both free up tokens from when we lost the lease, and also ensure
// that attempts to return them (on hearing about AdmittedRaftLogEntries
// replicated under the earlier lease), we discard the attempts.
TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
// Close closes the handle and returns all held tokens back to the
// underlying controller. Typically used when the replica loses its lease
// and/or raft leadership, or ends up getting GC-ed (if it's being
// rebalanced, merged away, etc).
Close()
}

// Dispatch is used to dispatch information about admitted raft log entries to
// specific nodes and read pending dispatches.
type Dispatch interface {
DispatchWriter
DispatchReader
}

// DispatchWriter is used to dispatch information about admitted raft log
// entries to specific nodes (typically where said entries originated, where
// flow tokens were deducted and waiting to be returned).
type DispatchWriter interface {
Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
}

// DispatchReader is used to read pending dispatches. It's used in the raft
// transport layer when looking to piggyback information on traffic already
// bound to specific nodes. It's also used when timely dispatching (read:
// piggybacking) has not taken place.
type DispatchReader interface {
PendingDispatch() []roachpb.NodeID
PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
}
Loading

0 comments on commit f29a8e5

Please sign in to comment.