Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single round trip commit on BatchExecute #4739

Merged
merged 1 commit into from
Mar 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
562 changes: 284 additions & 278 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

18 changes: 15 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,18 @@ func (tsv *TabletServer) ExecuteBatch(ctx context.Context, target *querypb.Targe
defer tsv.endRequest(false)
defer tsv.handlePanicAndSendLogStats("batch", nil, nil)

// When all these conditions are met, we send the queries directly
// to the MySQL without creating a transaction. This optimization
// yields better throughput.
// Setting ExecuteOptions_AUTOCOMMIT will get a connection out of the
// pool without actually begin/commit the transaction.
if (options == nil || options.TransactionIsolation == querypb.ExecuteOptions_DEFAULT) &&
tsv.qe.autoCommit.Get() &&
asTransaction &&
tsv.qe.passthroughDMLs.Get() {
options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT
}

if asTransaction {
transactionID, err = tsv.Begin(ctx, target, options)
if err != nil {
Expand Down Expand Up @@ -1844,9 +1856,9 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real
target := tsv.target
tsv.mu.Unlock()
shr := &querypb.StreamHealthResponse{
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
TabletExternallyReparentedTimestamp: terTimestamp,
RealtimeStats: stats,
}
Expand Down
15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
return 0, err
}

autocommitTransaction := false

if queries, ok := txIsolations[options.GetTransactionIsolation()]; ok {
if queries.setIsolationLevel != "" {
if _, err := conn.Exec(ctx, "set transaction isolation level "+queries.setIsolationLevel, 1, false); err != nil {
Expand All @@ -249,6 +251,8 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
if _, err := conn.Exec(ctx, queries.openTransaction, 1, false); err != nil {
return 0, err
}
} else if options.GetTransactionIsolation() == querypb.ExecuteOptions_AUTOCOMMIT {
autocommitTransaction = true
} else {
return 0, fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation())
}
Expand All @@ -263,6 +267,7 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
axp,
immediateCaller,
effectiveCaller,
autocommitTransaction,
),
options.GetWorkload() != querypb.ExecuteOptions_DBA,
)
Expand Down Expand Up @@ -312,6 +317,12 @@ func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptio
func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) error {
defer conn.conclude(TxCommit, "transaction committed")
defer mc.LockDB(conn.NewMessages, conn.ChangedMessages)()

if conn.Autocommit {
mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages)
return nil
}

if _, err := conn.Exec(ctx, "commit", 1, false); err != nil {
conn.Close()
return err
Expand Down Expand Up @@ -379,9 +390,10 @@ type TxConnection struct {
LogToFile sync2.AtomicInt32
ImmediateCallerID *querypb.VTGateCallerID
EffectiveCallerID *vtrpcpb.CallerID
Autocommit bool
}

func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID) *TxConnection {
func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID, autocommit bool) *TxConnection {
return &TxConnection{
DBConn: conn,
TransactionID: transactionID,
Expand All @@ -391,6 +403,7 @@ func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, i
ChangedMessages: make(map[string][]string),
ImmediateCallerID: immediate,
EffectiveCallerID: effective,
Autocommit: autocommit,
}
}

Expand Down
21 changes: 21 additions & 0 deletions go/vt/vttablet/tabletserver/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,27 @@ func TestTxPoolTransactionIsolation(t *testing.T) {
}
}

func TestTxPoolAutocommit(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
txPool := newTxPool()
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
ctx := context.Background()

// Start a transaction with autocommit. This will ensure that the executor does not send begin/commit statements
// to mysql.
// This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal
// because is not in the list of expected queries (i.e db.AddQuery hasn't been called).
txid, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT})
if err != nil {
t.Fatal(err)
}
err = txPool.Commit(ctx, txid, &fakeMessageCommitter{})
if err != nil {
t.Fatal(err)
}
}

// TestTxPoolBeginWithPoolConnectionError_TransientErrno2006 tests the case
// where we see a transient errno 2006 e.g. because MySQL killed the
// db connection. DBConn.Exec() is going to reconnect and retry automatically
Expand Down
4 changes: 4 additions & 0 deletions proto/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ message ExecuteOptions {
// This is not an "official" transaction level but it will do a
// START TRANSACTION WITH CONSISTENT SNAPSHOT, READ ONLY
CONSISTENT_SNAPSHOT_READ_ONLY = 5;

// This not an "official" transaction level, it will send queries to mysql
// without wrapping them in a transaction
AUTOCOMMIT = 6;
}

TransactionIsolation transaction_isolation = 9;
Expand Down
Loading