Skip to content

Commit

Permalink
Merge pull request #6451 from planetscale/comquit
Browse files Browse the repository at this point in the history
Close reserved connections when client hangs up
  • Loading branch information
harshit-gangal authored Jul 17, 2020
2 parents 9ef859b + 48cdd37 commit 709dbed
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 10 deletions.
8 changes: 4 additions & 4 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession,
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
e.updateQueryCounts("Rollback", "", "", int64(logStats.ShardQueries))
err := e.CloseSession(ctx, safeSession)
err := e.txConn.Rollback(ctx, safeSession)
logStats.CommitTime = time.Since(execStart)
return &sqltypes.Result{}, err
}
Expand Down Expand Up @@ -342,10 +342,10 @@ func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession
return qr, nil
}

// CloseSession closes the current transaction, if any. It is called both for explicit "rollback"
// statements and implicitly when the mysql server closes the connection.
// CloseSession releases the current connection, which rollbacks open transactions and closes reserved connections.
// It is called then the MySQL servers closes the connection to its client.
func (e *Executor) CloseSession(ctx context.Context, safeSession *SafeSession) error {
return e.txConn.Rollback(ctx, safeSession)
return e.txConn.Release(ctx, safeSession)
}

func (e *Executor) handleSet(ctx context.Context, safeSession *SafeSession, sql string, logStats *LogStats) (*sqltypes.Result, error) {
Expand Down
65 changes: 59 additions & 6 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,71 @@ func (q *query) VStreamResults(request *binlogdatapb.VStreamResultsRequest, stre
}

//ReserveExecute implements the QueryServer interface
func (q *query) ReserveExecute(ctx context.Context, request *querypb.ReserveExecuteRequest) (*querypb.ReserveExecuteResponse, error) {
panic("implement me")
func (q *query) ReserveExecute(ctx context.Context, request *querypb.ReserveExecuteRequest) (response *querypb.ReserveExecuteResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx),
request.EffectiveCallerId,
request.ImmediateCallerId,
)
result, reservedID, alias, err := q.server.ReserveExecute(ctx, request.Target, request.PreQueries, request.Query.Sql, request.Query.BindVariables, request.TransactionId, request.Options)
if err != nil {
// if we have a valid reservedID, return the error in-band
if reservedID != 0 {
return &querypb.ReserveExecuteResponse{
Error: vterrors.ToVTRPC(err),
ReservedId: reservedID,
TabletAlias: alias,
}, nil
}
return nil, vterrors.ToGRPC(err)
}
return &querypb.ReserveExecuteResponse{
Result: sqltypes.ResultToProto3(result),
ReservedId: reservedID,
TabletAlias: alias,
}, nil
}

//ReserveBeginExecute implements the QueryServer interface
func (q *query) ReserveBeginExecute(ctx context.Context, request *querypb.ReserveBeginExecuteRequest) (*querypb.ReserveBeginExecuteResponse, error) {
panic("implement me")
func (q *query) ReserveBeginExecute(ctx context.Context, request *querypb.ReserveBeginExecuteRequest) (response *querypb.ReserveBeginExecuteResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx),
request.EffectiveCallerId,
request.ImmediateCallerId,
)
result, transactionID, reservedID, alias, err := q.server.ReserveBeginExecute(ctx, request.Target, request.PreQueries, request.Query.Sql, request.Query.BindVariables, request.Options)
if err != nil {
// if we have a valid reservedID, return the error in-band
if reservedID != 0 {
return &querypb.ReserveBeginExecuteResponse{
Error: vterrors.ToVTRPC(err),
TransactionId: transactionID,
ReservedId: reservedID,
TabletAlias: alias,
}, nil
}
return nil, vterrors.ToGRPC(err)
}
return &querypb.ReserveBeginExecuteResponse{
Result: sqltypes.ResultToProto3(result),
TransactionId: transactionID,
ReservedId: reservedID,
TabletAlias: alias,
}, nil
}

//Release implements the QueryServer interface
func (q *query) Release(ctx context.Context, request *querypb.ReleaseRequest) (*querypb.ReleaseResponse, error) {
panic("implement me")
func (q *query) Release(ctx context.Context, request *querypb.ReleaseRequest) (response *querypb.ReleaseResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx),
request.EffectiveCallerId,
request.ImmediateCallerId,
)
err = q.server.Release(ctx, request.Target, request.TransactionId, request.ReservedId)
if err != nil {
return nil, vterrors.ToGRPC(err)
}
return &querypb.ReleaseResponse{}, nil
}

// Register registers the implementation on the provide gRPC Server.
Expand Down

0 comments on commit 709dbed

Please sign in to comment.