Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[To dev/1.3] Pipe / Load: Enable validation skip for load tsFile (#14774) #14776

Merged
merged 8 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,4 +448,76 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce
new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,", "3,1.0,", "4,1.0,"))));
}
}

@Test
public void testSyncLoadTsFileWithoutVerify() throws Exception {
testLoadTsFileWithoutVerify("sync");
}

@Test
public void testAsyncLoadTsFileWithoutVerify() throws Exception {
testLoadTsFileWithoutVerify("async");
}

@Test
private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

// Do not fail if the failure has nothing to do with pipe
// Because the failures will randomly generate due to resource limitation
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"))) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.realtime.mode", "forced-log");

connectorAttributes.put("sink", "iotdb-thrift-sink");
connectorAttributes.put("sink.batch.enable", "false");
connectorAttributes.put("sink.ip", receiverIp);
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
connectorAttributes.put("sink.tsfile.validation", "false");

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes))
.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

// Do not fail if the failure has nothing to do with pipe
// Because the failures will randomly generate due to resource limitation
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"create timeSeries root.vehicle.d0.s1 int32",
"insert into root.vehicle.d0(time, s1) values (2, 1)",
"flush"))) {
return;
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.**",
"Time,root.vehicle.d0.s1,",
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,"))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public class ImportTsFile extends AbstractTsFileTool {
private static final String THREAD_NUM_ARGS = "tn";
private static final String THREAD_NUM_NAME = "thread_num";

protected static final String VERIFY_ARGS = "v";
protected static final String VERIFY_NAME = "verify";

private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);

private static final String TS_FILE_CLI_PREFIX = "ImportTsFile";
Expand All @@ -80,7 +83,7 @@ public class ImportTsFile extends AbstractTsFileTool {
private static int threadNum = 8;

private static boolean isRemoteLoad = true;

protected static boolean verify = true;
private static SessionPool sessionPool;

private static void createOptions() {
Expand Down Expand Up @@ -299,6 +302,11 @@ private static void parseSpecialParams(CommandLine commandLine) {
if (commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS) != null) {
timestampPrecision = commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS);
}

verify =
null != commandLine.getOptionValue(VERIFY_ARGS)
? Boolean.parseBoolean(commandLine.getOptionValue(VERIFY_ARGS))
: verify;
}

public static boolean isFileStoreEquals(String pathString, File dir) {
Expand Down Expand Up @@ -387,12 +395,14 @@ private static void processSetParams() {
}

ImportTsFileLocally.setSessionPool(sessionPool);
ImportTsFileLocally.setVerify(verify);

// ImportTsFileRemotely
ImportTsFileRemotely.setHost(host);
ImportTsFileRemotely.setPort(port);
ImportTsFileRemotely.setUsername(username);
ImportTsFileRemotely.setPassword(password);
ImportTsFileRemotely.setValidateTsFile(verify);

// ImportTsFileBase
ImportTsFileBase.setSuccessAndFailDirAndOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ public class ImportTsFileLocally extends ImportTsFileBase implements Runnable {
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);

private static SessionPool sessionPool;
private static boolean verify;

@Override
public void loadTsFile() {
String filePath;
try {
while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) {
final String sql = "load '" + filePath + "' onSuccess=none ";
final String sql =
"load '" + filePath + "' onSuccess=none " + (verify ? "" : "verify=false");
try {
sessionPool.executeNonQueryStatement(sql);

Expand All @@ -50,4 +52,8 @@ public void loadTsFile() {
public static void setSessionPool(SessionPool sessionPool) {
ImportTsFileLocally.sessionPool = sessionPool;
}

public static void setVerify(boolean verify) {
ImportTsFileLocally.verify = verify;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {

private static String username = SessionConfig.DEFAULT_USER;
private static String password = SessionConfig.DEFAULT_PASSWORD;
private static boolean validateTsFile;

public ImportTsFileRemotely(String timePrecision) {
setTimePrecision(timePrecision);
Expand Down Expand Up @@ -189,6 +190,9 @@ private Map<String, String> constructParamsMap() {
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, LOAD_STRATEGY);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(validateTsFile));
return params;
}

Expand Down Expand Up @@ -346,4 +350,8 @@ public static void setUsername(final String username) {
public static void setPassword(final String password) {
ImportTsFileRemotely.password = password;
}

public static void setValidateTsFile(final boolean validateTsFile) {
ImportTsFileRemotely.validateTsFile = validateTsFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public IoTDBConfigNodeSyncClientManager(
String trustStorePwd,
String loadBalanceStrategy,
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy) {
String loadTsFileStrategy,
boolean validateTsFile) {
super(
endPoints,
username,
Expand All @@ -53,7 +54,8 @@ public IoTDBConfigNodeSyncClientManager(
false,
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
loadTsFileStrategy,
validateTsFile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(loadTsFileValidation));

return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ protected IoTDBSyncClientManager constructClient(
final boolean useLeaderCache,
final String loadBalanceStrategy,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy) {
final String loadTsFileStrategy,
final boolean validateTsFile) {
return new IoTDBConfigNodeSyncClientManager(
nodeUrls,
username,
Expand All @@ -77,7 +78,8 @@ protected IoTDBSyncClientManager constructClient(
trustStorePwd,
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
loadTsFileStrategy,
validateTsFile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1172,11 +1172,12 @@ public class IoTDBConfig {
+ IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+ File.separator
+ IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME;

private long loadActiveListeningCheckIntervalSeconds = 5L;

private int loadActiveListeningMaxThreadNum = Runtime.getRuntime().availableProcessors();

private boolean loadActiveListeningVerifyEnable = true;

/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during querying */
private String[] pipeReceiverFileDirs = new String[0];
Expand Down Expand Up @@ -4024,6 +4025,14 @@ public void setLoadActiveListeningMaxThreadNum(int loadActiveListeningMaxThreadN
this.loadActiveListeningMaxThreadNum = loadActiveListeningMaxThreadNum;
}

public boolean isLoadActiveListeningVerifyEnable() {
return loadActiveListeningVerifyEnable;
}

public void setLoadActiveListeningVerifyEnable(boolean loadActiveListeningVerifyEnable) {
this.loadActiveListeningVerifyEnable = loadActiveListeningVerifyEnable;
}

public long getLoadActiveListeningCheckIntervalSeconds() {
return loadActiveListeningCheckIntervalSeconds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,12 @@ private void loadLoadTsFileProps(TrimProperties properties) {
if (conf.getLoadActiveListeningMaxThreadNum() <= 0) {
conf.setLoadActiveListeningMaxThreadNum(Runtime.getRuntime().availableProcessors());
}

conf.setLoadActiveListeningVerifyEnable(
Boolean.parseBoolean(
properties.getProperty(
"load_active_listening_verify_enable",
Boolean.toString(conf.isLoadActiveListeningVerifyEnable()))));
}

private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@ public IoTDBDataNodeAsyncClientManager(
final String username,
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy) {
final String loadTsFileStrategy,
final boolean validateTsFile) {
super(
endPoints,
username,
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
useLeaderCache);
useLeaderCache,
validateTsFile);

endPointSet = new HashSet<>(endPoints);

Expand Down Expand Up @@ -248,6 +250,9 @@ public void onError(final Exception e) {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(validateTsFile));

client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params), callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public IoTDBDataNodeSyncClientManager(
final boolean useLeaderCache,
final String loadBalanceStrategy,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy) {
final String loadTsFileStrategy,
final boolean validateTsFile) {
super(
endPoints,
username,
Expand All @@ -63,7 +64,8 @@ public IoTDBDataNodeSyncClientManager(
useLeaderCache,
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
loadTsFileStrategy,
validateTsFile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(loadTsFileValidation));

return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public void customize(
username,
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
loadTsFileStrategy,
loadTsFileValidation);

if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ protected IoTDBSyncClientManager constructClient(
final boolean useLeaderCache,
final String loadBalanceStrategy,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy) {
final String loadTsFileStrategy,
final boolean validateTsFile) {
clientManager =
new IoTDBDataNodeSyncClientManager(
nodeUrls,
Expand All @@ -103,7 +104,8 @@ protected IoTDBSyncClientManager constructClient(
useLeaderCache,
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
loadTsFileStrategy,
validateTsFile);
return clientManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ private TSStatus loadTsFileSync(final String fileAbsolutePath) throws FileNotFou

statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(true);
statement.setVerifySchema(validateTsFile.get());
statement.setAutoCreateDatabase(false);

return executeStatementAndClassifyExceptions(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2069,10 +2069,7 @@ private void parseLoadFileAttributeClause(
} else if (ctx.SGLEVEL() != null) {
loadTsFileStatement.setDatabaseLevel(Integer.parseInt(ctx.INTEGER_LITERAL().getText()));
} else if (ctx.VERIFY() != null) {
if (!Boolean.parseBoolean(ctx.boolean_literal().getText())) {
throw new SemanticException("Load option VERIFY can only be set to true.");
}
loadTsFileStatement.setVerifySchema(true);
loadTsFileStatement.setVerifySchema(Boolean.parseBoolean(ctx.boolean_literal().getText()));
} else {
throw new SemanticException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class ActiveLoadTsFileLoader {
private final AtomicReference<WrappedThreadPoolExecutor> activeLoadExecutor =
new AtomicReference<>();
private final AtomicReference<String> failDir = new AtomicReference<>();
private final boolean isVerify = IOTDB_CONFIG.isLoadActiveListeningVerifyEnable();

public int getCurrentAllowedPendingSize() {
return MAX_PENDING_SIZE - pendingQueue.size();
Expand Down Expand Up @@ -198,7 +199,7 @@ private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws FileNot
final LoadTsFileStatement statement = new LoadTsFileStatement(filePair.getLeft());
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(true);
statement.setVerifySchema(isVerify);
statement.setAutoCreateDatabase(false);
return executeStatement(filePair.getRight() ? new PipeEnrichedStatement(statement) : statement);
}
Expand Down
Loading
Loading