Skip to content

Commit

Permalink
decouple ol{a,t}p tx timeouts: pr comments vitessio#3 -txProps.timeou…
Browse files Browse the repository at this point in the history
…t, +sc.expiryTime

Signed-off-by: Max Englander <max@planetscale.com>
  • Loading branch information
maxenglander committed Aug 23, 2022
1 parent 98b3941 commit 6aba785
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 32 deletions.
34 changes: 19 additions & 15 deletions go/vt/vttablet/tabletserver/stateful_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type StatefulConnection struct {
tainted bool
enforceTimeout bool
timeout time.Duration
timeUsed *atomic.Value
expiryTime *atomic.Value
}

// Properties contains meta information about the connection
Expand Down Expand Up @@ -84,12 +84,10 @@ func (sc *StatefulConnection) ElapsedTimeout() bool {
if !sc.enforceTimeout {
return false
}
timeout := sc.Timeout()
if timeout <= 0 {
if sc.timeout <= 0 {
return false
}
now := time.Now()
return sc.TimeUsed().Before(now.Add(-sc.timeout))
return sc.ExpiryTime().Before(time.Now())
}

// Exec executes the statement in the dedicated connection
Expand Down Expand Up @@ -117,13 +115,6 @@ func (sc *StatefulConnection) Exec(ctx context.Context, query string, maxrows in
return r, nil
}

func (sc *StatefulConnection) Timeout() time.Duration {
if sc.txProps != nil {
return sc.txProps.Timeout
}
return sc.timeout
}

func (sc *StatefulConnection) execWithRetry(ctx context.Context, query string, maxrows int, wantfields bool) (string, error) {
if sc.IsClosed() {
return "", vterrors.New(vtrpcpb.Code_CANCELED, "connection is closed")
Expand Down Expand Up @@ -295,15 +286,24 @@ func (sc *StatefulConnection) LogTransaction(reason tx.ReleaseReason) {
tabletenv.TxLogger.Send(sc)
}

func (sc *StatefulConnection) TimeUsed() time.Time {
tu := sc.timeUsed.Load()
func (sc *StatefulConnection) ExpiryTime() time.Time {
tu := sc.expiryTime.Load()
if tu == nil {
// This is a bug. Better to panic?
return time.Now()
return time.Unix(1<<63-1, 0)
}
return tu.(time.Time)
}

func (sc *StatefulConnection) SetTimeout(timeout time.Duration) {
sc.timeout = timeout
sc.resetExpiryTime()
}

func (sc *StatefulConnection) Timeout() time.Duration {
return sc.timeout
}

// logReservedConn logs reserved connection related stats.
func (sc *StatefulConnection) logReservedConn() {
if sc.reservedProps == nil {
Expand All @@ -323,3 +323,7 @@ func (sc *StatefulConnection) getUsername() string {
}
return callerid.GetUsername(sc.reservedProps.ImmediateCaller)
}

func (sc *StatefulConnection) resetExpiryTime() {
sc.expiryTime.Store(time.Now().Add(sc.timeout))
}
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletserver/stateful_connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ func (sf *StatefulConnectionPool) NewConn(ctx context.Context, options *querypb.
pool: sf,
env: sf.env,
enforceTimeout: options.GetWorkload() != querypb.ExecuteOptions_DBA,
timeout: sf.env.Config().TxTimeoutForWorkload(options.GetWorkload()),
timeUsed: &atomic.Value{},
expiryTime: &atomic.Value{},
}
sfConn.timeUsed.Store(time.Now())
// This will set both the timeout and initialize the expiryTime.
sfConn.SetTimeout(sf.env.Config().TxTimeoutForWorkload(options.GetWorkload()))

err = sf.active.Register(sfConn.ConnID, sfConn)
if err != nil {
Expand Down Expand Up @@ -238,7 +238,7 @@ func (sf *StatefulConnectionPool) markAsNotInUse(sc *StatefulConnection, updateT
}
if sf.active.Put(sc.ConnID) {
if updateTime {
sc.timeUsed.Store(time.Now())
sc.resetExpiryTime()
}
}
}
Expand All @@ -252,6 +252,6 @@ func (sf *StatefulConnectionPool) Capacity() int {
func (sf *StatefulConnectionPool) renewConn(sc *StatefulConnection) error {
sf.active.Unregister(sc.ConnID, "renew existing connection")
sc.ConnID = sf.lastID.Add(1)
sc.timeUsed.Store(time.Now())
sc.expiryTime.Store(time.Now())
return sf.active.Register(sc.ConnID, sc)
}
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,15 @@ func (c *TabletConfig) Clone() *TabletConfig {
return &tc
}

// Set transaction timeouts. Used in tests only.
func (c *TabletConfig) SetTxTimeoutForWorkload(val time.Duration, workload querypb.ExecuteOptions_Workload) {
switch workload {
case querypb.ExecuteOptions_OLAP:
c.Olap.TxTimeoutSeconds.Set(val)
case querypb.ExecuteOptions_OLTP:
c.Oltp.TxTimeoutSeconds.Set(val)
default:
panic(fmt.Sprintf("unsupported workload type: %v", workload))
}
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,9 +1182,9 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar
timeout := tsv.QueryTimeout.Get()
if transactionID != 0 {
allowOnShutdown = true
// ReserveExecute is for OLAP only, so we can directly fetch the OLAP
// ReserveExecute is for OLTP only, so we can directly fetch the OLTP
// TX timeout.
txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLAP)
txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP)
// Use the smaller of the two values (0 means infinity).
timeout = smallerTimeout(timeout, txTimeout)
}
Expand Down
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletserver/tx/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type (
Autocommit bool
Conclusion string
LogToFile bool
Timeout time.Duration

Stats *servenv.TimingsWrapper
}
Expand Down
15 changes: 6 additions & 9 deletions go/vt/vttablet/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (tp *TxPool) Shutdown(ctx context.Context) {
func (tp *TxPool) transactionKiller() {
defer tp.env.LogError()
for _, conn := range tp.scp.GetElapsedTimeout(vterrors.TxKillerRollback) {
timeout := conn.Timeout()
timeout := conn.timeout
log.Warningf("killing transaction (exceeded timeout: %v): %s", timeout, conn.String(tp.env.Config().SanitizeLogMessages))
switch {
case conn.IsTainted():
Expand Down Expand Up @@ -158,14 +158,13 @@ func (tp *TxPool) WaitForEmpty() {
}

// NewTxProps creates a new TxProperties struct
func (tp *TxPool) NewTxProps(immediateCaller *querypb.VTGateCallerID, effectiveCaller *vtrpcpb.CallerID, autocommit bool, timeout time.Duration) *tx.Properties {
func (tp *TxPool) NewTxProps(immediateCaller *querypb.VTGateCallerID, effectiveCaller *vtrpcpb.CallerID, autocommit bool) *tx.Properties {
return &tx.Properties{
StartTime: time.Now(),
EffectiveCaller: effectiveCaller,
ImmediateCaller: immediateCaller,
Autocommit: autocommit,
Stats: tp.txStats,
Timeout: timeout,
}
}

Expand Down Expand Up @@ -241,6 +240,9 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re
if err != nil {
return nil, "", "", vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction %d: %v", reservedID, err)
}
// Update conn timeout.
timeout := tp.env.Config().TxTimeoutForWorkload(options.GetWorkload())
conn.SetTimeout(timeout)
} else {
immediateCaller := callerid.ImmediateCallerIDFromContext(ctx)
effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx)
Expand Down Expand Up @@ -276,12 +278,7 @@ func (tp *TxPool) begin(ctx context.Context, options *querypb.ExecuteOptions, re
return "", "", err
}

workload := options.GetWorkload()
if workload == querypb.ExecuteOptions_UNSPECIFIED {
workload = querypb.ExecuteOptions_OLTP
}
timeout := tp.env.Config().TxTimeoutForWorkload(workload)
conn.txProps = tp.NewTxProps(immediateCaller, effectiveCaller, autocommit, timeout)
conn.txProps = tp.NewTxProps(immediateCaller, effectiveCaller, autocommit)

return beginQueries, sessionStateChanges, nil
}
Expand Down

0 comments on commit 6aba785

Please sign in to comment.