Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spanner: Block nested transactions #3628

Merged
merged 7 commits into from
Sep 19, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google-cloud-clients/google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
</parent>
<properties>
<site.installationModule>google-cloud-spanner</site.installationModule>
<skipUnitTests>${skipTests}</skipUnitTests>
snehashah16 marked this conversation as resolved.
Show resolved Hide resolved
</properties>
<build>
<plugins>
Expand All @@ -27,6 +28,7 @@
<version>2.12.4</version>
<configuration>
<excludedGroups>com.google.cloud.spanner.IntegrationTest</excludedGroups>
<skipTests>${skipUnitTests}</skipTests>
snehashah16 marked this conversation as resolved.
Show resolved Hide resolved
<reportNameSuffix>sponge_log</reportNameSuffix>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@ public <T> T run(TransactionCallable<T> callable) {
public Timestamp getCommitTimestamp() {
return runner.getCommitTimestamp();
}

@Override
public TransactionRunner allowNestedTransaction() {
runner.allowNestedTransaction();
return runner;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;

import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractList;
Expand Down Expand Up @@ -129,6 +128,19 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
private static final String QUERY = "CloudSpannerOperation.ExecuteStreamingQuery";
private static final String READ = "CloudSpannerOperation.ExecuteStreamingRead";

private static final ThreadLocal<Boolean> hasPendingTransaction = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};

private static void throwIfTransactionsPending() {
if (hasPendingTransaction.get() == Boolean.TRUE) {
throw newSpannerException(ErrorCode.INTERNAL, "Nested transactions are not supported");
}
}

static {
TraceUtil.exportSpans(CREATE_SESSION, DELETE_SESSION, BEGIN_TRANSACTION, COMMIT, QUERY, READ);
}
Expand Down Expand Up @@ -905,6 +917,8 @@ TransactionContextImpl newTransaction() {
}

<T extends SessionTransaction> T setActive(@Nullable T ctx) {
throwIfTransactionsPending();

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


if (activeTransaction != null) {
activeTransaction.invalidate();
}
Expand Down Expand Up @@ -1209,6 +1223,7 @@ public void execute(Runnable command) {

@VisibleForTesting
static class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
private boolean blockNestedTxn = true;

/** Allow for testing of backoff logic */
static class Sleeper {
Expand All @@ -1223,6 +1238,11 @@ void backoffSleep(Context context, long backoffMillis) {
private TransactionContextImpl txn;
private volatile boolean isValid = true;

public TransactionRunner allowNestedTransaction() {
blockNestedTxn = false;
return this;
}

TransactionRunnerImpl(
SessionImpl session, SpannerRpc rpc, Sleeper sleeper, int defaultPrefetchChunks) {
this.session = session;
Expand All @@ -1239,11 +1259,19 @@ void backoffSleep(Context context, long backoffMillis) {
@Override
public <T> T run(TransactionCallable<T> callable) {
try (Scope s = tracer.withSpan(span)) {
if (blockNestedTxn) {
hasPendingTransaction.set(Boolean.TRUE);
}

return runInternal(callable);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
} finally {
if (blockNestedTxn) {
hasPendingTransaction.set(Boolean.FALSE);

This comment was marked as spam.

This comment was marked as spam.

}

span.end();
}
}
Expand Down Expand Up @@ -1660,6 +1688,8 @@ ByteString getTransactionId() {
}

void initTransaction() {
throwIfTransactionsPending();

// Since we only support synchronous calls, just block on "txnLock" while the RPC is in
// flight. Note that we use the strategy of sending an explicit BeginTransaction() RPC,
// rather than using the first read in the transaction to begin it implicitly. The chosen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,23 @@ interface TransactionCallable<T> {
* {@link #run(TransactionCallable)} has returned normally.
*/
Timestamp getCommitTimestamp();

/**
* Allows overriding the default behaviour of blocking nested transactions.
*
* Note that the client library does not maintain any information regarding the nesting structure.
* If an outer transaction fails and an inner transaction succeeds, upon retry of the outer
* transaction, the inner transaction will be re-executed.
*
* Use with care when certain that the inner transaction is idempotent. Avoid doing this when
* accessing the same db. There might be legitimate uses where access need to be made across DBs
* for instance.
*
* E.g. of nesting that is discouraged, see {@code nestedReadWriteTxnThrows}
* {@code nestedReadOnlyTxnThrows}, {@code nestedBatchTxnThrows},
* {@code nestedSingleUseReadTxnThrows}
*
* @return this object
*/
TransactionRunner allowNestedTransaction();

This comment was marked as spam.

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.BatchClient;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.IntegrationTest;
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReadContext;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -349,4 +353,126 @@ public Void run(TransactionContext transaction) throws SpannerException {
.getLong(0))
.isEqualTo(2);
}

private void doNestedRwTransaction() {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
return null;
}
});

return null;
}
});
}

@Test
public void nestedReadWriteTxnThrows() {
try {
doNestedRwTransaction();
fail("Expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL);
assertThat(e.getMessage()).contains("not supported");
}
}

@Test
public void nestedReadOnlyTxnThrows() {
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
client
.readOnlyTransaction()
.getReadTimestamp();

return null;
}
});
fail("Expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL);
assertThat(e.getMessage()).contains("not supported");
}
}

@Test
public void nestedBatchTxnThrows() {
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
BatchClient batchClient = env.getTestHelper().getBatchClient(db);
BatchReadOnlyTransaction batchTxn = batchClient
.batchReadOnlyTransaction(TimestampBound.strong());
batchTxn.partitionReadUsingIndex(
PartitionOptions.getDefaultInstance(),
"Test",
"Index",
KeySet.all(),
Arrays.asList("Fingerprint"));

return null;
}
});
fail("Expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL);
assertThat(e.getMessage()).contains("not supported");
}
}

@Test
public void nestedSingleUseReadTxnThrows() {
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
client.singleUseReadOnlyTransaction();

return null;
}
});
fail("Expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL);
assertThat(e.getMessage()).contains("not supported");
}
}

@Test
public void nestedTxnSucceedsWhenAllowed() {
client
.readWriteTransaction()
.allowNestedTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
client.singleUseReadOnlyTransaction();

return null;
}
});
}
}