Skip to content

Commit

Permalink
Added execution of batched updates and deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Oct 2, 2024
1 parent 67a5be1 commit 66c5a85
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 73 deletions.
24 changes: 12 additions & 12 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand All @@ -29,6 +28,7 @@
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbPreparedQuery;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.jdbc.query.YqlBatcher;
import tech.ydb.jdbc.query.params.BatchedQuery;
import tech.ydb.jdbc.query.params.BulkUpsertQuery;
import tech.ydb.jdbc.query.params.InMemoryQuery;
Expand All @@ -43,7 +43,6 @@
import tech.ydb.scheme.SchemeClient;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.TableClient;
import tech.ydb.table.description.TableColumn;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.impl.PooledTableClient;
import tech.ydb.table.query.ExplainDataQueryResult;
Expand Down Expand Up @@ -79,6 +78,7 @@ public class YdbContext implements AutoCloseable {
private final Cache<String, YdbQuery> queriesCache;
private final Cache<String, QueryStat> statsCache;
private final Cache<String, Map<String, Type>> queryParamsCache;
private final Cache<String, TableDescription> tableDescribeCache;

private final boolean autoResizeSessionPool;
private final AtomicInteger connectionsCount = new AtomicInteger();
Expand Down Expand Up @@ -108,6 +108,7 @@ private YdbContext(
if (cacheSize > 0) {
queriesCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
queryParamsCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
tableDescribeCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
if (config.isFullScanDetectorEnabled()) {
statsCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
} else {
Expand All @@ -117,6 +118,7 @@ private YdbContext(
queriesCache = null;
statsCache = null;
queryParamsCache = null;
tableDescribeCache = null;
}
}

Expand Down Expand Up @@ -356,7 +358,7 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
QueryType type = query.getType();

if (type == QueryType.BULK_QUERY) {
if (query.getYqlBatcher() == null || !query.getYqlBatcher().isUpsert()) {
if (query.getYqlBatcher() == null || query.getYqlBatcher().getCommand() != YqlBatcher.Cmd.UPSERT) {
throw new SQLException(YdbConst.BULKS_UNSUPPORTED);
}
}
Expand All @@ -368,30 +370,28 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
if (query.getYqlBatcher() != null && (mode == YdbPrepareMode.AUTO || type == QueryType.BULK_QUERY)) {
String tableName = query.getYqlBatcher().getTableName();
String tablePath = tableName.startsWith("/") ? tableName : getDatabase() + "/" + tableName;
Map<String, Type> types = queryParamsCache.getIfPresent(query.getOriginQuery());
if (types == null) {
TableDescription description = tableDescribeCache.getIfPresent(tablePath);
if (description == null) {
DescribeTableSettings settings = withDefaultTimeout(new DescribeTableSettings());
Result<TableDescription> result = retryCtx.supplyResult(
session -> session.describeTable(tablePath, settings)
).join();

if (result.isSuccess()) {
TableDescription descrtiption = result.getValue();
types = descrtiption.getColumns().stream()
.collect(Collectors.toMap(TableColumn::getName, TableColumn::getType));
queryParamsCache.put(query.getOriginQuery(), types);
description = result.getValue();
tableDescribeCache.put(query.getOriginQuery(), description);
} else {
if (type == QueryType.BULK_QUERY) {
throw new SQLException(YdbConst.BULKS_DESCRIBE_ERROR + result.getStatus());
}
}
}
if (type == QueryType.BULK_QUERY) {
return BulkUpsertQuery.build(tablePath, query.getYqlBatcher().getColumns(), types);
return BulkUpsertQuery.build(tablePath, query.getYqlBatcher().getColumns(), description);
}

if (types != null) {
BatchedQuery params = BatchedQuery.createAutoBatched(query.getYqlBatcher(), types);
if (description != null) {
BatchedQuery params = BatchedQuery.createAutoBatched(query.getYqlBatcher(), description);
if (params != null) {
return params;
}
Expand Down
5 changes: 4 additions & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ public static YdbQuery parseQuery(String query, YdbQueryProperties opts) throws

if (opts.isForcedScanAndBulks()) {
if (batcher.isValidBatch()) {
if (batcher.isInsert() || batcher.isUpdate()) {
if (batcher.getCommand() == YqlBatcher.Cmd.UPSERT) {
type = QueryType.BULK_QUERY;
}
if (batcher.getCommand() == YqlBatcher.Cmd.INSERT) {
parser.getYqlBatcher().setForcedUpsert();
type = QueryType.BULK_QUERY;
}
Expand Down
22 changes: 3 additions & 19 deletions jdbc/src/main/java/tech/ydb/jdbc/query/YqlBatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @author Aleksandr Gorshenin
*/
public class YqlBatcher {
private enum Cmd {
public enum Cmd {
UPSERT,
INSERT,
REPLACE,
Expand Down Expand Up @@ -59,24 +59,8 @@ public void setForcedUpsert() {
cmd = Cmd.UPSERT;
}

public boolean isInsert() {
return cmd == Cmd.INSERT;
}

public boolean isUpsert() {
return cmd == Cmd.UPSERT;
}

public boolean isReplace() {
return cmd == Cmd.REPLACE;
}

public boolean isUpdate() {
return cmd == Cmd.UPDATE;
}

public boolean isDelete() {
return cmd == Cmd.DELETE;
public Cmd getCommand() {
return cmd;
}

public String getTableName() {
Expand Down
64 changes: 47 additions & 17 deletions jdbc/src/main/java/tech/ydb/jdbc/query/params/BatchedQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import tech.ydb.jdbc.query.YdbPreparedQuery;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.jdbc.query.YqlBatcher;
import tech.ydb.table.description.TableColumn;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.query.Params;
import tech.ydb.table.values.ListType;
import tech.ydb.table.values.ListValue;
Expand Down Expand Up @@ -222,16 +224,39 @@ public static BatchedQuery tryCreateBatched(YdbQuery query, Map<String, Type> pr
return new BatchedQuery(query.getPreparedYql(), listName, Arrays.asList(columns), types);
}

public static BatchedQuery createAutoBatched(YqlBatcher batcher, Map<String, Type> tableColumns)
public static BatchedQuery createAutoBatched(YqlBatcher batcher, TableDescription description)
throws SQLException {

// DELETE and UPDATE may be batched only if WHERE contains only primary key columns
if (batcher.getCommand() == YqlBatcher.Cmd.DELETE || batcher.getCommand() == YqlBatcher.Cmd.UPDATE) {
Set<String> primaryKey = new HashSet<>(description.getPrimaryKeys());
for (String keyColumn: batcher.getKeyColumns()) {
if (!primaryKey.remove(keyColumn)) {
return null;
}
}
if (!primaryKey.isEmpty()) {
return null;
}
}

StringBuilder sb = new StringBuilder();
Map<String, Type> columnTypes = new HashMap<>();
Map<String, Type> structTypes = new HashMap<>();
List<String> columns = new ArrayList<>();

for (TableColumn column: description.getColumns()) {
columnTypes.put(column.getName(), column.getType());
}

List<String> params = new ArrayList<>();
params.addAll(batcher.getColumns());
params.addAll(batcher.getKeyColumns());

sb.append("DECLARE $batch AS List<Struct<");
int idx = 1;
for (String column: batcher.getColumns()) {
Type type = tableColumns.get(column);
for (String column: params) {
Type type = columnTypes.get(column);
if (type == null) {
return null;
}
Expand All @@ -245,23 +270,28 @@ public static BatchedQuery createAutoBatched(YqlBatcher batcher, Map<String, Typ
}
sb.append(">>;\n");

if (batcher.isInsert()) {
sb.append("INSERT INTO `");
}
if (batcher.isUpsert()) {
sb.append("UPSERT INTO `");
}
if (batcher.isReplace()) {
sb.append("REPLACE INTO `");
}
if (batcher.isDelete() || batcher.isUpdate()) {
return null;
switch (batcher.getCommand()) {
case UPSERT:
sb.append("UPSERT INTO `").append(batcher.getTableName()).append("` SELECT ");
break;
case INSERT:
sb.append("INSERT INTO `").append(batcher.getTableName()).append("` SELECT ");
break;
case REPLACE:
sb.append("REPLACE INTO `").append(batcher.getTableName()).append("` SELECT ");
break;
case UPDATE:
sb.append("UPDATE `").append(batcher.getTableName()).append("` ON SELECT ");
break;
case DELETE:
sb.append("DELETE FROM `").append(batcher.getTableName()).append("` ON SELECT ");
break;
default:
return null;
}

sb.append(batcher.getTableName()).append("` SELECT ");

idx = 1;
for (String column: batcher.getColumns()) {
for (String column: params) {
if (idx > 1) {
sb.append(", ");
}
Expand Down
23 changes: 15 additions & 8 deletions jdbc/src/main/java/tech/ydb/jdbc/query/params/BulkUpsertQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import tech.ydb.table.description.TableColumn;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.values.ListType;
import tech.ydb.table.values.ListValue;
import tech.ydb.table.values.StructType;
Expand All @@ -24,12 +26,7 @@ private BulkUpsertQuery(String tablePath, String yql, List<String> columns, Map<
throws SQLException {
super(yql, "$bulk", columns, types);
this.tablePath = tablePath;

Map<String, Type> reducedTypes = new HashMap<>();
for (String column: columns) {
reducedTypes.put(column, types.get(column));
}
this.bulkType = ListType.of(StructType.of(reducedTypes));
this.bulkType = ListType.of(StructType.of(types));
}

public String getTablePath() {
Expand All @@ -44,7 +41,7 @@ public ListValue getBatchedBulk() {
return bulkType.newValue(getBatchedValues());
}

public static BulkUpsertQuery build(String tablePath, List<String> columns, Map<String, Type> types)
public static BulkUpsertQuery build(String tablePath, List<String> columns, TableDescription description)
throws SQLException {
StringBuilder yql = new StringBuilder();
yql.append("BULK UPSERT INTO `");
Expand All @@ -53,6 +50,16 @@ public static BulkUpsertQuery build(String tablePath, List<String> columns, Map<
yql.append(columns.stream().collect(Collectors.joining(", ")));
yql.append(")");

return new BulkUpsertQuery(tablePath, yql.toString(), columns, types);
Map<String, Type> columnTypes = new HashMap<>();
for (TableColumn column: description.getColumns()) {
columnTypes.put(column.getName(), column.getType());
}

Map<String, Type> structTypes = new HashMap<>();
for (String column: columns) {
structTypes.put(column, columnTypes.get(column));
}

return new BulkUpsertQuery(tablePath, yql.toString(), columns, structTypes);
}
}
4 changes: 2 additions & 2 deletions jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,13 @@ public void forceScanAndBulkTest() throws SQLException {
try (PreparedStatement ps = conn.prepareStatement(UPDATE_ROW)) {
ps.setString(1, "updated-value");
ps.setInt(2, 1);
ExceptionAssert.ydbException(ERROR_SCAN_QUERY, ps::execute);
ps.executeUpdate();
}

// single delete
try (PreparedStatement ps = conn.prepareStatement(DELETE_ROW)) {
ps.setInt(1, 2);
ExceptionAssert.ydbException(ERROR_SCAN_QUERY, ps::execute);
ps.executeUpdate();
}
}
}
Expand Down
Loading

0 comments on commit 66c5a85

Please sign in to comment.