Skip to content

Commit

Permalink
More simplify implementation of StreamQueryResult
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Oct 14, 2024
1 parent f8b825d commit 483976d
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 116 deletions.
26 changes: 23 additions & 3 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.core.Result;
Expand Down Expand Up @@ -120,9 +121,28 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S

String msg = QueryType.SCAN_QUERY + " >>\n" + yql;
StreamQueryResult lazy = validator.call(msg, () -> {
GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
return result.execute(stream, session::close);
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
final GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);

stream.start((rsr) -> {
future.complete(Result.success(result));
result.onStreamResultSet(0, rsr);
}).whenComplete((st, th) -> {
session.close();

if (th != null) {
result.onStreamFinished(th);
future.completeExceptionally(th);
}
if (st != null) {
validator.addStatusIssues(st);
result.onStreamFinished(st);
future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st));
}
});

return future;
});

return updateCurrentResult(lazy);
Expand Down
107 changes: 76 additions & 31 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Issue;
Expand Down Expand Up @@ -50,8 +52,8 @@ public class QueryServiceExecutor extends BaseYdbExecutor {
private boolean isAutoCommit;
private TxMode txMode;

private QueryTransaction tx;
private boolean isClosed;
private final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
private volatile boolean isClosed;

public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
super(ctx);
Expand All @@ -63,7 +65,6 @@ public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo
this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE;
this.isAutoCommit = autoCommit;
this.txMode = txMode(transactionLevel, isReadOnly);
this.tx = null;
this.isClosed = false;
}

Expand All @@ -81,14 +82,10 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE
@Override
public void close() throws SQLException {
closeCurrentResult();
cleanTx();
isClosed = true;
}

private void cleanTx() {
if (tx != null) {
tx.getSession().close();
tx = null;
QueryTransaction old = tx.getAndSet(null);
if (old != null) {
old.getSession().close();
}
}

Expand All @@ -100,7 +97,8 @@ public void setTransactionLevel(int level) throws SQLException {
return;
}

if (tx != null && tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx != null && localTx.isActive()) {
throw new SQLFeatureNotSupportedException(YdbConst.CHANGE_ISOLATION_INSIDE_TX);
}

Expand All @@ -117,7 +115,8 @@ public void setReadOnly(boolean readOnly) throws SQLException {
return;
}

if (tx != null && tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx != null && localTx.isActive()) {
throw new SQLFeatureNotSupportedException(YdbConst.READONLY_INSIDE_TRANSACTION);
}

Expand All @@ -133,7 +132,8 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
return;
}

if (tx != null && tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx != null && localTx.isActive()) {
throw new SQLFeatureNotSupportedException(YdbConst.CHANGE_ISOLATION_INSIDE_TX);
}

Expand All @@ -149,13 +149,15 @@ public boolean isClosed() throws SQLException {
@Override
public String txID() throws SQLException {
closeCurrentResult();
return tx != null ? tx.getId() : null;
QueryTransaction localTx = tx.get();
return localTx != null ? localTx.getId() : null;
}

@Override
public boolean isInsideTransaction() throws SQLException {
ensureOpened();
return tx != null && tx.isActive();
QueryTransaction localTx = tx.get();
return localTx != null && localTx.isActive();
}

@Override
Expand All @@ -180,24 +182,28 @@ public int transactionLevel() throws SQLException {
public void commit(YdbContext ctx, YdbValidator validator) throws SQLException {
ensureOpened();

if (tx == null || !tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx == null || !localTx.isActive()) {
return;
}

CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build();
try {
validator.clearWarnings();
validator.call("Commit TxId: " + tx.getId(), () -> tx.commit(settings));
validator.call("Commit TxId: " + localTx.getId(), () -> localTx.commit(settings));
} finally {
cleanTx();
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
}
}

@Override
public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException {
ensureOpened();

if (tx == null || !tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx == null || !localTx.isActive()) {
return;
}

Expand All @@ -206,9 +212,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException

try {
validator.clearWarnings();
validator.execute("Rollback TxId: " + tx.getId(), () -> tx.rollback(settings));
validator.execute("Rollback TxId: " + localTx.getId(), () -> localTx.rollback(settings));
} finally {
cleanTx();
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
}
}

Expand All @@ -226,28 +234,63 @@ public YdbQueryResult executeDataQuery(
}
final ExecuteQuerySettings settings = builder.build();

if (tx == null) {
tx = createNewQuerySession(validator).createNewTransaction(txMode);
QueryTransaction nextTx = tx.get();
while (nextTx == null) {
nextTx = createNewQuerySession(validator).createNewTransaction(txMode);
if (!tx.compareAndSet(null, nextTx)) {
nextTx.getSession().close();
nextTx = tx.get();
}
}

final QueryTransaction localTx = nextTx;

if (useStreamResultSet) {
String msg = "STREAM_QUERY >>\n" + yql;
StreamQueryResult lazy = validator.call(msg, () -> {
QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings);
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
return result.execute(stream, () -> {
if (!tx.isActive()) {
cleanTx();
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
final QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings);
final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);


stream.execute(new QueryStream.PartsHandler() {
@Override
public void onIssues(Issue[] issues) {
validator.addStatusIssues(Arrays.asList(issues));
}

@Override
public void onNextPart(QueryResultPart part) {
result.onStreamResultSet((int) part.getResultSetIndex(), part.getResultSetReader());
future.complete(Result.success(result));
}
}).whenComplete((res, th) -> {
if (!localTx.isActive()) {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
}

if (th != null) {
future.completeExceptionally(th);
result.onStreamFinished(th);
}
if (res != null) {
validator.addStatusIssues(res.getStatus());
future.complete(res.isSuccess() ? Result.success(result) : Result.fail(res.getStatus()));
result.onStreamFinished(res.getStatus());
}
});

return future;
});

return updateCurrentResult(lazy);
}

try {
QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql,
() -> QueryReader.readFrom(tx.createQuery(yql, isAutoCommit, params, settings))
() -> QueryReader.readFrom(localTx.createQuery(yql, isAutoCommit, params, settings))
);
validator.addStatusIssues(result.getIssueList());

Expand All @@ -257,8 +300,10 @@ public YdbQueryResult executeDataQuery(
}
return updateCurrentResult(new StaticQueryResult(query, readers));
} finally {
if (!tx.isActive()) {
cleanTx();
if (!localTx.isActive()) {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
}
}
}
Expand Down
Loading

0 comments on commit 483976d

Please sign in to comment.