Skip to content

Commit

Permalink
on a reserve connection retry ignore the existing shard session
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Nov 21, 2024
1 parent b487724 commit 6ddf65e
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (stc *ScatterConn) ExecuteMultiShard(
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserve
info.ignoreOldSession = true
var state queryservice.ReservedState
state, innerqr, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts)
reservedID = state.ReservedID
Expand All @@ -237,6 +238,7 @@ func (stc *ScatterConn) ExecuteMultiShard(
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserveBegin
info.ignoreOldSession = true
var state queryservice.ReservedTransactionState
state, innerqr, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables, opts)
transactionID = state.TransactionID
Expand Down Expand Up @@ -670,16 +672,19 @@ func (stc *ScatterConn) multiGoTransaction(
if err != nil {
return
}
updated, err := action(rs, i, info)
if updated == nil {
info, err = action(rs, i, info)
if info == nil {
return
}
if shardSession != nil && updated.rowsAffected {
if info.ignoreOldSession {
shardSession = nil
}
if shardSession != nil && info.rowsAffected {
// We might not always update or append in the session.
// We need to track if rows were affected in the transaction.
shardSession.RowsAffected = updated.rowsAffected
shardSession.RowsAffected = info.rowsAffected
}
if updated.actionNeeded != nothing && (updated.transactionID != 0 || updated.reservedID != 0) {
if info.actionNeeded != nothing && (info.transactionID != 0 || info.reservedID != 0) {
appendErr := session.AppendOrUpdate(rs.Target, info, shardSession, stc.txConn.mode)
if appendErr != nil {
err = appendErr
Expand Down Expand Up @@ -901,6 +906,7 @@ type shardActionInfo struct {
actionNeeded actionNeeded
reservedID, transactionID int64
alias *topodatapb.TabletAlias
ignoreOldSession bool
rowsAffected bool
}

Expand Down

0 comments on commit 6ddf65e

Please sign in to comment.