From 2cac11b0b649a7c52ca600a114c60945f77ac5cd Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 20 Jan 2023 18:30:18 -0500 Subject: [PATCH] kvflowcontrol,raftlog: interfaces for replication control Follower replication work, today, is not subject to admission control. It consumes IO tokens without waiting, which both (i) does not prevent the LSM from being inverted, and (ii) can cause priority inversion where low-pri follower write work ends up causing IO token exhaustion, which in turn causes throughput and latency impact for high-pri non-follower write work on that same store. This latter behavior was especially noticeble with large index backfills (#82556) where >2/3rds of write traffic on stores could be follower work for large AddSSTs, causing IO token exhaustion for regular write work being proposed on those stores. We last looked at this problem as part of #79215, settling on #83851 which pauses replication traffic to stores close to exceeding their IO overload threshold (data that's periodically gossiped). In large index backfill experiments we found this to help slightly, but it's still a coarse and imperfect solution -- we're deliberately causing under-replication instead of being able to shape the rate of incoming writes for low-pri work closer to the origin. As part of #95563 we're introducing machinery for "replication admission control" -- end-to-end flow control for replication traffic. With it we expect to no longer need to bypass follower write work in admission control and solve the issues mentioned above. Some small degree of familiarity with the design is assumed below. In this proto{col,buf}/interface-only commit and the previous raft log encoding commit, we introduce: 1. Package kvflowcontrol{,pb}, which will provide flow control for replication traffic in KV. It will be part of the integration layer between KV and admission control. In it we have a few central interfaces: - kvflowcontrol.Controller, held at the node-level and holds all kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store we're sending raft traffic to and tenant we're sending it for). - kvflowcontrol.Handle, which will held at the replica-level (only on those who are both leaseholder and raft leader), and will be used to interface with the node-level kvflowcontrol.Controller. When replicating log entries, these replicas choose the log position (term+index) the data is to end up at, and use this handle to track the token deductions on a per log position basis. Later when freeing up tokens (after being informed of said log entries being admitted on the receiving end of the stream), it's done so by specifying the log position up to which we free up all deducted tokens. type Controller interface { Admit(admissionpb.WorkPriority, time.Time, Stream) DeductTokens(admissionpb.WorkPriority, Tokens, Stream) (deducted bool) ReturnTokens(admissionpb.WorkPriority, Tokens, Stream) } type Handle interface { Admit(admissionpb.WorkPriority, time.Time) DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens) DeductedTokensUpto(Stream) kvflowcontrolpb.RaftLogPosition ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream) ReturnAllTokensUpto(kvflowcontrolpb.RaftLogPosition, Stream) Close() } 2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding routines. RaftAdmissionMeta is 'embedded' within a kvserverpb.RaftCommand, and includes necessary AC metadata on a per raft entry basis. Entries that contain this metadata will make use of the AC-specific raft log entry encodings described earlier. The AC metadata is decoded below-raft when looking to admit the write work. Also included is the node where this command originated, who wants to eventually learn of this command's admission. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } 3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in kvserverpb.RaftMessageRequest, the unit of what's sent back-and-forth between two nodes over their two uni-directional raft transport streams. AdmittedRaftLogEntries, just like raft heartbeats, is coalesced information about all raft log entries that were admitted below raft. We'll use the origin node encoded in raft entry (admission_origin_node from from above) to know where to send these to, which in turn will release flow tokens that were acquired when replicating the original log entries. message AdmittedRaftLogEntries { int64 range_id = ...; int32 admission_priority = ...; RaftLogPosition up_to_raft_log_position = ...; uint64 store_id = ...; } message RaftLogPosition { uint64 term = ...; uint64 index = ...; } 4. kvflowcontrol.Dispatch, which is used to dispatch information about admitted raft log entries (see AdmittedRaftLogEntries from above) to specific nodes where (i) said entries originated, (ii) flow tokens were deducted and (iii) are waiting to be returned. The interface is also used to read pending dispatches, which will be used in the raft transport layer when looking to piggyback information on traffic already bound to specific nodes. Since timely dispatching (read: piggybacking) is not guaranteed, we allow querying for all long-overdue dispatches. The interface looks roughly like: type Dispatch interface { Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries) PendingDispatch() []roachpb.NodeID PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries } 5. Two new encodings for raft log entries, EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two, EntryEncoding{Standard,Sideloaded} (now renamed to EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether the entry came with sideloaded data (these are typically AddSSTs, the storage for which is treated differently for performance). Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll admit the work without blocking. - We'll come back to this non-blocking admission in the AdmitRaftEntry description below, even though the implementation is left for a future PR. - The decision to use replication admission control happens above raft, and AC-specific metadata is plumbed down as part of the marshaled raft command, as described for RaftAdmissionMeta above. 6. AdmitRaftEntry, on the kvadmission.Controller interface. We'll use this as the integration point for log entries received below raft, right as they're being written to storage. This will be non-blocking since we'll be below raft in the raft.Ready() loop, and will effectively enqueue a "virtual" work item in underlying StoreWorkQueue mediating store IO. This virtual work item is what later gets dequeued once the store granter informs the work queue of newly available IO tokens. For standard work queue ordering, our work item needs to include the create time and admission pri. The tenant ID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation); the store ID to find the right store work queue on multi-store nodes. The raftpb.Entry encodes within it its origin node (see RaftAdmissionMeta above), which is used post-admission to inform the right node of said admission. It looks like: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage. AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) Release note: None --- .github/CODEOWNERS | 3 +- pkg/BUILD.bazel | 4 + pkg/gen/protobuf.bzl | 1 + pkg/kv/kvserver/kvadmission/BUILD.bazel | 1 + pkg/kv/kvserver/kvadmission/kvadmission.go | 25 +- pkg/kv/kvserver/kvflowcontrol/BUILD.bazel | 19 + pkg/kv/kvserver/kvflowcontrol/doc.go | 355 ++++++++++++++++++ .../kvserver/kvflowcontrol/kvflowcontrol.go | 138 +++++++ .../kvflowcontrol/kvflowcontrolpb/BUILD.bazel | 35 ++ .../kvflowcontrolpb/kvflowcontrol.proto | 119 ++++++ .../kvflowcontrolpb/raft_log_position.go | 42 +++ pkg/kv/kvserver/kvserverpb/BUILD.bazel | 2 + pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 17 + pkg/kv/kvserver/kvserverpb/raft.proto | 5 + pkg/kv/kvserver/raftlog/BUILD.bazel | 5 + pkg/kv/kvserver/raftlog/encoding.go | 22 ++ pkg/kv/kvserver/raftlog/encoding_test.go | 107 ++++++ pkg/util/admission/admission.go | 5 +- pkg/util/admission/admissionpb/admissionpb.go | 38 ++ pkg/util/admission/grant_coordinator.go | 12 +- pkg/util/admission/granter.go | 47 +-- pkg/util/admission/granter_test.go | 36 +- pkg/util/admission/io_load_listener.go | 4 +- pkg/util/admission/io_load_listener_test.go | 19 +- pkg/util/admission/work_queue.go | 19 +- pkg/util/admission/work_queue_test.go | 20 +- 26 files changed, 1004 insertions(+), 96 deletions(-) create mode 100644 pkg/kv/kvserver/kvflowcontrol/BUILD.bazel create mode 100644 pkg/kv/kvserver/kvflowcontrol/doc.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go create mode 100644 pkg/kv/kvserver/raftlog/encoding_test.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 5c8036fe810a..6d5c0db22246 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -222,7 +222,8 @@ /pkg/kv/kvserver/gc/ @cockroachdb/kv-prs /pkg/kv/kvserver/idalloc/ @cockroachdb/kv-prs /pkg/kv/kvserver/intentresolver/ @cockroachdb/kv-prs -/pkg/kv/kvserver/kvadmission/ @cockroachdb/kv-prs +/pkg/kv/kvserver/kvadmission/ @cockroachdb/admission-control +/pkg/kv/kvserver/kvflowcontrol/ @cockroachdb/admission-control /pkg/kv/kvserver/kvserverbase/ @cockroachdb/kv-prs /pkg/kv/kvserver/kvserverpb/ @cockroachdb/kv-prs /pkg/kv/kvserver/kvstorage/ @cockroachdb/repl-prs diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 57e7afe17925..f4b8f4df8e0e 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1214,6 +1214,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/intentresolver:intentresolver", "//pkg/kv/kvserver/intentresolver:intentresolver_test", "//pkg/kv/kvserver/kvadmission:kvadmission", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol", "//pkg/kv/kvserver/kvserverbase:kvserverbase", "//pkg/kv/kvserver/kvserverpb:kvserverpb", "//pkg/kv/kvserver/kvstorage:kvstorage", @@ -2607,6 +2609,8 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/idalloc:get_x_data", "//pkg/kv/kvserver/intentresolver:get_x_data", "//pkg/kv/kvserver/kvadmission:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", "//pkg/kv/kvserver/kvserverpb:get_x_data", "//pkg/kv/kvserver/kvstorage:get_x_data", diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index 88e7e8d1ecb3..fdeda998c619 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -30,6 +30,7 @@ PROTOBUF_SRCS = [ "//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto", "//pkg/kv/kvserver/concurrency/lock:lock_go_proto", "//pkg/kv/kvserver/concurrency/poison:poison_go_proto", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_go_proto", "//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto", diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 44573b5e43fc..3c8953754572 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//:pebble", + "@io_etcd_go_raft_v3//raftpb", ], ) diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index c9d4a98b30cb..92fb01f8a832 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "go.etcd.io/raft/v3/raftpb" ) // elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted @@ -81,6 +82,14 @@ var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting( true, ) +// ProvisionedBandwidth set a value of the provisioned +// bandwidth for each store in the cluster. +var ProvisionedBandwidth = settings.RegisterByteSizeSetting( + settings.SystemOnly, "kvadmission.store.provisioned_bandwidth", + "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ + "for each store. It can be over-ridden on a per-store basis using the --store flag", + 0).WithPublic() + // Controller provides admission control for the KV layer. type Controller interface { // AdmitKVWork must be called before performing KV work. @@ -108,6 +117,9 @@ type Controller interface { // replicated to a raft follower, that have not been subject to admission // control. FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes) + // AdmitRaftEntry informs admission control of a raft log entry being + // written to storage. + AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) } // TenantWeightProvider can be periodically asked to provide the tenant @@ -394,13 +406,12 @@ func (n *controllerImpl) FollowerStoreWriteBytes( followerWriteBytes.NumEntries, followerWriteBytes.StoreWorkDoneInfo) } -// ProvisionedBandwidth set a value of the provisioned -// bandwidth for each store in the cluster. -var ProvisionedBandwidth = settings.RegisterByteSizeSetting( - settings.SystemOnly, "kvadmission.store.provisioned_bandwidth", - "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ - "for each store. It can be over-ridden on a per-store basis using the --store flag", - 0).WithPublic() +// AdmitRaftEntry implements the Controller interface. +func (n *controllerImpl) AdmitRaftEntry( + roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry, +) { + panic("unimplemented") +} // FollowerStoreWriteBytes captures stats about writes done to a store by a // replica that is not the leaseholder. These are used for admission control. diff --git a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel new file mode 100644 index 000000000000..04ef54d4893d --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel @@ -0,0 +1,19 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "kvflowcontrol", + srcs = [ + "doc.go", + "kvflowcontrol.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/roachpb", + "//pkg/util/admission/admissionpb", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go new file mode 100644 index 000000000000..f4b1a0ed64a3 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -0,0 +1,355 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontrol + +// This package contains machinery for "replication admission control" -- +// end-to-end flow control for replication traffic. It's part of the integration +// layer between KV and admission control. There are a few components, in and +// out of this package. The [{l,r}] annotations refer to the figure +// below. +// +// A. The central interfaces/types in this package are: +// - kvflowcontrol.Controller, held at the node-level and holds all available +// kvflowcontrol.Tokens for each kvflowcontrol.Stream. +// - kvflowcontrol.Tokens represent the finite capacity of a given stream, +// expressed in bytes we're looking to replicate over the given stream. +// The tokens we deduct are determined post-evaluation, after [3]. +// - kvflowcontrol.Stream models the stream over which we replicate data +// traffic, transmission for which we regulate using flow control. It's +// segmented by the specific store the traffic is bound for, and also the +// tenant driving it. +// - kvflowcontrol.Handle is held at the replica-level (only on those who are +// both leaseholder and raft leader), and is used to interface with +// the node-level kvflowcontrol.Controller. When replicating log entries, +// these replicas choose the log position (term+index) the data is to end +// up at, and use this handle to track the token deductions on a per log +// position basis, see [4]. After being informed of these log entries being +// admitted by the receiving end of the kvflowcontrol.Stream, it frees up +// the tokens. +// +// B. kvflowcontrolpb.RaftAdmissionMeta, embedded within each +// kvserverpb.RaftCommand, includes all necessary information for below-raft +// IO admission control. Also included is the node where this command +// originated, who wants to eventually learn of this command's admission. +// Entries that contain this metadata make use of AC-specific raft log entry +// encodings described below. They're passed through the [5r] and [5l]. +// +// C. kvflowcontrolpb.AdmittedRaftLogEntries, piggybacked as part of +// kvserverpb.RaftMessageRequest[^1] (see [7r] + [9r] below), contains +// coalesced information about all raft log entries that were admitted below +// raft. We use the origin node encoded in raft entry +// (RaftAdmissionMeta.AdmissionOriginNode) to know where to send these to. +// This information is used on the origin node to release flow tokens that +// were acquired when replicating the original log entries. This is [10r] and +// [8l'] below. +// +// D. kvflowcontrol.Dispatch is used to dispatch information about +// admitted raft log entries (AdmittedRaftLogEntries) to the specific nodes +// where (i) said entries originated, (ii) flow tokens were deducted and +// (iii) are waiting to be returned. The interface is also used to read +// pending dispatches, which will be used in the raft transport layer when +// looking to piggyback information on traffic already bound to specific +// nodes; see [8]. Since timely dispatching (read: piggybacking) is not +// guaranteed, we allow querying for all long-overdue dispatches. +// +// E. We use specific encodings for raft log entries that contain AC data: +// EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix +// byte that informs decoding routines how to interpret the subsequent bytes. +// Since we don't want to decode anything if the command is not subject to +// replication admission control, the encoding type is a convenient place to +// capture how a specific entry is to be considered. +// - The decision to use replication admission control happens above raft +// (using cluster settings, version gates), and AC-specific metadata is +// plumbed down as part of the marshaled raft command (RaftAdmissionMeta). +// +// F. AdmitRaftEntry, on the kvadmission.Controller is the integration +// point for log entries received below raft right as they're being written +// to storage. This is [6] in the figure below; it's non-blocking since we're +// below raft in the raft.Ready() loop. It effectively enqueues a "virtual" +// work item in underlying StoreWorkQueue mediating store IO. This virtual +// work item is what later gets dequeued once the store granter informs the +// work queue of newly available IO tokens. For standard work queue ordering, +// our work item needs to include the CreateTime and AdmissionPriority. The +// tenant ID is plumbed to find the right tenant heap to queue it under (for +// inter-tenant isolation); the store ID to find the right store work queue +// on multi-store nodes. Since the raftpb.Entry encodes within it its origin +// node (AdmissionOriginNode), it's used post-admission to dispatch to the +// right node; see [7'] below. +// +// --- +// +// Here's how the various pieces fit together: +// +// [1] Admit +// ○ +// │ +// │ +// ┌───────────────▼───────────────┬┬┐ ┌───────────────────────────────────┐ +// │ Replica (proposer) │││ │ kvflowcontrol.Controller │ +// │ ┌──────────────────────┬┬○────┼┼┼ [2] Admit ────────────▶ ┌───────────────────────────────┐ │ +// │ │ │││ │││ │ │ admissionpb.RegularWorkClass │ │ +// │ │ kvflowcontrol.Handle ││○────┼┼┼ [4] DeductTokens ─────▶ │┌──────────────────────┬┬┐ │ │ +// │ │ │││ │││ │ ││ kvflowcontrol.Stream │││ │ │ +// │ └─────────────▲──────○─┴┴○────┼┼┼ [10r] ReturnTokens ───▶ │└──────────────────────┴┴┘ │ │ +// │ ╲ ╱ │││ │ └───────────────────────────────┘ │ +// │ ╲ ╱ │││ │ ┌───────────────────────────────┐ │ +// │ [3] Evaluate │││ │ │ admissionpb.ElasticsWorkClass │ │ +// │ │││ │ │┌──────────────────────┬┬┐ │ │ +// │ │││ │ ││ kvflowcontrol.Stream │││ │ │ +// │ │││ │ │└──────────────────────┴┴┘ │ │ +// │ │││ │ └───────────────────────────────┘ │ +// └○────▲────────────────○───────▲┴┴┘ └──────────────────────▲────────────┘ +// │ │ │ +// │ │ │ │ +// │ │ │ +// [5l] MsgApp │ [5r] MsgApp(s) [9r] MsgAppResp(s) [8l'] ReturnTokens +// │ │ + kvflowcontrolpb.AdmittedRaftLogEntries │ (using kvflowcontrol.Handle) +// │ │ │ │ +// ┌────┴───────┴────┐ ┌───────────○────────────┐ +// │ │ │ RaftTransport ◀──── [8] PendingDispatchFor ────▶ kvflowcontrol.Dispatch │ +// └────┬───────┬────┘ └───────────▲────────────┘ +// │ │ │ │ │ +// │ │ │ +// │ │ │ │ │ +// │ │ [7'] Dispatch +// │ │ │ │ │ +// [7l] MsgAppResp │ [7r] MsgAppResp(s) │ +// │ │ │ │ │ +// ┌─▼────○────────────────▼───────○──┬┬┐ ┌───────────○────────────┐ +// │ Store │││ │ kvadmission.Controller │ +// │ ○┼┼───── [6] AdmitRaftEntry ───────▶ │ +// │ │││ │ │ +// │ │││ │ ┌─────────────────┴┬┬┐ +// │ │││ │ │ StoreWorkQueue │││ +// │ │││ │ ├──────────────────┼┼┤ +// └──────────────────────────────────┴┴┘ └──────┤ Granter │││ +// └──────────────────┴┴┘ +// +// Notation: +// - The top-half of the figure is above raft, the bottom half is below-raft. +// - The paths marked as [l] denote paths taken for the locally held +// store and [r] for remotely held stores where raft traffic crosses an +// RPC boundary (using the RaftTransport). The MsgApp and MsgAppResp shown in +// [5l] and [7l] don't actually exist -- the raft library short circuits +// things, but we include it for symmetry. [8l'] is a fast-path we use whereby +// we return flow tokens for locally admitted raft log entries without going +// through the RaftTransport. +// - The paths marked with ['] happen concurrently. In the diagram +// above, [7'] happens concurrently with [7r]; we're trying to show that a +// subsequent MsgAppResps may end up carrying AdmittedRaftLogEntries from +// earlier. [9r] shows this piggybacking. +// - Stacked boxes (with " │││" on the right hand side) indicate that there are +// multiple of a kind. Like multiple stores, multiple StoreWorkQueues, +// kvflowcontrol.Streams, etc. +// +// --- +// +// There are various interactions to consider: +// +// I1. What happens if the RaftTransport gRPC stream[^2] breaks? +// - When reading pending dispatches to piggyback onto outbound raft messages, +// we're removing it[^3] from the underlying list. So if the stream breaks +// after having done so, we're possibly leaking flow tokens at the origin +// node. Doing some sort of handshake/2PC for every return seems fraught, so +// we do something simpler: return all held tokens[^4] for a given +// kvflowcontrol.Stream when the underlying transport breaks. We need to +// ensure that if the stream re-connects we're not doubly returning flow +// tokens, for which we use the low water mark[^5] on each stream. +// +// I2. What happens if a node crashes? +// - We don't rely on it to return kvflowcontrolpb.AdmittedRaftLogEntries, and +// don't want the origin node to leak flow tokens. Here too we react +// similar to the gRPC stream breakage described above. We don't worry about +// double returns if the node never restarts, and if it does, we'll treat it +// similar to I3a below. +// - If the node containing store S2 crashed, we don't want to deduct flow +// tokens when proposing writes to (now under-replicated) ranges with replicas +// on S2. kvflowcontrol.Controller is made aware of the streams that are to be +// 'ignored' for flow token purposes, and informs callers when they attempt to +// deduct[^6], who in turn avoid tracking the (non-)deduction. +// - See I3a and [^7] for options on what to do when the node comes back up. +// +// I3. What happens if a follower store is paused, as configured by +// admission.kv.pause_replication_io_threshold? +// - Proposing replicas are aware of paused followers and don't deduct flow +// tokens for those corresponding streams (we're not issuing traffic over +// those replication streams so deducting flow tokens is meaningless). When +// the follower store is no longer paused, and the replica on that store +// is sufficiently caught up, only then do we start deducting flow tokens for +// it. +// +// I3a. Why do we need to wait for the previously paused replica to be caught +// up (via log entries, snapshots) to the raft log position found on a +// quorum of replicas? What about replicas that have fallen behind by +// being on recently crashed nodes? +// - If we didn't, we risk write stalls to the range (and others) due to flow +// tokens deducted for that store. Take the following example where we start +// deducting flow tokens as soon as the follower store is unpaused. +// - If the quorum is at , and the now-no-longer-paused +// replica at , i.e. lagging by L entries. +// - As we start proposing commands to , +// , ..., we're deducting flow tokens for the +// previously-paused follower at indexes > I. +// - These flow tokens that will only be released once the +// now-no-longer-paused follower stores those entries to its raft log and +// also admits them. +// - To do so, it has to first catch up to and admit entries from +// , , ..., , +// which could take a while. +// - We don't want to stall quorum writes at (if going up +// to I+i depletes the available flow tokens) because the previously +// paused follower here has not yet caught up. +// - When the previously-paused replica starts storing log entries from +// , , ..., and admitting them, it's +// going to try and return flow tokens for them. This is because those +// entries are encoded to use replication admission control. +// - We don't want to add these tokens to the stream's bucket since we +// didn't make corresponding deductions at propose time (the follower was +// paused then). +// - In this case, whenever we start tracking this follower's stream, i.e. +// after it's sufficiently caught up to the quorum raft log position, +// we'll ensure it's low water mark is at this position to ignore these +// invalid returns. +// +// I4. What happens when a replica gets caught up via raft snapshot? Or needs to +// get caught up via snapshot due to log truncation at the leader? +// - If a replica's fallen far enough behind the quorum with respect to its log +// position, we don't deduct flow tokens for its stream, similar to I3a. The +// same is true for if we've truncated our log ahead of what it's stored in +// its raft log. If it's just far behind on a non-truncated raft log, it's +// still receiving replication traffic through raft generated MsgApps for +// the older entries, but we don't deduct flow tokens for it. +// - When the replica gets caught up via snapshot, similar to I3a, given it's +// caught up to the quorum raft log position, we can start deducting flow +// tokens for its replication stream going forward. +// +// I5. What happens when the leaseholder and/or the raft leader changes? When +// the raft leader is not the same as the leaseholder? +// - The per-replica kvflowcontrol.Handle is tied to the lifetime of a +// leaseholder replica having raft leadership. When leadership is lost, or the +// lease changes hands, we release all held flow tokens. +// - Avoiding double returns on subsequent AdmittedRaftLogEntries for these +// already released flow tokens is easier for raft leadership changes since +// there's a term change, and all earlier/stale AdmittedRaftLogEntries with +// the lower term can be discarded. We do a similar thing for leases -- when +// being granted a lease, the low water mark in kvflowcontrol.Handle is at +// least as high as the command that transferred the lease. +// +// I6. What happens during replica GC? +// - It's unlikely that a replica gets GC-ed without first going through the +// leaseholder/raft leadership transition described in I5. Regardless, we'll +// free up all held flow tokens. If a replica were to be re-added (through +// membership change) and become raft leader + range leaseholder, and hear +// about stale AdmittedRaftLogEntries from earlier, they'd be ignored due to +// the low water mark on kvflowcontrol.Handle. +// +// I7. What happens during re-proposals? +// - Flow token deductions are tied to the first raft log position we propose +// at. We don't deduct flow tokens again for reproposed commands at higher log +// positions. Ditto for command reproposals at apply-time (if we violated +// MLAI). +// +// I8. What happens if the proposal is dropped from the raft transport's send +// queue? What if it gets dropped from some receive queue? +// - As described in I7, we're binding the proposal to the first log position we +// try to propose at. If the node-level raft transport's send queue is full, +// and we drop the proposal, we'll end up reproposing it at a higher index +// without deducting flow tokens again. Free-ing up the originally held tokens +// will only occur when any entry at a higher log position gets admitted (not +// necessarily the reproposed command). So we're relying on at least future +// attempt to make it to the follower's raft log. +// - If a replica abandons a proposal that its deducted tokens for (i.e. it'll +// no longer repropose it), we'll need to free up those tokens. That +// proposal may have never made it to the follower's logs (if they were +// dropped from the raft transport's send/receive queues for example). +// - Perhaps another way to think about safety (no token leaks) and liveness +// (eventual token returns) is that on the sender side, we should only +// deduct tokens for a proposal once actually transmitting it over the +// relevant, reliable gRPC stream (and reacting to it breaking as described +// in I1 above). On the receiver, since this proposal/message can be dropped +// due to full receive queues within the raft transport, we should either +// signal back to the sender that it return corresponding tokens (and +// re-acquire on another attempt), or that it simply free up all tokens for +// this replication stream. This could also apply to dropped messages on the +// sender side queue, where we just free up all held tokens. +// - If messages containing the entry gets dropped from the raft transport +// receive queue, we rely on raft re-transmitting said entries. Similar to +// above, we're relying on the logical admission of some entry with log +// position equal or higher to free up the deducted tokens. +// - Given AdmittedRaftLogEntries travel over the RaftTransport stream, and +// dropping them could cause flow token leakage, we guarantee delivery (aside +// from stream breaking, which is covered in I1) by only piggybacking on +// requests that exit the send queue, and returning flow tokens[^8] before +// potentially dropping the stream message due to full receive queues. +// +// I9. What happens during range splits and merges? +// - During merges when the RHS raft group is deleted, we'll free up all held +// flow tokens. Any subsequent AdmittedRaftLogEntries will be dropped since +// tokens only get returned through the kvflowcontrol.Handle identified by the +// RangeID, which for the RHS, no longer exists. +// - During splits, the LHS leaseholder+raft leader will construct its own +// kvflowcontrol.Handle, which will handle flow tokens for subsequent +// proposals. +// +// I10. What happens when replicas are added/removed from the raft group? +// - The leader+leaseholder replica is aware of replicas being added/removed +// replicas, and starts and stops deducting flow tokens for the relevant +// streams accordingly. If a replica is removed, we free up all flow tokens +// held for its particular stream. The low water mark for that particular +// stream is set to the raft log position of the command removing the replica, +// so stale AdmittedRaftLogEntries messages can be discarded. +// +// --- +// +// [^1]: kvserverpb.RaftMessageRequest is the unit of what's sent +// back-and-forth between two nodes over their two uni-directional raft +// transport streams. +// [^2]: Over which we're dispatching kvflowcontrolpb.AdmittedRaftLogEntries. +// [^3]: kvflowcontrol.DispatchReader implementations do this as part of +// PendingDispatchFor. +// [^4]: Using DeductedTokensUpto + ReturnAllTokensUpto on +// kvflowcontrol.Handler. +// [^5]: Using ReturnAllTokensUpto on kvflowcontrol.Handler. +// [^6]: DeductTokens on kvflowcontrol.Controller returns whether the deduction +// was done. +// [^7]: When a node is crashed, instead of ignoring the underlying flow token +// buckets, an alternative is to DeductTokens without going through Admit +// (which blocks until flow tokens > 0). That way if the node comes back +// up and catches up via replay (and not snapshot) we have accounted for +// the load being placed on it. Though similar to I3a, the risk there is +// that if start going through Admit() as soon as the node comes back up, +// the tokens will still be negative and writes will stall. Whether we (i) +// DeductTokens but wait post node-restart lag before going through +// Admit(), or (ii) don't DeductTokens (Admit() is rendered a no-op), +// we're being somewhat optimistic, which is fine. +// [^8]: Using ReturnTokensUpto on kvflowcontrol.Handle. +// +// TODO(irfansharif): These descriptions are too high-level, imprecise and +// possibly wrong. Fix that. After implementing these interfaces and integrating +// it into KV, write tests for each one of them and document the precise +// interactions/integration points. It needs to be distilled to crisper +// invariants. The guiding principle seems to be 'only grab flow tokens when +// actively replicating a proposal along specific streams', which excludes +// dead/paused/lagging/pre-split RHS/non-longer-group-member replicas, and +// explains why we only do it on replicas that are both leaseholder and leader. +// It also explains why we don't re-deduct on reproposals, or try to intercept +// raft-initiated re-transmissions. For each of these scenarios, we know when +// not to deduct flow tokens. If we observe getting into the scenarios, we +// simply free up all held tokens and safeguard against a subsequent double +// returns, relying entirely on low water marks or RangeIDs not being re-used. +// - When framing invariants, talk about how we're achieving safety (no token +// leaks, no double returns) and liveness (eventual token returns). +// - Other than I8 above, are there cases where the sender has deducted tokens +// and something happens on the receiver/sender/sender-receiver stream that: +// (a) doesn't cause the sender to "return all tokens", i.e. it's relying on +// the receiver to send messages to return tokens up to some point, and +// (b) the receiver has either not received the message for which we've +// deducted tokens, or forgotten about it. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go new file mode 100644 index 000000000000..68f2cfa7b3a0 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -0,0 +1,138 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package kvflowcontrol provides flow control for replication traffic in KV. +// It's part of the integration layer between KV and admission control. +package kvflowcontrol + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" +) + +// Stream models the stream over which we replicate data traffic, the +// transmission for which we regulate using flow control. It's segmented by the +// specific store the traffic is bound for and the tenant driving it. Despite +// the underlying link/transport being shared across tenants, modeling streams +// on a per-tenant basis helps provide inter-tenant isolation. +type Stream struct { + TenantID roachpb.TenantID + StoreID roachpb.StoreID +} + +// Tokens represent the finite capacity of a given stream, expressed in bytes +// for data we're looking to replicate. Use of replication streams are +// predicated on tokens being available. +type Tokens uint64 + +// Controller provides flow control for replication traffic in KV, held at the +// node-level. +type Controller interface { + // Admit seeks admission to replicate data, regardless of size, for work + // with the given priority, create-time, and over the given stream. This + // blocks until there are flow tokens available. + Admit(context.Context, admissionpb.WorkPriority, time.Time, Stream) error + // DeductTokens deducts (without blocking) flow tokens for replicating work + // with given priority over the given stream. Requests are expected to + // have been Admit()-ed first. + DeductTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream) (deducted bool) + // ReturnTokens returns flow tokens for the given stream. These tokens are + // expected to have been deducted earlier with the same priority provided + // here. + ReturnTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream) + + // TODO(irfansharif): We might need the ability to "disable" specific + // streams/corresponding token buckets when there are failures or + // replication to a specific store is paused due to follower-pausing. + // That'll have to show up between the Handler and the Controller somehow. + // See I2, I3a and [^7] in kvflowcontrol/doc.go. +} + +// Handle is used to interface with replication flow control; it's typically +// backed by a node-level kvflowcontrol.Controller. Handles are held on replicas +// initiating replication traffic, i.e. are both the leaseholder and raft +// leader, and manage multiple Streams (one per active replica) underneath. +// +// When replicating log entries, these replicas choose the log position +// (term+index) the data is to end up at, and use this handle to track the token +// deductions on a per log position basis. When informed of admitted log entries +// on the receiving end of the stream, we free up tokens by specifying the +// highest log position up to which we've admitted (below-raft admission, for a +// given priority, takes log position into account -- see +// kvflowcontrolpb.AdmittedRaftLogEntries for more details). +type Handle interface { + // Admit seeks admission to replicate data, regardless of size, for work + // with the given priority and create-time. This blocks until there are + // flow tokens available. + Admit(context.Context, admissionpb.WorkPriority, time.Time) + // DeductTokensFor deducts (without blocking) flow tokens for replicating + // work with given priority to members of the raft group. The deduction, + // if successful, is tracked with respect to the specific raft log position + // it's expecting it to end up in. Requests are assumed to have been + // Admit()-ed first. + DeductTokensFor(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens) + // DeductedTokensUpto returns the highest log position for which we've + // deducted flow tokens for, over the given stream. + DeductedTokensUpto(context.Context, Stream) kvflowcontrolpb.RaftLogPosition + // ReturnTokensUpto returns all previously deducted tokens of a given + // priority for all log positions less than or equal to the one specified. + // It does for the specific stream. Once returned, subsequent attempts to + // return tokens upto the same position or lower are no-ops. + ReturnTokensUpto(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream) + // ReturnAllTokensUpto is like ReturnTokensUpto but does so across all + // priorities. + // + // NB: This is used when a replica on the other end of a stream gets caught + // up via snapshot (say, after a log truncation), where we then don't expect + // dispatches for the individual AdmittedRaftLogEntries between what it + // admitted last and its latest RaftLogPosition. Another use is during + // successive lease changes (out and back) within the same raft term -- we + // want to both free up tokens from when we lost the lease, and also ensure + // that attempts to return them (on hearing about AdmittedRaftLogEntries + // replicated under the earlier lease), we discard the attempts. + ReturnAllTokensUpto(context.Context, kvflowcontrolpb.RaftLogPosition, Stream) + // Close closes the handle and returns all held tokens back to the + // underlying controller. Typically used when the replica loses its lease + // and/or raft leadership, or ends up getting GC-ed (if it's being + // rebalanced, merged away, etc). + Close(context.Context) +} + +// Dispatch is used (i) to dispatch information about admitted raft log entries +// to specific nodes, and (ii) to read pending dispatches. +type Dispatch interface { + DispatchWriter + DispatchReader +} + +// DispatchWriter is used to dispatch information about admitted raft log +// entries to specific nodes (typically where said entries originated, where +// flow tokens were deducted and waiting to be returned). +type DispatchWriter interface { + Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries) +} + +// DispatchReader is used to read pending dispatches. It's used in the raft +// transport layer when looking to piggyback information on traffic already +// bound to specific nodes. It's also used when timely dispatching (read: +// piggybacking) has not taken place. +// +// NB: PendingDispatchFor is expected to remove dispatches from the pending +// list. If the GRPC stream we're sending it over happens to break, we drop +// these dispatches. The node waiting these dispatches is expected to react to +// the stream breaking by freeing up all held tokens. +type DispatchReader interface { + PendingDispatch() []roachpb.NodeID + PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel new file mode 100644 index 000000000000..0a2fd87eda43 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel @@ -0,0 +1,35 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "kvflowcontrolpb_proto", + srcs = ["kvflowcontrol.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"], +) + +go_proto_library( + name = "kvflowcontrolpb_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + proto = ":kvflowcontrolpb_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", # keep + "@com_github_gogo_protobuf//gogoproto", + ], +) + +go_library( + name = "kvflowcontrolpb", + srcs = ["raft_log_position.go"], + embed = [":kvflowcontrolpb_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + visibility = ["//visibility:public"], + deps = ["@com_github_cockroachdb_redact//:redact"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto new file mode 100644 index 000000000000..aa0589b5e085 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto @@ -0,0 +1,119 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.kv.kvserver.kvflowcontrol.kvflowcontrolpb; +option go_package = "kvflowcontrolpb"; + +import "gogoproto/gogo.proto"; + +// RaftAdmissionMeta contains information used by admission control for the +// select raft commands that use replication admission control. It contains a +// subset of the fields in kvserverpb.RaftCommand to selectively decode +// state[1]. When marshaling a RaftCommand, we willfully include this data in +// the prefix of the marshaled byte buffer. Information about whether this data +// is present is captured in the first byte of the encoded raft proposal -- see +// raftlog.EntryEncoding. +// +// [1]: The field tags and types must be kept identical with what's found there. +message RaftAdmissionMeta { + // AdmissionPriority of the command (maps to admission.WorkPriority); used + // within a tenant below-raft for replication admission control. + int32 admission_priority = 18; + // AdmissionCreateTime is equivalent to Time.UnixNano() at the creation time + // of the request, or a parent request, for which this command is a part of. + // It's used within a tenant below-raft for replication admission control; see + // admission.WorkInfo.CreateTime for details. + int64 admission_create_time = 19; + // AdmissionOriginNode captures where this raft command originated. It's used + // to inform said node of this raft command's (virtual) admission in order for + // it to release flow tokens for subsequent commands. + int32 admission_origin_node = 20 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + + // TODO(irfansharif): If the {marshaling,unmarshaling} performance overhead + // proves costly, we could: + // - For Admission{Priority,CreateTime}, pack them within a single int64 by + // using 8 bits for the priority (we're using an int8 in Go code) and the + // remaining bits for the create timestamp with lower resolution. + // - For AdmissionOriginNodeID, we could re-work the MultiRaft streaming RPCs + // to include upfront, during stream setup, which node the subsequent + // RaftMessageRequests are coming from. But this awkward to do with our + // current code layering: + // - We want to find out on a per raftpb.Entry level where it came from, and + // to do it once raft.Ready() tells to persist said entry into our raft log. + // - We're currently encoding this data in the raft entry itself, at the + // sender, so it's easy to decode at the right place in + // raft-ready-handling loop. + // - But if we had to "stitch in" the origin node ID once received off of + // the transport, or tie together raft entries with their origin node IDs + // through some other way (the raft library only wants to "step" through + // message type we can't so easily annotate), we'd have to do a fair bit + // of state tracking. + // If it's still too costly, we could rip all this out and coarsen + // intra-tenant ordering with respect to Admission{Priority,CreateTime}. We + // could instead introduce a WorkQueue-like ordering at the origin where + // requests wait for flow tokens for every it + // intends to write to. Below raft we could live with just side-loaded + // proposals being marked as admissionpb.BulkNormalPri. Origin-side ordering + // would work ok for epoch-LIFO. The coarseness comes from this re-ordering + // only happening on individual origin nodes. + // + // TODO(irfansharif): Get rid of this TODO block after simple performance + // benchmarks (say, `cockroach workload run kv` with high concurrency and + // small write sizes). The ideas above are too complicated. +} + +// AdmittedRaftLogEntries represents a set of raft log entries that were +// admitted below raft. These are identified by: +// - the range ID (there's one per raft group); +// - the admission priority of all said entries; +// - the (inclusive) raft log position up-to-which we've admitted entries; +// - the store ID on which these raft logs were admitted. +// +// This is used as part replication admission control to release, at the origin, +// the specific flow tokens acquired when replicating these log entries along +// this particular "replication stream" (i.e. flowing to a particular store, +// remote or otherwise). +message AdmittedRaftLogEntries { + // RangeID of the raft group these entries belong to. This is the range on + // whose behalf work was admitted. + int64 range_id = 1 [(gogoproto.customname) = "RangeID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; + + // AdmissionPriority of all admitted entries (maps to admissionpb.WorkPriority). + int32 admission_priority = 2; + + // UpToRaftLogPosition (inclusive) of the highest entry that was admitted. + // Within a given priority, admission takes place in raft log order (i.e. + // entries with lower terms get admitted first, or with lower indexes within + // the same term). So the value here implies admission of all entries that + // sort before and have the same priority. + RaftLogPosition up_to_raft_log_position = 3 [(gogoproto.nullable) = false]; + + // StoreID on which this raft log entry was admitted. + // + // TODO(irfansharif): We can avoid sending this for every logically admitted + // message if the raft transport stream we were sending it on had some + // handshake protocol at the start, where the client identified itself by its + // NodeID. That way the origin replica receiving this information can infer + // the StoreID where this work was done since we we never store multiple + // replicas of a range on the same {single,multi}-store node. + uint64 store_id = 4 [(gogoproto.customname) = "StoreID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"]; +} + +// RaftLogPosition is a point on the raft log, identified by a term and an +// index. +message RaftLogPosition { + option (gogoproto.goproto_stringer) = false; + + uint64 term = 1; + uint64 index = 2; +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go new file mode 100644 index 000000000000..c147618928fb --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go @@ -0,0 +1,42 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontrolpb + +import "github.com/cockroachdb/redact" + +func (p *RaftLogPosition) String() string { + return redact.StringWithoutMarkers(p) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (p *RaftLogPosition) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("position=%d/%d", p.Term, p.Index) +} + +// Equal returns whether the two raft log positions are identical. +func (p *RaftLogPosition) Equal(o RaftLogPosition) bool { + return p.Term == o.Term && p.Index == o.Index +} + +// Less returns whether the one raft log position is less than the other. Those +// with lower terms sort first, and barring that, those with lower indexes. +func (p *RaftLogPosition) Less(o RaftLogPosition) bool { + if p.Term != o.Term { + return p.Term < o.Term + } + return p.Index < o.Index +} + +// LessEq returns whether one raft log position is less than or equal to the +// other +func (p *RaftLogPosition) LessEq(o RaftLogPosition) bool { + return p.Less(o) || p.Equal(o) +} diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index c7259fa8dc0a..f2bf110a9e7c 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -34,6 +34,7 @@ proto_library( strip_import_prefix = "/pkg", visibility = ["//visibility:public"], deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_proto", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_proto", "//pkg/kv/kvserver/readsummary/rspb:rspb_proto", "//pkg/roachpb:roachpb_proto", @@ -55,6 +56,7 @@ go_proto_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/closedts/ctpb", # keep + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/readsummary/rspb", "//pkg/roachpb", diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index eb9e7cbd4c73..89acafa75709 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -358,6 +358,23 @@ message RaftCommand { // from" the proposer. map trace_data = 16; + // Fields used below-raft for replication admission control. See + // kvflowcontrolpb.RaftAdmissionMeta for how this data is selectively decoded. + // The field tags and types must be kept identical with what's found there. + + // AdmissionPriority of the command (maps to admission.WorkPriority); used + // within a tenant below-raft for replication admission control. + int32 admission_priority = 18; + // AdmissionCreateTime is equivalent to Time.UnixNano() at the creation time + // of the request (or a parent request) for which this command is a part of. + // It's used within a tenant below-raft for replication admission control; see + // admission.WorkInfo.CreateTime for details. + int64 admission_create_time = 19; + // AdmissionOriginNode captures where this raft command originated. It's used + // to inform said node of this raft command's (virtual) admission in order for + // it to release flow tokens for subsequent commands. + int32 admission_origin_node = 20 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + reserved 1, 2, 10001 to 10014; } diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4ec5d0288aed..229af24b873d 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -18,6 +18,7 @@ import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; import "kv/kvserver/kvserverpb/state.proto"; +import "kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto"; import "raft/v3/raftpb/raft.proto"; import "gogoproto/gogo.proto"; import "util/tracing/tracingpb/recorded_span.proto"; @@ -91,6 +92,10 @@ message RaftMessageRequest { repeated RaftHeartbeat heartbeats = 6 [(gogoproto.nullable) = false]; repeated RaftHeartbeat heartbeat_resps = 7 [(gogoproto.nullable) = false]; + // AdmittedRaftLogEntries is coalesced information about all raft log entries + // that were admitted below raft. + repeated kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedRaftLogEntries admitted_raft_log_entries = 11 [(gogoproto.nullable) = false]; + reserved 10; } diff --git a/pkg/kv/kvserver/raftlog/BUILD.bazel b/pkg/kv/kvserver/raftlog/BUILD.bazel index 710e68417394..628b21ea93d4 100644 --- a/pkg/kv/kvserver/raftlog/BUILD.bazel +++ b/pkg/kv/kvserver/raftlog/BUILD.bazel @@ -14,6 +14,7 @@ go_library( deps = [ "//pkg/keys", "//pkg/kv/kvserver/apply", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", @@ -29,6 +30,7 @@ go_library( go_test( name = "raftlog_test", srcs = [ + "encoding_test.go", "entry_bench_test.go", "entry_test.go", "iter_bench_test.go", @@ -38,12 +40,15 @@ go_test( embed = [":raftlog"], deps = [ "//pkg/keys", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/admission/admissionpb", "//pkg/util/hlc", + "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/protoutil", diff --git a/pkg/kv/kvserver/raftlog/encoding.go b/pkg/kv/kvserver/raftlog/encoding.go index bf312d57f233..e0427b51ec9c 100644 --- a/pkg/kv/kvserver/raftlog/encoding.go +++ b/pkg/kv/kvserver/raftlog/encoding.go @@ -13,7 +13,9 @@ package raftlog import ( "fmt" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" ) // EntryEncoding enumerates the encodings used in CockroachDB for raftpb.Entry's @@ -148,3 +150,23 @@ func EncodeRaftCommandPrefix(b []byte, enc EntryEncoding, commandID kvserverbase b[0] = enc.prefixByte() copy(b[1:], commandID) } + +// DecodeRaftAdmissionMeta decodes admission control metadata from a +// raftpb.Entry.Data. Expects an EntryEncoding{Standard,Sideloaded}WithAC +// encoding. +func DecodeRaftAdmissionMeta(data []byte) (kvflowcontrolpb.RaftAdmissionMeta, error) { + prefix := data[0] + if !(prefix == entryEncodingStandardWithACPrefixByte || prefix == entryEncodingSideloadedWithACPrefixByte) { + panic(fmt.Sprintf("invalid encoding: prefix %v", prefix)) + } + + // TODO(irfansharif): If the decoding overhead is noticeable, we can write a + // custom decoder and rely on the encoding for raft admission data being + // present at the start of the marshaled raft command. This could speed it + // up slightly. + var raftAdmissionMeta kvflowcontrolpb.RaftAdmissionMeta + if err := protoutil.Unmarshal(data[1+RaftCommandIDLen:], &raftAdmissionMeta); err != nil { + return kvflowcontrolpb.RaftAdmissionMeta{}, err + } + return raftAdmissionMeta, nil +} diff --git a/pkg/kv/kvserver/raftlog/encoding_test.go b/pkg/kv/kvserver/raftlog/encoding_test.go new file mode 100644 index 000000000000..0e5734d9d5bd --- /dev/null +++ b/pkg/kv/kvserver/raftlog/encoding_test.go @@ -0,0 +1,107 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package raftlog + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/require" + "go.etcd.io/raft/v3/raftpb" +) + +// BenchmarkRaftAdmissionMetaOverhead measures the overhead of encoding/decoding +// raft metadata, compared to not doing it. It's structured similar to how raft +// command data is encoded + decoded end-to-end, including the optional +// below-raft admission control data, where steps (2) and (4) below are only +// done for proposals subject to below-raft admission. +// +// name old time/op new time/op delta +// RaftAdmissionMetaOverhead/bytes=1.0_KiB,raft-ac-10 1.30µs ± 1% 1.70µs ± 1% +30.43% (p=0.008 n=5+5) +// RaftAdmissionMetaOverhead/bytes=256_KiB,raft-ac-10 51.6µs ± 4% 50.6µs ± 5% ~ (p=0.421 n=5+5) +// RaftAdmissionMetaOverhead/bytes=512_KiB,raft-ac-10 91.9µs ± 4% 91.2µs ± 5% ~ (p=1.000 n=5+5) +// RaftAdmissionMetaOverhead/bytes=1.0_MiB,raft-ac-10 148µs ± 4% 151µs ± 5% ~ (p=0.095 n=5+5) +// RaftAdmissionMetaOverhead/bytes=2.0_MiB,raft-ac-10 290µs ± 3% 292µs ± 1% ~ (p=0.151 n=5+5) +func BenchmarkRaftAdmissionMetaOverhead(b *testing.B) { + defer log.Scope(b).Close(b) + + const KiB = 1 << 10 + const MiB = 1 << 20 + + for _, withRaftAdmissionMeta := range []bool{false, true} { + for _, bytes := range []int64{1 * KiB, 256 * KiB, 512 * KiB, 1 * MiB, 2 * MiB} { + var raftAdmissionMetaLen int + var raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta + entryEnc := EntryEncodingStandardWithoutAC + + raftCmd := mkRaftCommand(100, int(bytes), int(bytes+200)) + marshaledRaftCmd, err := protoutil.Marshal(raftCmd) + require.NoError(b, err) + + if withRaftAdmissionMeta { + raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ + AdmissionPriority: int32(admissionpb.BulkNormalPri), + AdmissionCreateTime: 18581258253, + } + raftAdmissionMetaLen = raftAdmissionMeta.Size() + entryEnc = EntryEncodingStandardWithAC + } + + encodingBuf := make([]byte, RaftCommandPrefixLen+raftAdmissionMeta.Size()+len(marshaledRaftCmd)) + raftEnt := Entry{ + Entry: raftpb.Entry{ + Term: 1, + Index: 1, + Type: raftpb.EntryNormal, + Data: encodingBuf, + }, + } + + b.Run(fmt.Sprintf("bytes=%s,raft-ac=%t", humanizeutil.IBytes(bytes), withRaftAdmissionMeta), + func(b *testing.B) { + for i := 0; i < b.N; i++ { + // 1. Encode the raft command prefix. + EncodeRaftCommandPrefix(encodingBuf[:RaftCommandPrefixLen], entryEnc, "deadbeef") + + // 2. If using below-raft admission, encode the raft + // metadata right after the command prefix. + if withRaftAdmissionMeta { + _, err = protoutil.MarshalTo( + raftAdmissionMeta, + encodingBuf[RaftCommandPrefixLen:RaftCommandPrefixLen+raftAdmissionMetaLen], + ) + require.NoError(b, err) + } + + // 3. Marshal the rest of the command. + _, err = protoutil.MarshalTo(raftCmd, encodingBuf[RaftCommandPrefixLen+raftAdmissionMetaLen:]) + require.NoError(b, err) + + // 4. If using below-raft admission, decode the raft + // metadata. + if withRaftAdmissionMeta { + _, err = DecodeRaftAdmissionMeta(encodingBuf) + require.NoError(b, err) + } + + // 5. Decode the entire raft command. + require.NoError(b, raftEnt.load()) + } + }, + ) + } + } +} diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index d9a035428f0a..e8db8e8be5ae 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -131,6 +131,7 @@ package admission import ( "time" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" ) @@ -273,7 +274,7 @@ type granterWithIOTokens interface { setAvailableElasticDiskBandwidthTokensLocked(tokens int64) // getDiskTokensUsedAndResetLocked returns the disk bandwidth tokens used // since the last such call. - getDiskTokensUsedAndResetLocked() [numWorkClasses]int64 + getDiskTokensUsedAndResetLocked() [admissionpb.NumWorkClasses]int64 // setAdmittedDoneModelsLocked supplies the models to use when // storeWriteDone is called, to adjust token consumption. Note that these // models are not used for token adjustment at admission time -- that is @@ -320,7 +321,7 @@ type CPULoadListener interface { // storeRequester is used to abstract *StoreWorkQueue for testing. type storeRequester interface { requesterClose - getRequesters() [numWorkClasses]requester + getRequesters() [admissionpb.NumWorkClasses]requester getStoreAdmissionStats() storeAdmissionStats setStoreRequestEstimates(estimates storeRequestEstimates) } diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index f856ed977e41..cd5f4a80dd0a 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -52,6 +52,44 @@ var WorkPriorityDict = map[WorkPriority]string{ HighPri: "high-pri", } +// WorkClass represents the class of work, which is defined entirely by its +// WorkPriority. Namely, everything less than NormalPri is defined to be +// "Elastic", while everything above and including NormalPri is considered +// "Regular. +type WorkClass int8 + +const ( + // RegularWorkClass is for work corresponding to workloads that are + // throughput and latency sensitive. + RegularWorkClass WorkClass = iota + // ElasticWorkClass is for work corresponding to workloads that can handle + // reduced throughput, possibly by taking longer to finish a workload. It is + // not latency sensitive. + ElasticWorkClass + // NumWorkClasses is the number of work classes. + NumWorkClasses +) + +// WorkClassFromPri translates a WorkPriority to its given WorkClass. +func WorkClassFromPri(pri WorkPriority) WorkClass { + class := RegularWorkClass + if pri < NormalPri { + class = ElasticWorkClass + } + return class +} + +func (w WorkClass) String() string { + switch w { + case RegularWorkClass: + return "regular" + case ElasticWorkClass: + return "elastic" + default: + return "" + } +} + // Prevent the linter from emitting unused warnings. var _ = LowPri var _ = TTLLowPri diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 49b179806346..c66c5c6da919 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -156,13 +156,13 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo // This is IO work, so override the usesTokens value. opts.usesTokens = true // TODO(sumeer): add per-store WorkQueue state for debug.zip and db console. - granters := [numWorkClasses]granterWithStoreWriteDone{ + granters := [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ &kvStoreTokenChildGranter{ - workClass: regularWorkClass, + workClass: admissionpb.RegularWorkClass, parent: kvg, }, &kvStoreTokenChildGranter{ - workClass: elasticWorkClass, + workClass: admissionpb.ElasticWorkClass, parent: kvg, }, } @@ -170,8 +170,8 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, granters, sgc.settings, sgc.workQueueMetrics, opts) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() - kvg.regularRequester = requesters[regularWorkClass] - kvg.elasticRequester = requesters[elasticWorkClass] + kvg.regularRequester = requesters[admissionpb.RegularWorkClass] + kvg.elasticRequester = requesters[admissionpb.ElasticWorkClass] coord.granters[KVWork] = kvg coord.ioLoadListener = &ioLoadListener{ storeID: storeID, @@ -330,7 +330,7 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, granters [numWorkClasses]granterWithStoreWriteDone, + _ log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index acae136a3f97..49da834c509f 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -272,19 +272,6 @@ func (tg *tokenGranter) tryGrantLocked(grantChainID grantChainID) grantResult { return res } -type workClass int8 - -const ( - // regularWorkClass is for work corresponding to workloads that are - // throughput and latency sensitive. - regularWorkClass workClass = iota - // elasticWorkClass is for work corresponding to workloads that can handle - // reduced throughput, possibly by taking longer to finish a workload. It is - // not latency sensitive. - elasticWorkClass - numWorkClasses -) - // kvStoreTokenGranter implements granterWithLockedCalls. It is used for // grants to KVWork to a store, that is limited by IO tokens. It encapsulates // two granter-requester pairs, for the two workClasses. The granter in these @@ -317,7 +304,7 @@ type kvStoreTokenGranter struct { // Disk bandwidth tokens. elasticDiskBWTokensAvailable int64 - diskBWTokensUsed [numWorkClasses]int64 + diskBWTokensUsed [admissionpb.NumWorkClasses]int64 // Estimation models. l0WriteLM, l0IngestLM, ingestLM tokensLinearModel @@ -329,7 +316,7 @@ var _ granterWithIOTokens = &kvStoreTokenGranter{} // kvStoreTokenChildGranter handles a particular workClass. Its methods // pass-through to the parent after adding the workClass as a parameter. type kvStoreTokenChildGranter struct { - workClass workClass + workClass admissionpb.WorkClass parent *kvStoreTokenGranter } @@ -368,13 +355,13 @@ func (cg *kvStoreTokenChildGranter) storeWriteDone( return cg.parent.storeWriteDone(cg.workClass, originalTokens, doneInfo) } -func (sg *kvStoreTokenGranter) tryGet(workClass workClass, count int64) bool { +func (sg *kvStoreTokenGranter) tryGet(workClass admissionpb.WorkClass, count int64) bool { return sg.coord.tryGet(KVWork, count, int8(workClass)) } // tryGetLocked implements granterWithLockedCalls. func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grantResult { - wc := workClass(demuxHandle) + wc := admissionpb.WorkClass(demuxHandle) // NB: ideally if regularRequester.hasWaitingRequests() returns true and // wc==elasticWorkClass we should reject this request, since it means that // more important regular work is waiting. However, we rely on the @@ -383,13 +370,13 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant // elasticWorkClass is when the queue is empty, this case should be rare // (and not cause a performance isolation failure). switch wc { - case regularWorkClass: + case admissionpb.RegularWorkClass: if sg.availableIOTokens > 0 { sg.subtractTokens(count, false) sg.diskBWTokensUsed[wc] += count return grantSuccess } - case elasticWorkClass: + case admissionpb.ElasticWorkClass: if sg.elasticDiskBWTokensAvailable > 0 && sg.availableIOTokens > 0 { sg.elasticDiskBWTokensAvailable -= count sg.subtractTokens(count, false) @@ -400,31 +387,31 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant return grantFailLocal } -func (sg *kvStoreTokenGranter) returnGrant(workClass workClass, count int64) { +func (sg *kvStoreTokenGranter) returnGrant(workClass admissionpb.WorkClass, count int64) { sg.coord.returnGrant(KVWork, count, int8(workClass)) } // returnGrantLocked implements granterWithLockedCalls. func (sg *kvStoreTokenGranter) returnGrantLocked(count int64, demuxHandle int8) { - wc := workClass(demuxHandle) + wc := admissionpb.WorkClass(demuxHandle) // Return count tokens to the "IO tokens". sg.subtractTokens(-count, false) - if wc == elasticWorkClass { + if wc == admissionpb.ElasticWorkClass { // Return count tokens to the elastic disk bandwidth tokens. sg.elasticDiskBWTokensAvailable += count } sg.diskBWTokensUsed[wc] -= count } -func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass workClass, count int64) { +func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass admissionpb.WorkClass, count int64) { sg.coord.tookWithoutPermission(KVWork, count, int8(workClass)) } // tookWithoutPermissionLocked implements granterWithLockedCalls. func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHandle int8) { - wc := workClass(demuxHandle) + wc := admissionpb.WorkClass(demuxHandle) sg.subtractTokens(count, false) - if wc == elasticWorkClass { + if wc == admissionpb.ElasticWorkClass { sg.elasticDiskBWTokensAvailable -= count } sg.diskBWTokensUsed[wc] += count @@ -462,7 +449,7 @@ func (sg *kvStoreTokenGranter) tryGrantLocked(grantChainID grantChainID) grantRe // First try granting to regular requester. for wc := range sg.diskBWTokensUsed { req := sg.regularRequester - if workClass(wc) == elasticWorkClass { + if admissionpb.WorkClass(wc) == admissionpb.ElasticWorkClass { req = sg.elasticRequester } if req.hasWaitingRequests() { @@ -519,7 +506,7 @@ func (sg *kvStoreTokenGranter) setAvailableElasticDiskBandwidthTokensLocked(toke } // getDiskTokensUsedAndResetLocked implements granterWithIOTokens. -func (sg *kvStoreTokenGranter) getDiskTokensUsedAndResetLocked() [numWorkClasses]int64 { +func (sg *kvStoreTokenGranter) getDiskTokensUsedAndResetLocked() [admissionpb.NumWorkClasses]int64 { result := sg.diskBWTokensUsed for i := range sg.diskBWTokensUsed { sg.diskBWTokensUsed[i] = 0 @@ -538,7 +525,7 @@ func (sg *kvStoreTokenGranter) setAdmittedDoneModelsLocked( // storeWriteDone implements granterWithStoreWriteDone. func (sg *kvStoreTokenGranter) storeWriteDone( - wc workClass, originalTokens int64, doneInfo StoreWorkDoneInfo, + wc admissionpb.WorkClass, originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { // Normally, we follow the structure of a foo() method calling into a foo() // method on the GrantCoordinator, which then calls fooLocked() on the @@ -558,7 +545,7 @@ func (sg *kvStoreTokenGranter) storeWriteDone( sg.coord.mu.Lock() exhaustedFunc := func() bool { return sg.availableIOTokens <= 0 || - (wc == elasticWorkClass && sg.elasticDiskBWTokensAvailable <= 0) + (wc == admissionpb.ElasticWorkClass && sg.elasticDiskBWTokensAvailable <= 0) } wasExhausted := exhaustedFunc() actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(doneInfo.WriteBytes) @@ -568,7 +555,7 @@ func (sg *kvStoreTokenGranter) storeWriteDone( sg.subtractTokens(additionalL0TokensNeeded, false) actualIngestTokens := sg.ingestLM.applyLinearModel(doneInfo.IngestedBytes) additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens - if wc == elasticWorkClass { + if wc == admissionpb.ElasticWorkClass { sg.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded } sg.diskBWTokensUsed[wc] += additionalDiskBWTokensNeeded diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index 33a1f66d0456..3c4b5d019399 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -109,9 +109,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, granters [numWorkClasses]granterWithStoreWriteDone, + ambientCtx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { - makeTestRequester := func(wc workClass) *testRequester { + makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ workKind: KVWork, granter: granters[wc], @@ -120,18 +120,18 @@ func TestGranterBasic(t *testing.T) { returnValueFromGranted: 0, } switch wc { - case regularWorkClass: + case admissionpb.RegularWorkClass: req.additionalID = "-regular" - case elasticWorkClass: + case admissionpb.ElasticWorkClass: req.additionalID = "-elastic" } return req } req := &storeTestRequester{} - req.requesters[regularWorkClass] = makeTestRequester(regularWorkClass) - req.requesters[elasticWorkClass] = makeTestRequester(elasticWorkClass) - requesters[KVWork] = req.requesters[regularWorkClass] - requesters[numWorkKinds] = req.requesters[elasticWorkClass] + req.requesters[admissionpb.RegularWorkClass] = makeTestRequester(admissionpb.RegularWorkClass) + req.requesters[admissionpb.ElasticWorkClass] = makeTestRequester(admissionpb.ElasticWorkClass) + requesters[KVWork] = req.requesters[admissionpb.RegularWorkClass] + requesters[numWorkKinds] = req.requesters[admissionpb.ElasticWorkClass] return req }, kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, @@ -287,15 +287,15 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, granters [numWorkClasses]granterWithStoreWriteDone, + ctx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { - reqReg := makeRequesterFunc(ctx, KVWork, granters[regularWorkClass], settings, metrics, opts) - reqElastic := makeRequesterFunc(ctx, KVWork, granters[elasticWorkClass], settings, metrics, opts) + reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) + reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} - str.requesters[regularWorkClass] = reqReg.(*testRequester) - str.requesters[regularWorkClass].additionalID = "-regular" - str.requesters[elasticWorkClass] = reqElastic.(*testRequester) - str.requesters[elasticWorkClass].additionalID = "-elastic" + str.requesters[admissionpb.RegularWorkClass] = reqReg.(*testRequester) + str.requesters[admissionpb.RegularWorkClass].additionalID = "-regular" + str.requesters[admissionpb.ElasticWorkClass] = reqElastic.(*testRequester) + str.requesters[admissionpb.ElasticWorkClass].additionalID = "-elastic" return str }, } @@ -393,13 +393,13 @@ func (tr *testRequester) continueGrantChain() { } type storeTestRequester struct { - requesters [numWorkClasses]*testRequester + requesters [admissionpb.NumWorkClasses]*testRequester } var _ storeRequester = &storeTestRequester{} -func (str *storeTestRequester) getRequesters() [numWorkClasses]requester { - var rv [numWorkClasses]requester +func (str *storeTestRequester) getRequesters() [admissionpb.NumWorkClasses]requester { + var rv [admissionpb.NumWorkClasses]requester for i := range str.requesters { rv[i] = str.requesters[i] } diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index 23c89ad9714c..1f05b1695caf 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -405,8 +405,8 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics io.mu.Unlock() io.aux.diskBW.intervalLSMInfo = intervalLSMInfo{ incomingBytes: int64(cumLSMIncomingBytes) - int64(cumDiskBW.incomingLSMBytes), - regularTokensUsed: diskTokensUsed[regularWorkClass], - elasticTokensUsed: diskTokensUsed[elasticWorkClass], + regularTokensUsed: diskTokensUsed[admissionpb.RegularWorkClass], + elasticTokensUsed: diskTokensUsed[admissionpb.ElasticWorkClass], } if metrics.DiskStats.ProvisionedBandwidth > 0 { io.elasticDiskBWTokens = io.diskBandwidthLimiter.computeElasticTokens(ctx, diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index 7d664c12210c..a62b14052e1c 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" @@ -151,11 +152,11 @@ func TestIOLoadListener(t *testing.T) { if d.HasArg("disk-bw-tokens-used") { var regularTokensUsed, elasticTokensUsed int d.ScanArgs(t, "disk-bw-tokens-used", ®ularTokensUsed, &elasticTokensUsed) - kvGranter.diskBandwidthTokensUsed[regularWorkClass] = int64(regularTokensUsed) - kvGranter.diskBandwidthTokensUsed[elasticWorkClass] = int64(elasticTokensUsed) + kvGranter.diskBandwidthTokensUsed[admissionpb.RegularWorkClass] = int64(regularTokensUsed) + kvGranter.diskBandwidthTokensUsed[admissionpb.ElasticWorkClass] = int64(elasticTokensUsed) } else { - kvGranter.diskBandwidthTokensUsed[regularWorkClass] = 0 - kvGranter.diskBandwidthTokensUsed[elasticWorkClass] = 0 + kvGranter.diskBandwidthTokensUsed[admissionpb.RegularWorkClass] = 0 + kvGranter.diskBandwidthTokensUsed[admissionpb.ElasticWorkClass] = 0 } var printOnlyFirstTick bool if d.HasArg("print-only-first-tick") { @@ -337,7 +338,7 @@ var _ storeRequester = &testRequesterForIOLL{} func (r *testRequesterForIOLL) close() {} -func (r *testRequesterForIOLL) getRequesters() [numWorkClasses]requester { +func (r *testRequesterForIOLL) getRequesters() [admissionpb.NumWorkClasses]requester { panic("unimplemented") } @@ -352,7 +353,7 @@ func (r *testRequesterForIOLL) setStoreRequestEstimates(estimates storeRequestEs type testGranterWithIOTokens struct { buf strings.Builder allTokensUsed bool - diskBandwidthTokensUsed [numWorkClasses]int64 + diskBandwidthTokensUsed [admissionpb.NumWorkClasses]int64 } var _ granterWithIOTokens = &testGranterWithIOTokens{} @@ -370,7 +371,7 @@ func (g *testGranterWithIOTokens) setAvailableElasticDiskBandwidthTokensLocked(t tokensForTokenTickDurationToString(tokens)) } -func (g *testGranterWithIOTokens) getDiskTokensUsedAndResetLocked() [numWorkClasses]int64 { +func (g *testGranterWithIOTokens) getDiskTokensUsedAndResetLocked() [admissionpb.NumWorkClasses]int64 { return g.diskBandwidthTokensUsed } @@ -410,8 +411,8 @@ func (g *testGranterNonNegativeTokens) setAvailableElasticDiskBandwidthTokensLoc require.LessOrEqual(g.t, int64(0), tokens) } -func (g *testGranterNonNegativeTokens) getDiskTokensUsedAndResetLocked() [numWorkClasses]int64 { - return [numWorkClasses]int64{} +func (g *testGranterNonNegativeTokens) getDiskTokensUsedAndResetLocked() [admissionpb.NumWorkClasses]int64 { + return [admissionpb.NumWorkClasses]int64{} } func (g *testGranterNonNegativeTokens) setAdmittedDoneModelsLocked( diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 35e20f995806..be9b4fabc3d4 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -1644,10 +1644,10 @@ type StoreWriteWorkInfo struct { // StoreWorkQueue is responsible for admission to a store. type StoreWorkQueue struct { - q [numWorkClasses]WorkQueue + q [admissionpb.NumWorkClasses]WorkQueue // Only calls storeWriteDone. The rest of the interface is used by // WorkQueue. - granters [numWorkClasses]granterWithStoreWriteDone + granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone mu struct { syncutil.RWMutex estimates storeRequestEstimates @@ -1662,7 +1662,7 @@ type StoreWorkHandle struct { tenantID roachpb.TenantID // The writeTokens acquired by this request. Must be > 0. writeTokens int64 - workClass workClass + workClass admissionpb.WorkClass admissionEnabled bool } @@ -1680,10 +1680,7 @@ func (q *StoreWorkQueue) Admit( ctx context.Context, info StoreWriteWorkInfo, ) (handle StoreWorkHandle, err error) { // For now, we compute a workClass based on priority. - wc := regularWorkClass - if info.Priority < admissionpb.NormalPri { - wc = elasticWorkClass - } + wc := admissionpb.WorkClassFromPri(info.Priority) h := StoreWorkHandle{ tenantID: info.TenantID, workClass: wc, @@ -1732,7 +1729,7 @@ func (q *StoreWorkQueue) BypassedWorkDone(workCount int64, doneInfo StoreWorkDon q.updateStoreAdmissionStats(uint64(workCount), doneInfo, true) // Since we have no control over such work, we choose to count it as // regularWorkClass. - _ = q.granters[regularWorkClass].storeWriteDone(0, doneInfo) + _ = q.granters[admissionpb.RegularWorkClass].storeWriteDone(0, doneInfo) } // StatsToIgnore is called for range snapshot ingestion -- see the comment in @@ -1767,8 +1764,8 @@ func (q *StoreWorkQueue) SetTenantWeights(tenantWeights map[uint64]uint32) { } // getRequesters implements storeRequester. -func (q *StoreWorkQueue) getRequesters() [numWorkClasses]requester { - var result [numWorkClasses]requester +func (q *StoreWorkQueue) getRequesters() [admissionpb.NumWorkClasses]requester { + var result [admissionpb.NumWorkClasses]requester for i := range q.q { result[i] = &q.q[i] } @@ -1795,7 +1792,7 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, - granters [numWorkClasses]granterWithStoreWriteDone, + granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index 6b97e7509af0..0296c442a7ad 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -454,13 +454,13 @@ func TestPriorityStates(t *testing.T) { }) } -func tryScanWorkClass(t *testing.T, d *datadriven.TestData) workClass { - wc := regularWorkClass +func tryScanWorkClass(t *testing.T, d *datadriven.TestData) admissionpb.WorkClass { + wc := admissionpb.RegularWorkClass if d.HasArg("elastic") { var b bool d.ScanArgs(t, "elastic", &b) if b { - wc = elasticWorkClass + wc = admissionpb.ElasticWorkClass } } return wc @@ -487,7 +487,7 @@ func TestStoreWorkQueueBasic(t *testing.T) { } } defer closeFn() - var tg [numWorkClasses]*testGranter + var tg [admissionpb.NumWorkClasses]*testGranter var wrkMap workMap var buf builderWithMu var st *cluster.Settings @@ -495,7 +495,7 @@ func TestStoreWorkQueueBasic(t *testing.T) { q.mu.Lock() defer q.mu.Unlock() return fmt.Sprintf("regular workqueue: %s\nelastic workqueue: %s\nstats:%+v\nestimates:%+v", - q.q[regularWorkClass].String(), q.q[elasticWorkClass].String(), q.mu.stats, + q.q[admissionpb.RegularWorkClass].String(), q.q[admissionpb.ElasticWorkClass].String(), q.mu.stats, q.mu.estimates) } @@ -506,18 +506,18 @@ func TestStoreWorkQueueBasic(t *testing.T) { switch d.Cmd { case "init": closeFn() - tg[regularWorkClass] = &testGranter{name: " regular", buf: &buf} - tg[elasticWorkClass] = &testGranter{name: " elastic", buf: &buf} + tg[admissionpb.RegularWorkClass] = &testGranter{name: " regular", buf: &buf} + tg[admissionpb.ElasticWorkClass] = &testGranter{name: " elastic", buf: &buf} opts := makeWorkQueueOptions(KVWork) opts.usesTokens = true opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), - [numWorkClasses]granterWithStoreWriteDone{tg[regularWorkClass], tg[elasticWorkClass]}, + [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, st, metrics, opts).(*StoreWorkQueue) - tg[regularWorkClass].r = q.getRequesters()[regularWorkClass] - tg[elasticWorkClass].r = q.getRequesters()[elasticWorkClass] + tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] + tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap() return ""