diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java index 8553c8d..3adee19 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java @@ -3,6 +3,7 @@ import java.sql.SQLException; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import tech.ydb.core.Result; @@ -120,9 +121,28 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S String msg = QueryType.SCAN_QUERY + " >>\n" + yql; StreamQueryResult lazy = validator.call(msg, () -> { - GrpcReadStream stream = session.executeScanQuery(yql, params, settings); - StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); - return result.execute(stream, session::close); + final CompletableFuture> future = new CompletableFuture<>(); + final GrpcReadStream stream = session.executeScanQuery(yql, params, settings); + final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); + + stream.start((rsr) -> { + future.complete(Result.success(result)); + result.onStreamResultSet(0, rsr); + }).whenComplete((st, th) -> { + session.close(); + + if (th != null) { + result.onStreamFinished(th); + future.completeExceptionally(th); + } + if (st != null) { + validator.addStatusIssues(st); + result.onStreamFinished(st); + future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st)); + } + }); + + return future; }); return updateCurrentResult(lazy); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index 713a5a0..dea2023 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -8,7 +8,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Issue; @@ -50,8 +52,8 @@ public class QueryServiceExecutor extends BaseYdbExecutor { private boolean isAutoCommit; private TxMode txMode; - private QueryTransaction tx; - private boolean isClosed; + private final AtomicReference tx = new AtomicReference<>(); + private volatile boolean isClosed; public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { super(ctx); @@ -63,7 +65,6 @@ public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE; this.isAutoCommit = autoCommit; this.txMode = txMode(transactionLevel, isReadOnly); - this.tx = null; this.isClosed = false; } @@ -81,14 +82,10 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE @Override public void close() throws SQLException { closeCurrentResult(); - cleanTx(); isClosed = true; - } - - private void cleanTx() { - if (tx != null) { - tx.getSession().close(); - tx = null; + QueryTransaction old = tx.getAndSet(null); + if (old != null) { + old.getSession().close(); } } @@ -100,7 +97,8 @@ public void setTransactionLevel(int level) throws SQLException { return; } - if (tx != null && tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx != null && localTx.isActive()) { throw new SQLFeatureNotSupportedException(YdbConst.CHANGE_ISOLATION_INSIDE_TX); } @@ -117,7 +115,8 @@ public void setReadOnly(boolean readOnly) throws SQLException { return; } - if (tx != null && tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx != null && localTx.isActive()) { throw new SQLFeatureNotSupportedException(YdbConst.READONLY_INSIDE_TRANSACTION); } @@ -133,7 +132,8 @@ public void setAutoCommit(boolean autoCommit) throws SQLException { return; } - if (tx != null && tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx != null && localTx.isActive()) { throw new SQLFeatureNotSupportedException(YdbConst.CHANGE_ISOLATION_INSIDE_TX); } @@ -149,13 +149,15 @@ public boolean isClosed() throws SQLException { @Override public String txID() throws SQLException { closeCurrentResult(); - return tx != null ? tx.getId() : null; + QueryTransaction localTx = tx.get(); + return localTx != null ? localTx.getId() : null; } @Override public boolean isInsideTransaction() throws SQLException { ensureOpened(); - return tx != null && tx.isActive(); + QueryTransaction localTx = tx.get(); + return localTx != null && localTx.isActive(); } @Override @@ -180,16 +182,19 @@ public int transactionLevel() throws SQLException { public void commit(YdbContext ctx, YdbValidator validator) throws SQLException { ensureOpened(); - if (tx == null || !tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx == null || !localTx.isActive()) { return; } CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build(); try { validator.clearWarnings(); - validator.call("Commit TxId: " + tx.getId(), () -> tx.commit(settings)); + validator.call("Commit TxId: " + localTx.getId(), () -> localTx.commit(settings)); } finally { - cleanTx(); + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } } } @@ -197,7 +202,8 @@ public void commit(YdbContext ctx, YdbValidator validator) throws SQLException { public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException { ensureOpened(); - if (tx == null || !tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx == null || !localTx.isActive()) { return; } @@ -206,9 +212,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException try { validator.clearWarnings(); - validator.execute("Rollback TxId: " + tx.getId(), () -> tx.rollback(settings)); + validator.execute("Rollback TxId: " + localTx.getId(), () -> localTx.rollback(settings)); } finally { - cleanTx(); + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } } } @@ -226,20 +234,55 @@ public YdbQueryResult executeDataQuery( } final ExecuteQuerySettings settings = builder.build(); - if (tx == null) { - tx = createNewQuerySession(validator).createNewTransaction(txMode); + QueryTransaction nextTx = tx.get(); + while (nextTx == null) { + nextTx = createNewQuerySession(validator).createNewTransaction(txMode); + if (!tx.compareAndSet(null, nextTx)) { + nextTx.getSession().close(); + nextTx = tx.get(); + } } + final QueryTransaction localTx = nextTx; + if (useStreamResultSet) { String msg = "STREAM_QUERY >>\n" + yql; StreamQueryResult lazy = validator.call(msg, () -> { - QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings); - StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); - return result.execute(stream, () -> { - if (!tx.isActive()) { - cleanTx(); + final CompletableFuture> future = new CompletableFuture<>(); + final QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings); + final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); + + + stream.execute(new QueryStream.PartsHandler() { + @Override + public void onIssues(Issue[] issues) { + validator.addStatusIssues(Arrays.asList(issues)); + } + + @Override + public void onNextPart(QueryResultPart part) { + result.onStreamResultSet((int) part.getResultSetIndex(), part.getResultSetReader()); + future.complete(Result.success(result)); + } + }).whenComplete((res, th) -> { + if (!localTx.isActive()) { + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } + } + + if (th != null) { + future.completeExceptionally(th); + result.onStreamFinished(th); + } + if (res != null) { + validator.addStatusIssues(res.getStatus()); + future.complete(res.isSuccess() ? Result.success(result) : Result.fail(res.getStatus())); + result.onStreamFinished(res.getStatus()); } }); + + return future; }); return updateCurrentResult(lazy); @@ -247,7 +290,7 @@ public YdbQueryResult executeDataQuery( try { QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql, - () -> QueryReader.readFrom(tx.createQuery(yql, isAutoCommit, params, settings)) + () -> QueryReader.readFrom(localTx.createQuery(yql, isAutoCommit, params, settings)) ); validator.addStatusIssues(result.getIssueList()); @@ -257,8 +300,10 @@ public YdbQueryResult executeDataQuery( } return updateCurrentResult(new StaticQueryResult(query, readers)); } finally { - if (!tx.isActive()) { - cleanTx(); + if (!localTx.isActive()) { + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } } } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java b/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java index c64a098..7870d96 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java @@ -5,7 +5,6 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -16,11 +15,9 @@ import java.util.logging.Level; import java.util.logging.Logger; -import tech.ydb.core.Issue; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.UnexpectedResultException; -import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; @@ -30,8 +27,6 @@ import tech.ydb.jdbc.impl.YdbQueryResult; import tech.ydb.jdbc.query.QueryStatement; import tech.ydb.jdbc.query.YdbQuery; -import tech.ydb.query.QueryStream; -import tech.ydb.query.result.QueryResultPart; import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.result.ValueReader; @@ -47,22 +42,21 @@ public class StreamQueryResult implements YdbQueryResult { private final String msg; private final YdbStatement statement; - private final Runnable stopRunnable; + private final Runnable streamStopper; private final CompletableFuture streamFuture = new CompletableFuture<>(); - private final CompletableFuture> startFuture = new CompletableFuture<>(); + private final AtomicBoolean streamCancelled = new AtomicBoolean(false); private final int[] resultIndexes; private final List>> resultFutures = new ArrayList<>(); - private final AtomicBoolean streamCancelled = new AtomicBoolean(false); private int resultIndex = 0; private volatile boolean resultClosed = false; - public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Runnable stopRunnable) { + public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Runnable streamStopper) { this.msg = msg; this.statement = statement; - this.stopRunnable = stopRunnable; + this.streamStopper = streamStopper; this.resultIndexes = new int[query.getStatements().size()]; @@ -84,49 +78,42 @@ public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Run } } - public CompletableFuture> execute(QueryStream stream, Runnable finish) { - LOGGER.log(Level.FINE, "Stream executed by QueryStream"); - stream.execute(new QueryPartsHandler()) - .thenApply(Result::getStatus) - .whenComplete(this::onStreamFinished) - .thenRun(finish); - return startFuture; - } + public void onStreamResultSet(int index, ResultSetReader rsr) { + CompletableFuture> future = resultFutures.get(index); + if (!future.isDone()) { + ColumnInfo[] columns = ColumnInfo.fromResultSetReader(rsr); + future.complete(Result.success(new LazyResultSet(statement, columns))); + } - public CompletableFuture> execute( - GrpcReadStream stream, Runnable finish - ) { - LOGGER.log(Level.FINE, "Stream executed by ScanQuery"); - stream.start(new ScanQueryHandler()) - .whenComplete(this::onStreamFinished) - .thenRun(finish); - return startFuture; + Result res = future.join(); + if (res.isSuccess()) { + res.getValue().addResultSet(rsr); + } } - private void onStreamFinished(Status status, Throwable th) { - if (th != null) { - streamFuture.completeExceptionally(th); - for (CompletableFuture> future: resultFutures) { - future.completeExceptionally(th); - } - startFuture.completeExceptionally(th); + public void onStreamFinished(Throwable th) { + streamFuture.completeExceptionally(th); + for (CompletableFuture> future: resultFutures) { + future.completeExceptionally(th); } - if (status != null) { - streamFuture.complete(status); + completeAllSets(); + } + + public void onStreamFinished(Status status) { + for (CompletableFuture> future : resultFutures) { if (status.isSuccess()) { - for (CompletableFuture> future: resultFutures) { - future.complete(Result.success(new LazyResultSet(statement, new ColumnInfo[0]), status)); - } - startFuture.complete(Result.success(this)); + future.complete(Result.success(new LazyResultSet(statement, new ColumnInfo[0]), status)); } else { - for (CompletableFuture> future: resultFutures) { - future.complete(Result.fail(status)); - } - startFuture.complete(Result.fail(status)); + future.complete(Result.fail(status)); } } + streamFuture.complete(status); + + completeAllSets(); + } + private void completeAllSets() { for (CompletableFuture> future: resultFutures) { if (!future.isCompletedExceptionally()) { Result rs = future.join(); @@ -155,13 +142,13 @@ private void checkStream() { if (!streamFuture.isDone() && streamCancelled.compareAndSet(false, true)) { LOGGER.log(Level.FINE, "Stream cancel"); - stopRunnable.run(); + streamStopper.run(); } } @Override public void close() throws SQLException { - if (startFuture.isDone() && resultClosed) { + if (streamFuture.isDone() && resultClosed) { return; } @@ -170,8 +157,6 @@ public void close() throws SQLException { resultClosed = true; Status status = streamFuture.join(); - statement.getValidator().addStatusIssues(status); - if (streamCancelled.get()) { LOGGER.log(Level.FINE, "Stream canceled and finished with status {0}", status); return; @@ -252,40 +237,6 @@ public boolean getMoreResults(int current) throws SQLException { } - private void onResultSet(int index, ResultSetReader rsr) { - CompletableFuture> future = resultFutures.get(index); - if (!future.isDone()) { - ColumnInfo[] columns = ColumnInfo.fromResultSetReader(rsr); - future.complete(Result.success(new LazyResultSet(statement, columns))); - } - - Result res = future.join(); - if (res.isSuccess()) { - res.getValue().addResultSet(rsr); - } - } - - private class ScanQueryHandler implements GrpcReadStream.Observer { - @Override - public void onNext(ResultSetReader part) { - onResultSet(0, part); - startFuture.complete(Result.success(StreamQueryResult.this)); - } - } - - private class QueryPartsHandler implements QueryStream.PartsHandler { - @Override - public void onIssues(Issue[] issues) { - statement.getValidator().addStatusIssues(Arrays.asList(issues)); - } - - @Override - public void onNextPart(QueryResultPart part) { - onResultSet((int) part.getResultSetIndex(), part.getResultSetReader()); - startFuture.complete(Result.success(StreamQueryResult.this)); - } - } - private class LazyResultSet extends BaseYdbResultSet { private final BlockingQueue readers = new ArrayBlockingQueue<>(5); private final AtomicLong rowsCount = new AtomicLong(); @@ -314,7 +265,7 @@ public void addResultSet(ResultSetReader rsr) { } catch (InterruptedException ex) { if (streamFuture.completeExceptionally(ex)) { LOGGER.log(Level.WARNING, "LazyResultSet offer interrupted"); - stopRunnable.run(); + streamStopper.run(); } return; }