Skip to content

Commit

Permalink
tenantcostmodel: update tenant cost model
Browse files Browse the repository at this point in the history
The current tenant cost model has several problems:

1. It only takes batches into account. It ignores the requests in each read and
   write batch (e.g. GetRequest, CPutRequest, etc). For example, lookup joins
   typically send a small number of read batches with a large number of requests
   in each batch, which resulted in the model greatly under-costing queries
   containing them.

2. It does not take the replication factor into account. This does not matter
   much right now, because the replication factor is always 3 for Serverless.
   But eventually we will allow developers to change the replication factor
   for tables, and will need to charge proportionally more in that case.

3. It processes batches at the highest level of DistSender, before batches have
   been split by range. The upcoming Streamer component will make the count and
   size of these batches unpredictable and difficult for the model to cost.
   While the already-split batches sent to leaseholders at the lower level of
   DistSender have their own predictability issues, they should be better once
   Streamer is runnning.

4. Related to cockroachdb#2 and cockroachdb#3, DistSender cannot accurately cost either read or write
   batches until the replication factor is known in the case of write batches,
   and until a response is received in the case of read batches. This calls into
   questions its current behavior of throttling based on the content of requests.
   Read batches in particular will not throttle until outstanding debt has
   reached fairly high levels, because the read request has a cost close to zero.

5. If an error occurs during message transmission, RUs are still charged against
   the tenant. We'd prefer to not charge customers if there are internal network
   or server problems that are causing errors, since those conditions are
   generally not caused by them or under their control.

The new cost model takes batches, requests, and replication factor into account.
It is also based on a larger number of training scenarios, and so is quite a bit
more accurate for scenarios like IMPORT and TPC-C. Rather than waiting based on
the content of a read/write request batch, it waits based on the average size of
recent batches. It does not charge RUs in error cases.

Release note: None
  • Loading branch information
andy-kimball committed May 14, 2022
1 parent 8a65eef commit 1412117
Show file tree
Hide file tree
Showing 38 changed files with 1,978 additions and 840 deletions.
308 changes: 212 additions & 96 deletions pkg/ccl/multitenantccl/tenantcostclient/limiter.go

Large diffs are not rendered by default.

267 changes: 267 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package tenantcostclient

import (
"context"
"flag"
"fmt"
"io"
"math/rand"
"os"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

var saveDebtCSV = flag.String(
"save-debt-csv", "",
"save latency data from TestTokenBucketDebt to a csv file",
)

func TestLimiter(t *testing.T) {
defer leaktest.AfterTest(t)()

start := timeutil.Now()
ts := timeutil.NewManualTime(start)

ch := make(chan struct{}, 100)

var tb tokenBucket
tb.Init(ts.Now(), ch, 10 /* rate */, 100 /* available */)

check := func(expected string) {
// Call AvailableRU to trigger time update.
t.Helper()
tb.AvailableTokens(ts.Now())
actual := tb.String()
if actual != expected {
t.Errorf("expected: %s\nactual: %s\n", expected, actual)
}
}
check("100.00 RU filling @ 10.00 RU/s (avg 100.00 RU/op)")

// Verify basic update.
ts.Advance(1 * time.Second)
check("110.00 RU filling @ 10.00 RU/s (avg 100.00 RU/op)")
// Check RemoveRU.
tb.RemoveTokens(ts.Now(), 200)
check("-90.00 RU filling @ 10.00 RU/s (avg 100.00 RU/op)")

ts.Advance(1 * time.Second)
check("-80.00 RU filling @ 10.00 RU/s (avg 100.00 RU/op)")
ts.Advance(15 * time.Second)
check("70.00 RU filling @ 10.00 RU/s (avg 100.00 RU/op)")

fulfill := func(amount tenantcostmodel.RU) {
t.Helper()
if ok, _ := tb.TryToFulfill(ts.Now()); !ok {
t.Fatalf("failed to fulfill")
}
tb.RemoveTokens(ts.Now(), amount)
}
fulfill(100)
check("-30.00 RU filling @ 10.00 RU/s (avg 100.00 RU/op)")

// TryAgainAfter should be the time to pay off entire debt.
if ok, tryAgainAfter := tb.TryToFulfill(ts.Now()); ok {
t.Fatalf("fulfilled incorrectly")
} else if exp := 3 * time.Second; tryAgainAfter.Round(time.Millisecond) != exp {
t.Fatalf("tryAgainAfter: expected %s, got %s", exp, tryAgainAfter)
}
check("-30.00 RU filling @ 10.00 RU/s (avg 300.00 RU/op)")

// Create massive debt and ensure that delay is = maxTryAgainAfterSeconds.
tb.RemoveTokens(ts.Now(), maxTryAgainAfterSeconds*10)
if ok, tryAgainAfter := tb.TryToFulfill(ts.Now()); ok {
t.Fatalf("fulfilled incorrectly")
} else if exp := maxTryAgainAfterSeconds * time.Second; tryAgainAfter.Round(time.Millisecond) != exp {
t.Fatalf("tryAgainAfter: expected %s, got %s", exp, tryAgainAfter)
}
check("-10030.00 RU filling @ 10.00 RU/s (avg 300.00 RU/op)")
ts.Advance(maxTryAgainAfterSeconds * time.Second)

// Update average RU/op in order to test case where TryAgainAfter should be
// the time to pay off debt in excess of maxDebtSeconds.
tb.avgRUPerOp = 3
tb.opRU = 0
tb.opCount = 0
if ok, tryAgainAfter := tb.TryToFulfill(ts.Now()); ok {
t.Fatalf("fulfilled incorrectly")
} else if exp := 1 * time.Second; tryAgainAfter.Round(time.Millisecond) != exp {
t.Fatalf("tryAgainAfter: expected %s, got %s", exp, tryAgainAfter)
}
check("-30.00 RU filling @ 10.00 RU/s (avg 3.00 RU/op)")

// Wait for 1 second in order to pay off excess debt. Ensure that operations
// can be fulfilled every 300 ms from then on.
ts.Advance(1 * time.Second)
check("-20.00 RU filling @ 10.00 RU/s (avg 3.00 RU/op)")
fulfill(3)
ts.Advance(300 * time.Millisecond)
fulfill(3)
ts.Advance(300 * time.Millisecond)
fulfill(3)
check("-23.00 RU filling @ 10.00 RU/s (avg 3.00 RU/op)")

// Check notification.
checkNoNotification := func() {
t.Helper()
select {
case <-ch:
t.Error("unexpected notification")
default:
}
}

checkNotification := func() {
t.Helper()
select {
case <-ch:
default:
t.Error("expected notification")
}
}

checkNoNotification()
args := tokenBucketReconfigureArgs{
NewTokens: 43,
NewRate: 10,
NotifyThreshold: 5,
}
tb.Reconfigure(ts.Now(), args)

checkNoNotification()
ts.Advance(1 * time.Second)
check("30.00 RU filling @ 10.00 RU/s (avg 3.00 RU/op)")
fulfill(20)
// No notification: we did not go below the threshold.
checkNoNotification()
// Now we should get a notification.
fulfill(8)
checkNotification()
check("2.00 RU filling @ 10.00 RU/s (avg 3.00 RU/op)")

// We only get one notification (until we Reconfigure or StartNotification).
fulfill(1)
checkNoNotification()

args = tokenBucketReconfigureArgs{
NewTokens: 80,
NewRate: 1,
}
tb.Reconfigure(ts.Now(), args)
check("81.00 RU filling @ 1.00 RU/s (avg 3.00 RU/op)")
ts.Advance(1 * time.Second)

checkNoNotification()
tb.SetupNotification(ts.Now(), 50)
checkNoNotification()
fulfill(60)
checkNotification()
check("22.00 RU filling @ 1.00 RU/s (avg 4.33 RU/op)")

// Test exponential moving average RU/op.
fulfill(60)
ts.Advance(100 * time.Second)
check("62.00 RU filling @ 1.00 RU/s (avg 15.47 RU/op)")
}

// TestTokenBucketDebt simulates a closed-loop workload with parallelism 1 in
// combination with incurring a fixed amount of debt every second. It verifies
// that queue times are never too high, and optionally emits all queue time
// information into a csv file.
func TestLimiterDebt(t *testing.T) {
defer leaktest.AfterTest(t)()

start := timeutil.Now()
ts := timeutil.NewManualTime(start)

var lim limiter
lim.Init(ts, nil /* notifyChan */)
lim.Reconfigure(limiterReconfigureArgs{NewRate: 100})

const tickDuration = time.Millisecond
const debtPeriod = time.Second
const debtTicks = int(debtPeriod / tickDuration)
const totalTicks = 10 * debtTicks
const debt tenantcostmodel.RU = 50

const reqRUMean = 2.0
const reqRUStdDev = reqRUMean / 2.0

// Use a fixed seed so the result is reproducible.
r := rand.New(rand.NewSource(1234))
randRu := func() tenantcostmodel.RU {
ru := r.NormFloat64()*reqRUStdDev + reqRUMean
if ru < 0 {
return 0
}
return tenantcostmodel.RU(ru)
}

out := io.Discard
if *saveDebtCSV != "" {
file, err := os.Create(*saveDebtCSV)
if err != nil {
t.Fatalf("error creating csv file: %v", err)
}
defer func() {
if err := file.Close(); err != nil {
t.Errorf("error closing csv file: %v", err)
}
}()
out = file
}

var req *waitRequest
reqTick := 0
opsCount := 0
ctx := context.Background()

fmt.Fprintf(out, "time (s), RUs requested, queue latency (ms), debt applied (RU), available RUs, extra RUs\n")
for tick := 0; tick < totalTicks; tick++ {
var d tenantcostmodel.RU
if tick > 0 && tick%debtTicks == 0 {
d = debt
lim.OnTick(d)
opsCount = 0
}

if req != nil {
if tenantcostmodel.RU(req.needed) <= lim.AvailableRU() {
// This should not block, since we checked above there are enough
// tokens.
if err := lim.qp.Acquire(ctx, req); err != nil {
t.Fatalf("error during Acquire: %v", err)
}

opsCount++
latency := tick - reqTick
if latency > 100 {
// A single request took longer than 100ms; we did a poor job smoothing
// out the debt.
t.Fatalf("high latency for request: %d ms", latency)
}
fmt.Fprintf(out, "%8.3f,%14.2f,%19d,%18.1f,%14.2f,%10.2f\n",
(time.Duration(tick) * tickDuration).Seconds(),
req.needed, latency, d, lim.AvailableRU(), lim.extraUsage())

req = nil
} else {
fmt.Fprintf(out, "%8.3f, , ,%18.1f,%14.2f,%10.2f\n",
(time.Duration(tick) * tickDuration).Seconds(), d, lim.AvailableRU(), lim.extraUsage())
}
}

if req == nil {
// Create a request now.
req = &waitRequest{needed: quotapool.Tokens(lim.amortizeExtraRU(randRu()))}
reqTick = tick
}

ts.Advance(tickDuration)
}
}
Loading

0 comments on commit 1412117

Please sign in to comment.