Skip to content

Commit dc89b4d

Browse files
feat: add option for multiplexed sessions with partitioned operations (#3635)
* chore(spanner): Release Multiplexed session for Partitioned Ops. * fix(spanner): Fix unit tests for Partitioned Query and Partitioned DML Multiplexed support launch. * fix(spanner): Fix multiplexed session environment variable validation. * fix(spanner): Enabled multiplexed session for partitioned ops in integration test. * fix(spanner): Disable multiplexed session for partitioned ops in emulator integration test. * chore(spanner): fix test * chore(spanner): fix test --------- Co-authored-by: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Co-authored-by: Sri Harsha CH <sriharshach@google.com>
1 parent bda78ed commit dc89b4d

File tree

5 files changed

+36
-11
lines changed

5 files changed

+36
-11
lines changed

.github/workflows/integration-tests-against-emulator-with-multiplexed-session.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,4 @@ jobs:
3939
env:
4040
JOB_TYPE: test
4141
SPANNER_EMULATOR_HOST: localhost:9010
42-
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
42+
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ public boolean getUseMultiplexedSessionForRW() {
360360
@VisibleForTesting
361361
@InternalApi
362362
public boolean getUseMultiplexedSessionPartitionedOps() {
363-
return useMultiplexedSessionForPartitionedOps;
363+
return getUseMultiplexedSession() && useMultiplexedSessionForPartitionedOps;
364364
}
365365

366366
private static Boolean getUseMultiplexedSessionFromEnvVariable() {
@@ -370,9 +370,7 @@ private static Boolean getUseMultiplexedSessionFromEnvVariable() {
370370
@VisibleForTesting
371371
@InternalApi
372372
protected static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() {
373-
// Checks the value of env, GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS
374-
// This returns null until Partitioned Operations is supported.
375-
return null;
373+
return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS");
376374
}
377375

378376
private static Boolean parseBooleanEnvVariable(String variableName) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public void setUp() {
102102
@SuppressWarnings("resource")
103103
SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions);
104104
client = new BatchClientImpl(spanner.getSessionClient(db), isMultiplexedSession);
105+
BatchClientImpl.unimplementedForPartitionedOps.set(false);
105106
}
106107

107108
@SuppressWarnings("unchecked")

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,13 @@ boolean isMultiplexedSessionsEnabled(Spanner spanner) {
328328
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession();
329329
}
330330

331+
boolean isMultiplexedSessionsEnabledForPartitionedOps(Spanner spanner) {
332+
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
333+
return false;
334+
}
335+
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps();
336+
}
337+
331338
boolean isMultiplexedSessionsEnabledForRW(Spanner spanner) {
332339
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
333340
return false;

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/PartitionedQueryMockServerTest.java

+25-6
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ public void testPartitionQuery() {
9393
assertFalse(resultSet.next());
9494
}
9595
}
96-
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
96+
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
97+
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
98+
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
9799
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
98100
} else {
99101
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
@@ -155,7 +157,9 @@ public void testMixNormalAndPartitionQueryInReadOnlyTransaction() {
155157
readTimestamps.add(connection.getReadTimestamp());
156158
connection.commit();
157159
}
158-
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
160+
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
161+
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
162+
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
159163
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
160164
} else {
161165
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
@@ -228,6 +232,10 @@ public void testRunPartition() {
228232
if (createSessionRequestCounts == expectedCreateSessionsRPC + 1) {
229233
isMultiplexedSessionCreated = true;
230234
}
235+
} else if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())
236+
&& isMultiplexedSessionCreated) {
237+
// When multiplexed session will be reused for each iteration.
238+
assertEquals(0, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
231239
} else {
232240
assertEquals(
233241
expectedCreateSessionsRPC,
@@ -261,6 +269,7 @@ public void testRunPartitionUsingSql() {
261269
String prefix = dialect == Dialect.POSTGRESQL ? "spanner." : "";
262270

263271
int maxPartitions = 5;
272+
boolean isMultiplexedSessionCreated = false;
264273
try (Connection connection = createConnection()) {
265274
connection.execute(Statement.of("set autocommit=true"));
266275
assertTrue(connection.isAutocommit());
@@ -284,7 +293,6 @@ public void testRunPartitionUsingSql() {
284293
assertFalse(resultSet.next());
285294
}
286295

287-
boolean isMultiplexedSessionCreated = false;
288296
for (boolean useLiteral : new boolean[] {true, false}) {
289297
try (ResultSet partitions =
290298
connection.executeQuery(
@@ -328,6 +336,10 @@ public void testRunPartitionUsingSql() {
328336
if (createSessionRequestCounts == expectedCreateSessionsRPC + 1) {
329337
isMultiplexedSessionCreated = true;
330338
}
339+
} else if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())
340+
&& isMultiplexedSessionCreated) {
341+
// When multiplexed session will be reused for each iteration.
342+
assertEquals(0, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
331343
} else {
332344
assertEquals(
333345
expectedCreateSessionsRPC,
@@ -570,7 +582,9 @@ public void testRunPartitionedQueryUsingSql() {
570582
assertEquals(maxPartitions * generatedRowCount, rowCount);
571583
}
572584
}
573-
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
585+
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
586+
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
587+
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
574588
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
575589
} else {
576590
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
@@ -679,7 +693,9 @@ public void testRunPartitionedQueryWithMaxParallelism() {
679693
assertEquals(maxPartitions * generatedRowCount, rowCount);
680694
}
681695
}
682-
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
696+
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
697+
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
698+
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
683699
assertEquals(6, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
684700
} else {
685701
assertEquals(5, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
@@ -758,7 +774,10 @@ public void testAutoPartitionMode() {
758774
exception
759775
.getMessage()
760776
.contains("Partition query is not supported for read/write transaction"));
761-
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
777+
778+
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
779+
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
780+
} else if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
762781
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
763782
} else {
764783
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));

0 commit comments

Comments
 (0)