Skip to content

Commit

Permalink
Load: New SQL grammer LOAD <filePath> with ('k'='v', ...) (apache#1…
Browse files Browse the repository at this point in the history
…3466)

Co-authored-by: Steve Yurong Su <rong@apache.org>
  • Loading branch information
YC27 and SteveYurongSu authored Sep 20, 2024
1 parent 5387a10 commit 3f922e7
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,9 @@ public void testLoadWithOnSuccess() throws Exception {
final Statement statement = connection.createStatement()) {

statement.execute(
String.format("load \"%s\" sglevel=2 onSuccess=none", file1.getAbsolutePath()));
String.format(
"load \"%s\" with ('database-level'='2', 'on-success'='none')",
file1.getAbsolutePath()));

try (final ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
Expand All @@ -561,7 +563,9 @@ public void testLoadWithOnSuccess() throws Exception {
final Statement statement = connection.createStatement()) {

statement.execute(
String.format("load \"%s\" sglevel=2 onSuccess=delete", file2.getAbsolutePath()));
String.format(
"load \"%s\" with ('database-level'='2', 'on-success'='delete')",
file2.getAbsolutePath()));

try (final ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ loadTimeseries

// Load TsFile
loadFile
: LOAD fileName=STRING_LITERAL loadFileAttributeClauses?
: LOAD fileName=STRING_LITERAL ((loadFileAttributeClauses?) | (loadFileWithAttributeClauses))
;

loadFileAttributeClauses
Expand All @@ -1141,6 +1141,17 @@ loadFileAttributeClause
| ONSUCCESS operator_eq (DELETE|NONE)
;

loadFileWithAttributeClauses
: WITH
LR_BRACKET
(loadFileWithAttributeClause COMMA)* loadFileWithAttributeClause?
RR_BRACKET
;

loadFileWithAttributeClause
: loadFileWithKey=STRING_LITERAL OPERATOR_SEQ loadFileWithValue=STRING_LITERAL
;

// Remove TsFile
removeFile
: REMOVE fileName=STRING_LITERAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowThrottleQuotaStatement;
import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.db.utils.constant.SqlConstant;
Expand Down Expand Up @@ -2005,8 +2006,27 @@ private long parseTimeValue(ConstantContext constant) {
@Override
public Statement visitLoadFile(IoTDBSqlParser.LoadFileContext ctx) {
try {
LoadTsFileStatement loadTsFileStatement =
final LoadTsFileStatement loadTsFileStatement =
new LoadTsFileStatement(parseStringLiteral(ctx.fileName.getText()));

// if sql have with, return new load sql statement
if (ctx.loadFileWithAttributeClauses() != null) {
final Map<String, String> loadTsFileAttributes = new HashMap<>();
for (IoTDBSqlParser.LoadFileWithAttributeClauseContext attributeContext :
ctx.loadFileWithAttributeClauses().loadFileWithAttributeClause()) {
final String key =
parseStringLiteral(attributeContext.loadFileWithKey.getText()).trim().toLowerCase();
final String value =
parseStringLiteral(attributeContext.loadFileWithValue.getText()).trim().toLowerCase();

LoadTsFileConfigurator.validateParameters(key, value);
loadTsFileAttributes.put(key, value);
}

loadTsFileStatement.setLoadAttributes(loadTsFileAttributes);
return loadTsFileStatement;
}

if (ctx.loadFileAttributeClauses() != null) {
for (IoTDBSqlParser.LoadFileAttributeClauseContext attributeContext :
ctx.loadFileAttributeClauses().loadFileAttributeClause()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.common.constant.TsFileConstant;
Expand All @@ -36,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class LoadTsFileStatement extends Statement {

Expand All @@ -45,6 +47,8 @@ public class LoadTsFileStatement extends Statement {
private boolean deleteAfterLoad;
private boolean autoCreateDatabase;

private Map<String, String> loadAttributes;

private final List<File> tsFiles;
private final List<TsFileResource> resources;
private final List<Long> writePointCountList;
Expand All @@ -60,6 +64,10 @@ public LoadTsFileStatement(String filePath) throws FileNotFoundException {
this.writePointCountList = new ArrayList<>();
this.statementType = StatementType.MULTI_BATCH_INSERT;

processTsFile(filePath);
}

private void processTsFile(final String filePath) throws FileNotFoundException {
if (file.isFile()) {
tsFiles.add(file);
} else {
Expand Down Expand Up @@ -165,6 +173,16 @@ public long getWritePointCount(int resourceIndex) {
return writePointCountList.get(resourceIndex);
}

public void setLoadAttributes(final Map<String, String> loadAttributes) {
this.loadAttributes = loadAttributes;
initAttributes();
}

private void initAttributes() {
this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
}

@Override
public List<PartialPath> getPaths() {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.load.config;

import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sql.SemanticException;

import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class LoadTsFileConfigurator {

public static void validateParameters(final String key, final String value) {
switch (key) {
case DATABASE_LEVEL_KEY:
validateDatabaseLevelParam(value);
break;
case ON_SUCCESS_KEY:
validateOnSuccessParam(value);
break;
default:
throw new SemanticException("Invalid parameter '" + key + "' for LOAD TSFILE command.");
}
}

private static final String DATABASE_LEVEL_KEY = "database-level";
private static final int DATABASE_LEVEL_DEFAULT_VALUE =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
private static final int DATABASE_LEVEL_MIN_VALUE = 1;

public static void validateDatabaseLevelParam(final String databaseLevel) {
try {
int level = Integer.parseInt(databaseLevel);
if (level < DATABASE_LEVEL_MIN_VALUE) {
throw new SemanticException(
String.format(
"Given database level %d is less than the minimum value %d, please input a valid database level.",
level, DATABASE_LEVEL_MIN_VALUE));
}
} catch (Exception e) {
throw new SemanticException(
String.format(
"Given database level %s is not a valid integer, please input a valid database level.",
databaseLevel));
}
}

public static int parseOrGetDefaultDatabaseLevel(final Map<String, String> loadAttributes) {
return Integer.parseInt(
loadAttributes.getOrDefault(
DATABASE_LEVEL_KEY, String.valueOf(DATABASE_LEVEL_DEFAULT_VALUE)));
}

private static final String ON_SUCCESS_KEY = "on-success";
private static final String ON_SUCCESS_DELETE_VALUE = "delete";
private static final String ON_SUCCESS_NONE_VALUE = "none";
private static final Set<String> ON_SUCCESS_VALUE_SET =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(ON_SUCCESS_DELETE_VALUE, ON_SUCCESS_NONE_VALUE)));

public static void validateOnSuccessParam(final String onSuccess) {
if (!ON_SUCCESS_VALUE_SET.contains(onSuccess)) {
throw new SemanticException(
String.format(
"Given on-success value '%s' is not supported, please input a valid on-success value.",
onSuccess));
}
}

public static boolean parseOrGetDefaultOnSuccess(final Map<String, String> loadAttributes) {
final String value = loadAttributes.get(ON_SUCCESS_KEY);
return StringUtils.isEmpty(value) || ON_SUCCESS_DELETE_VALUE.equals(value);
}

private LoadTsFileConfigurator() {
throw new IllegalStateException("Utility class");
}
}

0 comments on commit 3f922e7

Please sign in to comment.