Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load: Support converting mini TsFile into Tablets #14784

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,8 @@ public class IoTDBConfig {

private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s

private int loadTabletConversionThreshold = -1;

private boolean loadActiveListeningEnable = true;

private String[] loadActiveListeningDirs =
Expand Down Expand Up @@ -4164,6 +4166,14 @@ public void setLoadWriteThroughputBytesPerSecond(double loadWriteThroughputBytes
this.loadWriteThroughputBytesPerSecond = loadWriteThroughputBytesPerSecond;
}

public int getLoadTabletConversionThreshold() {
return loadTabletConversionThreshold;
}

public void setLoadTabletConversionThreshold(int loadTabletConversionThreshold) {
this.loadTabletConversionThreshold = loadTabletConversionThreshold;
}

public int getLoadActiveListeningMaxThreadNum() {
return loadActiveListeningMaxThreadNum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2457,6 +2457,12 @@ private void loadLoadTsFileProps(TrimProperties properties) {
"load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));

conf.setLoadTabletConversionThreshold(
Integer.parseInt(
properties.getProperty(
"load_tablet_conversion_threshold",
String.valueOf(conf.getLoadTabletConversionThreshold()))));

conf.setLoadActiveListeningEnable(
Boolean.parseBoolean(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void load() {
LoadTsFileStatement statement = new LoadTsFileStatement(tsFile.getAbsolutePath());
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
statement.setTabletConversionThreshold(-1);
statement.setDatabaseLevel(parseSgLevel());
statement.setVerifySchema(true);
statement.setAutoCreateDatabase(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ private TSStatus loadTsFileSync(final String dataBaseName, final String fileAbso
final LoadTsFileStatement statement = new LoadTsFileStatement(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
statement.setTabletConversionThreshold(-1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 means Load doesn't convert into Tablet

statement.setVerifySchema(validateTsFile.get());
statement.setAutoCreateDatabase(false);
statement.setDatabase(dataBaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private final List<File> tsFiles;
private final List<Boolean> isTableModelTsFile;
private int isTableModelTsFileReliableIndex = -1;
private final List<File> tabletConvertionList = new java.util.ArrayList<>();

// User specified configs
private final int databaseLevel;
Expand All @@ -113,6 +114,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private final boolean isDeleteAfterLoad;
private final boolean isConvertOnTypeMismatch;
private final boolean isAutoCreateDatabase;
private final int tabletConversionThreshold;

// Schema creators for tree and table
private TreeSchemaAutoCreatorAndVerifier treeSchemaAutoCreatorAndVerifier;
Expand All @@ -136,6 +138,7 @@ public LoadTsFileAnalyzer(
this.isVerifySchema = loadTsFileStatement.isVerifySchema();
this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
this.isConvertOnTypeMismatch = loadTsFileStatement.isConvertOnTypeMismatch();
this.tabletConversionThreshold = loadTsFileStatement.getTabletConversionThreshold();
this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
}

Expand All @@ -157,6 +160,7 @@ public LoadTsFileAnalyzer(
this.isVerifySchema = loadTsFileTableStatement.isVerifySchema();
this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad();
this.isConvertOnTypeMismatch = loadTsFileTableStatement.isConvertOnTypeMismatch();
this.tabletConversionThreshold = loadTsFileTableStatement.getTabletConversionThreshold();
this.isAutoCreateDatabase = loadTsFileTableStatement.isAutoCreateDatabase();
}

Expand Down Expand Up @@ -257,6 +261,19 @@ private boolean doAnalyzeFileByFile(IAnalysis analysis) {
continue;
}

if (tsFile.length() < tabletConversionThreshold) {
tabletConvertionList.add(tsFile);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Load - Analysis Stage: {}/{} tsfiles have been analyzed, {} tsfiles have been added to the conversion list, progress: {}%",
i + 1,
tsfileNum,
tabletConvertionList.size(),
String.format("%.3f", (i + 1) * 100.00 / tsfileNum));
}
continue;
}

try {
analyzeSingleTsFile(tsFile, i);
if (LOGGER.isInfoEnabled()) {
Expand Down Expand Up @@ -290,13 +307,13 @@ private boolean doAnalyzeFileByFile(IAnalysis analysis) {
return false;
}
}
return true;

return handleTabletConversionList(analysis);
}

private void analyzeSingleTsFile(final File tsFile, int i)
throws IOException, AuthException, LoadAnalyzeException {
try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {

// check whether the tsfile is tree-model or not
final Map<String, TableSchema> tableSchemaMap = reader.getTableSchemaMap();
final boolean isTableModelFile = Objects.nonNull(tableSchemaMap) && !tableSchemaMap.isEmpty();
Expand Down Expand Up @@ -340,6 +357,34 @@ private void analyzeSingleTsFile(final File tsFile, int i)
}
}

private boolean handleTabletConversionList(IAnalysis analysis) {
if (tabletConvertionList.isEmpty()) {
return true;
}

// 1. all mini files are converted to tablets
setStatementTsFiles(tabletConvertionList);
executeTabletConversion(
analysis, new LoadAnalyzeException("Failed to convert mini file to tablet"));
if (analysis.isFailed()) {
return false;
}

// 2. remove the converted mini files from the tsFiles and load the rest
tsFiles.removeAll(tabletConvertionList);
setStatementTsFiles(tsFiles);
analysis.setFinishQueryAfterAnalyze(false);
return true;
}

private void setStatementTsFiles(List<File> files) {
if (isTableModelStatement) {
loadTsFileTableStatement.setTsFiles(files);
} else {
loadTsFileTreeStatement.setTsFiles(files);
}
}

private void doAnalyzeSingleTreeFile(
final File tsFile,
final TsFileSequenceReader reader,
Expand Down Expand Up @@ -540,6 +585,14 @@ private void setTsFileModelInfoToStatement() {
}

private void executeTabletConversion(final IAnalysis analysis, final LoadAnalyzeException e) {
if (shouldSkipConversion(e)) {
analysis.setFailStatus(
new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
analysis.setFinishQueryAfterAnalyze(true);
setRealStatement(analysis);
return;
}

if (isTableModelTsFileReliableIndex < tsFiles.size() - 1) {
try {
getFileModelInfoBeforeTabletConversion();
Expand All @@ -562,17 +615,15 @@ private void executeTabletConversion(final IAnalysis analysis, final LoadAnalyze
for (int i = 0; i < tsFiles.size(); i++) {
try {
final TSStatus status =
(!(e instanceof LoadAnalyzeTypeMismatchException) || isConvertOnTypeMismatch)
? (isTableModelTsFile.get(i)
? loadTsFileDataTypeConverter
.convertForTableModel(
new LoadTsFile(null, tsFiles.get(i).getPath(), Collections.emptyMap())
.setDatabase(databaseForTableData))
.orElse(null)
: loadTsFileDataTypeConverter
.convertForTreeModel(new LoadTsFileStatement(tsFiles.get(i).getPath()))
.orElse(null))
: null;
isTableModelTsFile.get(i)
? loadTsFileDataTypeConverter
.convertForTableModel(
new LoadTsFile(null, tsFiles.get(i).getPath(), Collections.emptyMap())
.setDatabase(databaseForTableData))
.orElse(null)
: loadTsFileDataTypeConverter
.convertForTreeModel(new LoadTsFileStatement(tsFiles.get(i).getPath()))
.orElse(null);

if (status == null) {
LOGGER.warn(
Expand Down Expand Up @@ -605,6 +656,10 @@ private void executeTabletConversion(final IAnalysis analysis, final LoadAnalyze
setRealStatement(analysis);
}

private boolean shouldSkipConversion(LoadAnalyzeException e) {
return (e instanceof LoadAnalyzeTypeMismatchException) && !isConvertOnTypeMismatch;
}

private void getFileModelInfoBeforeTabletConversion() throws IOException {
for (int i = isTableModelTsFileReliableIndex + 1; i < tsFiles.size(); i++) {
try (final TsFileSequenceReader reader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class LoadTsFile extends Statement {
private String database; // For loading to table-model only
private boolean deleteAfterLoad = false;
private boolean convertOnTypeMismatch = true;
private int tabletConversionThreshold = -1;
private boolean autoCreateDatabase = true;
private boolean verify;
private boolean isGeneratedByPipe = false;
Expand All @@ -62,6 +63,7 @@ public LoadTsFile(NodeLocation location, String filePath, Map<String, String> lo
this.databaseLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.deleteAfterLoad = false;
this.convertOnTypeMismatch = true;
this.tabletConversionThreshold = -1;
this.verify = true;
this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.resources = new ArrayList<>();
Expand Down Expand Up @@ -103,6 +105,10 @@ public boolean isConvertOnTypeMismatch() {
return convertOnTypeMismatch;
}

public int getTabletConversionThreshold() {
return tabletConversionThreshold;
}

public boolean isVerifySchema() {
return verify;
}
Expand Down Expand Up @@ -140,6 +146,11 @@ public List<File> getTsFiles() {
return tsFiles;
}

public void setTsFiles(List<File> tsFiles) {
this.tsFiles.clear();
this.tsFiles.addAll(tsFiles);
}

public void addTsFileResource(TsFileResource resource) {
resources.add(resource);
}
Expand All @@ -162,6 +173,8 @@ private void initAttributes() {
this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
this.convertOnTypeMismatch =
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
this.tabletConversionThreshold =
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThreshold(loadAttributes);
this.verify = LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE;
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY;
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE;
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY;

public class LoadTsFileStatement extends Statement {

Expand All @@ -58,6 +59,7 @@ public class LoadTsFileStatement extends Statement {
private boolean verifySchema = true;
private boolean deleteAfterLoad = false;
private boolean convertOnTypeMismatch = true;
private int tabletConversionThreshold = -1;
private boolean autoCreateDatabase = true;
private boolean isGeneratedByPipe = false;

Expand All @@ -74,6 +76,7 @@ public LoadTsFileStatement(String filePath) throws FileNotFoundException {
this.verifySchema = true;
this.deleteAfterLoad = false;
this.convertOnTypeMismatch = true;
this.tabletConversionThreshold = -1;
this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.resources = new ArrayList<>();
this.writePointCountList = new ArrayList<>();
Expand Down Expand Up @@ -106,6 +109,7 @@ protected LoadTsFileStatement() {
this.verifySchema = true;
this.deleteAfterLoad = false;
this.convertOnTypeMismatch = true;
this.tabletConversionThreshold = -1;
this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.tsFiles = new ArrayList<>();
this.resources = new ArrayList<>();
Expand Down Expand Up @@ -183,6 +187,14 @@ public boolean isConvertOnTypeMismatch() {
return convertOnTypeMismatch;
}

public void setTabletConversionThreshold(int tabletConversionThreshold) {
this.tabletConversionThreshold = tabletConversionThreshold;
}

public int getTabletConversionThreshold() {
return tabletConversionThreshold;
}

public void setAutoCreateDatabase(boolean autoCreateDatabase) {
this.autoCreateDatabase = autoCreateDatabase;
}
Expand Down Expand Up @@ -211,6 +223,11 @@ public List<File> getTsFiles() {
return tsFiles;
}

public void setTsFiles(List<File> tsFiles) {
this.tsFiles.clear();
this.tsFiles.addAll(tsFiles);
}

public void addTsFileResource(TsFileResource resource) {
resources.add(resource);
}
Expand Down Expand Up @@ -238,6 +255,8 @@ private void initAttributes() {
this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
this.convertOnTypeMismatch =
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
this.tabletConversionThreshold =
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThreshold(loadAttributes);
}

@Override
Expand All @@ -264,6 +283,7 @@ public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelat
loadAttributes.put(
ON_SUCCESS_KEY, deleteAfterLoad ? ON_SUCCESS_DELETE_VALUE : ON_SUCCESS_NONE_VALUE);
loadAttributes.put(CONVERT_ON_TYPE_MISMATCH_KEY, String.valueOf(convertOnTypeMismatch));
loadAttributes.put(TABLET_CONVERSION_THRESHOLD_KEY, String.valueOf(tabletConversionThreshold));

return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
}
Expand All @@ -286,6 +306,8 @@ public String toString() {
+ verifySchema
+ ", convert-on-type-mismatch="
+ convertOnTypeMismatch
+ ", tablet-conversion-threshold="
+ tabletConversionThreshold
+ ", tsFiles size="
+ tsFiles.size()
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws FileNot
final LoadTsFileStatement statement = new LoadTsFileStatement(filePair.getLeft());
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
statement.setTabletConversionThreshold(-1);
statement.setVerifySchema(isVerify);
statement.setAutoCreateDatabase(false);
return executeStatement(filePair.getRight() ? new PipeEnrichedStatement(statement) : statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static void validateParameters(final String key, final String value) {
validateOnSuccessParam(value);
break;
case DATABASE_NAME_KEY:
case TABLET_CONVERSION_THRESHOLD_KEY:
break;
case CONVERT_ON_TYPE_MISMATCH_KEY:
validateConvertOnTypeMismatchParam(value);
Expand Down Expand Up @@ -133,6 +134,17 @@ public static boolean parseOrGetDefaultConvertOnTypeMismatch(
CONVERT_ON_TYPE_MISMATCH_KEY, String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE)));
}

public static final String TABLET_CONVERSION_THRESHOLD_KEY = "tablet-conversion-threshold";

public static int parseOrGetDefaultTabletConversionThreshold(
final Map<String, String> loadAttributes) {
return Integer.parseInt(
loadAttributes.getOrDefault(
TABLET_CONVERSION_THRESHOLD_KEY,
String.valueOf(
IoTDBDescriptor.getInstance().getConfig().getLoadTabletConversionThreshold())));
}

public static final String VERIFY_KEY = "verify";
private static final boolean VERIFY_DEFAULT_VALUE = true;

Expand Down
Loading