Skip to content

Commit

Permalink
Merge pull request #4739 from tinyspeck/autocommit-batch-execute
Browse files Browse the repository at this point in the history
Single round trip commit on BatchExecute
  • Loading branch information
demmer authored Mar 22, 2019
2 parents 7a201e3 + 20b59bb commit 56c2a93
Show file tree
Hide file tree
Showing 6 changed files with 815 additions and 758 deletions.
562 changes: 284 additions & 278 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

18 changes: 15 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,18 @@ func (tsv *TabletServer) ExecuteBatch(ctx context.Context, target *querypb.Targe
defer tsv.endRequest(false)
defer tsv.handlePanicAndSendLogStats("batch", nil, nil)

// When all these conditions are met, we send the queries directly
// to the MySQL without creating a transaction. This optimization
// yields better throughput.
// Setting ExecuteOptions_AUTOCOMMIT will get a connection out of the
// pool without actually begin/commit the transaction.
if (options == nil || options.TransactionIsolation == querypb.ExecuteOptions_DEFAULT) &&
tsv.qe.autoCommit.Get() &&
asTransaction &&
tsv.qe.passthroughDMLs.Get() {
options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT
}

if asTransaction {
transactionID, err = tsv.Begin(ctx, target, options)
if err != nil {
Expand Down Expand Up @@ -1844,9 +1856,9 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real
target := tsv.target
tsv.mu.Unlock()
shr := &querypb.StreamHealthResponse{
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
TabletExternallyReparentedTimestamp: terTimestamp,
RealtimeStats: stats,
}
Expand Down
15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
return 0, err
}

autocommitTransaction := false

if queries, ok := txIsolations[options.GetTransactionIsolation()]; ok {
if queries.setIsolationLevel != "" {
if _, err := conn.Exec(ctx, "set transaction isolation level "+queries.setIsolationLevel, 1, false); err != nil {
Expand All @@ -249,6 +251,8 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
if _, err := conn.Exec(ctx, queries.openTransaction, 1, false); err != nil {
return 0, err
}
} else if options.GetTransactionIsolation() == querypb.ExecuteOptions_AUTOCOMMIT {
autocommitTransaction = true
} else {
return 0, fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation())
}
Expand All @@ -263,6 +267,7 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
axp,
immediateCaller,
effectiveCaller,
autocommitTransaction,
),
options.GetWorkload() != querypb.ExecuteOptions_DBA,
)
Expand Down Expand Up @@ -312,6 +317,12 @@ func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptio
func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) error {
defer conn.conclude(TxCommit, "transaction committed")
defer mc.LockDB(conn.NewMessages, conn.ChangedMessages)()

if conn.Autocommit {
mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages)
return nil
}

if _, err := conn.Exec(ctx, "commit", 1, false); err != nil {
conn.Close()
return err
Expand Down Expand Up @@ -379,9 +390,10 @@ type TxConnection struct {
LogToFile sync2.AtomicInt32
ImmediateCallerID *querypb.VTGateCallerID
EffectiveCallerID *vtrpcpb.CallerID
Autocommit bool
}

func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID) *TxConnection {
func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID, autocommit bool) *TxConnection {
return &TxConnection{
DBConn: conn,
TransactionID: transactionID,
Expand All @@ -391,6 +403,7 @@ func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, i
ChangedMessages: make(map[string][]string),
ImmediateCallerID: immediate,
EffectiveCallerID: effective,
Autocommit: autocommit,
}
}

Expand Down
21 changes: 21 additions & 0 deletions go/vt/vttablet/tabletserver/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,27 @@ func TestTxPoolTransactionIsolation(t *testing.T) {
}
}

func TestTxPoolAutocommit(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
txPool := newTxPool()
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
ctx := context.Background()

// Start a transaction with autocommit. This will ensure that the executor does not send begin/commit statements
// to mysql.
// This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal
// because is not in the list of expected queries (i.e db.AddQuery hasn't been called).
txid, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT})
if err != nil {
t.Fatal(err)
}
err = txPool.Commit(ctx, txid, &fakeMessageCommitter{})
if err != nil {
t.Fatal(err)
}
}

// TestTxPoolBeginWithPoolConnectionError_TransientErrno2006 tests the case
// where we see a transient errno 2006 e.g. because MySQL killed the
// db connection. DBConn.Exec() is going to reconnect and retry automatically
Expand Down
4 changes: 4 additions & 0 deletions proto/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ message ExecuteOptions {
// This is not an "official" transaction level but it will do a
// START TRANSACTION WITH CONSISTENT SNAPSHOT, READ ONLY
CONSISTENT_SNAPSHOT_READ_ONLY = 5;

// This not an "official" transaction level, it will send queries to mysql
// without wrapping them in a transaction
AUTOCOMMIT = 6;
}

TransactionIsolation transaction_isolation = 9;
Expand Down
Loading

0 comments on commit 56c2a93

Please sign in to comment.