diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java index 26c76bf00420..7fbb0f3a7179 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java @@ -448,4 +448,76 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,", "3,1.0,", "4,1.0,")))); } } + + @Test + public void testSyncLoadTsFileWithoutVerify() throws Exception { + testLoadTsFileWithoutVerify("sync"); + } + + @Test + public void testAsyncLoadTsFileWithoutVerify() throws Exception { + testLoadTsFileWithoutVerify("async"); + } + + @Test + private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + // Do not fail if the failure has nothing to do with pipe + // Because the failures will randomly generate due to resource limitation + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"))) { + return; + } + + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.realtime.mode", "forced-log"); + + connectorAttributes.put("sink", "iotdb-thrift-sink"); + connectorAttributes.put("sink.batch.enable", "false"); + connectorAttributes.put("sink.ip", receiverIp); + connectorAttributes.put("sink.port", Integer.toString(receiverPort)); + connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy); + connectorAttributes.put("sink.tsfile.validation", "false"); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)) + .getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + + // Do not fail if the failure has nothing to do with pipe + // Because the failures will randomly generate due to resource limitation + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "create timeSeries root.vehicle.d0.s1 int32", + "insert into root.vehicle.d0(time, s1) values (2, 1)", + "flush"))) { + return; + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.**", + "Time,root.vehicle.d0.s1,", + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,")))); + } + } } diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java index 50cd1a030625..cdfbd5b4437a 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java @@ -60,6 +60,9 @@ public class ImportTsFile extends AbstractTsFileTool { private static final String THREAD_NUM_ARGS = "tn"; private static final String THREAD_NUM_NAME = "thread_num"; + protected static final String VERIFY_ARGS = "v"; + protected static final String VERIFY_NAME = "verify"; + private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out); private static final String TS_FILE_CLI_PREFIX = "ImportTsFile"; @@ -80,7 +83,7 @@ public class ImportTsFile extends AbstractTsFileTool { private static int threadNum = 8; private static boolean isRemoteLoad = true; - + protected static boolean verify = true; private static SessionPool sessionPool; private static void createOptions() { @@ -299,6 +302,11 @@ private static void parseSpecialParams(CommandLine commandLine) { if (commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS) != null) { timestampPrecision = commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS); } + + verify = + null != commandLine.getOptionValue(VERIFY_ARGS) + ? Boolean.parseBoolean(commandLine.getOptionValue(VERIFY_ARGS)) + : verify; } public static boolean isFileStoreEquals(String pathString, File dir) { @@ -387,12 +395,14 @@ private static void processSetParams() { } ImportTsFileLocally.setSessionPool(sessionPool); + ImportTsFileLocally.setVerify(verify); // ImportTsFileRemotely ImportTsFileRemotely.setHost(host); ImportTsFileRemotely.setPort(port); ImportTsFileRemotely.setUsername(username); ImportTsFileRemotely.setPassword(password); + ImportTsFileRemotely.setValidateTsFile(verify); // ImportTsFileBase ImportTsFileBase.setSuccessAndFailDirAndOperation( diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java index 8f65d26fd327..f94050e98eb0 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java @@ -27,13 +27,15 @@ public class ImportTsFileLocally extends ImportTsFileBase implements Runnable { private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out); private static SessionPool sessionPool; + private static boolean verify; @Override public void loadTsFile() { String filePath; try { while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) { - final String sql = "load '" + filePath + "' onSuccess=none "; + final String sql = + "load '" + filePath + "' onSuccess=none " + (verify ? "" : "verify=false"); try { sessionPool.executeNonQueryStatement(sql); @@ -50,4 +52,8 @@ public void loadTsFile() { public static void setSessionPool(SessionPool sessionPool) { ImportTsFileLocally.sessionPool = sessionPool; } + + public static void setVerify(boolean verify) { + ImportTsFileLocally.verify = verify; + } } diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java index 5459b0448ef6..9f4e09fa3640 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java @@ -74,6 +74,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase { private static String username = SessionConfig.DEFAULT_USER; private static String password = SessionConfig.DEFAULT_PASSWORD; + private static boolean validateTsFile; public ImportTsFileRemotely(String timePrecision) { setTimePrecision(timePrecision); @@ -189,6 +190,9 @@ private Map constructParamsMap() { params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, LOAD_STRATEGY); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, + Boolean.toString(validateTsFile)); return params; } @@ -346,4 +350,8 @@ public static void setUsername(final String username) { public static void setPassword(final String password) { ImportTsFileRemotely.password = password; } + + public static void setValidateTsFile(final boolean validateTsFile) { + ImportTsFileRemotely.validateTsFile = validateTsFile; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java index 00a06c926c83..ab678f98864c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java @@ -42,7 +42,8 @@ public IoTDBConfigNodeSyncClientManager( String trustStorePwd, String loadBalanceStrategy, boolean shouldReceiverConvertOnTypeMismatch, - String loadTsFileStrategy) { + String loadTsFileStrategy, + boolean validateTsFile) { super( endPoints, username, @@ -53,7 +54,8 @@ public IoTDBConfigNodeSyncClientManager( false, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + validateTsFile); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java index 7dd90f18dee5..de8b8fe5d986 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java @@ -75,6 +75,9 @@ protected byte[] generateHandShakeV2Payload() throws IOException { PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, + Boolean.toString(loadTsFileValidation)); return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index 809daf044c00..c2a327fb853d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -67,7 +67,8 @@ protected IoTDBSyncClientManager constructClient( final boolean useLeaderCache, final String loadBalanceStrategy, final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy) { + final String loadTsFileStrategy, + final boolean validateTsFile) { return new IoTDBConfigNodeSyncClientManager( nodeUrls, username, @@ -77,7 +78,8 @@ protected IoTDBSyncClientManager constructClient( trustStorePwd, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + validateTsFile); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 5113e8c045c0..b2e997330725 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1172,11 +1172,12 @@ public class IoTDBConfig { + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME + File.separator + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME; - private long loadActiveListeningCheckIntervalSeconds = 5L; private int loadActiveListeningMaxThreadNum = Runtime.getRuntime().availableProcessors(); + private boolean loadActiveListeningVerifyEnable = true; + /** Pipe related */ /** initialized as empty, updated based on the latest `systemDir` during querying */ private String[] pipeReceiverFileDirs = new String[0]; @@ -4024,6 +4025,14 @@ public void setLoadActiveListeningMaxThreadNum(int loadActiveListeningMaxThreadN this.loadActiveListeningMaxThreadNum = loadActiveListeningMaxThreadNum; } + public boolean isLoadActiveListeningVerifyEnable() { + return loadActiveListeningVerifyEnable; + } + + public void setLoadActiveListeningVerifyEnable(boolean loadActiveListeningVerifyEnable) { + this.loadActiveListeningVerifyEnable = loadActiveListeningVerifyEnable; + } + public long getLoadActiveListeningCheckIntervalSeconds() { return loadActiveListeningCheckIntervalSeconds; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 3fcb72d36864..c725b336e901 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2422,6 +2422,12 @@ private void loadLoadTsFileProps(TrimProperties properties) { if (conf.getLoadActiveListeningMaxThreadNum() <= 0) { conf.setLoadActiveListeningMaxThreadNum(Runtime.getRuntime().availableProcessors()); } + + conf.setLoadActiveListeningVerifyEnable( + Boolean.parseBoolean( + properties.getProperty( + "load_active_listening_verify_enable", + Boolean.toString(conf.isLoadActiveListeningVerifyEnable())))); } private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 74e56f728dc1..11ef6aec7f4d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -84,14 +84,16 @@ public IoTDBDataNodeAsyncClientManager( final String username, final String password, final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy) { + final String loadTsFileStrategy, + final boolean validateTsFile) { super( endPoints, username, password, shouldReceiverConvertOnTypeMismatch, loadTsFileStrategy, - useLeaderCache); + useLeaderCache, + validateTsFile); endPointSet = new HashSet<>(endPoints); @@ -248,6 +250,9 @@ public void onError(final Exception e) { PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, + Boolean.toString(validateTsFile)); client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs()); client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params), callback); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java index ae3f07b068c1..62b8b5630756 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java @@ -52,7 +52,8 @@ public IoTDBDataNodeSyncClientManager( final boolean useLeaderCache, final String loadBalanceStrategy, final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy) { + final String loadTsFileStrategy, + final boolean validateTsFile) { super( endPoints, username, @@ -63,7 +64,8 @@ public IoTDBDataNodeSyncClientManager( useLeaderCache, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + validateTsFile); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java index 788244f738c8..537d06bf767f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java @@ -107,6 +107,9 @@ protected byte[] generateHandShakeV2Payload() throws IOException { PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, + Boolean.toString(loadTsFileValidation)); return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 7641eb30f8e3..56c3f08d647e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -133,7 +133,8 @@ public void customize( username, password, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + loadTsFileValidation); if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index 0ae8ae8743a4..5e8624397e99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -91,7 +91,8 @@ protected IoTDBSyncClientManager constructClient( final boolean useLeaderCache, final String loadBalanceStrategy, final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy) { + final String loadTsFileStrategy, + final boolean validateTsFile) { clientManager = new IoTDBDataNodeSyncClientManager( nodeUrls, @@ -103,7 +104,8 @@ protected IoTDBSyncClientManager constructClient( useLeaderCache, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + validateTsFile); return clientManager; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index afb15822f8f7..6b6ea69de342 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -485,7 +485,7 @@ private TSStatus loadTsFileSync(final String fileAbsolutePath) throws FileNotFou statement.setDeleteAfterLoad(true); statement.setConvertOnTypeMismatch(true); - statement.setVerifySchema(true); + statement.setVerifySchema(validateTsFile.get()); statement.setAutoCreateDatabase(false); return executeStatementAndClassifyExceptions(statement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index f3643e4436b6..c86af7cfb381 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -2069,10 +2069,7 @@ private void parseLoadFileAttributeClause( } else if (ctx.SGLEVEL() != null) { loadTsFileStatement.setDatabaseLevel(Integer.parseInt(ctx.INTEGER_LITERAL().getText())); } else if (ctx.VERIFY() != null) { - if (!Boolean.parseBoolean(ctx.boolean_literal().getText())) { - throw new SemanticException("Load option VERIFY can only be set to true."); - } - loadTsFileStatement.setVerifySchema(true); + loadTsFileStatement.setVerifySchema(Boolean.parseBoolean(ctx.boolean_literal().getText())); } else { throw new SemanticException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java index 8cb79658b668..65665f5e3d4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java @@ -73,6 +73,7 @@ public class ActiveLoadTsFileLoader { private final AtomicReference activeLoadExecutor = new AtomicReference<>(); private final AtomicReference failDir = new AtomicReference<>(); + private final boolean isVerify = IOTDB_CONFIG.isLoadActiveListeningVerifyEnable(); public int getCurrentAllowedPendingSize() { return MAX_PENDING_SIZE - pendingQueue.size(); @@ -198,7 +199,7 @@ private TSStatus loadTsFile(final Pair filePair) throws FileNot final LoadTsFileStatement statement = new LoadTsFileStatement(filePair.getLeft()); statement.setDeleteAfterLoad(true); statement.setConvertOnTypeMismatch(true); - statement.setVerifySchema(true); + statement.setVerifySchema(isVerify); statement.setAutoCreateDatabase(false); return executeStatement(filePair.getRight() ? new PipeEnrichedStatement(statement) : statement); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java index d5672028369d..00dca6b61b38 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java @@ -43,6 +43,9 @@ public static void validateParameters(final String key, final String value) { case CONVERT_ON_TYPE_MISMATCH_KEY: validateConvertOnTypeMismatchParam(value); break; + case VERIFY_KEY: + validateVerifyParam(value); + break; default: throw new SemanticException("Invalid parameter '" + key + "' for LOAD TSFILE command."); } @@ -117,6 +120,23 @@ public static boolean parseOrGetDefaultConvertOnTypeMismatch( CONVERT_ON_TYPE_MISMATCH_KEY, String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE))); } + public static final String VERIFY_KEY = "verify"; + private static final boolean VERIFY_DEFAULT_VALUE = true; + + public static void validateVerifyParam(final String verify) { + if (!"true".equalsIgnoreCase(verify) && !"false".equalsIgnoreCase(verify)) { + throw new SemanticException( + String.format( + "Given %s value '%s' is not supported, please input a valid boolean value.", + VERIFY_KEY, verify)); + } + } + + public static boolean parseOrGetDefaultVerify(final Map loadAttributes) { + return Boolean.parseBoolean( + loadAttributes.getOrDefault(VERIFY_KEY, String.valueOf(VERIFY_DEFAULT_VALUE))); + } + private LoadTsFileConfigurator() { throw new IllegalStateException("Utility class"); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index afee3e9a0a27..1f7b78703030 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -238,6 +238,11 @@ public class PipeConnectorConstant { CONNECTOR_LOAD_TSFILE_STRATEGY_ASYNC_VALUE, CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE))); + public static final String CONNECTOR_LOAD_TSFILE_VALIDATION_KEY = + "connector.load-tsfile-validation"; + public static final String SINK_LOAD_TSFILE_VALIDATION_KEY = "sink.load-tsfile-validation"; + public static final boolean CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE = true; + private PipeConnectorConstant() { throw new IllegalStateException("Utility class"); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java index ed3334b2459c..18ef38ad47a3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java @@ -40,6 +40,8 @@ public abstract class IoTDBClientManager { protected final String username; protected final String password; + protected final boolean validateTsFile; + protected final boolean shouldReceiverConvertOnTypeMismatch; protected final String loadTsFileStrategy; @@ -60,13 +62,15 @@ protected IoTDBClientManager( String password, boolean shouldReceiverConvertOnTypeMismatch, final String loadTsFileStrategy, - boolean useLeaderCache) { + boolean useLeaderCache, + final boolean validateTsFile) { this.endPointList = endPointList; this.username = username; this.password = password; this.shouldReceiverConvertOnTypeMismatch = shouldReceiverConvertOnTypeMismatch; this.loadTsFileStrategy = loadTsFileStrategy; this.useLeaderCache = useLeaderCache; + this.validateTsFile = validateTsFile; } public boolean supportModsIfIsDataNodeReceiver() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java index 0e4884d92464..13ee064fafbe 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java @@ -71,14 +71,16 @@ protected IoTDBSyncClientManager( boolean useLeaderCache, String loadBalanceStrategy, boolean shouldReceiverConvertOnTypeMismatch, - String loadTsFileStrategy) { + String loadTsFileStrategy, + boolean validateTsFile) { super( endPoints, username, password, shouldReceiverConvertOnTypeMismatch, loadTsFileStrategy, - useLeaderCache); + useLeaderCache, + validateTsFile); this.useSSL = useSSL; this.trustStorePath = trustStorePath; @@ -211,6 +213,9 @@ public void sendHandshakeReq(final Pair clientAndStatu PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username); params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, + Boolean.toString(validateTsFile)); // Try to handshake by PipeTransferHandshakeV2Req. TPipeTransferResp resp = client.pipeTransfer(buildHandshakeV2Req(params)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java index 46bd38ba45a2..4291427a9bcc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java @@ -27,6 +27,7 @@ public class PipeTransferHandshakeConstant { public static final String HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY = "loadTsFileStrategy"; public static final String HANDSHAKE_KEY_USERNAME = "username"; public static final String HANDSHAKE_KEY_PASSWORD = "password"; + public static final String HANDSHAKE_KEY_VALIDATE_TSFILE = "validateTsFile"; private PipeTransferHandshakeConstant() { // Utility class diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index b64b3df20667..22228f101401 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -91,6 +91,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SET; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY; @@ -113,6 +115,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY; public abstract class IoTDBConnector implements PipeConnector { @@ -132,6 +135,7 @@ public abstract class IoTDBConnector implements PipeConnector { protected String loadBalanceStrategy; protected String loadTsFileStrategy; + protected boolean loadTsFileValidation; private boolean isRpcCompressionEnabled; private final List compressors = new ArrayList<>(); @@ -232,6 +236,10 @@ public void validate(final PipeParameterValidator validator) throws Exception { "Load tsfile strategy should be one of %s, but got %s.", CONNECTOR_LOAD_TSFILE_STRATEGY_SET, loadTsFileStrategy), loadTsFileStrategy); + loadTsFileValidation = + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_LOAD_TSFILE_VALIDATION_KEY, SINK_LOAD_TSFILE_VALIDATION_KEY), + CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE); final int zstdCompressionLevel = parameters.getIntOrDefault( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index ffdc9f55b18e..e3a5dd218764 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -126,7 +126,8 @@ public void customize( useLeaderCache, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + loadTsFileValidation); } protected abstract IoTDBSyncClientManager constructClient( @@ -139,7 +140,8 @@ protected abstract IoTDBSyncClientManager constructClient( final boolean useLeaderCache, final String loadBalanceStrategy, final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy); + final String loadTsFileStrategy, + final boolean validateTsFile); @Override public void handshake() throws Exception { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index b3cbca89f12a..ed4b85cfec65 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -84,6 +84,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // Used to determine current strategy is sync or async protected final AtomicBoolean isUsingAsyncLoadTsFileStrategy = new AtomicBoolean(false); + protected final AtomicBoolean validateTsFile = new AtomicBoolean(true); @Override public IoTDBConnectorRequestVersion getVersion() { @@ -282,6 +283,12 @@ protected TPipeTransferResp handleTransferHandshakeV2(final PipeTransferHandshak loadTsFileStrategyString)); } + validateTsFile.set( + Boolean.parseBoolean( + req.getParams() + .getOrDefault( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, "true"))); + // Handle the handshake request as a v1 request. // Here we construct a fake "dataNode" request to valid from v1 validation logic, though // it may not require the actual type of the v1 request.