Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,4 @@ public void abort_forOngoingTransaction_ShouldAbortCorrectly() {}
@Override
@Test
public void rollback_forOngoingTransaction_ShouldRollbackCorrectly() {}

@Disabled("Implement later")
@Override
@Test
public void getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {}

@Disabled("Implement later")
@Override
@Test
public void manager_getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {}
}
2 changes: 2 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,8 @@ public enum CoreError implements ScalarDbError {
Category.INTERNAL_ERROR, "0052", "Failed to read JSON file. Details: %s.", "", ""),
DATA_LOADER_JSONLINES_FILE_READ_FAILED(
Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""),
JDBC_TRANSACTION_GETTING_SCANNER_FAILED(
Category.INTERNAL_ERROR, "0054", "Getting the scanner failed. Details: %s", "", ""),

//
// Errors for the unknown transaction status error category
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ public Optional<Result> get(Get get, Connection connection)
}
}

@SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE")
public Scanner getScanner(Scan scan, Connection connection)
throws SQLException, ExecutionException {
return getScanner(scan, connection, true);
}

@SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE")
public Scanner getScanner(Scan scan, Connection connection, boolean closeConnectionOnScannerClose)
throws SQLException, ExecutionException {
operationChecker.check(scan);

TableMetadata tableMetadata = tableMetadataManager.getTableMetadata(scan);
Expand All @@ -111,7 +116,8 @@ public Scanner getScanner(Scan scan, Connection connection)
new ResultInterpreter(scan.getProjections(), tableMetadata, rdbEngine),
connection,
preparedStatement,
resultSet);
resultSet,
closeConnectionOnScannerClose);
}

public List<Result> scan(Scan scan, Connection connection)
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ public class ScannerImpl extends AbstractScanner {
private final Connection connection;
private final PreparedStatement preparedStatement;
private final ResultSet resultSet;
private final boolean closeConnectionOnClose;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public ScannerImpl(
ResultInterpreter resultInterpreter,
Connection connection,
PreparedStatement preparedStatement,
ResultSet resultSet) {
ResultSet resultSet,
boolean closeConnectionOnClose) {
this.resultInterpreter = Objects.requireNonNull(resultInterpreter);
this.connection = Objects.requireNonNull(connection);
this.preparedStatement = Objects.requireNonNull(preparedStatement);
this.resultSet = Objects.requireNonNull(resultSet);
this.closeConnectionOnClose = closeConnectionOnClose;
}

@Override
Expand Down Expand Up @@ -75,10 +78,13 @@ public void close() {
} catch (SQLException e) {
logger.warn("Failed to close the preparedStatement", e);
}
try {
connection.close();
} catch (SQLException e) {
logger.warn("Failed to close the connection", e);

if (closeConnectionOnClose) {
try {
connection.close();
} catch (SQLException e) {
logger.warn("Failed to close the connection", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.scalar.db.api.UpdateIfExists;
import com.scalar.db.api.Upsert;
import com.scalar.db.common.AbstractDistributedTransaction;
import com.scalar.db.common.AbstractTransactionCrudOperableScanner;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CommitConflictException;
Expand All @@ -32,6 +33,7 @@
import com.scalar.db.storage.jdbc.JdbcService;
import com.scalar.db.storage.jdbc.RdbEngineStrategy;
import com.scalar.db.util.ScalarDbUtils;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
Expand Down Expand Up @@ -96,7 +98,46 @@ public List<Result> scan(Scan scan) throws CrudException {

@Override
public Scanner getScanner(Scan scan) throws CrudException {
throw new UnsupportedOperationException("Implement later");
scan = copyAndSetTargetToIfNot(scan);

com.scalar.db.api.Scanner scanner;
try {
scanner = jdbcService.getScanner(scan, connection, false);
} catch (SQLException e) {
throw createCrudException(
e, CoreError.JDBC_TRANSACTION_GETTING_SCANNER_FAILED.buildMessage(e.getMessage()));
} catch (ExecutionException e) {
throw new CrudException(e.getMessage(), e, txId);
}

return new AbstractTransactionCrudOperableScanner() {
@Override
public Optional<Result> one() throws CrudException {
try {
return scanner.one();
} catch (ExecutionException e) {
throw new CrudException(e.getMessage(), e, txId);
}
}

@Override
public List<Result> all() throws CrudException {
try {
return scanner.all();
} catch (ExecutionException e) {
throw new CrudException(e.getMessage(), e, txId);
}
}

@Override
public void close() throws CrudException {
try {
scanner.close();
} catch (IOException e) {
throw new CrudException(e.getMessage(), e, txId);
}
}
};
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.SerializableStrategy;
import com.scalar.db.api.TransactionCrudOperable;
import com.scalar.db.api.TransactionState;
import com.scalar.db.api.Update;
import com.scalar.db.api.Upsert;
import com.scalar.db.common.AbstractDistributedTransactionManager;
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
import com.scalar.db.common.TableMetadataManager;
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.common.error.CoreError;
Expand All @@ -38,6 +40,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.dbcp2.BasicDataSource;
import org.slf4j.Logger;
Expand Down Expand Up @@ -170,9 +173,93 @@ public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStat

@Override
public Scanner getScanner(Scan scan) throws CrudException {
throw new UnsupportedOperationException("Implement later");
DistributedTransaction transaction;
try {
transaction = begin();
} catch (TransactionNotFoundException e) {
throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (TransactionException e) {
throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null));
}

TransactionCrudOperable.Scanner scanner;
try {
scanner = transaction.getScanner(copyAndSetTargetToIfNot(scan));
} catch (CrudException e) {
rollbackTransaction(transaction);
throw e;
}

return new AbstractTransactionManagerCrudOperableScanner() {

private final AtomicBoolean closed = new AtomicBoolean();

@Override
public Optional<Result> one() throws CrudException {
try {
return scanner.one();
} catch (CrudException e) {
closed.set(true);

try {
scanner.close();
} catch (CrudException ex) {
e.addSuppressed(ex);
}

rollbackTransaction(transaction);
throw e;
}
}

@Override
public List<Result> all() throws CrudException {
try {
return scanner.all();
} catch (CrudException e) {
closed.set(true);
Copy link
Contributor

@komamitsu komamitsu May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] How about moving this after scanner.close() so that calling closed() close() method can close scanner even if calling scanner.close() throws an exception?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If scanner.close() throws an exception, we catch it:
https://github.com/scalar-labs/scalardb/pull/2702/files#diff-ca47f7038a18b284fd927596315c778f3c0dc5258d0ae101bb73c158c64e4917R224

Do you still think it’s necessary to move closed.set(true)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only thought in the current implementation there is no way to re-try to close scanner by calling close(), since the flag is set to true before scanner.close() finishes successfully. The concern was not based on actual usages, and probably never mind.


try {
scanner.close();
} catch (CrudException ex) {
e.addSuppressed(ex);
}

rollbackTransaction(transaction);
throw e;
}
}

@Override
public void close() throws CrudException, UnknownTransactionStatusException {
if (closed.get()) {
return;
}
closed.set(true);

try {
scanner.close();
} catch (CrudException e) {
rollbackTransaction(transaction);
throw e;
}

try {
transaction.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CrudException is thrown from one() or all(), only scanner is closed and closed is set to true. In this case, it seems calling this close() method won't close this transaction?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CrudException is thrown from one() or all(), we call rollbackTransaction(transaction) to roll back the transaction. After that, even if a user call the close() method, the method does nothing. Is there any problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Sounds good!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confused this with com.scalar.db.transaction.jdbc.JdbcTransaction#getScanner...

} catch (CommitConflictException e) {
rollbackTransaction(transaction);
throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (UnknownTransactionStatusException e) {
throw e;
} catch (TransactionException e) {
rollbackTransaction(transaction);
throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null));
}
}
};
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@Deprecated
@Override
public void put(Put put) throws CrudException, UnknownTransactionStatusException {
Expand All @@ -183,6 +270,7 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException
});
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@Deprecated
@Override
public void put(List<Put> puts) throws CrudException, UnknownTransactionStatusException {
Expand Down Expand Up @@ -229,6 +317,7 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus
});
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@Deprecated
@Override
public void delete(List<Delete> deletes) throws CrudException, UnknownTransactionStatusException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public void whenGetOperationExecuted_shouldCallJdbcService() throws Exception {
public void whenScanOperationExecutedAndScannerClosed_shouldCallJdbcService() throws Exception {
// Arrange
when(jdbcService.getScanner(any(), any()))
.thenReturn(new ScannerImpl(resultInterpreter, connection, preparedStatement, resultSet));
.thenReturn(
new ScannerImpl(resultInterpreter, connection, preparedStatement, resultSet, true));

// Act
Scan scan = new Scan(new Key("p1", "val")).forNamespace(NAMESPACE).forTable(TABLE);
Expand Down
Loading