Skip to content

Commit bdb959c

Browse files
committed
Implement scanner API for Consensus Commit
1 parent e4db961 commit bdb959c

File tree

16 files changed

+1908
-96
lines changed

16 files changed

+1908
-96
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,18 @@ public enum CoreError implements ScalarDbError {
911911
Category.USER_ERROR, "0203", "Delimiter must not be null", "", ""),
912912
DATA_LOADER_CONFIG_FILE_PATH_BLANK(
913913
Category.USER_ERROR, "0204", "Config file path must not be blank", "", ""),
914+
CONSENSUS_COMMIT_SCANNER_NOT_CLOSED(
915+
Category.USER_ERROR,
916+
"0205",
917+
"Some scanners were not closed. All scanners must be closed before committing the transaction.",
918+
"",
919+
""),
920+
TWO_PHASE_CONSENSUS_COMMIT_SCANNER_NOT_CLOSED(
921+
Category.USER_ERROR,
922+
"0206",
923+
"Some scanners were not closed. All scanners must be closed before preparing the transaction.",
924+
"",
925+
""),
914926

915927
//
916928
// Errors for the concurrency error category

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import com.scalar.db.exception.transaction.UnsatisfiedConditionException;
2525
import com.scalar.db.util.ScalarDbUtils;
2626
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
27+
import java.util.Iterator;
2728
import java.util.List;
2829
import java.util.Optional;
30+
import javax.annotation.Nonnull;
2931
import javax.annotation.Nullable;
3032
import javax.annotation.concurrent.NotThreadSafe;
3133
import org.slf4j.Logger;
@@ -98,7 +100,41 @@ public List<Result> scan(Scan scan) throws CrudException {
98100

99101
@Override
100102
public Scanner getScanner(Scan scan) throws CrudException {
101-
throw new UnsupportedOperationException("Implement later");
103+
scan = copyAndSetTargetToIfNot(scan);
104+
Scanner scanner = crud.getScanner(scan);
105+
106+
return new Scanner() {
107+
@Override
108+
public Optional<Result> one() throws CrudException {
109+
try {
110+
return scanner.one();
111+
} catch (UncommittedRecordException e) {
112+
lazyRecovery(e);
113+
throw e;
114+
}
115+
}
116+
117+
@Override
118+
public List<Result> all() throws CrudException {
119+
try {
120+
return scanner.all();
121+
} catch (UncommittedRecordException e) {
122+
lazyRecovery(e);
123+
throw e;
124+
}
125+
}
126+
127+
@Override
128+
public void close() throws CrudException {
129+
scanner.close();
130+
}
131+
132+
@Nonnull
133+
@Override
134+
public Iterator<Result> iterator() {
135+
return scanner.iterator();
136+
}
137+
};
102138
}
103139

104140
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -213,6 +249,10 @@ public void mutate(List<? extends Mutation> mutations) throws CrudException {
213249

214250
@Override
215251
public void commit() throws CommitException, UnknownTransactionStatusException {
252+
if (!crud.areAllScannersClosed()) {
253+
throw new IllegalStateException(CoreError.CONSENSUS_COMMIT_SCANNER_NOT_CLOSED.buildMessage());
254+
}
255+
216256
// Execute implicit pre-read
217257
try {
218258
crud.readIfImplicitPreReadEnabled();
@@ -234,6 +274,12 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
234274

235275
@Override
236276
public void rollback() {
277+
try {
278+
crud.closeScanners();
279+
} catch (CrudException e) {
280+
logger.warn("Failed to close the scanner", e);
281+
}
282+
237283
if (groupCommitter != null) {
238284
groupCommitter.remove(crud.getSnapshot().getId());
239285
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
import com.scalar.db.api.Put;
1717
import com.scalar.db.api.Result;
1818
import com.scalar.db.api.Scan;
19+
import com.scalar.db.api.TransactionCrudOperable;
1920
import com.scalar.db.api.TransactionState;
2021
import com.scalar.db.api.Update;
2122
import com.scalar.db.api.Upsert;
2223
import com.scalar.db.common.AbstractDistributedTransactionManager;
24+
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
2325
import com.scalar.db.config.DatabaseConfig;
2426
import com.scalar.db.exception.transaction.CommitConflictException;
2527
import com.scalar.db.exception.transaction.CrudConflictException;
@@ -34,6 +36,7 @@
3436
import java.util.List;
3537
import java.util.Optional;
3638
import java.util.UUID;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3740
import javax.annotation.Nullable;
3841
import javax.annotation.concurrent.ThreadSafe;
3942
import org.slf4j.Logger;
@@ -231,9 +234,86 @@ public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStat
231234

232235
@Override
233236
public Scanner getScanner(Scan scan) throws CrudException {
234-
throw new UnsupportedOperationException("Implement later");
237+
DistributedTransaction transaction = begin();
238+
239+
TransactionCrudOperable.Scanner scanner;
240+
try {
241+
scanner = transaction.getScanner(copyAndSetTargetToIfNot(scan));
242+
} catch (CrudException e) {
243+
rollbackTransaction(transaction);
244+
throw e;
245+
}
246+
247+
return new AbstractTransactionManagerCrudOperableScanner() {
248+
249+
private final AtomicBoolean closed = new AtomicBoolean();
250+
251+
@Override
252+
public Optional<Result> one() throws CrudException {
253+
try {
254+
return scanner.one();
255+
} catch (CrudException e) {
256+
closed.set(true);
257+
258+
try {
259+
scanner.close();
260+
} catch (CrudException ex) {
261+
e.addSuppressed(ex);
262+
}
263+
264+
rollbackTransaction(transaction);
265+
throw e;
266+
}
267+
}
268+
269+
@Override
270+
public List<Result> all() throws CrudException {
271+
try {
272+
return scanner.all();
273+
} catch (CrudException e) {
274+
closed.set(true);
275+
276+
try {
277+
scanner.close();
278+
} catch (CrudException ex) {
279+
e.addSuppressed(ex);
280+
}
281+
282+
rollbackTransaction(transaction);
283+
throw e;
284+
}
285+
}
286+
287+
@Override
288+
public void close() throws CrudException, UnknownTransactionStatusException {
289+
if (closed.get()) {
290+
return;
291+
}
292+
closed.set(true);
293+
294+
try {
295+
scanner.close();
296+
} catch (CrudException e) {
297+
rollbackTransaction(transaction);
298+
throw e;
299+
}
300+
301+
try {
302+
transaction.commit();
303+
} catch (CommitConflictException e) {
304+
rollbackTransaction(transaction);
305+
throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
306+
} catch (UnknownTransactionStatusException e) {
307+
throw e;
308+
} catch (TransactionException e) {
309+
rollbackTransaction(transaction);
310+
throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null));
311+
}
312+
}
313+
};
235314
}
236315

316+
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
237317
@Deprecated
238318
@Override
239319
public void put(Put put) throws CrudException, UnknownTransactionStatusException {
@@ -244,6 +324,7 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException
244324
});
245325
}
246326

327+
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
247328
@Deprecated
248329
@Override
249330
public void put(List<Put> puts) throws CrudException, UnknownTransactionStatusException {
@@ -290,6 +371,7 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus
290371
});
291372
}
292373

374+
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
293375
@Deprecated
294376
@Override
295377
public void delete(List<Delete> deletes) throws CrudException, UnknownTransactionStatusException {

0 commit comments

Comments
 (0)