Skip to content

Commit

Permalink
feat: add fallback to PDML mode (#1841)
Browse files Browse the repository at this point in the history
DML statements that are executed in auto-commit mode can use either
atomic transactions, or partitioned non-atomic transactions. The
former is bound by the mutation limits in Spanner. The latter may
update/delete any number of rows.

The transaction type that is used to execute DML statements in
auto-commit mode is determined by the connection variable
`autocommit_dml_mode`. This connection variable now supports a
third value. The supported values are:

  - TRANSACTIONAL (default): Uses atomic read/write transactions.
  - PARTITIONED_NON_ATOMIC: Use Partitioned DML for DML statements in auto-commit mode. Use this mode
    to execute DML statements that exceed the transaction mutation limit in Spanner.
  - TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC: Execute DML statements using atomic read/write
    transactions. If this fails because the mutation limit on Spanner has been exceeded, the DML statement
    is retried using a Partitioned DML transaction.
  • Loading branch information
olavloite authored Dec 4, 2024
1 parent 3cd9cd6 commit 1e81863
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 0 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ these can also be supplied in a Properties instance that is passed to the
- maxSessions (int): Sets the maximum number of sessions in the backing session pool. Defaults to 400.
- numChannels (int): Sets the number of gRPC channels to use. Defaults to 4.
- retryAbortsInternally (boolean): The JDBC driver will by default automatically retry aborted transactions internally. This is done by keeping track of all statements and the results of these during a transaction, and if the transaction is aborted by Cloud Spanner, it will replay the statements on a new transaction and compare the results with the initial attempt. Disable this option if you want to handle aborted transactions in your own application.
- autocommit_dml_mode (string): Determines the transaction type that is used to execute
[DML statements](https://cloud.google.com/spanner/docs/dml-tasks#using-dml) when the connection is
in auto-commit mode. The following values are supported:
- TRANSACTIONAL (default): Uses atomic read/write transactions.
- PARTITIONED_NON_ATOMIC: Use Partitioned DML for DML statements in auto-commit mode. Use this mode
to execute DML statements that exceed the transaction mutation limit in Spanner.
- TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC: Execute DML statements using atomic read/write
transactions. If this fails because the mutation limit on Spanner has been exceeded, the DML statement
is retried using a Partitioned DML transaction.
- auto_batch_dml (boolean): Automatically buffer DML statements and execute them as one batch,
instead of executing them on Spanner directly. The buffered DML statements are executed on Spanner
in one batch when a query is executed, or when the transaction is committed. This option can for
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.jdbc;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.TransactionMutationLimitExceededException;
import com.google.cloud.spanner.connection.AbstractMockServerTest;
import com.google.cloud.spanner.connection.AutocommitDmlMode;
import com.google.cloud.spanner.connection.SpannerPool;
import com.google.protobuf.Any;
import com.google.rpc.Help;
import com.google.rpc.Help.Link;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class FallbackToPartitionedDMLMockServerTest extends AbstractMockServerTest {

static StatusRuntimeException createTransactionMutationLimitExceededException() {
Metadata.Key<byte[]> key =
Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER);
Help help =
Help.newBuilder()
.addLinks(
Link.newBuilder()
.setDescription("Cloud Spanner limits documentation.")
.setUrl("https://cloud.google.com/spanner/docs/limits")
.build())
.build();
com.google.rpc.Status status =
com.google.rpc.Status.newBuilder().addDetails(Any.pack(help)).build();

Metadata trailers = new Metadata();
trailers.put(key, status.toByteArray());

return Status.INVALID_ARGUMENT
.withDescription("The transaction contains too many mutations.")
.asRuntimeException(trailers);
}

@Test
public void testConnectionProperty() throws SQLException {
for (AutocommitDmlMode mode : AutocommitDmlMode.values()) {
Properties properties = new Properties();
properties.put("autocommit_dml_mode", mode.name());
try (Connection connection =
DriverManager.getConnection("jdbc:" + getBaseUrl(), properties)) {
assertEquals(
mode, connection.unwrap(CloudSpannerJdbcConnection.class).getAutocommitDmlMode());
}
}
}

@Test
public void testTransactionMutationLimitExceeded_isNotRetriedByDefault() throws SQLException {
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofException(createTransactionMutationLimitExceededException()));

try (Connection connection = createJdbcConnection()) {
connection.setAutoCommit(true);
assertEquals(
AutocommitDmlMode.TRANSACTIONAL,
connection.unwrap(CloudSpannerJdbcConnection.class).getAutocommitDmlMode());

SQLException exception =
assertThrows(
SQLException.class,
() ->
connection.createStatement().executeUpdate("update test set value=1 where true"));
assertNotNull(exception.getCause());
assertEquals(
TransactionMutationLimitExceededException.class, exception.getCause().getClass());
TransactionMutationLimitExceededException transactionMutationLimitExceededException =
(TransactionMutationLimitExceededException) exception.getCause();
assertEquals(0, transactionMutationLimitExceededException.getSuppressed().length);
}
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class));
}

@Test
public void testTransactionMutationLimitExceeded_canBeRetriedAsPDML() throws SQLException {
String sql = "update test set value=1 where true";
com.google.cloud.spanner.Statement statement = com.google.cloud.spanner.Statement.of(sql);
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofException(createTransactionMutationLimitExceededException()));
mockSpanner.putStatementResult(
MockSpannerServiceImpl.StatementResult.update(statement, 100000L));

try (Connection connection = createJdbcConnection()) {
connection.setAutoCommit(true);
connection
.unwrap(CloudSpannerJdbcConnection.class)
.setAutocommitDmlMode(
AutocommitDmlMode.TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC);

long updateCount = connection.createStatement().executeUpdate(sql);
assertEquals(100000L, updateCount);
}
// Verify that the request is retried as Partitioned DML.
assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
// The transactional request uses inline-begin.
ExecuteSqlRequest transactionalRequest =
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
assertTrue(transactionalRequest.getTransaction().getBegin().hasReadWrite());

// Partitioned DML uses an explicit BeginTransaction RPC.
assertEquals(1, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
BeginTransactionRequest beginRequest =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class).get(0);
assertTrue(beginRequest.getOptions().hasPartitionedDml());
ExecuteSqlRequest partitionedDmlRequest =
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(1);
assertTrue(partitionedDmlRequest.getTransaction().hasId());

// Partitioned DML transactions are not committed.
assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class));
}

@Test
public void testTransactionMutationLimitExceeded_retryAsPDMLFails() throws SQLException {
String sql = "insert into test (id, value) select -id, value from test";
com.google.cloud.spanner.Statement statement = com.google.cloud.spanner.Statement.of(sql);
// The transactional update statement uses ExecuteSql(..).
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofException(createTransactionMutationLimitExceededException()));
mockSpanner.putStatementResult(
MockSpannerServiceImpl.StatementResult.exception(
statement,
Status.INVALID_ARGUMENT
.withDescription("This statement is not supported with Partitioned DML")
.asRuntimeException()));

try (Connection connection = createJdbcConnection()) {
connection.setAutoCommit(true);
connection
.unwrap(CloudSpannerJdbcConnection.class)
.setAutocommitDmlMode(
AutocommitDmlMode.TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC);

// The connection throws TransactionMutationLimitExceededException if the retry using
// partitioned DML fails. The exception from the failed retry is returned as a suppressed
// exception of the TransactionMutationLimitExceededException.
SQLException exception =
assertThrows(SQLException.class, () -> connection.createStatement().executeUpdate(sql));
assertNotNull(exception.getCause());
assertEquals(
TransactionMutationLimitExceededException.class, exception.getCause().getClass());
TransactionMutationLimitExceededException transactionMutationLimitExceededException =
(TransactionMutationLimitExceededException) exception.getCause();
assertEquals(1, transactionMutationLimitExceededException.getSuppressed().length);
assertEquals(
SpannerException.class,
transactionMutationLimitExceededException.getSuppressed()[0].getClass());
SpannerException spannerException =
(SpannerException) transactionMutationLimitExceededException.getSuppressed()[0];
assertEquals(ErrorCode.INVALID_ARGUMENT, spannerException.getErrorCode());
assertTrue(
spannerException.getMessage(),
spannerException
.getMessage()
.contains("This statement is not supported with Partitioned DML"));
}
// Verify that the request was retried as Partitioned DML.
assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
// The transactional request uses inline-begin.
ExecuteSqlRequest transactionalRequest =
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
assertTrue(transactionalRequest.getTransaction().getBegin().hasReadWrite());

// Partitioned DML uses an explicit BeginTransaction RPC.
assertEquals(1, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
BeginTransactionRequest beginRequest =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class).get(0);
assertTrue(beginRequest.getOptions().hasPartitionedDml());
ExecuteSqlRequest partitionedDmlRequest =
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(1);
assertTrue(partitionedDmlRequest.getTransaction().hasId());

// Partitioned DML transactions are not committed.
assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class));
}

@Test
public void testSqlStatements() throws SQLException {
for (Dialect dialect : Dialect.values()) {
SpannerPool.closeSpannerPool();
mockSpanner.putStatementResult(
MockSpannerServiceImpl.StatementResult.detectDialectResult(dialect));
String prefix = dialect == Dialect.POSTGRESQL ? "SPANNER." : "";

try (Connection connection = createJdbcConnection()) {
connection.setAutoCommit(true);
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(String.format("show variable %sautocommit_dml_mode", prefix))) {
assertTrue(resultSet.next());
assertEquals(
AutocommitDmlMode.TRANSACTIONAL.name(),
resultSet.getString(String.format("%sAUTOCOMMIT_DML_MODE", prefix)));
assertFalse(resultSet.next());
}
connection
.createStatement()
.execute(
String.format(
"set %sautocommit_dml_mode = 'transactional_with_fallback_to_partitioned_non_atomic'",
prefix));
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(String.format("show variable %sautocommit_dml_mode", prefix))) {
assertTrue(resultSet.next());
assertEquals(
AutocommitDmlMode.TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC.name(),
resultSet.getString(String.format("%sAUTOCOMMIT_DML_MODE", prefix)));
assertFalse(resultSet.next());
}
}
}
}
}

0 comments on commit 1e81863

Please sign in to comment.