diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java index 38fc1887a887..d0164717e158 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; import com.google.api.gax.longrunning.OperationFuture; -import com.google.cloud.spanner.BatchClient; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; @@ -45,12 +44,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; @@ -70,7 +65,6 @@ public class SpannerReadIT { private static final int MAX_DB_NAME_LENGTH = 30; - private static final int CLEANUP_PROPAGATION_DELAY_MS = 5000; @Rule public final transient TestPipeline p = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -275,55 +269,6 @@ public void testReadFailsBadTable() throws Exception { p.run().waitUntilFinish(); } - private static class CloseTransactionFn extends SimpleFunction { - private final SpannerConfig spannerConfig; - - private CloseTransactionFn(SpannerConfig spannerConfig) { - this.spannerConfig = spannerConfig; - } - - @Override - public Transaction apply(Transaction tx) { - BatchClient batchClient = SpannerAccessor.getOrCreate(spannerConfig).getBatchClient(); - batchClient.batchReadOnlyTransaction(tx.transactionId()).cleanup(); - try { - // Wait for cleanup to propagate. - Thread.sleep(CLEANUP_PROPAGATION_DELAY_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return tx; - } - } - - @Test - public void testReadFailsBadSession() throws Exception { - - thrown.expect(new SpannerWriteIT.StackTraceContainsString("SpannerException")); - thrown.expect(new SpannerWriteIT.StackTraceContainsString("NOT_FOUND: Session not found")); - - SpannerConfig spannerConfig = createSpannerConfig(); - - // This creates a transaction then closes the session. - // The (closed) transaction is then passed to SpannerIO.read() and should - // raise SessionNotFound errors. - PCollectionView tx = - p.apply("Transaction seed", Create.of(1)) - .apply( - "Create transaction", - ParDo.of(new CreateTransactionFn(spannerConfig, TimestampBound.strong()))) - .apply("Close Transaction", MapElements.via(new CloseTransactionFn(spannerConfig))) - .apply("As PCollectionView", View.asSingleton()); - p.apply( - "read db", - SpannerIO.read() - .withSpannerConfig(spannerConfig) - .withTable(options.getTable()) - .withColumns("Key", "Value") - .withTransaction(tx)); - p.run().waitUntilFinish(); - } - @Test public void testQuery() throws Exception { SpannerConfig spannerConfig = createSpannerConfig();