Skip to content

Commit

Permalink
more refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Apr 20, 2021
1 parent 7aad79c commit 7614155
Showing 1 changed file with 20 additions and 51 deletions.
71 changes: 20 additions & 51 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ func (stc *ScatterConn) ExecuteMultiShard(

if autocommit {
// As this is auto-commit, the transactionID is supposed to be zero.
if info.transactionID != int64(0) {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "in autocommit mode, transactionID should be zero but was: %d", info.transactionID)
if transactionID != int64(0) {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "in autocommit mode, transactionID should be zero but was: %d", transactionID)
}
}

Expand All @@ -240,7 +240,7 @@ func (stc *ScatterConn) ExecuteMultiShard(
qs = rs.Gateway
}

retryConnection := func(resetTabletConnAndExec func(), failUpdate func() *shardActionInfo) {
retryRequest := func(exec func()) {
retry := checkAndResetShardSession(info, err, session)
switch retry {
case newQS:
Expand All @@ -250,69 +250,54 @@ func (stc *ScatterConn) ExecuteMultiShard(
case shard:
// if we need to reset a reserved connection, here is our chance to try executing again,
// against a new connection
resetTabletConnAndExec()
}
// err will have been changed by the call above
if err != nil {
info = failUpdate()
exec()
}
}

switch info.actionNeeded {
case nothing:
innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts)
if err != nil {
retryConnection(func() {
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserve
innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts)
}, func() *shardActionInfo {
// we failed, let's clear out any lingering reserve ID
return info.updateReservedID(reservedID, alias)
})
if err != nil {
return info, err
}
}

case begin:
innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, info.reservedID, opts)
innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, reservedID, opts)
if err != nil {
if transactionID != 0 {
return info.updateTransactionID(transactionID, alias), err
// if we had an open transaction, we can't repair anything and have to exit here.
// we still keep the transaction open - an error doesn't immediately close the transaction
break
}
retryConnection(func() {
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserveBegin
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts)
}, func() *shardActionInfo {
return info.updateTransactionAndReservedID(transactionID, reservedID, alias)
})
if err != nil {
return info, err
}
}
case reserve:
innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts)
if err != nil {
return info.updateReservedID(reservedID, alias), err
}
innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID, opts)
case reserveBegin:
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts)
if err != nil {
return info.updateTransactionAndReservedID(transactionID, reservedID, alias), err
}
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded)
}
// We need to new shard info irrespective of the error.
newInfo := info.updateTransactionAndReservedID(transactionID, reservedID, alias)
if err != nil {
return newInfo, err
}
mu.Lock()
defer mu.Unlock()

// Don't append more rows if row count is exceeded.
if ignoreMaxMemoryRows || len(qr.Rows) <= *maxMemoryRows {
qr.AppendResult(innerqr)
}
return info.updateTransactionAndReservedID(transactionID, reservedID, alias), nil
return newInfo, nil
},
)

Expand All @@ -334,7 +319,7 @@ func checkAndResetShardSession(info *shardActionInfo, err error, session *SafeSe
}
}
if retry != none {
session.ResetShard(info.alias)
_ = session.ResetShard(info.alias)
}
return retry
}
Expand Down Expand Up @@ -796,25 +781,9 @@ type shardActionInfo struct {
alias *topodatapb.TabletAlias
}

func (sai *shardActionInfo) updateTransactionID(txID int64, alias *topodatapb.TabletAlias) *shardActionInfo {
if txID == 0 {
// As transaction id is ZERO, there is nothing to update in session shard sessions.
return nil
}
return sai.updateTransactionAndReservedID(txID, sai.reservedID, alias)
}

func (sai *shardActionInfo) updateReservedID(rID int64, alias *topodatapb.TabletAlias) *shardActionInfo {
if rID == 0 {
// As reserved id is ZERO, there is nothing to update in session shard sessions.
return nil
}
return sai.updateTransactionAndReservedID(sai.transactionID, rID, alias)
}

func (sai *shardActionInfo) updateTransactionAndReservedID(txID int64, rID int64, alias *topodatapb.TabletAlias) *shardActionInfo {
if txID == 0 && rID == 0 {
// As transaction id and reserved id is ZERO, there is nothing to update in session shard sessions.
if txID == sai.transactionID && rID == sai.reservedID {
// As transaction id and reserved id have not changed, there is nothing to update in session shard sessions.
return nil
}
newInfo := *sai
Expand Down

0 comments on commit 7614155

Please sign in to comment.