Skip to content

Commit

Permalink
Added auto batch for REPLACE, UPDATE and DELETE statements (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 authored Oct 2, 2024
2 parents f840a6d + 66c5a85 commit 2affd1c
Show file tree
Hide file tree
Showing 8 changed files with 558 additions and 111 deletions.
24 changes: 12 additions & 12 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand All @@ -29,6 +28,7 @@
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbPreparedQuery;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.jdbc.query.YqlBatcher;
import tech.ydb.jdbc.query.params.BatchedQuery;
import tech.ydb.jdbc.query.params.BulkUpsertQuery;
import tech.ydb.jdbc.query.params.InMemoryQuery;
Expand All @@ -43,7 +43,6 @@
import tech.ydb.scheme.SchemeClient;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.TableClient;
import tech.ydb.table.description.TableColumn;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.impl.PooledTableClient;
import tech.ydb.table.query.ExplainDataQueryResult;
Expand Down Expand Up @@ -79,6 +78,7 @@ public class YdbContext implements AutoCloseable {
private final Cache<String, YdbQuery> queriesCache;
private final Cache<String, QueryStat> statsCache;
private final Cache<String, Map<String, Type>> queryParamsCache;
private final Cache<String, TableDescription> tableDescribeCache;

private final boolean autoResizeSessionPool;
private final AtomicInteger connectionsCount = new AtomicInteger();
Expand Down Expand Up @@ -108,6 +108,7 @@ private YdbContext(
if (cacheSize > 0) {
queriesCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
queryParamsCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
tableDescribeCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
if (config.isFullScanDetectorEnabled()) {
statsCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
} else {
Expand All @@ -117,6 +118,7 @@ private YdbContext(
queriesCache = null;
statsCache = null;
queryParamsCache = null;
tableDescribeCache = null;
}
}

Expand Down Expand Up @@ -356,7 +358,7 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
QueryType type = query.getType();

if (type == QueryType.BULK_QUERY) {
if (query.getYqlBatcher() == null || query.getYqlBatcher().isInsert()) {
if (query.getYqlBatcher() == null || query.getYqlBatcher().getCommand() != YqlBatcher.Cmd.UPSERT) {
throw new SQLException(YdbConst.BULKS_UNSUPPORTED);
}
}
Expand All @@ -368,30 +370,28 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
if (query.getYqlBatcher() != null && (mode == YdbPrepareMode.AUTO || type == QueryType.BULK_QUERY)) {
String tableName = query.getYqlBatcher().getTableName();
String tablePath = tableName.startsWith("/") ? tableName : getDatabase() + "/" + tableName;
Map<String, Type> types = queryParamsCache.getIfPresent(query.getOriginQuery());
if (types == null) {
TableDescription description = tableDescribeCache.getIfPresent(tablePath);
if (description == null) {
DescribeTableSettings settings = withDefaultTimeout(new DescribeTableSettings());
Result<TableDescription> result = retryCtx.supplyResult(
session -> session.describeTable(tablePath, settings)
).join();

if (result.isSuccess()) {
TableDescription descrtiption = result.getValue();
types = descrtiption.getColumns().stream()
.collect(Collectors.toMap(TableColumn::getName, TableColumn::getType));
queryParamsCache.put(query.getOriginQuery(), types);
description = result.getValue();
tableDescribeCache.put(query.getOriginQuery(), description);
} else {
if (type == QueryType.BULK_QUERY) {
throw new SQLException(YdbConst.BULKS_DESCRIBE_ERROR + result.getStatus());
}
}
}
if (type == QueryType.BULK_QUERY) {
return BulkUpsertQuery.build(tablePath, query.getYqlBatcher().getColumns(), types);
return BulkUpsertQuery.build(tablePath, query.getYqlBatcher().getColumns(), description);
}

if (types != null) {
BatchedQuery params = BatchedQuery.createAutoBatched(query.getYqlBatcher(), types);
if (description != null) {
BatchedQuery params = BatchedQuery.createAutoBatched(query.getYqlBatcher(), description);
if (params != null) {
return params;
}
Expand Down
18 changes: 13 additions & 5 deletions jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,20 @@ public static YdbQuery parseQuery(String query, YdbQueryProperties opts) throws
QueryType type = opts.getForcedQueryType();
if (type == null) {
type = parser.detectQueryType();
YqlBatcher batcher = parser.getYqlBatcher();

if (opts.isForcedScanAndBulks()) {
if (batcher.isValidBatch()) {
if (batcher.getCommand() == YqlBatcher.Cmd.UPSERT) {
type = QueryType.BULK_QUERY;
}
if (batcher.getCommand() == YqlBatcher.Cmd.INSERT) {
parser.getYqlBatcher().setForcedUpsert();
type = QueryType.BULK_QUERY;
}
}

if (opts.isForcedScanAndBulks() && type != QueryType.SCHEME_QUERY && type != QueryType.EXPLAIN_QUERY) {
if (parser.getYqlBatcher().isValidBatch()) {
parser.getYqlBatcher().setForcedUpsert();
type = QueryType.BULK_QUERY;
} else {
if (parser.getStatements().size() == 1 && parser.getStatements().get(0).getCmd() == QueryCmd.SELECT) {
type = QueryType.SCAN_QUERY;
}
}
Expand Down
17 changes: 13 additions & 4 deletions jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,17 @@ public String parseSQL(String origin) throws SQLException {
}

// starts with UPDATE, REPLACE, DELETE
if (parseUpdateKeyword(chars, keywordStart, keywordLength)
|| parseDeleteKeyword(chars, keywordStart, keywordLength)
|| parseReplaceKeyword(chars, keywordStart, keywordLength)) {
if (parseUpdateKeyword(chars, keywordStart, keywordLength)) {
statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.UPDATE_REPLACE_DELETE);
batcher.readIdentifier(chars, keywordStart, keywordLength);
batcher.readUpdate();
}
if (parseDeleteKeyword(chars, keywordStart, keywordLength)) {
statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.UPDATE_REPLACE_DELETE);
batcher.readDelete();
}
if (parseReplaceKeyword(chars, keywordStart, keywordLength)) {
statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.UPDATE_REPLACE_DELETE);
batcher.readReplace();
}

// Detect scheme expression - starts with ALTER, DROP, CREATE
Expand Down Expand Up @@ -242,6 +248,9 @@ public String parseSQL(String origin) throws SQLException {
case ',':
batcher.readComma();
break;
case '=':
batcher.readEqual();
break;
case ';':
batcher.readSemiColon();
if (parenLevel == 0) {
Expand Down
Loading

0 comments on commit 2affd1c

Please sign in to comment.