Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load: Add param convert-on-type-mismatch for new SQL #13557

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ public void testLoadWithOnSuccess() throws Exception {

statement.execute(
String.format(
"load \"%s\" with ('database-level'='2', 'on-success'='none')",
"load \"%s\" with ('database-level'='2', 'on-success'='none', 'convert-on-type-mismatch'='true')",
file1.getAbsolutePath()));

try (final ResultSet resultSet =
Expand All @@ -564,7 +564,7 @@ public void testLoadWithOnSuccess() throws Exception {

statement.execute(
String.format(
"load \"%s\" with ('database-level'='2', 'on-success'='delete')",
"load \"%s\" with ('database-level'='2', 'on-success'='delete', 'convert-on-type-mismatch'='false')",
file2.getAbsolutePath()));

try (final ResultSet resultSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.io.FileUtils;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -122,6 +125,20 @@ file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {

if (loadTsFileStatement.isDeleteAfterLoad()) {
loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);

if (loadTsFileStatement.isSecondLoad()) {
loadTsFileStatement
.getTsFiles()
.forEach(
file -> {
FileUtils.deleteQuietly(
FSFactoryProducer.getFSFactory()
.getFile(file.getAbsoluteFile() + TsFileResource.RESOURCE_SUFFIX));
FileUtils.deleteQuietly(
FSFactoryProducer.getFSFactory()
.getFile(file.getAbsoluteFile() + ModificationFile.FILE_SUFFIX));
});
}
}

LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,28 @@ public Analysis analyzeFileByFile(final boolean isDeleteAfterLoad) {
} catch (AuthException e) {
return createFailAnalysisForAuthException(e);
} catch (Exception e) {
final String exceptionMessage =
String.format(
"Auto create or verify schema error when executing statement %s. Detail: %s.",
loadTsFileStatement,
e.getMessage() == null ? e.getClass().getName() : e.getMessage());
final String exceptionMessage;

if (loadTsFileStatement.isShouldConvertDataTypeOnTypeMismatch()) {
analysis.setFinishQueryAfterAnalyze(false);
analysis.setRealStatement(loadTsFileStatement);

exceptionMessage =
String.format(
"Auto create or verify schema error when executing statement %s. Will try to auto convert data type",
loadTsFileStatement);
} else {
exceptionMessage =
String.format(
"Auto create or verify schema error when executing statement %s. Detail: %s.",
loadTsFileStatement,
e.getMessage() == null ? e.getClass().getName() : e.getMessage());

analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage));
analysis.setFinishQueryAfterAnalyze(true);
}

LOGGER.warn(exceptionMessage, e);
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage));
return analysis;
}

Expand Down Expand Up @@ -661,7 +675,8 @@ private void verifySchema(ISchemaTree schemaTree)
}

// check datatype
if (!tsFileSchema.getType().equals(iotdbSchema.getType())) {
if (!loadTsFileStatement.isShouldConvertDataTypeOnTypeMismatch()
&& !tsFileSchema.getType().equals(iotdbSchema.getType())) {
throw new VerifyMetadataException(
String.format(
"Measurement %s%s%s datatype not match, TsFile: %s, IoTDB: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,7 @@ public PlanNode visitPipeEnrichedStatement(

@Override
public PlanNode visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
return new LoadTsFileNode(
context.getQueryId().genPlanNodeId(), loadTsFileStatement.getResources());
return new LoadTsFileNode(context.getQueryId().genPlanNodeId(), loadTsFileStatement);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;

Expand Down Expand Up @@ -60,15 +61,22 @@ public class LoadSingleTsFileNode extends WritePlanNode {
private final long writePointCount;
private boolean needDecodeTsFile;

private final LoadTsFileStatement loadTsFileStatement;

private TRegionReplicaSet localRegionReplicaSet;

public LoadSingleTsFileNode(
PlanNodeId id, TsFileResource resource, boolean deleteAfterLoad, long writePointCount) {
PlanNodeId id,
TsFileResource resource,
boolean deleteAfterLoad,
long writePointCount,
LoadTsFileStatement loadTsFileStatement) {
super(id);
this.tsFile = resource.getTsFile();
this.resource = resource;
this.deleteAfterLoad = deleteAfterLoad;
this.writePointCount = writePointCount;
this.loadTsFileStatement = loadTsFileStatement;
}

public boolean isTsFileEmpty() {
Expand Down Expand Up @@ -134,6 +142,10 @@ public long getWritePointCount() {
return writePointCount;
}

public LoadTsFileStatement getLoadTsFileStatement() {
return loadTsFileStatement;
}

/**
* only used for load locally.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@
public class LoadTsFileNode extends WritePlanNode {

private final List<TsFileResource> resources;
private final LoadTsFileStatement loadTsFileStatement;

public LoadTsFileNode(PlanNodeId id, List<TsFileResource> resources) {
public LoadTsFileNode(PlanNodeId id, LoadTsFileStatement loadTsFileStatement) {
super(id);
this.resources = resources;
this.resources = loadTsFileStatement.getResources();
this.loadTsFileStatement = loadTsFileStatement;
}

@Override
Expand Down Expand Up @@ -104,7 +106,8 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
getPlanNodeId(),
resources.get(i),
statement.isDeleteAfterLoad(),
statement.getWritePointCount(i)));
statement.getWritePointCount(i),
loadTsFileStatement));
}
return res;
}
Expand Down
Loading