diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java index c47d84e05741..fc07299d8d0b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java @@ -544,7 +544,7 @@ public void testLoadWithOnSuccess() throws Exception { statement.execute( String.format( - "load \"%s\" with ('database-level'='2', 'on-success'='none')", + "load \"%s\" with ('database-level'='2', 'on-success'='none', 'convert-on-type-mismatch'='true')", file1.getAbsolutePath())); try (final ResultSet resultSet = @@ -564,7 +564,7 @@ public void testLoadWithOnSuccess() throws Exception { statement.execute( String.format( - "load \"%s\" with ('database-level'='2', 'on-success'='delete')", + "load \"%s\" with ('database-level'='2', 'on-success'='delete', 'convert-on-type-mismatch'='false')", file2.getAbsolutePath())); try (final ResultSet resultSet = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java index 868785ec8fbc..dc0cc690c70a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java @@ -34,9 +34,12 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.io.FileUtils; +import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,7 +124,23 @@ file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { } if (loadTsFileStatement.isDeleteAfterLoad()) { - loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); + loadTsFileStatement + .getTsFiles() + .forEach( + file -> { + // delete resource if exist + FileUtils.deleteQuietly( + FSFactoryProducer.getFSFactory() + .getFile(file.getAbsoluteFile() + TsFileResource.RESOURCE_SUFFIX)); + + // delete mods if exist + FileUtils.deleteQuietly( + FSFactoryProducer.getFSFactory() + .getFile(file.getAbsoluteFile() + ModificationFile.FILE_SUFFIX)); + + // delete file if exist + FileUtils.deleteQuietly(file); + }); } LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index f0a91e5e766f..db0ade8a9d5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -3020,6 +3020,14 @@ public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryC e.getMessage() == null ? e.getClass().getName() : e.getMessage()); logger.warn(exceptionMessage, e); final Analysis analysis = new Analysis(); + + if (loadTsFileStatement.isShouldConvertDataTypeOnTypeMismatch()) { + loadTsFileStatement.setTypeMismatchDetected(true); + analysis.setRealStatement(loadTsFileStatement); + analysis.setFinishQueryAfterAnalyze(false); + return analysis; + } + analysis.setFinishQueryAfterAnalyze(true); analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage)); return analysis; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java index c3482a3c1fef..348bf236f676 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java @@ -209,14 +209,29 @@ public Analysis analyzeFileByFile(final boolean isDeleteAfterLoad) { } catch (AuthException e) { return createFailAnalysisForAuthException(e); } catch (Exception e) { - final String exceptionMessage = - String.format( - "Auto create or verify schema error when executing statement %s. Detail: %s.", - loadTsFileStatement, - e.getMessage() == null ? e.getClass().getName() : e.getMessage()); + final String exceptionMessage; + + if (loadTsFileStatement.isShouldConvertDataTypeOnTypeMismatch()) { + loadTsFileStatement.setTypeMismatchDetected(true); + analysis.setFinishQueryAfterAnalyze(false); + analysis.setRealStatement(loadTsFileStatement); + + exceptionMessage = + String.format( + "Auto create or verify schema error when executing statement %s. Will try to auto convert data type", + loadTsFileStatement); + } else { + exceptionMessage = + String.format( + "Auto create or verify schema error when executing statement %s. Detail: %s.", + loadTsFileStatement, + e.getMessage() == null ? e.getClass().getName() : e.getMessage()); + + analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage)); + analysis.setFinishQueryAfterAnalyze(true); + } + LOGGER.warn(exceptionMessage, e); - analysis.setFinishQueryAfterAnalyze(true); - analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage)); return analysis; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index 828f0bd96110..0bcf8a05f51a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -513,8 +513,7 @@ public PlanNode visitPipeEnrichedStatement( @Override public PlanNode visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { - return new LoadTsFileNode( - context.getQueryId().genPlanNodeId(), loadTsFileStatement.getResources()); + return new LoadTsFileNode(context.getQueryId().genPlanNodeId(), loadTsFileStatement); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index 3623e9760076..390025bd863a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -60,15 +61,22 @@ public class LoadSingleTsFileNode extends WritePlanNode { private final long writePointCount; private boolean needDecodeTsFile; + private final LoadTsFileStatement loadTsFileStatement; + private TRegionReplicaSet localRegionReplicaSet; public LoadSingleTsFileNode( - PlanNodeId id, TsFileResource resource, boolean deleteAfterLoad, long writePointCount) { + PlanNodeId id, + TsFileResource resource, + boolean deleteAfterLoad, + long writePointCount, + LoadTsFileStatement loadTsFileStatement) { super(id); this.tsFile = resource.getTsFile(); this.resource = resource; this.deleteAfterLoad = deleteAfterLoad; this.writePointCount = writePointCount; + this.loadTsFileStatement = loadTsFileStatement; } public boolean isTsFileEmpty() { @@ -134,6 +142,10 @@ public long getWritePointCount() { return writePointCount; } + public LoadTsFileStatement getLoadTsFileStatement() { + return loadTsFileStatement; + } + /** * only used for load locally. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java index be5de4cc819f..a69e57acbee1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java @@ -42,10 +42,12 @@ public class LoadTsFileNode extends WritePlanNode { private final List resources; + private final LoadTsFileStatement loadTsFileStatement; - public LoadTsFileNode(PlanNodeId id, List resources) { + public LoadTsFileNode(PlanNodeId id, LoadTsFileStatement loadTsFileStatement) { super(id); - this.resources = resources; + this.resources = loadTsFileStatement.getResources(); + this.loadTsFileStatement = loadTsFileStatement; } @Override @@ -104,7 +106,8 @@ public List splitByPartition(IAnalysis analysis) { getPlanNodeId(), resources.get(i), statement.isDeleteAfterLoad(), - statement.getWritePointCount(i))); + statement.getWritePointCount(i), + loadTsFileStatement)); } return res; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 09b67b9ad76b..a8b2f8160f0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -35,17 +35,24 @@ import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.exception.LoadReadOnlyException; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; +import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementDataTypeConvertExecutionVisitor; +import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; @@ -53,6 +60,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; import org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult; import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask; @@ -78,6 +87,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -118,6 +128,10 @@ public class LoadTsFileScheduler implements IScheduler { private static final Set LOADING_FILE_SET = new HashSet<>(); + private static final PipeStatementDataTypeConvertExecutionVisitor + STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR = + new PipeStatementDataTypeConvertExecutionVisitor(LoadTsFileScheduler::executeStatement); + private final MPPQueryContext queryContext; private final QueryStateMachine stateMachine; private final LoadTsFileDispatcherImpl dispatcher; @@ -161,86 +175,100 @@ public void start() { final LoadSingleTsFileNode node = tsFileNodeList.get(i); final String filePath = node.getTsFileResource().getTsFilePath(); - boolean isLoadSingleTsFileSuccess = true; - boolean shouldRemoveFileFromLoadingSet = false; - try { - synchronized (LOADING_FILE_SET) { - if (LOADING_FILE_SET.contains(filePath)) { - throw new LoadFileException( - String.format("TsFile %s is loading by another scheduler.", filePath)); - } - LOADING_FILE_SET.add(filePath); + if (node.getLoadTsFileStatement().isShouldConvertDataTypeOnTypeMismatch() + && node.getLoadTsFileStatement().isTypeMismatchDetected()) { + final long startTime = System.nanoTime(); + + try { + STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR.visitLoadFile( + node.getLoadTsFileStatement(), + new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())); + } finally { + LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( + LoadTsFileCostMetricsSet.CONVERT_ON_TYPE_MISMATCH, System.nanoTime() - startTime); } - shouldRemoveFileFromLoadingSet = true; - - if (node.isTsFileEmpty()) { - LOGGER.info("Load skip TsFile {}, because it has no data.", filePath); - } else if (!node.needDecodeTsFile( - slotList -> - partitionFetcher.queryDataPartition( - slotList, - queryContext.getSession().getUserName()))) { // do not decode, load locally - final long startTime = System.nanoTime(); - try { - isLoadSingleTsFileSuccess = loadLocally(node); - } finally { - LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( - LoadTsFileCostMetricsSet.LOAD_LOCALLY, System.nanoTime() - startTime); - } - - node.clean(); - } else { // need decode, load locally or remotely, use two phases method - String uuid = UUID.randomUUID().toString(); - dispatcher.setUuid(uuid); - allReplicaSets.clear(); - - long startTime = System.nanoTime(); - final boolean isFirstPhaseSuccess; - try { - isFirstPhaseSuccess = firstPhase(node); - } finally { - LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( - LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() - startTime); + } else { + boolean isLoadSingleTsFileSuccess = true; + boolean shouldRemoveFileFromLoadingSet = false; + try { + synchronized (LOADING_FILE_SET) { + if (LOADING_FILE_SET.contains(filePath)) { + throw new LoadFileException( + String.format("TsFile %s is loading by another scheduler.", filePath)); + } + LOADING_FILE_SET.add(filePath); } - - startTime = System.nanoTime(); - final boolean isSecondPhaseSuccess; - try { - isSecondPhaseSuccess = - secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource()); - } finally { - LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( - LoadTsFileCostMetricsSet.SECOND_PHASE, System.nanoTime() - startTime); + shouldRemoveFileFromLoadingSet = true; + + if (node.isTsFileEmpty()) { + LOGGER.info("Load skip TsFile {}, because it has no data.", filePath); + } else if (!node.needDecodeTsFile( + slotList -> + partitionFetcher.queryDataPartition( + slotList, + queryContext.getSession().getUserName()))) { // do not decode, load locally + final long startTime = System.nanoTime(); + try { + isLoadSingleTsFileSuccess = loadLocally(node); + } finally { + LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( + LoadTsFileCostMetricsSet.LOAD_LOCALLY, System.nanoTime() - startTime); + } + + node.clean(); + } else { // need decode, load locally or remotely, use two phases method + String uuid = UUID.randomUUID().toString(); + dispatcher.setUuid(uuid); + allReplicaSets.clear(); + + long startTime = System.nanoTime(); + final boolean isFirstPhaseSuccess; + try { + isFirstPhaseSuccess = firstPhase(node); + } finally { + LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( + LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() - startTime); + } + + startTime = System.nanoTime(); + final boolean isSecondPhaseSuccess; + try { + isSecondPhaseSuccess = + secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource()); + } finally { + LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( + LoadTsFileCostMetricsSet.SECOND_PHASE, System.nanoTime() - startTime); + } + + node.clean(); + if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) { + isLoadSingleTsFileSuccess = false; + } } - node.clean(); - if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) { - isLoadSingleTsFileSuccess = false; + if (isLoadSingleTsFileSuccess) { + LOGGER.info( + "Load TsFile {} Successfully, load process [{}/{}]", + filePath, + i + 1, + tsFileNodeListSize); + } else { + isLoadSuccess = false; + LOGGER.warn( + "Can not Load TsFile {}, load process [{}/{}]", + filePath, + i + 1, + tsFileNodeListSize); } - } - - if (isLoadSingleTsFileSuccess) { - LOGGER.info( - "Load TsFile {} Successfully, load process [{}/{}]", - filePath, - i + 1, - tsFileNodeListSize); - } else { + } catch (Exception e) { isLoadSuccess = false; - LOGGER.warn( - "Can not Load TsFile {}, load process [{}/{}]", - filePath, - i + 1, - tsFileNodeListSize); - } - } catch (Exception e) { - isLoadSuccess = false; - stateMachine.transitionToFailed(e); - LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath, e); - } finally { - if (shouldRemoveFileFromLoadingSet) { - synchronized (LOADING_FILE_SET) { - LOADING_FILE_SET.remove(filePath); + stateMachine.transitionToFailed(e); + LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath, e); + } finally { + if (shouldRemoveFileFromLoadingSet) { + synchronized (LOADING_FILE_SET) { + LOADING_FILE_SET.remove(filePath); + } } } } @@ -253,6 +281,19 @@ public void start() { } } + private static TSStatus executeStatement(final Statement statement) { + return Coordinator.getInstance() + .executeForTreeModel( + new PipeEnrichedStatement(statement), + SessionManager.getInstance().requestQueryId(), + new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), + "", + ClusterPartitionFetcher.getInstance(), + ClusterSchemaFetcher.getInstance(), + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) + .status; + } + private boolean firstPhase(LoadSingleTsFileNode node) { final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block); try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index 1fb3e5f1b92c..6136476a3b90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -48,6 +48,8 @@ public class LoadTsFileStatement extends Statement { private boolean autoCreateDatabase; private Map loadAttributes; + private boolean shouldConvertDataTypeOnTypeMismatch; + private boolean typeMismatchDetected; private final List tsFiles; private final List resources; @@ -173,6 +175,18 @@ public long getWritePointCount(int resourceIndex) { return writePointCountList.get(resourceIndex); } + public boolean isShouldConvertDataTypeOnTypeMismatch() { + return shouldConvertDataTypeOnTypeMismatch; + } + + public boolean isTypeMismatchDetected() { + return typeMismatchDetected; + } + + public void setTypeMismatchDetected(final boolean typeMismatchDetected) { + this.typeMismatchDetected = typeMismatchDetected; + } + public void setLoadAttributes(final Map loadAttributes) { this.loadAttributes = loadAttributes; initAttributes(); @@ -181,6 +195,9 @@ public void setLoadAttributes(final Map loadAttributes) { private void initAttributes() { this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes); this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes); + + this.shouldConvertDataTypeOnTypeMismatch = + LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes); } @Override @@ -210,6 +227,8 @@ public String toString() { + databaseLevel + ", verifySchema=" + verifySchema + + ", shouldConvertDataTypeOnTypeMismatch=" + + shouldConvertDataTypeOnTypeMismatch + ", tsFiles Size=" + tsFiles.size() + '}'; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java index d09540133ab4..1ca8a0f4c1e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java @@ -32,6 +32,7 @@ public class LoadTsFileConfigurator { + // param validate public static void validateParameters(final String key, final String value) { switch (key) { case DATABASE_LEVEL_KEY: @@ -40,11 +41,30 @@ public static void validateParameters(final String key, final String value) { case ON_SUCCESS_KEY: validateOnSuccessParam(value); break; + case CONVERT_ON_TYPE_MISMATCH_KEY: + validateBooleanParam(key, value); + break; default: throw new SemanticException("Invalid parameter '" + key + "' for LOAD TSFILE command."); } } + // all boolean param validate + private static final String BOOLEAN_TRUE_VALUE = "true"; + private static final String BOOLEAN_FALSE_VALUE = "false"; + private static final Set BOOLEAN_VALUE_SET = + Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(BOOLEAN_TRUE_VALUE, BOOLEAN_FALSE_VALUE))); + + public static void validateBooleanParam(final String key, final String value) { + if (!BOOLEAN_VALUE_SET.contains(value)) { + throw new SemanticException( + String.format( + "Given %s value '%s' is not supported, please input 'true' or 'false'.", key, value)); + } + } + + // param database-level private static final String DATABASE_LEVEL_KEY = "database-level"; private static final int DATABASE_LEVEL_DEFAULT_VALUE = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(); @@ -73,6 +93,7 @@ public static int parseOrGetDefaultDatabaseLevel(final Map loadA DATABASE_LEVEL_KEY, String.valueOf(DATABASE_LEVEL_DEFAULT_VALUE))); } + // param on-success private static final String ON_SUCCESS_KEY = "on-success"; private static final String ON_SUCCESS_DELETE_VALUE = "delete"; private static final String ON_SUCCESS_NONE_VALUE = "none"; @@ -94,6 +115,15 @@ public static boolean parseOrGetDefaultOnSuccess(final Map loadA return StringUtils.isEmpty(value) || ON_SUCCESS_DELETE_VALUE.equals(value); } + // param convert-on-type-mismatch (boolean) + private static final String CONVERT_ON_TYPE_MISMATCH_KEY = "convert-on-type-mismatch"; + + public static boolean parseOrGetDefaultConvertOnTypeMismatch( + final Map loadAttributes) { + final String value = loadAttributes.get(CONVERT_ON_TYPE_MISMATCH_KEY); + return StringUtils.isEmpty(value) || BOOLEAN_TRUE_VALUE.equals(value); + } + private LoadTsFileConfigurator() { throw new IllegalStateException("Utility class"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java index fd0f398644d1..d0cf2bd4a710 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java @@ -40,6 +40,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet { public static final String FIRST_PHASE = "first_phase"; public static final String SECOND_PHASE = "second_phase"; public static final String LOAD_LOCALLY = "load_locally"; + public static final String CONVERT_ON_TYPE_MISMATCH = "convert_on_type_mismatch"; private LoadTsFileCostMetricsSet() { // empty constructor @@ -49,6 +50,7 @@ private LoadTsFileCostMetricsSet() { private Timer firstPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer secondPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer convertOnTypeMismatchTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; @@ -66,6 +68,9 @@ public void recordPhaseTimeCost(String stage, long costTimeInNanos) { case LOAD_LOCALLY: loadLocallyTimer.updateNanos(costTimeInNanos); break; + case CONVERT_ON_TYPE_MISMATCH: + convertOnTypeMismatchTimer.updateNanos(costTimeInNanos); + break; default: throw new UnsupportedOperationException("Unsupported stage: " + stage); } @@ -98,6 +103,12 @@ public void bindTo(AbstractMetricService metricService) { MetricLevel.IMPORTANT, Tag.NAME.toString(), LOAD_LOCALLY); + convertOnTypeMismatchTimer = + metricService.getOrCreateTimer( + Metric.LOAD_TIME_COST.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + CONVERT_ON_TYPE_MISMATCH); diskIOCounter = metricService.getOrCreateCounter( @@ -109,7 +120,7 @@ public void bindTo(AbstractMetricService metricService) { @Override public void unbindFrom(AbstractMetricService metricService) { - Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY) + Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY, CONVERT_ON_TYPE_MISMATCH) .forEach( stage -> metricService.remove( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java index 849d45c61c8b..4fda10a9cd5e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java @@ -30,6 +30,7 @@ import org.junit.Test; import java.io.File; +import java.io.FileNotFoundException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -37,9 +38,10 @@ public class LoadTsFileNodeTest { @Test - public void testLoadSingleTsFileNode() { + public void testLoadSingleTsFileNode() throws FileNotFoundException { TsFileResource resource = new TsFileResource(new File("1")); - LoadSingleTsFileNode node = new LoadSingleTsFileNode(new PlanNodeId(""), resource, true, 0L); + LoadSingleTsFileNode node = + new LoadSingleTsFileNode(new PlanNodeId(""), resource, true, 0L, null); Assert.assertTrue(node.isDeleteAfterLoad()); Assert.assertEquals(resource, node.getTsFileResource()); Assert.assertNull(node.getLocalRegionReplicaSet());