Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decouple olap tx timeout from oltp tx timeout #10946

Merged
merged 15 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions doc/releasenotes/15_0_0_summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,24 @@ The reason you cannot change all the values together is because the restore proc
should be used to process the previous backup. Please make sure you have thought out all possible scenarios for restore before transitioning from one
compression engine to another.

#### Independent OLAP and OLTP transactional timeouts

`--queryserver-config-olap-transaction-timeout` specifies the timeout applied
to a transaction created within an OLAP workload. The default value is `30`
seconds, but this can be raised, lowered, or set to zero to disable the timeout
altogether.

Until now, while OLAP queries would bypass the query timeout, transactions
created within an OLAP session would be rolled back
`--queryserver-config-transaction-timeout` seconds after the transaction was
started.

As of now, OLTP and OLAP transaction timeouts can be configured independently of each
other.

The main use case is to run queries spanning a long period of time which
require transactional guarantees such as consistency or atomicity.

### Online DDL changes

#### Concurrent vitess migrations
Expand Down
1 change: 1 addition & 0 deletions go/cmd/vttestserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (t *topoFlags) buildTopology() (*vttestpb.VTTestTopology, error) {
// Annoying, but in unit tests, parseFlags gets called multiple times per process
// (anytime startCluster is called), so we need to guard against the second test
// to run failing with:
//
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
// flag redefined: log_rotate_max_size
var logFlagsOnce sync.Once

Expand Down
3 changes: 2 additions & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ Usage of vtctld:
--queryserver-config-idle-timeout float query server idle timeout (in seconds), vttablet manages various mysql connection pools. This config means if a connection has not been used in given idle timeout, this connection will be removed from pool. This effectively manages number of connection objects and optimize the pool performance. (default 1800)
--queryserver-config-max-result-size int query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries. (default 10000)
--queryserver-config-message-postpone-cap int query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem. (default 4)
--queryserver-config-olap-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLAP session will be killed (default 30)
--queryserver-config-passthrough-dmls query server pass through all dml statements without rewriting
--queryserver-config-pool-prefill-parallelism int (DEPRECATED) query server read pool prefill parallelism, a non-zero value will prefill the pool using the specified parallism.
--queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16)
Expand All @@ -159,7 +160,7 @@ Usage of vtctld:
--queryserver-config-terse-errors prevent bind vars from escaping in client error messages
--queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20)
--queryserver-config-transaction-prefill-parallelism int (DEPRECATED) query server transaction prefill parallelism, a non-zero value will prefill the pool using the specified parallism.
--queryserver-config-transaction-timeout float query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value (default 30)
--queryserver-config-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLTP session will be killed (default 30)
--queryserver-config-txpool-timeout float query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1)
--queryserver-config-txpool-waiter-cap int query server transaction pool waiter limit, this is the maximum number of transactions that can be queued waiting to get a connection (default 5000)
--queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this
Expand Down
3 changes: 2 additions & 1 deletion go/flags/endtoend/vtexplain.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ Usage of vtexplain:
--queryserver-config-idle-timeout float query server idle timeout (in seconds), vttablet manages various mysql connection pools. This config means if a connection has not been used in given idle timeout, this connection will be removed from pool. This effectively manages number of connection objects and optimize the pool performance. (default 1800)
--queryserver-config-max-result-size int query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries. (default 10000)
--queryserver-config-message-postpone-cap int query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem. (default 4)
--queryserver-config-olap-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLAP session will be killed (default 30)
--queryserver-config-passthrough-dmls query server pass through all dml statements without rewriting
--queryserver-config-pool-prefill-parallelism int (DEPRECATED) query server read pool prefill parallelism, a non-zero value will prefill the pool using the specified parallism.
--queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16)
Expand All @@ -173,7 +174,7 @@ Usage of vtexplain:
--queryserver-config-terse-errors prevent bind vars from escaping in client error messages
--queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20)
--queryserver-config-transaction-prefill-parallelism int (DEPRECATED) query server transaction prefill parallelism, a non-zero value will prefill the pool using the specified parallism.
--queryserver-config-transaction-timeout float query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value (default 30)
--queryserver-config-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLTP session will be killed (default 30)
--queryserver-config-txpool-timeout float query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1)
--queryserver-config-txpool-waiter-cap int query server transaction pool waiter limit, this is the maximum number of transactions that can be queued waiting to get a connection (default 5000)
--queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this
Expand Down
3 changes: 2 additions & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ Usage of vttablet:
--queryserver-config-idle-timeout float query server idle timeout (in seconds), vttablet manages various mysql connection pools. This config means if a connection has not been used in given idle timeout, this connection will be removed from pool. This effectively manages number of connection objects and optimize the pool performance. (default 1800)
--queryserver-config-max-result-size int query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries. (default 10000)
--queryserver-config-message-postpone-cap int query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem. (default 4)
--queryserver-config-olap-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLAP session will be killed (default 30)
--queryserver-config-passthrough-dmls query server pass through all dml statements without rewriting
--queryserver-config-pool-prefill-parallelism int (DEPRECATED) query server read pool prefill parallelism, a non-zero value will prefill the pool using the specified parallism.
--queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16)
Expand All @@ -338,7 +339,7 @@ Usage of vttablet:
--queryserver-config-terse-errors prevent bind vars from escaping in client error messages
--queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20)
--queryserver-config-transaction-prefill-parallelism int (DEPRECATED) query server transaction prefill parallelism, a non-zero value will prefill the pool using the specified parallism.
--queryserver-config-transaction-timeout float query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value (default 30)
--queryserver-config-transaction-timeout float query server transaction timeout (in seconds), after which a transaction in an OLTP session will be killed (default 30)
--queryserver-config-txpool-timeout float query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1)
--queryserver-config-txpool-waiter-cap int query server transaction pool waiter limit, this is the maximum number of transactions that can be queued waiting to get a connection (default 5000)
--queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this
Expand Down
65 changes: 9 additions & 56 deletions go/pools/numbered.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ type Numbered struct {
}

type numberedWrapper struct {
val any
inUse bool
purpose string
timeCreated time.Time
timeUsed time.Time
enforceTimeout bool
val any
inUse bool
purpose string
}

type unregistered struct {
Expand All @@ -62,14 +59,10 @@ func NewNumbered() *Numbered {
// Register starts tracking a resource by the supplied id.
// It does not lock the object.
// It returns an error if the id already exists.
func (nu *Numbered) Register(id int64, val any, enforceTimeout bool) error {
func (nu *Numbered) Register(id int64, val any) error {
// Optimistically assume we're not double registering.
now := time.Now()
resource := &numberedWrapper{
val: val,
timeCreated: now,
timeUsed: now,
enforceTimeout: enforceTimeout,
val: val,
}

nu.mu.Lock()
Expand Down Expand Up @@ -129,16 +122,15 @@ func (nu *Numbered) Get(id int64, purpose string) (val any, err error) {
}

// Put unlocks a resource for someone else to use.
func (nu *Numbered) Put(id int64, updateTime bool) {
func (nu *Numbered) Put(id int64) bool {
nu.mu.Lock()
defer nu.mu.Unlock()
if nw, ok := nu.resources[id]; ok {
nw.inUse = false
nw.purpose = ""
if updateTime {
nw.timeUsed = time.Now()
}
return true
}
return false
}

// GetAll returns the list of all resources in the pool.
Expand All @@ -157,50 +149,11 @@ func (nu *Numbered) GetAll() (vals []any) {
func (nu *Numbered) GetByFilter(purpose string, match func(val any) bool) (vals []any) {
nu.mu.Lock()
defer nu.mu.Unlock()
for _, nw := range nu.resources {
if nw.inUse || !nw.enforceTimeout {
continue
}
if match(nw.val) {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
}
}
return vals
}

// GetOutdated returns a list of resources that are older than age, and locks them.
// It does not return any resources that are already locked.
func (nu *Numbered) GetOutdated(age time.Duration, purpose string) (vals []any) {
nu.mu.Lock()
defer nu.mu.Unlock()
now := time.Now()
for _, nw := range nu.resources {
if nw.inUse || !nw.enforceTimeout {
continue
}
if nw.timeUsed.Add(age).Sub(now) <= 0 {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
}
}
return vals
}

// GetIdle returns a list of resurces that have been idle for longer
// than timeout, and locks them. It does not return any resources that
// are already locked.
func (nu *Numbered) GetIdle(timeout time.Duration, purpose string) (vals []any) {
nu.mu.Lock()
defer nu.mu.Unlock()
now := time.Now()
for _, nw := range nu.resources {
if nw.inUse {
continue
}
if nw.timeUsed.Add(timeout).Sub(now) <= 0 {
if match(nw.val) {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
Expand Down
65 changes: 10 additions & 55 deletions go/pools/numbered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -30,10 +29,10 @@ func TestNumberedGeneral(t *testing.T) {
id := int64(0)
p := NewNumbered()

err := p.Register(id, id, true)
err := p.Register(id, id)
require.NoError(t, err)

err = p.Register(id, id, true)
err = p.Register(id, id)
assert.Contains(t, "already present", err.Error())

var v any
Expand All @@ -44,7 +43,7 @@ func TestNumberedGeneral(t *testing.T) {
_, err = p.Get(id, "test1")
assert.Contains(t, "in use: test", err.Error())

p.Put(id, true)
p.Put(id)
_, err = p.Get(1, "test2")
assert.Contains(t, "not found", err.Error())
p.Unregister(1, "test") // Should not fail
Expand All @@ -55,61 +54,17 @@ func TestNumberedGeneral(t *testing.T) {
t.Errorf("want prefix 'ended at' and suffix '(test)', got '%v'", err)
}

id = 0
p.Register(id, id, true)
id = 1
p.Register(id, id, true)
id = 2
p.Register(id, id, false)
time.Sleep(300 * time.Millisecond)
id = 3
p.Register(id, id, true)
time.Sleep(100 * time.Millisecond)

// p has 0, 1, 2, 3 (0, 1, 2 are aged, but 2 is not enforced)
vals := p.GetOutdated(200*time.Millisecond, "by outdated")
if num := len(vals); num != 2 {
t.Errorf("want 2, got %v", num)
if p.Size() != 0 {
t.Errorf("want 0, got %v", p.Size())
}
if _, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by outdated" {
t.Errorf("want 'in use: by outdated', got '%v'", err)
}
for _, v := range vals {
p.Put(v.(int64), true)
}
p.Put(2, true) // put to 2 to ensure it's not idle
time.Sleep(100 * time.Millisecond)

// p has 0, 1, 2 (2 is idle)
vals = p.GetIdle(200*time.Millisecond, "by idle")
if len(vals) != 1 {
t.Errorf("want 1, got %v", len(vals))
}
if _, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by idle" {
t.Errorf("want 'in use: by idle', got '%v'", err)
}
if vals[0].(int64) != 3 {
t.Errorf("want 3, got %v", vals[0])
}
p.Unregister(vals[0].(int64), "test")

// p has 0, 1, and 2
if p.Size() != 3 {
t.Errorf("want 3, got %v", p.Size())
}
go func() {
p.Unregister(0, "test")
p.Unregister(1, "test")
p.Unregister(2, "test")
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
}()
p.WaitForEmpty()
}

func TestNumberedGetByFilter(t *testing.T) {
p := NewNumbered()
p.Register(1, 1, true)
p.Register(2, 2, true)
p.Register(3, 3, true)
p.Register(1, 1)
p.Register(2, 2)
p.Register(3, 3)
p.Get(1, "locked")

vals := p.GetByFilter("filtered", func(v any) bool {
Expand All @@ -133,7 +88,7 @@ func BenchmarkRegisterUnregister(b *testing.B) {
id := int64(1)
val := "foobarbazdummyval"
for i := 0; i < b.N; i++ {
p.Register(id, val, false)
p.Register(id, val)
p.Unregister(id, "some reason")
}
}
Expand All @@ -145,7 +100,7 @@ func BenchmarkRegisterUnregisterParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
id := rand.Int63()
p.Register(id, val, false)
p.Register(id, val)
p.Unregister(id, "some reason")
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func (bt *BufferingTest) createCluster() (*cluster.LocalProcessCluster, int) {
SchemaSQL: sqlSchema,
VSchema: bt.VSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s",
clusterInstance.VtTabletExtraArgs = []string{
"--health_check_interval", "1s",
"--queryserver-config-transaction-timeout", "20",
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vtgate/godriver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func TestMain(m *testing.M) {
SchemaSQL: SchemaSQL,
VSchema: VSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "3"}
clusterInstance.VtTabletExtraArgs = []string{
"--queryserver-config-transaction-timeout", "3",
}
if err := clusterInstance.StartKeyspace(*Keyspace, []string{"-80", "80-"}, 1, false); err != nil {
log.Fatal(err.Error())
return 1
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vtgate/readafterwrite/raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func TestMain(m *testing.M) {
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "5"}
clusterInstance.VtTabletExtraArgs = []string{
"--queryserver-config-transaction-timeout", "5",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/servenv/grpc_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var grpcAuthServerFlagHooks []func(*pflag.FlagSet)
// RegisterGRPCServerAuthFlags registers flags required to enable server-side
// authentication in vitess gRPC services.
//
// `go/cmd/*`` entrypoints should call this function before
// `go/cmd/* entrypoints should call this function before
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
// ParseFlags(WithArgs)? if they wish to expose Authenticator functionality.
func RegisterGRPCServerAuthFlags() {
OnParse(func(fs *pflag.FlagSet) {
Expand Down
7 changes: 5 additions & 2 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,11 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) {

func TestShortTxTimeout(t *testing.T) {
client := framework.NewClient()
defer framework.Server.SetTxTimeout(framework.Server.TxTimeout())
framework.Server.SetTxTimeout(10 * time.Millisecond)
defer framework.Server.Config().SetTxTimeoutForWorkload(
framework.Server.Config().TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP),
querypb.ExecuteOptions_OLTP,
)
framework.Server.Config().SetTxTimeoutForWorkload(10*time.Millisecond, querypb.ExecuteOptions_OLTP)

err := client.Begin(false)
require.NoError(t, err)
Expand Down
Loading