Skip to content

Commit 5042c49

Browse files
authored
Implement scanner API for JDBC transactions (#2702)
1 parent 8ea08fa commit 5042c49

File tree

9 files changed

+748
-20
lines changed

9 files changed

+748
-20
lines changed

core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,4 @@ public void abort_forOngoingTransaction_ShouldAbortCorrectly() {}
4242
@Override
4343
@Test
4444
public void rollback_forOngoingTransaction_ShouldRollbackCorrectly() {}
45-
46-
@Disabled("Implement later")
47-
@Override
48-
@Test
49-
public void getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {}
50-
51-
@Disabled("Implement later")
52-
@Override
53-
@Test
54-
public void manager_getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {}
5545
}

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,6 +1182,8 @@ public enum CoreError implements ScalarDbError {
11821182
Category.INTERNAL_ERROR, "0052", "Failed to read JSON file. Details: %s.", "", ""),
11831183
DATA_LOADER_JSONLINES_FILE_READ_FAILED(
11841184
Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""),
1185+
JDBC_TRANSACTION_GETTING_SCANNER_FAILED(
1186+
Category.INTERNAL_ERROR, "0054", "Getting the scanner failed. Details: %s", "", ""),
11851187

11861188
//
11871189
// Errors for the unknown transaction status error category

core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,14 @@ public Optional<Result> get(Get get, Connection connection)
9696
}
9797
}
9898

99-
@SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE")
10099
public Scanner getScanner(Scan scan, Connection connection)
101100
throws SQLException, ExecutionException {
101+
return getScanner(scan, connection, true);
102+
}
103+
104+
@SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE")
105+
public Scanner getScanner(Scan scan, Connection connection, boolean closeConnectionOnScannerClose)
106+
throws SQLException, ExecutionException {
102107
operationChecker.check(scan);
103108

104109
TableMetadata tableMetadata = tableMetadataManager.getTableMetadata(scan);
@@ -111,7 +116,8 @@ public Scanner getScanner(Scan scan, Connection connection)
111116
new ResultInterpreter(scan.getProjections(), tableMetadata, rdbEngine),
112117
connection,
113118
preparedStatement,
114-
resultSet);
119+
resultSet,
120+
closeConnectionOnScannerClose);
115121
}
116122

117123
public List<Result> scan(Scan scan, Connection connection)

core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,20 @@ public class ScannerImpl extends AbstractScanner {
2525
private final Connection connection;
2626
private final PreparedStatement preparedStatement;
2727
private final ResultSet resultSet;
28+
private final boolean closeConnectionOnClose;
2829

2930
@SuppressFBWarnings("EI_EXPOSE_REP2")
3031
public ScannerImpl(
3132
ResultInterpreter resultInterpreter,
3233
Connection connection,
3334
PreparedStatement preparedStatement,
34-
ResultSet resultSet) {
35+
ResultSet resultSet,
36+
boolean closeConnectionOnClose) {
3537
this.resultInterpreter = Objects.requireNonNull(resultInterpreter);
3638
this.connection = Objects.requireNonNull(connection);
3739
this.preparedStatement = Objects.requireNonNull(preparedStatement);
3840
this.resultSet = Objects.requireNonNull(resultSet);
41+
this.closeConnectionOnClose = closeConnectionOnClose;
3942
}
4043

4144
@Override
@@ -75,10 +78,13 @@ public void close() {
7578
} catch (SQLException e) {
7679
logger.warn("Failed to close the preparedStatement", e);
7780
}
78-
try {
79-
connection.close();
80-
} catch (SQLException e) {
81-
logger.warn("Failed to close the connection", e);
81+
82+
if (closeConnectionOnClose) {
83+
try {
84+
connection.close();
85+
} catch (SQLException e) {
86+
logger.warn("Failed to close the connection", e);
87+
}
8288
}
8389
}
8490
}

core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransaction.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.scalar.db.api.UpdateIfExists;
2121
import com.scalar.db.api.Upsert;
2222
import com.scalar.db.common.AbstractDistributedTransaction;
23+
import com.scalar.db.common.AbstractTransactionCrudOperableScanner;
2324
import com.scalar.db.common.error.CoreError;
2425
import com.scalar.db.exception.storage.ExecutionException;
2526
import com.scalar.db.exception.transaction.CommitConflictException;
@@ -32,6 +33,7 @@
3233
import com.scalar.db.storage.jdbc.JdbcService;
3334
import com.scalar.db.storage.jdbc.RdbEngineStrategy;
3435
import com.scalar.db.util.ScalarDbUtils;
36+
import java.io.IOException;
3537
import java.sql.Connection;
3638
import java.sql.SQLException;
3739
import java.util.List;
@@ -96,7 +98,46 @@ public List<Result> scan(Scan scan) throws CrudException {
9698

9799
@Override
98100
public Scanner getScanner(Scan scan) throws CrudException {
99-
throw new UnsupportedOperationException("Implement later");
101+
scan = copyAndSetTargetToIfNot(scan);
102+
103+
com.scalar.db.api.Scanner scanner;
104+
try {
105+
scanner = jdbcService.getScanner(scan, connection, false);
106+
} catch (SQLException e) {
107+
throw createCrudException(
108+
e, CoreError.JDBC_TRANSACTION_GETTING_SCANNER_FAILED.buildMessage(e.getMessage()));
109+
} catch (ExecutionException e) {
110+
throw new CrudException(e.getMessage(), e, txId);
111+
}
112+
113+
return new AbstractTransactionCrudOperableScanner() {
114+
@Override
115+
public Optional<Result> one() throws CrudException {
116+
try {
117+
return scanner.one();
118+
} catch (ExecutionException e) {
119+
throw new CrudException(e.getMessage(), e, txId);
120+
}
121+
}
122+
123+
@Override
124+
public List<Result> all() throws CrudException {
125+
try {
126+
return scanner.all();
127+
} catch (ExecutionException e) {
128+
throw new CrudException(e.getMessage(), e, txId);
129+
}
130+
}
131+
132+
@Override
133+
public void close() throws CrudException {
134+
try {
135+
scanner.close();
136+
} catch (IOException e) {
137+
throw new CrudException(e.getMessage(), e, txId);
138+
}
139+
}
140+
};
100141
}
101142

102143
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */

core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
import com.scalar.db.api.Result;
1313
import com.scalar.db.api.Scan;
1414
import com.scalar.db.api.SerializableStrategy;
15+
import com.scalar.db.api.TransactionCrudOperable;
1516
import com.scalar.db.api.TransactionState;
1617
import com.scalar.db.api.Update;
1718
import com.scalar.db.api.Upsert;
1819
import com.scalar.db.common.AbstractDistributedTransactionManager;
20+
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
1921
import com.scalar.db.common.TableMetadataManager;
2022
import com.scalar.db.common.checker.OperationChecker;
2123
import com.scalar.db.common.error.CoreError;
@@ -38,6 +40,7 @@
3840
import java.util.List;
3941
import java.util.Optional;
4042
import java.util.UUID;
43+
import java.util.concurrent.atomic.AtomicBoolean;
4144
import javax.annotation.concurrent.ThreadSafe;
4245
import org.apache.commons.dbcp2.BasicDataSource;
4346
import org.slf4j.Logger;
@@ -170,9 +173,93 @@ public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStat
170173

171174
@Override
172175
public Scanner getScanner(Scan scan) throws CrudException {
173-
throw new UnsupportedOperationException("Implement later");
176+
DistributedTransaction transaction;
177+
try {
178+
transaction = begin();
179+
} catch (TransactionNotFoundException e) {
180+
throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
181+
} catch (TransactionException e) {
182+
throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null));
183+
}
184+
185+
TransactionCrudOperable.Scanner scanner;
186+
try {
187+
scanner = transaction.getScanner(copyAndSetTargetToIfNot(scan));
188+
} catch (CrudException e) {
189+
rollbackTransaction(transaction);
190+
throw e;
191+
}
192+
193+
return new AbstractTransactionManagerCrudOperableScanner() {
194+
195+
private final AtomicBoolean closed = new AtomicBoolean();
196+
197+
@Override
198+
public Optional<Result> one() throws CrudException {
199+
try {
200+
return scanner.one();
201+
} catch (CrudException e) {
202+
closed.set(true);
203+
204+
try {
205+
scanner.close();
206+
} catch (CrudException ex) {
207+
e.addSuppressed(ex);
208+
}
209+
210+
rollbackTransaction(transaction);
211+
throw e;
212+
}
213+
}
214+
215+
@Override
216+
public List<Result> all() throws CrudException {
217+
try {
218+
return scanner.all();
219+
} catch (CrudException e) {
220+
closed.set(true);
221+
222+
try {
223+
scanner.close();
224+
} catch (CrudException ex) {
225+
e.addSuppressed(ex);
226+
}
227+
228+
rollbackTransaction(transaction);
229+
throw e;
230+
}
231+
}
232+
233+
@Override
234+
public void close() throws CrudException, UnknownTransactionStatusException {
235+
if (closed.get()) {
236+
return;
237+
}
238+
closed.set(true);
239+
240+
try {
241+
scanner.close();
242+
} catch (CrudException e) {
243+
rollbackTransaction(transaction);
244+
throw e;
245+
}
246+
247+
try {
248+
transaction.commit();
249+
} catch (CommitConflictException e) {
250+
rollbackTransaction(transaction);
251+
throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
252+
} catch (UnknownTransactionStatusException e) {
253+
throw e;
254+
} catch (TransactionException e) {
255+
rollbackTransaction(transaction);
256+
throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null));
257+
}
258+
}
259+
};
174260
}
175261

262+
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
176263
@Deprecated
177264
@Override
178265
public void put(Put put) throws CrudException, UnknownTransactionStatusException {
@@ -183,6 +270,7 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException
183270
});
184271
}
185272

273+
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
186274
@Deprecated
187275
@Override
188276
public void put(List<Put> puts) throws CrudException, UnknownTransactionStatusException {
@@ -229,6 +317,7 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus
229317
});
230318
}
231319

320+
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
232321
@Deprecated
233322
@Override
234323
public void delete(List<Delete> deletes) throws CrudException, UnknownTransactionStatusException {

core/src/test/java/com/scalar/db/storage/jdbc/JdbcDatabaseTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public void whenGetOperationExecuted_shouldCallJdbcService() throws Exception {
9898
public void whenScanOperationExecutedAndScannerClosed_shouldCallJdbcService() throws Exception {
9999
// Arrange
100100
when(jdbcService.getScanner(any(), any()))
101-
.thenReturn(new ScannerImpl(resultInterpreter, connection, preparedStatement, resultSet));
101+
.thenReturn(
102+
new ScannerImpl(resultInterpreter, connection, preparedStatement, resultSet, true));
102103

103104
// Act
104105
Scan scan = new Scan(new Key("p1", "val")).forNamespace(NAMESPACE).forTable(TABLE);

0 commit comments

Comments
 (0)