Skip to content

Commit

Permalink
New barrier implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Sep 25, 2024
1 parent 257e579 commit 60d25c5
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 43 deletions.
39 changes: 38 additions & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.query.QueryType;
Expand All @@ -17,9 +20,31 @@
*/
public abstract class BaseYdbExecutor implements YdbExecutor {
private final SessionRetryContext retryCtx;
private final AtomicReference<Barrier> barrier;

public BaseYdbExecutor(YdbContext ctx) {
this.retryCtx = ctx.getRetryCtx();
this.barrier = new AtomicReference<>(new Barrier());
this.barrier.get().open();
}

protected void checkBarrier() {
barrier.get().waitOpening();
}

protected Barrier createBarrier() {
Barrier newBarrier = new Barrier();
Barrier prev = barrier.getAndSet(newBarrier);
prev.waitOpening();
return newBarrier;
}

@Override
public void ensureOpened() throws SQLException {
checkBarrier();
if (isClosed()) {
throw new SQLException(YdbConst.CLOSED_CONNECTION);
}
}

@Override
Expand All @@ -43,13 +68,25 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query, String tablePath, ListValue rows)
throws SQLException {
ensureOpened();

String yql = query.getPreparedYql();
YdbValidator validator = statement.getValidator();

validator.execute(QueryType.BULK_QUERY + " >>\n" + yql,
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
);

return new StaticQueryResult(query, Collections.emptyList());
}

public class Barrier {
private final CompletableFuture<Void> future = new CompletableFuture<>();

public void open() {
future.complete(null);
}

public void waitOpening() {
future.join();
}
}
}
15 changes: 12 additions & 3 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE

@Override
public void close() {
checkBarrier();
cleanTx();
isClosed = true;
}
Expand All @@ -96,6 +97,8 @@ private void cleanTx() {

@Override
public void setTransactionLevel(int level) throws SQLException {
ensureOpened();

if (level == transactionLevel) {
return;
}
Expand All @@ -111,6 +114,8 @@ public void setTransactionLevel(int level) throws SQLException {

@Override
public void setReadOnly(boolean readOnly) throws SQLException {
ensureOpened();

if (readOnly == isReadOnly) {
return;
}
Expand All @@ -125,6 +130,8 @@ public void setReadOnly(boolean readOnly) throws SQLException {

@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
ensureOpened();

if (autoCommit == isAutoCommit) {
return;
}
Expand All @@ -138,11 +145,13 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {

@Override
public boolean isClosed() {
checkBarrier();
return isClosed;
}

@Override
public String txID() {
checkBarrier();
return tx != null ? tx.getId() : null;
}

Expand Down Expand Up @@ -247,6 +256,8 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
throws SQLException {
ensureOpened();

Barrier barrier = createBarrier();

YdbContext ctx = statement.getConnection().getCtx();
YdbValidator validator = statement.getValidator();

Expand All @@ -260,9 +271,7 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
return validator.call(msg, () -> {
QueryStream stream = session.createQuery(yql, TxMode.SNAPSHOT_RO, params, settings);
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
return result.execute(stream, () -> {
session.close();
});
return result.execute(stream, barrier);
});
}

Expand Down
11 changes: 7 additions & 4 deletions jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,20 @@ public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Run
}
}

public CompletableFuture<Result<StreamQueryResult>> execute(QueryStream stream, Runnable finish) {
public CompletableFuture<Result<StreamQueryResult>> execute(QueryStream stream, BaseYdbExecutor.Barrier barrier) {
stream.execute(new QueryPartsHandler())
.thenApply(Result::getStatus)
.whenComplete(this::onStreamFinished)
.thenRun(finish);
.thenRun(barrier::open);
return startFuture;
}

public CompletableFuture<Result<StreamQueryResult>> execute(GrpcReadStream<ResultSetReader> stream) {
public CompletableFuture<Result<StreamQueryResult>> execute(
GrpcReadStream<ResultSetReader> stream, BaseYdbExecutor.Barrier barrier
) {
stream.start(rsr -> onResultSet(0, rsr))
.whenComplete(this::onStreamFinished);
.whenComplete(this::onStreamFinished)
.thenRun(barrier::open);
return startFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo

@Override
public void close() {
checkBarrier();
tx = null;
}

Expand All @@ -74,26 +75,31 @@ protected Session createNewTableSession(YdbValidator validator) throws SQLExcept

@Override
public void setTransactionLevel(int level) throws SQLException {
ensureOpened();
updateState(tx.withTransactionLevel(level));
}

@Override
public void setReadOnly(boolean readOnly) throws SQLException {
ensureOpened();
updateState(tx.withReadOnly(readOnly));
}

@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
ensureOpened();
updateState(tx.withAutoCommit(autoCommit));
}

@Override
public boolean isClosed() {
checkBarrier();
return tx == null;
}

@Override
public String txID() {
checkBarrier();
return tx != null ? tx.txID() : null;
}

Expand Down Expand Up @@ -232,6 +238,8 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
throws SQLException {
ensureOpened();

Barrier barrier = createBarrier();

YdbContext ctx = statement.getConnection().getCtx();
YdbValidator validator = statement.getValidator();
Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
Expand All @@ -245,7 +253,7 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
return validator.call(msg, () -> {
GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
return result.execute(stream);
return result.execute(stream, barrier);
});
}

Expand Down
11 changes: 2 additions & 9 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.sql.SQLException;

import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.query.YdbQuery;
Expand All @@ -14,13 +13,9 @@
* @author Aleksandr Gorshenin
*/
public interface YdbExecutor {
default void ensureOpened() throws SQLException {
if (isClosed()) {
throw new SQLException(YdbConst.CLOSED_CONNECTION);
}
}

void close();
boolean isClosed();
void ensureOpened() throws SQLException;

String txID();
int transactionLevel() throws SQLException;
Expand All @@ -46,6 +41,4 @@ YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String y
void rollback(YdbContext ctx, YdbValidator validator) throws SQLException;

boolean isValid(YdbValidator validator, int timeout) throws SQLException;

void close();
}
28 changes: 3 additions & 25 deletions jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -39,7 +38,6 @@ public class YdbConnectionImpl implements YdbConnection {
private final YdbContext ctx;
private final YdbValidator validator;
private final YdbExecutor executor;
private final AtomicReference<YdbStatement> currState = new AtomicReference<>();

public YdbConnectionImpl(YdbContext context) throws SQLException {
this.ctx = context;
Expand Down Expand Up @@ -85,35 +83,18 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
executor.setAutoCommit(autoCommit);
}

private <T extends YdbStatement> T updateStatement(T statement) throws SQLException {
Statement prev = currState.getAndSet(statement);
if (prev != null) {
prev.close();
}
return statement;
}

private void waitStatementReady() throws SQLException {
YdbStatement curr = currState.get();
if (curr != null) {
curr.waitReady();
}
}

@Override
public boolean getAutoCommit() throws SQLException {
return executor.isAutoCommit();
}

@Override
public void commit() throws SQLException {
waitStatementReady();
executor.commit(ctx, validator);
}

@Override
public void rollback() throws SQLException {
waitStatementReady();
executor.rollback(ctx, validator);
}

Expand All @@ -130,7 +111,7 @@ public void close() throws SQLException {
}

@Override
public boolean isClosed() {
public boolean isClosed() throws SQLException {
return executor.isClosed();
}

Expand Down Expand Up @@ -186,8 +167,6 @@ public SQLWarning getWarnings() throws SQLException {
@Override
public void clearWarnings() throws SQLException {
executor.ensureOpened();

waitStatementReady();
validator.clearWarnings();
}

Expand Down Expand Up @@ -231,7 +210,7 @@ public YdbStatement createStatement(int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
executor.ensureOpened();
checkStatementParams(resultSetType, resultSetConcurrency, resultSetHoldability);
return updateStatement(new YdbStatementImpl(this, resultSetType));
return new YdbStatementImpl(this, resultSetType);
}

@Override
Expand Down Expand Up @@ -263,7 +242,7 @@ private YdbPreparedStatement prepareStatement(String sql, int resultSetType, Ydb
YdbQuery query = ctx.findOrParseYdbQuery(sql);

YdbPreparedQuery params = ctx.findOrPrepareParams(query, mode);
return updateStatement(new YdbPreparedStatementImpl(this, query, params, resultSetType));
return new YdbPreparedStatementImpl(this, query, params, resultSetType);
}

@Override
Expand Down Expand Up @@ -309,7 +288,6 @@ public int getNetworkTimeout() throws SQLException {

@Override
public String getYdbTxId() throws SQLException {
waitStatementReady();
return executor.txID();
}

Expand Down

0 comments on commit 60d25c5

Please sign in to comment.