diff --git a/java/common/src/main/java/org/apache/tsfile/utils/BitMap.java b/java/common/src/main/java/org/apache/tsfile/utils/BitMap.java index bc4542957..61fc4b56b 100644 --- a/java/common/src/main/java/org/apache/tsfile/utils/BitMap.java +++ b/java/common/src/main/java/org/apache/tsfile/utils/BitMap.java @@ -100,6 +100,24 @@ public boolean isAllUnmarked() { return true; } + // whether all bits in the range are unmarked + public boolean isAllUnmarked(int rangeSize) { + int j; + for (j = 0; j < rangeSize / Byte.SIZE; j++) { + if (bits[j] != (byte) 0) { + return false; + } + } + int remainingBits = rangeSize % Byte.SIZE; + if (remainingBits > 0) { + byte mask = (byte) (0xFF >> (Byte.SIZE - remainingBits)); + if ((bits[rangeSize / Byte.SIZE] & mask) != 0) { + return false; + } + } + return true; + } + /** whether all bits are one, i.e., all are Null */ public boolean isAllMarked() { int j; @@ -147,6 +165,41 @@ public boolean equals(Object obj) { return this.size == other.size && Arrays.equals(this.bits, other.bits); } + public boolean equalsInRange(Object obj, int rangeSize) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof BitMap)) { + return false; + } + BitMap other = (BitMap) obj; + if (rangeSize > size || rangeSize > other.size) { + throw new IllegalArgumentException( + "range size " + + rangeSize + + " should <= the minimal bitmap size " + + Math.min(this.size, other.size)); + } + + int byteSize = rangeSize / Byte.SIZE; + for (int i = 0; i < byteSize; i++) { + if (this.bits[i] != other.bits[i]) { + return false; + } + } + int remainingBits = rangeSize % Byte.SIZE; + if (remainingBits > 0) { + byte mask = (byte) (0xFF >> (Byte.SIZE - remainingBits)); + if ((this.bits[byteSize] & mask) != (other.bits[byteSize] & mask)) { + return false; + } + } + return true; + } + @Override public BitMap clone() { byte[] cloneBytes = new byte[this.bits.length]; @@ -191,4 +244,12 @@ public BitMap getRegion(int positionOffset, int length) { copyOfRange(this, positionOffset, newBitMap, 0, length); return newBitMap; } + + public int getTruncatedSize(int size) { + return size / Byte.SIZE + (size % Byte.SIZE == 0 ? 0 : 1); + } + + public byte[] getTruncatedByteArray(int size) { + return Arrays.copyOf(this.bits, getTruncatedSize(size)); + } } diff --git a/java/examples/src/main/java/org/apache/tsfile/TsFileForceAppendWrite.java b/java/examples/src/main/java/org/apache/tsfile/TsFileForceAppendWrite.java index 4bea2092e..e8650318e 100644 --- a/java/examples/src/main/java/org/apache/tsfile/TsFileForceAppendWrite.java +++ b/java/examples/src/main/java/org/apache/tsfile/TsFileForceAppendWrite.java @@ -65,7 +65,7 @@ public static void main(String[] args) throws IOException { // construct TSRecord for (int i = 0; i < 100; i++) { - TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + (i % 4)); + TSRecord tsRecord = new TSRecord(Constant.DEVICE_PREFIX + (i % 4), i); DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i); DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i); DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i); @@ -74,7 +74,7 @@ public static void main(String[] args) throws IOException { tsRecord.addTuple(dPoint3); // write TSRecord - tsFileWriter.write(tsRecord); + tsFileWriter.writeRecord(tsRecord); } } catch (Exception e) { LOGGER.error("meet error in TsFileWrite ", e); @@ -106,7 +106,7 @@ private static void write(ForceAppendTsFileWriter fwriter) { } // construct TSRecord for (int i = 100; i < 120; i++) { - TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + (i % 4)); + TSRecord tsRecord = new TSRecord(Constant.DEVICE_PREFIX + (i % 4), i); DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i); DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i); DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i); @@ -115,7 +115,7 @@ private static void write(ForceAppendTsFileWriter fwriter) { tsRecord.addTuple(dPoint3); // write TSRecord - tsFileWriter1.write(tsRecord); + tsFileWriter1.writeRecord(tsRecord); } } catch (Exception e) { LOGGER.error("meet error in TsFileWrite ", e); diff --git a/java/examples/src/main/java/org/apache/tsfile/TsFileWriteAlignedWithTablet.java b/java/examples/src/main/java/org/apache/tsfile/TsFileWriteAlignedWithTablet.java index 73704ca06..784df63ba 100644 --- a/java/examples/src/main/java/org/apache/tsfile/TsFileWriteAlignedWithTablet.java +++ b/java/examples/src/main/java/org/apache/tsfile/TsFileWriteAlignedWithTablet.java @@ -100,22 +100,22 @@ private static void writeAlignedWithTablet( long sensorNum = schemas.size(); for (long r = 0; r < rowNum; r++, startValue++) { - int row = tablet.rowSize++; - timestamps[row] = startTime++; + int row = tablet.getRowSize(); + tablet.addTimestamp(row, startTime++); for (int i = 0; i < sensorNum; i++) { tablet.addValue( - schemas.get(i).getMeasurementId(), + schemas.get(i).getMeasurementName(), row, DataGenerator.generate(schemas.get(i).getType(), (int) r)); } // write - if (tablet.rowSize == tablet.getMaxRowNumber()) { + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { tsFileWriter.writeAligned(tablet); tablet.reset(); } } // write - if (tablet.rowSize != 0) { + if (tablet.getRowSize() != 0) { tsFileWriter.writeAligned(tablet); tablet.reset(); } @@ -140,21 +140,21 @@ private static void writeNonAlignedWithTablet(TsFileWriter tsFileWriter) long timestamp = 1; long value = 1000000L; for (int r = 0; r < rowNum; r++, value++) { - int row = tablet.rowSize++; - timestamps[row] = timestamp++; + int row = tablet.getRowSize(); + tablet.addTimestamp(row, timestamp++); for (int i = 0; i < sensorNum; i++) { long[] sensor = (long[]) values[i]; sensor[row] = value; } // write - if (tablet.rowSize == tablet.getMaxRowNumber()) { - tsFileWriter.write(tablet); + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { + tsFileWriter.writeTree(tablet); tablet.reset(); } } // write - if (tablet.rowSize != 0) { - tsFileWriter.write(tablet); + if (tablet.getRowSize() != 0) { + tsFileWriter.writeTree(tablet); tablet.reset(); } } diff --git a/java/examples/src/main/java/org/apache/tsfile/v4/ITsFileReaderAndITsFileWriter.java b/java/examples/src/main/java/org/apache/tsfile/v4/ITsFileReaderAndITsFileWriter.java new file mode 100644 index 000000000..e57b95de2 --- /dev/null +++ b/java/examples/src/main/java/org/apache/tsfile/v4/ITsFileReaderAndITsFileWriter.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.v4; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.ColumnSchemaBuilder; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.fileSystem.FSFactoryProducer; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.query.dataset.ResultSetMetadata; +import org.apache.tsfile.read.v4.ITsFileReader; +import org.apache.tsfile.read.v4.TsFileReaderBuilder; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.StringJoiner; + +public class ITsFileReaderAndITsFileWriter { + + private static final Logger LOGGER = LoggerFactory.getLogger(ITsFileReaderAndITsFileWriter.class); + + public static void main(String[] args) throws IOException { + String path = "test.tsfile"; + File f = FSFactoryProducer.getFSFactory().getFile(path); + if (f.exists()) { + Files.delete(f.toPath()); + } + + String tableName = "table1"; + + TableSchema tableSchema = + new TableSchema( + tableName, + Arrays.asList( + new ColumnSchemaBuilder() + .name("id1") + .dataType(TSDataType.STRING) + .category(Tablet.ColumnCategory.ID) + .build(), + new ColumnSchemaBuilder() + .name("id2") + .dataType(TSDataType.STRING) + .category(Tablet.ColumnCategory.ID) + .build(), + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(Tablet.ColumnCategory.MEASUREMENT) + .build(), + new ColumnSchemaBuilder().name("s2").dataType(TSDataType.BOOLEAN).build())); + + Tablet tablet = + new Tablet( + Arrays.asList("id1", "id2", "s1", "s2"), + Arrays.asList( + TSDataType.STRING, TSDataType.STRING, TSDataType.INT32, TSDataType.BOOLEAN)); + for (int row = 0; row < 5; row++) { + long timestamp = row; + tablet.addTimestamp(row, timestamp); + tablet.addValue(row, "id1", "id1_filed_1"); + tablet.addValue(row, "id2", "id2_filed_1"); + tablet.addValue(row, "s1", row); + // null value + // tablet.addValue(row, "s2", true); + } + for (int row = 5; row < 10; row++) { + long timestamp = row; + tablet.addTimestamp(row, timestamp); + + // id1 column + tablet.addValue(row, 0, "id1_field_2"); + + // id2 column + tablet.addValue(row, 1, "id1_field_2"); + + // s1 column: null value + // tablet.addValue(row, 2, row); + + // s2 column + tablet.addValue(row, 3, false); + } + + long memoryThreshold = 10 * 1024 * 1024; + // tableSchema and file are required. memoryThreshold is an optional parameter, default value is + // 32 * 1024 * 1024 byte. + try (ITsFileWriter writer = + new TsFileWriterBuilder() + .file(f) + .tableSchema(tableSchema) + .memoryThreshold(memoryThreshold) + .build()) { + writer.write(tablet); + } catch (WriteProcessException e) { + LOGGER.error("meet error in TsFileWrite ", e); + } + + // file is a required parameter + try (ITsFileReader reader = new TsFileReaderBuilder().file(f).build(); + ResultSet resultSet = + reader.query(tableName, Arrays.asList("id1", "id2", "s1", "s2"), 2, 8)) { + // first column is Time + ResultSetMetadata metadata = resultSet.getMetadata(); + System.out.println(metadata); + StringJoiner sj = new StringJoiner(" "); + for (int column = 1; column <= 5; column++) { + sj.add(metadata.getColumnName(column) + "(" + metadata.getColumnType(column) + ") "); + } + System.out.println(sj.toString()); + while (resultSet.next()) { + // columnIndex starts from 1 + // Time id1 id2 s1 s2 + Long timeField = resultSet.getLong("Time"); + String id1Field = resultSet.isNull("id1") ? null : resultSet.getString("id1"); + String id2Field = resultSet.isNull("id2") ? null : resultSet.getString("id2"); + Integer s1Field = resultSet.isNull("s1") ? null : resultSet.getInt(4); + Boolean s2Field = resultSet.isNull("s2") ? null : resultSet.getBoolean(5); + sj = new StringJoiner(" "); + System.out.println( + sj.add(timeField + "") + .add(id1Field) + .add(id2Field) + .add(s1Field + "") + .add(s2Field + "") + .toString()); + } + } catch (Exception e) { + LOGGER.error("meet error in TsFileRead ", e); + } + } +} diff --git a/java/examples/src/main/java/org/apache/tsfile/v4/WriteTabletWithITsFileWriter.java b/java/examples/src/main/java/org/apache/tsfile/v4/WriteTabletWithITsFileWriter.java new file mode 100644 index 000000000..d7dca8310 --- /dev/null +++ b/java/examples/src/main/java/org/apache/tsfile/v4/WriteTabletWithITsFileWriter.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.v4; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.ColumnSchemaBuilder; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.fileSystem.FSFactoryProducer; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; + +public class WriteTabletWithITsFileWriter { + + private static final Logger LOGGER = LoggerFactory.getLogger(WriteTabletWithITsFileWriter.class); + + public static void main(String[] args) throws IOException { + String path = "test.tsfile"; + File f = FSFactoryProducer.getFSFactory().getFile(path); + if (f.exists()) { + Files.delete(f.toPath()); + } + + String tableName = "table1"; + + TableSchema tableSchema = + new TableSchema( + tableName, + Arrays.asList( + new ColumnSchemaBuilder() + .name("id1") + .dataType(TSDataType.STRING) + .category(Tablet.ColumnCategory.ID) + .build(), + new ColumnSchemaBuilder() + .name("id2") + .dataType(TSDataType.STRING) + .category(Tablet.ColumnCategory.ID) + .build(), + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(Tablet.ColumnCategory.MEASUREMENT) + .build(), + new ColumnSchemaBuilder().name("s2").dataType(TSDataType.BOOLEAN).build())); + + long memoryThreshold = 10 * 1024 * 1024; + // tableSchema and file are required. memoryThreshold is an optional parameter, default value is + // 32 * 1024 * 1024 byte. + try (ITsFileWriter writer = + new TsFileWriterBuilder() + .file(f) + .tableSchema(tableSchema) + .memoryThreshold(memoryThreshold) + .build()) { + Tablet tablet = + new Tablet( + Arrays.asList("id1", "id2", "s1", "s2"), + Arrays.asList( + TSDataType.STRING, TSDataType.STRING, TSDataType.INT32, TSDataType.BOOLEAN)); + for (int row = 0; row < 5; row++) { + long timestamp = row; + tablet.addTimestamp(row, timestamp); + tablet.addValue(row, "id1", "id1_filed_1"); + tablet.addValue(row, "id2", "id2_filed_1"); + tablet.addValue(row, "s1", row); + tablet.addValue(row, "s2", true); + } + writer.write(tablet); + + // reset tablet + tablet.reset(); + + for (long timestamp = 0; timestamp < 5; timestamp++) { + int rowIndex = tablet.getRowSize(); + // rowSize may be changed after addTimestamp + tablet.addTimestamp(rowIndex, timestamp); + + // id1 column + tablet.addValue(rowIndex, 0, "id1_field_2"); + + // id2 column + tablet.addValue(rowIndex, 1, "id1_field_2"); + + // s1 column + tablet.addValue(rowIndex, 2, 1); + + // s2 column + tablet.addValue(rowIndex, 3, false); + } + writer.write(tablet); + } catch (WriteProcessException e) { + LOGGER.error("meet error in TsFileWrite ", e); + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/exception/NullFieldException.java b/java/tsfile/src/main/java/org/apache/tsfile/exception/NullFieldException.java index 65e9497cb..fb146f0bd 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/exception/NullFieldException.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/exception/NullFieldException.java @@ -24,4 +24,8 @@ public class NullFieldException extends TsFileRuntimeException { public NullFieldException() { super("Field is null"); } + + public NullFieldException(String msg) { + super(msg); + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ColumnSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ColumnSchema.java new file mode 100644 index 000000000..db089bce6 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ColumnSchema.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.record.Tablet.ColumnCategory; + +public class ColumnSchema { + private String columnName; + private TSDataType dataType; + private ColumnCategory columnCategory; + + public ColumnSchema(String columnName, TSDataType dataType, ColumnCategory columnCategory) { + this.columnName = columnName; + this.dataType = dataType; + this.columnCategory = columnCategory; + } + + public String getColumnName() { + return columnName; + } + + public TSDataType getDataType() { + return dataType; + } + + public Tablet.ColumnCategory getColumnCategory() { + return columnCategory; + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ColumnSchemaBuilder.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ColumnSchemaBuilder.java new file mode 100644 index 000000000..8e6747649 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ColumnSchemaBuilder.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet.ColumnCategory; + +public class ColumnSchemaBuilder { + + private String columnName; + private TSDataType columnDataType; + private ColumnCategory columnCategory = ColumnCategory.MEASUREMENT; + + @TsFileApi + public ColumnSchema build() { + validateParameters(); + return new ColumnSchema(columnName, columnDataType, columnCategory); + } + + @TsFileApi + public ColumnSchemaBuilder name(String columnName) { + this.columnName = columnName == null ? null : columnName.trim(); + if (this.columnName == null || this.columnName.isEmpty()) { + throw new IllegalArgumentException("Column name must be a non empty string"); + } + return this; + } + + @TsFileApi + public ColumnSchemaBuilder dataType(TSDataType columnType) { + this.columnDataType = columnType; + return this; + } + + @TsFileApi + public ColumnSchemaBuilder category(ColumnCategory columnCategory) { + this.columnCategory = columnCategory; + return this; + } + + private void validateParameters() { + if (columnName == null) { + throw new IllegalStateException("Column name must be set before building"); + } + if (columnDataType == null) { + throw new IllegalStateException("Column data type must be set before building"); + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java index 1fea110b5..07925aeaa 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java @@ -68,9 +68,9 @@ public void finalizeColumnSchema() { List allColumns = new ArrayList<>(generateIdColumns()); List allColumnCategories = ColumnCategory.nCopy(ColumnCategory.ID, allColumns.size()); - allColumns.addAll(columnSchemas); + allColumns.addAll(measurementSchemas); allColumnCategories.addAll(columnCategories); - columnSchemas = allColumns; + measurementSchemas = allColumns; columnCategories = allColumnCategories; updatable = false; } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java index 5ba25685a..e012cdfb9 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java @@ -76,8 +76,8 @@ public IDeviceID create(String[] segments) { // or we can just use a tuple like Relational DB. private final String[] segments; - public StringArrayDeviceID(String... segments) { - this.segments = formalize(segments); + public StringArrayDeviceID(String... deviceIdSegments) { + this.segments = formalize(deviceIdSegments); } public StringArrayDeviceID(String deviceIdString) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java index b2b6c096f..21d444250 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java @@ -19,6 +19,7 @@ package org.apache.tsfile.file.metadata; +import org.apache.tsfile.common.TsFileApi; import org.apache.tsfile.compatibility.DeserializeConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; @@ -42,7 +43,7 @@ public class TableSchema { // the tableName is not serialized since the TableSchema is always stored in a Map, from whose // key the tableName can be known protected String tableName; - protected List columnSchemas; + protected List measurementSchemas; protected List columnCategories; protected boolean updatable = false; @@ -53,7 +54,7 @@ public class TableSchema { public TableSchema(String tableName) { this.tableName = tableName; - this.columnSchemas = new ArrayList<>(); + this.measurementSchemas = new ArrayList<>(); this.columnCategories = new ArrayList<>(); this.updatable = true; } @@ -63,10 +64,35 @@ public TableSchema( List columnSchemas, List columnCategories) { this.tableName = tableName; - this.columnSchemas = columnSchemas; + this.measurementSchemas = columnSchemas; this.columnCategories = columnCategories; } + public TableSchema( + String tableName, + List columnNameList, + List dataTypeList, + List categoryList) { + this.tableName = tableName; + this.measurementSchemas = new ArrayList<>(columnNameList.size()); + for (int i = 0; i < columnNameList.size(); i++) { + measurementSchemas.add(new MeasurementSchema(columnNameList.get(i), dataTypeList.get(i))); + } + this.columnCategories = categoryList; + } + + @TsFileApi + public TableSchema(String tableName, List columnSchemaList) { + this.tableName = tableName; + this.measurementSchemas = new ArrayList<>(columnSchemaList.size()); + this.columnCategories = new ArrayList<>(columnSchemaList.size()); + for (ColumnSchema columnSchema : columnSchemaList) { + this.measurementSchemas.add( + new MeasurementSchema(columnSchema.getColumnName(), columnSchema.getDataType())); + this.columnCategories.add(columnSchema.getColumnCategory()); + } + } + public Map getColumnPosIndex() { if (columnPosIndex == null) { columnPosIndex = new HashMap<>(); @@ -74,6 +100,21 @@ public Map getColumnPosIndex() { return columnPosIndex; } + // Only for deserialized TableSchema + public Map buildColumnPosIndex() { + if (columnPosIndex == null) { + columnPosIndex = new HashMap<>(); + } + if (columnPosIndex.size() >= measurementSchemas.size()) { + return columnPosIndex; + } + for (int i = 0; i < measurementSchemas.size(); i++) { + IMeasurementSchema currentColumnSchema = measurementSchemas.get(i); + columnPosIndex.putIfAbsent(currentColumnSchema.getMeasurementName(), i); + } + return columnPosIndex; + } + public Map getIdColumnOrder() { if (idColumnOrder == null) { idColumnOrder = new HashMap<>(); @@ -89,8 +130,8 @@ public int findColumnIndex(String columnName) { .computeIfAbsent( columnName, colName -> { - for (int i = 0; i < columnSchemas.size(); i++) { - if (columnSchemas.get(i).getMeasurementName().equals(columnName)) { + for (int i = 0; i < measurementSchemas.size(); i++) { + if (measurementSchemas.get(i).getMeasurementName().equals(columnName)) { return i; } } @@ -108,8 +149,8 @@ public int findIdColumnOrder(String columnName) { columnName, colName -> { int columnOrder = 0; - for (int i = 0; i < columnSchemas.size(); i++) { - if (columnSchemas.get(i).getMeasurementName().equals(columnName) + for (int i = 0; i < measurementSchemas.size(); i++) { + if (measurementSchemas.get(i).getMeasurementName().equals(columnName) && columnCategories.get(i) == ColumnCategory.ID) { return columnOrder; } else if (columnCategories.get(i) == ColumnCategory.ID) { @@ -122,7 +163,7 @@ public int findIdColumnOrder(String columnName) { public IMeasurementSchema findColumnSchema(String columnName) { final int columnIndex = findColumnIndex(columnName); - return columnIndex >= 0 ? columnSchemas.get(columnIndex) : null; + return columnIndex >= 0 ? measurementSchemas.get(columnIndex) : null; } public void update(ChunkGroupMetadata chunkGroupMetadata) { @@ -134,11 +175,11 @@ public void update(ChunkGroupMetadata chunkGroupMetadata) { int columnIndex = findColumnIndex(chunkMetadata.getMeasurementUid()); // if the measurement is not found in the column list, add it if (columnIndex == -1) { - columnSchemas.add(chunkMetadata.toMeasurementSchema()); + measurementSchemas.add(chunkMetadata.toMeasurementSchema()); columnCategories.add(ColumnCategory.MEASUREMENT); - getColumnPosIndex().put(chunkMetadata.getMeasurementUid(), columnSchemas.size() - 1); + getColumnPosIndex().put(chunkMetadata.getMeasurementUid(), measurementSchemas.size() - 1); } else { - final IMeasurementSchema originSchema = columnSchemas.get(columnIndex); + final IMeasurementSchema originSchema = measurementSchemas.get(columnIndex); if (originSchema.getType() != chunkMetadata.getDataType()) { originSchema.setDataType(TSDataType.STRING); } @@ -147,7 +188,7 @@ public void update(ChunkGroupMetadata chunkGroupMetadata) { } public List getColumnSchemas() { - return columnSchemas; + return measurementSchemas; } public List getColumnTypes() { @@ -156,10 +197,10 @@ public List getColumnTypes() { public int serialize(OutputStream out) throws IOException { int cnt = 0; - if (columnSchemas != null) { - cnt += ReadWriteForEncodingUtils.writeUnsignedVarInt(columnSchemas.size(), out); - for (int i = 0; i < columnSchemas.size(); i++) { - IMeasurementSchema columnSchema = columnSchemas.get(i); + if (measurementSchemas != null) { + cnt += ReadWriteForEncodingUtils.writeUnsignedVarInt(measurementSchemas.size(), out); + for (int i = 0; i < measurementSchemas.size(); i++) { + IMeasurementSchema columnSchema = measurementSchemas.get(i); ColumnCategory columnCategory = columnCategories.get(i); cnt += columnSchema.serializeTo(out); cnt += ReadWriteIOUtils.write(columnCategory.ordinal(), out); @@ -207,7 +248,7 @@ public String toString() { + tableName + '\'' + ", columnSchemas=" - + columnSchemas + + measurementSchemas + ", columnTypes=" + columnCategories + '}'; @@ -223,12 +264,12 @@ public boolean equals(Object o) { } TableSchema that = (TableSchema) o; return Objects.equals(tableName, that.tableName) - && Objects.equals(columnSchemas, that.columnSchemas) + && Objects.equals(measurementSchemas, that.measurementSchemas) && Objects.equals(columnCategories, that.columnCategories); } @Override public int hashCode() { - return Objects.hash(tableName, columnSchemas, columnCategories); + return Objects.hash(tableName, measurementSchemas, columnCategories); } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileReader.java index 9f6c23c2d..bf0a3440c 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileReader.java @@ -20,39 +20,23 @@ package org.apache.tsfile.read; import org.apache.tsfile.common.TsFileApi; -import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.MetadataIndexNode; -import org.apache.tsfile.file.metadata.TableSchema; -import org.apache.tsfile.file.metadata.TimeseriesMetadata; -import org.apache.tsfile.file.metadata.TsFileMetadata; -import org.apache.tsfile.read.common.Path; import org.apache.tsfile.read.controller.CachedChunkLoaderImpl; import org.apache.tsfile.read.controller.IChunkLoader; import org.apache.tsfile.read.controller.IMetadataQuerier; import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl; import org.apache.tsfile.read.expression.QueryExpression; -import org.apache.tsfile.read.expression.impl.GlobalTimeExpression; -import org.apache.tsfile.read.filter.operator.TimeFilterOperators; import org.apache.tsfile.read.query.dataset.QueryDataSet; -import org.apache.tsfile.read.query.dataset.ResultSet; import org.apache.tsfile.read.query.executor.TsFileExecutor; -import org.apache.tsfile.write.schema.IMeasurementSchema; -import org.apache.tsfile.write.schema.MeasurementSchema; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; public class TsFileReader implements AutoCloseable { private TsFileSequenceReader fileReader; private IMetadataQuerier metadataQuerier; private IChunkLoader chunkLoader; - private TsFileExecutor tsFileExecutor; + private TsFileExecutor tsfileExecutor; @TsFileApi public TsFileReader(File file) throws IOException { @@ -64,78 +48,17 @@ public TsFileReader(TsFileSequenceReader fileReader) throws IOException { this.fileReader = fileReader; this.metadataQuerier = new MetadataQuerierByFileImpl(fileReader); this.chunkLoader = new CachedChunkLoaderImpl(fileReader); - tsFileExecutor = new TsFileExecutor(metadataQuerier, chunkLoader); + this.tsfileExecutor = new TsFileExecutor(metadataQuerier, chunkLoader); } - @TsFileApi - public List getAllDevices() throws IOException { - return fileReader.getAllDevices().stream() - .map(IDeviceID::toString) - .collect(Collectors.toList()); - } - - @TsFileApi - public List getTimeseriesSchema(String deviceId) throws IOException { - IDeviceID iDeviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId); - List deviceTimeseriesMetadata = - fileReader.getDeviceTimeseriesMetadataWithoutChunkMetadata(iDeviceID); - List measurementSchemaList = new ArrayList<>(); - for (TimeseriesMetadata timeseriesMetadata : deviceTimeseriesMetadata) { - measurementSchemaList.add( - new MeasurementSchema( - timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getTsDataType())); - } - return measurementSchemaList; - } - - @TsFileApi - public List getAllTables() throws IOException { - Map tableSchemaMap = fileReader.readFileMetadata().getTableSchemaMap(); - return new ArrayList<>(tableSchemaMap.keySet()); - } - - @TsFileApi - public List getAllTableDevices(String tableName) throws IOException { - MetadataIndexNode tableMetadataIndexNode = - fileReader.readFileMetadata().getTableMetadataIndexNode(tableName); - if (tableMetadataIndexNode == null) { - return Collections.emptyList(); - } - return fileReader.getAllDevices(tableMetadataIndexNode); - } - - @TsFileApi - public List getTableSchema(List tableNames) throws IOException { - TsFileMetadata tsFileMetadata = fileReader.readFileMetadata(); - Map tableSchemaMap = tsFileMetadata.getTableSchemaMap(); - List result = new ArrayList<>(tableNames.size()); - for (String tableName : tableNames) { - result.add(tableSchemaMap.get(tableName)); - } - return result; - } - - @Deprecated public QueryDataSet query(QueryExpression queryExpression) throws IOException { - return tsFileExecutor.execute(queryExpression); - } - - @TsFileApi - public ResultSet query(List pathList, long startTime, long endTime) throws IOException { - QueryExpression queryExpression = QueryExpression.create(); - for (String path : pathList) { - queryExpression.addSelectedPath(new Path(path, true)); - } - queryExpression.setExpression( - new GlobalTimeExpression(new TimeFilterOperators.TimeBetweenAnd(startTime, endTime))); - return new ResultSet(tsFileExecutor.execute(queryExpression)); + return tsfileExecutor.execute(queryExpression); } - @Deprecated public QueryDataSet query( QueryExpression queryExpression, long partitionStartOffset, long partitionEndOffset) throws IOException { - return tsFileExecutor.execute(queryExpression, partitionStartOffset, partitionEndOffset); + return tsfileExecutor.execute(queryExpression, partitionStartOffset, partitionEndOffset); } @Override diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index d7a3e1600..2f61eb093 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -881,8 +881,13 @@ public List readITimeseriesMetadata( mergeAlignedSeries); } if (valueTimeseriesMetadataList != null && !valueTimeseriesMetadataList.isEmpty()) { - resultTimeseriesMetadataList.add( - new AlignedTimeSeriesMetadata(timeColumnMetadata, valueTimeseriesMetadataList)); + if (this.tsFileMetaData.getTableSchemaMap().containsKey(device.getTableName())) { + resultTimeseriesMetadataList.add( + new TableDeviceMetadata(timeColumnMetadata, valueTimeseriesMetadataList)); + } else { + resultTimeseriesMetadataList.add( + new AlignedTimeSeriesMetadata(timeColumnMetadata, valueTimeseriesMetadataList)); + } } return resultTimeseriesMetadataList; } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/TimeSeries.java b/java/tsfile/src/main/java/org/apache/tsfile/read/common/TimeSeries.java new file mode 100644 index 000000000..540a6d106 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/TimeSeries.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.common; + +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.util.ArrayList; +import java.util.List; + +public class TimeSeries { + private IDeviceID deviceId; + private String measurementName; + + public TimeSeries(IDeviceID deviceId, String measurementName) { + this.deviceId = deviceId; + this.measurementName = measurementName; + } + + public IDeviceID getDeviceId() { + return deviceId; + } + + public void setDeviceId(IDeviceID deviceId) { + this.deviceId = deviceId; + } + + public String getMeasurementName() { + return measurementName; + } + + public void setMeasurementName(String measurementName) { + this.measurementName = measurementName; + } + + public static List getPathList(IDeviceID deviceId, String... measurements) { + List pathList = new ArrayList<>(measurements.length); + for (String measurement : measurements) { + pathList.add(new TimeSeries(deviceId, measurement)); + } + return pathList; + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java index 00e3ae1c9..47888d017 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java @@ -21,7 +21,7 @@ import org.apache.tsfile.common.cache.LRUCache; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.file.metadata.AlignedTimeSeriesMetadata; +import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; @@ -158,9 +158,9 @@ public void loadChunkMetaDatas(List paths) throws IOException { List chunkMetadataList = tsFileReader.readIChunkMetaDataList(timeseriesMetadata); String measurementId; - if (timeseriesMetadata instanceof AlignedTimeSeriesMetadata) { + if (timeseriesMetadata instanceof AbstractAlignedTimeSeriesMetadata) { measurementId = - ((AlignedTimeSeriesMetadata) timeseriesMetadata) + ((AbstractAlignedTimeSeriesMetadata) timeseriesMetadata) .getValueTimeseriesMetadataList() .get(0) .getMeasurementId(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java b/java/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java index fa0b11389..3a829e496 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java @@ -19,10 +19,58 @@ package org.apache.tsfile.read.expression; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.read.filter.factory.TimeFilterApi; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; public interface ExpressionTree { boolean satisfy(Object value); Filter toFilter(); + + class TimeBetweenAnd implements ExpressionTree { + private long startTime; + private long endTime; + + public TimeBetweenAnd(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public boolean satisfy(Object value) { + long v = (Long) value; + return v >= startTime && v <= endTime; + } + + @Override + public Filter toFilter() { + return TimeFilterApi.between(startTime, endTime); + } + } + + class IdColumnMatch implements ExpressionTree { + private Set satisfiedDeviceIds; + + public IdColumnMatch(List satisfiedDeviceIdList) { + this.satisfiedDeviceIds = + satisfiedDeviceIdList == null ? null : new HashSet<>(satisfiedDeviceIdList); + } + + @Override + public boolean satisfy(Object value) { + return satisfiedDeviceIds == null + || satisfiedDeviceIds.isEmpty() + || satisfiedDeviceIds.contains(value); + } + + @Override + public Filter toFilter() { + return null; + } + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/AbstractResultSet.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/AbstractResultSet.java new file mode 100644 index 000000000..8a28866de --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/AbstractResultSet.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.query.dataset; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.NullFieldException; +import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.read.common.RowRecord; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class AbstractResultSet implements ResultSet { + + protected ResultSetMetadata resultSetMetadata; + protected Map columnNameToColumnIndexMap; + protected RowRecord currentRow; + + protected AbstractResultSet(List columnNameList, List tsDataTypeList) { + // Add Time at first column + this.resultSetMetadata = new ResultSetMetadataImpl(columnNameList, tsDataTypeList); + int columnNum = tsDataTypeList.size() + 1; + this.columnNameToColumnIndexMap = new HashMap<>(tsDataTypeList.size()); + for (int columnIndex = 1; columnIndex <= columnNum; columnIndex++) { + this.columnNameToColumnIndexMap.put( + resultSetMetadata.getColumnName(columnIndex), columnIndex); + } + } + + @TsFileApi + public ResultSetMetadata getMetadata() { + return this.resultSetMetadata; + } + + @TsFileApi + public abstract boolean next() throws IOException; + + @TsFileApi + public int getInt(String columnName) { + Integer columnIndex = columnNameToColumnIndexMap.get(columnName); + return getInt(columnIndex); + } + + @TsFileApi + public int getInt(int columnIndex) { + return getNonNullField(columnIndex).getIntV(); + } + + @TsFileApi + public long getLong(String columnName) { + Integer columnIndex = columnNameToColumnIndexMap.get(columnName); + return getLong(columnIndex); + } + + @TsFileApi + public long getLong(int columnIndex) { + return getNonNullField(columnIndex).getLongV(); + } + + @TsFileApi + public float getFloat(String columnName) { + Integer columnIndex = columnNameToColumnIndexMap.get(columnName); + return getFloat(columnIndex); + } + + @TsFileApi + public float getFloat(int columnIndex) { + return getNonNullField(columnIndex).getFloatV(); + } + + @TsFileApi + public double getDouble(String columnName) { + Integer columnIndex = columnNameToColumnIndexMap.get(columnName); + return getDouble(columnIndex); + } + + @TsFileApi + public double getDouble(int columnIndex) { + return getNonNullField(columnIndex).getDoubleV(); + } + + @TsFileApi + public boolean getBoolean(String columnName) { + Integer columnIndex = columnNameToColumnIndexMap.get(columnName); + return getBoolean(columnIndex); + } + + @TsFileApi + public boolean getBoolean(int columnIndex) { + return getNonNullField(columnIndex).getBoolV(); + } + + @TsFileApi + public String getString(String columnName) { + Integer columnIndex = columnNameToColumnIndexMap.get(columnName); + return getString(columnIndex); + } + + @TsFileApi + public String getString(int columnIndex) { + return getNonNullField(columnIndex).getStringValue(); + } + + @TsFileApi + public LocalDate getDate(String columnName) { + Integer columnIndex = columnNameToColumnIndexMap.get(columnName); + return getDate(columnIndex); + } + + @TsFileApi + public LocalDate getDate(int columnIndex) { + return getNonNullField(columnIndex).getDateV(); + } + + @TsFileApi + public byte[] getBinary(String columnName) { + Integer columnIndex = columnNameToColumnIndexMap.get(columnName); + return getBinary(columnIndex); + } + + @TsFileApi + public byte[] getBinary(int columnIndex) { + return getNonNullField(columnIndex).getBinaryV().getValues(); + } + + @TsFileApi + public boolean isNull(String columnName) { + Integer columnIndex = columnNameToColumnIndexMap.get(columnName); + if (columnIndex == null) { + throw new IllegalArgumentException( + "Can't find columnName " + columnName + " from result set"); + } + return isNull(columnIndex); + } + + @TsFileApi + public boolean isNull(int columnIndex) { + return getField(columnIndex) == null; + } + + protected Field getNonNullField(int columnIndex) { + Field field = getField(columnIndex); + if (field == null) { + throw new NullFieldException("Field in columnIndex " + columnIndex + " is null"); + } + return field; + } + + protected Field getField(int columnIndex) { + if (columnIndex > this.columnNameToColumnIndexMap.size()) { + throw new IndexOutOfBoundsException("column index " + columnIndex + " out of bound"); + } + Field field; + if (columnIndex == 1) { + field = new Field(TSDataType.INT64); + field.setLongV(currentRow.getTimestamp()); + } else { + field = currentRow.getField(columnIndex - 2); + } + return field; + } + + @TsFileApi + public abstract void close(); +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSet.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSet.java index e72681055..02f23071a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSet.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSet.java @@ -20,197 +20,72 @@ package org.apache.tsfile.read.query.dataset; import org.apache.tsfile.common.TsFileApi; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.read.common.Field; -import org.apache.tsfile.read.common.Path; -import org.apache.tsfile.read.common.RowRecord; -import org.apache.tsfile.utils.Binary; import java.io.IOException; import java.time.LocalDate; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -public class ResultSet { - private QueryDataSet queryDataSet; - private ResultSetMetadata resultSetMetadata; - private RowRecord currentRow; - private Map columnNameToColumnIndexMap; - - public ResultSet(QueryDataSet queryDataSet) { - this.queryDataSet = queryDataSet; - // add Time column at first position - this.resultSetMetadata = - new ResultSetMetadata(queryDataSet.getPaths(), queryDataSet.getDataTypes()); - this.columnNameToColumnIndexMap = new HashMap<>(resultSetMetadata.getColumnNum()); - for (int columnIndex = 1; columnIndex <= resultSetMetadata.getColumnNum(); columnIndex++) { - this.columnNameToColumnIndexMap.put( - resultSetMetadata.getColumnName(columnIndex), columnIndex); - } - } +public interface ResultSet extends AutoCloseable { @TsFileApi - public ResultSetMetadata getMetadata() { - return this.resultSetMetadata; - } + ResultSetMetadata getMetadata(); @TsFileApi - public boolean next() throws IOException { - while (queryDataSet.hasNext()) { - currentRow = queryDataSet.next(); - if (currentRow.isAllNull()) { - continue; - } - return true; - } - return false; - } + boolean next() throws IOException; @TsFileApi - public int getInt(String columnName) { - Integer columnIndex = columnNameToColumnIndexMap.get(columnName); - return getInt(columnIndex); - } + int getInt(String columnName); @TsFileApi - public int getInt(int columnIndex) { - return getField(columnIndex).getIntV(); - } + int getInt(int columnIndex); @TsFileApi - public long getLong(String columnName) { - Integer columnIndex = columnNameToColumnIndexMap.get(columnName); - return getLong(columnIndex); - } + long getLong(String columnName); @TsFileApi - public long getLong(int columnIndex) { - return getField(columnIndex).getLongV(); - } + long getLong(int columnIndex); @TsFileApi - public float getFloat(String columnName) { - Integer columnIndex = columnNameToColumnIndexMap.get(columnName); - return getFloat(columnIndex); - } + float getFloat(String columnName); @TsFileApi - public float getFloat(int columnIndex) { - return getField(columnIndex).getFloatV(); - } + float getFloat(int columnIndex); @TsFileApi - public double getDouble(String columnName) { - Integer columnIndex = columnNameToColumnIndexMap.get(columnName); - return getDouble(columnIndex); - } + double getDouble(String columnName); @TsFileApi - public double getDouble(int columnIndex) { - return getField(columnIndex).getDoubleV(); - } + double getDouble(int columnIndex); @TsFileApi - public boolean getBoolean(String columnName) { - Integer columnIndex = columnNameToColumnIndexMap.get(columnName); - return getBoolean(columnIndex); - } + boolean getBoolean(String columnName); @TsFileApi - public boolean getBoolean(int columnIndex) { - return getField(columnIndex).getBoolV(); - } + boolean getBoolean(int columnIndex); @TsFileApi - public String getString(String columnName) { - Integer columnIndex = columnNameToColumnIndexMap.get(columnName); - return getString(columnIndex); - } + String getString(String columnName); @TsFileApi - public String getString(int columnIndex) { - return getField(columnIndex).getStringValue(); - } + String getString(int columnIndex); @TsFileApi - public LocalDate getDate(String columnName) { - Integer columnIndex = columnNameToColumnIndexMap.get(columnName); - return getDate(columnIndex); - } + LocalDate getDate(String columnName); @TsFileApi - public LocalDate getDate(int columnIndex) { - return getField(columnIndex).getDateV(); - } + LocalDate getDate(int columnIndex); @TsFileApi - public Binary getBinary(String columnName) { - Integer columnIndex = columnNameToColumnIndexMap.get(columnName); - return getBinary(columnIndex); - } + byte[] getBinary(String columnName); @TsFileApi - public Binary getBinary(int columnIndex) { - return getField(columnIndex).getBinaryV(); - } + byte[] getBinary(int columnIndex); @TsFileApi - public boolean isNull(String columnName) { - Integer columnIndex = columnNameToColumnIndexMap.get(columnName); - return isNull(columnIndex); - } + boolean isNull(String columnName); @TsFileApi - public boolean isNull(int columnIndex) { - return getField(columnIndex) == null; - } - - protected Field getField(int columnIndex) { - Field field; - if (columnIndex == 1) { - field = new Field(TSDataType.INT64); - field.setLongV(currentRow.getTimestamp()); - } else { - field = currentRow.getField(columnIndex - 2); - } - return field; - } + boolean isNull(int columnIndex); @TsFileApi - public void close() {} - - public static class ResultSetMetadata { - - private List columnNameList; - private List dataTypeList; - - public ResultSetMetadata(List paths, List dataTypeList) { - this.columnNameList = new ArrayList<>(paths.size() + 1); - this.dataTypeList = new ArrayList<>(paths.size() + 1); - // add time column - this.columnNameList.add("Time"); - this.dataTypeList.add(TSDataType.INT64); - // add other columns - paths.forEach(path -> columnNameList.add(path.getFullPath())); - this.dataTypeList.addAll(dataTypeList); - } - - // columnIndex starting from 1 - @TsFileApi - public String getColumnName(int columnIndex) { - return columnNameList.get(columnIndex - 1); - } - - // columnIndex starting from 1 - @TsFileApi - public TSDataType getColumnType(int columnIndex) { - return dataTypeList.get(columnIndex - 1); - } - - public int getColumnNum() { - return dataTypeList.size(); - } - } + public abstract void close(); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSetMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSetMetadata.java new file mode 100644 index 000000000..506346b31 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSetMetadata.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.query.dataset; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.enums.TSDataType; + +public interface ResultSetMetadata { + // columnIndex starting from 1 + @TsFileApi + String getColumnName(int columnIndex); + + // columnIndex starting from 1 + @TsFileApi + TSDataType getColumnType(int columnIndex); +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSetMetadataImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSetMetadataImpl.java new file mode 100644 index 000000000..72180908a --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/ResultSetMetadataImpl.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.query.dataset; + +import org.apache.tsfile.enums.TSDataType; + +import java.util.ArrayList; +import java.util.List; + +public class ResultSetMetadataImpl implements ResultSetMetadata { + + private List columnNameList; + private List dataTypeList; + + public ResultSetMetadataImpl(List columnNameList, List dataTypeList) { + int capacity = columnNameList.size() + 1; + this.columnNameList = new ArrayList<>(capacity); + this.dataTypeList = new ArrayList<>(capacity); + // add time column + this.columnNameList.add("Time"); + this.dataTypeList.add(TSDataType.INT64); + // add other columns + this.columnNameList.addAll(columnNameList); + this.dataTypeList.addAll(dataTypeList); + } + + // columnIndex starting from 1 + public String getColumnName(int columnIndex) { + return columnNameList.get(columnIndex - 1); + } + + // columnIndex starting from 1 + public TSDataType getColumnType(int columnIndex) { + return dataTypeList.get(columnIndex - 1); + } + + @Override + public String toString() { + return "ResultSetMetadataImpl{" + + "columnNameList=" + + columnNameList + + ", dataTypeList=" + + dataTypeList + + '}'; + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/TableResultSet.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/TableResultSet.java new file mode 100644 index 000000000..5e0507742 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/TableResultSet.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.query.dataset; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.read.reader.block.TsBlockReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class TableResultSet extends AbstractResultSet { + private static final Logger LOG = LoggerFactory.getLogger(TableResultSet.class); + + private TsBlockReader tsBlockReader; + private IPointReader tsBlockPointReader; + private List columnNameList; + private List dataTypeList; + + public TableResultSet( + TsBlockReader tsBlockReader, List columnNameList, List dataTypeList) { + super(columnNameList, dataTypeList); + this.tsBlockReader = tsBlockReader; + this.columnNameList = columnNameList; + this.dataTypeList = dataTypeList; + } + + @Override + public boolean next() throws IOException { + while ((tsBlockPointReader == null || !tsBlockPointReader.hasNextTimeValuePair()) + && tsBlockReader.hasNext()) { + TsBlock currentTsBlock = tsBlockReader.next(); + tsBlockPointReader = currentTsBlock.getTsBlockAlignedRowIterator(); + } + if (tsBlockPointReader == null || !tsBlockPointReader.hasNextTimeValuePair()) { + return false; + } + TimeValuePair currentTimeValuePair = tsBlockPointReader.nextTimeValuePair(); + currentRow = convertTimeValuePairToRowRecord(currentTimeValuePair); + return true; + } + + private RowRecord convertTimeValuePairToRowRecord(TimeValuePair timeValuePair) { + RowRecord rowRecord = new RowRecord(timeValuePair.getValues().length); + rowRecord.setTimestamp(timeValuePair.getTimestamp()); + for (int i = 0; i < timeValuePair.getValues().length; i++) { + Object value = timeValuePair.getValues()[i]; + rowRecord.addField(Field.getField(value, dataTypeList.get(i))); + } + return rowRecord; + } + + @Override + public void close() { + if (tsBlockReader == null) { + return; + } + try { + tsBlockReader.close(); + } catch (Exception e) { + LOG.error("Failed to close tsBlockReader"); + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/TreeResultSet.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/TreeResultSet.java new file mode 100644 index 000000000..e9c162cb8 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/TreeResultSet.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.query.dataset; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.read.common.Path; + +import java.io.IOException; +import java.util.stream.Collectors; + +public class TreeResultSet extends AbstractResultSet { + private QueryDataSet queryDataSet; + + public TreeResultSet(QueryDataSet queryDataSet) { + super( + queryDataSet.getPaths().stream().map(Path::toString).collect(Collectors.toList()), + queryDataSet.getDataTypes()); + this.queryDataSet = queryDataSet; + } + + @TsFileApi + public boolean next() throws IOException { + while (queryDataSet.hasNext()) { + currentRow = queryDataSet.next(); + if (currentRow.isAllNull()) { + continue; + } + return true; + } + return false; + } + + @TsFileApi + public void close() { + // nothing to be done + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java index 5bcbcf005..a1fc5b29b 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java @@ -105,7 +105,7 @@ private void constructColumnContext( } final IChunkMetadata chunkMetadata = chunkMetadataList.get(0); AbstractFileSeriesReader seriesReader = - new FileSeriesReader(chunkLoader, chunkMetadataList, timeFilter); + new FileSeriesReader(chunkLoader, chunkMetadataList, timeFilter, false); if (seriesReader.hasNextBatch()) { if (chunkMetadata instanceof AlignedChunkMetadata) { final List currentChunkMeasurementNames = diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/AbstractFileSeriesReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/AbstractFileSeriesReader.java index 735b44c9b..361615015 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/AbstractFileSeriesReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/AbstractFileSeriesReader.java @@ -38,16 +38,26 @@ public abstract class AbstractFileSeriesReader implements IBatchReader { protected IChunkReader chunkReader; protected List currentChunkMeasurementNames = new ArrayList<>(); private int chunkToRead; + protected boolean ignoreAllNullRows; protected Filter filter; /** constructor of FileSeriesReader. */ protected AbstractFileSeriesReader( IChunkLoader chunkLoader, List chunkMetadataList, Filter filter) { + this(chunkLoader, chunkMetadataList, filter, true); + } + + protected AbstractFileSeriesReader( + IChunkLoader chunkLoader, + List chunkMetadataList, + Filter filter, + boolean ignoreAllNullRows) { this.chunkLoader = chunkLoader; this.chunkMetadataList = chunkMetadataList; this.filter = filter; this.chunkToRead = 0; + this.ignoreAllNullRows = ignoreAllNullRows; } @Override diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java index c131c9757..bb6b38108 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java @@ -27,6 +27,7 @@ import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.reader.chunk.AlignedChunkReader; import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.read.reader.chunk.TableChunkReader; import java.io.IOException; import java.util.ArrayList; @@ -43,6 +44,14 @@ public FileSeriesReader( super(chunkLoader, chunkMetadataList, filter); } + public FileSeriesReader( + IChunkLoader chunkLoader, + List chunkMetadataList, + Filter filter, + boolean ignoreAllNullRows) { + super(chunkLoader, chunkMetadataList, filter, ignoreAllNullRows); + } + @Override protected void initChunkReader(IChunkMetadata chunkMetaData) throws IOException { currentChunkMeasurementNames.clear(); @@ -64,7 +73,11 @@ protected void initChunkReader(IChunkMetadata chunkMetaData) throws IOException valueChunkList.add(null); currentChunkMeasurementNames.add(null); } - this.chunkReader = new AlignedChunkReader(timeChunk, valueChunkList, filter); + if (ignoreAllNullRows) { + this.chunkReader = new AlignedChunkReader(timeChunk, valueChunkList, filter); + } else { + this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, filter); + } } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java new file mode 100644 index 000000000..a354df773 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.v4; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.read.ReadProcessException; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.TsFileMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.controller.CachedChunkLoaderImpl; +import org.apache.tsfile.read.controller.IChunkLoader; +import org.apache.tsfile.read.controller.IMetadataQuerier; +import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl; +import org.apache.tsfile.read.expression.ExpressionTree; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.query.dataset.TableResultSet; +import org.apache.tsfile.read.query.executor.TableQueryExecutor; +import org.apache.tsfile.read.reader.block.TsBlockReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class DeviceTableModelReader implements ITsFileReader { + + protected TsFileSequenceReader fileReader; + protected IMetadataQuerier metadataQuerier; + protected IChunkLoader chunkLoader; + protected TableQueryExecutor queryExecutor; + private static final Logger LOG = LoggerFactory.getLogger(DeviceTableModelReader.class); + + public DeviceTableModelReader(File file) throws IOException { + this.fileReader = new TsFileSequenceReader(file.getPath()); + this.metadataQuerier = new MetadataQuerierByFileImpl(fileReader); + this.chunkLoader = new CachedChunkLoaderImpl(fileReader); + this.queryExecutor = + new TableQueryExecutor( + metadataQuerier, chunkLoader, TableQueryExecutor.TableQueryOrdering.DEVICE); + } + + @TsFileApi + public List getAllTableSchema() throws IOException { + Map tableSchemaMap = fileReader.readFileMetadata().getTableSchemaMap(); + return new ArrayList<>(tableSchemaMap.values()); + } + + @TsFileApi + public Optional getTableSchemas(String tableName) throws IOException { + TsFileMetadata tsFileMetadata = fileReader.readFileMetadata(); + Map tableSchemaMap = tsFileMetadata.getTableSchemaMap(); + return Optional.ofNullable(tableSchemaMap.get(tableName)); + } + + @TsFileApi + public ResultSet query(String tableName, List columnNames, long startTime, long endTime) + throws IOException, NoTableException, NoMeasurementException, ReadProcessException { + TsFileMetadata tsFileMetadata = fileReader.readFileMetadata(); + TableSchema tableSchema = tsFileMetadata.getTableSchemaMap().get(tableName); + if (tableSchema == null) { + throw new NoTableException(tableName); + } + List dataTypeList = new ArrayList<>(columnNames.size()); + for (String columnName : columnNames) { + Map column2IndexMap = tableSchema.buildColumnPosIndex(); + Integer columnIndex = column2IndexMap.get(columnName); + if (columnIndex == null) { + throw new NoMeasurementException(columnName); + } + dataTypeList.add(tableSchema.getColumnSchemas().get(columnIndex).getType()); + } + TsBlockReader tsBlockReader = + queryExecutor.query( + tableName, + columnNames, + new ExpressionTree.TimeBetweenAnd(startTime, endTime), + null, + null); + return new TableResultSet(tsBlockReader, columnNames, dataTypeList); + } + + @Override + public void close() { + try { + this.fileReader.close(); + } catch (IOException e) { + LOG.warn("Meet exception when close file reader: ", e); + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java new file mode 100644 index 000000000..9f0312378 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.v4; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.exception.read.ReadProcessException; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.query.dataset.ResultSet; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +public interface ITsFileReader extends AutoCloseable { + + @TsFileApi + ResultSet query(String tableName, List columnNames, long startTime, long endTime) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException; + + @TsFileApi + Optional getTableSchemas(String tableName) throws IOException; + + @TsFileApi + List getAllTableSchema() throws IOException; + + @TsFileApi + void close(); +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileReaderBuilder.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileReaderBuilder.java new file mode 100644 index 000000000..a6fcc196d --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileReaderBuilder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.v4; + +import org.apache.tsfile.common.TsFileApi; + +import java.io.File; +import java.io.IOException; + +public class TsFileReaderBuilder { + + private File file; + + @TsFileApi + public ITsFileReader build() throws IOException { + validateParameters(); + return new DeviceTableModelReader(file); + } + + @TsFileApi + public TsFileReaderBuilder file(File file) { + this.file = file; + return this; + } + + @TsFileApi + private void validateParameters() { + if (file == null || !file.exists() || file.isDirectory()) { + throw new IllegalArgumentException("The file must be a non-null and non-directory File."); + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/utils/TsFileGeneratorUtils.java b/java/tsfile/src/main/java/org/apache/tsfile/utils/TsFileGeneratorUtils.java index 6db55669d..6b87f6e17 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/utils/TsFileGeneratorUtils.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/utils/TsFileGeneratorUtils.java @@ -133,7 +133,7 @@ public static void writeWithTablet( if (isAligned) { tsFileWriter.writeAligned(tablet); } else { - tsFileWriter.write(tablet); + tsFileWriter.writeTree(tablet); } tablet.reset(); } @@ -143,7 +143,7 @@ public static void writeWithTablet( if (isAligned) { tsFileWriter.writeAligned(tablet); } else { - tsFileWriter.write(tablet); + tsFileWriter.writeTree(tablet); } tablet.reset(); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java index 01116b1e9..20a38078a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java @@ -382,13 +382,16 @@ && getSchema().getSchemaTemplates().size() == 1) { return true; } - private void checkIsTableExist(Tablet tablet) throws WriteProcessException { + private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet) + throws WriteProcessException { String tableName = tablet.getTableName(); final TableSchema tableSchema = getSchema().getTableSchemaMap().get(tableName); if (tableSchema == null) { throw new NoTableException(tableName); } + List columnCategoryListForTablet = + new ArrayList<>(tablet.getSchemas().size()); for (IMeasurementSchema writingColumnSchema : tablet.getSchemas()) { final int columnIndex = tableSchema.findColumnIndex(writingColumnSchema.getMeasurementName()); if (columnIndex < 0) { @@ -400,7 +403,9 @@ private void checkIsTableExist(Tablet tablet) throws WriteProcessException { throw new ConflictDataTypeException( writingColumnSchema.getType(), registeredColumnSchema.getType()); } + columnCategoryListForTablet.add(tableSchema.getColumnTypes().get(columnIndex)); } + tablet.setColumnCategories(columnCategoryListForTablet); } private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned) @@ -539,7 +544,7 @@ public boolean writeRecord(TSRecord record) throws IOException, WriteProcessExce * @throws WriteProcessException exception in write process */ @TsFileApi - public boolean write(Tablet tablet) throws IOException, WriteProcessException { + public boolean writeTree(Tablet tablet) throws IOException, WriteProcessException { IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()); MeasurementGroup measurementGroup = getSchema().getSeriesSchema(deviceID); if (measurementGroup == null) { @@ -692,14 +697,14 @@ public Schema getSchema() { * tablet, List> deviceIdEndIndexPairs). One typical case where the other * method should be used is that all rows in the tablet belong to the same device. * - * @param tablet data to write + * @param table data to write * @return true if a flush is triggered after write, false otherwise * @throws IOException if the file cannot be written * @throws WriteProcessException if the schema is not registered first */ @TsFileApi - public boolean writeTable(Tablet tablet) throws IOException, WriteProcessException { - return writeTable(tablet, null); + public boolean writeTable(Tablet table) throws IOException, WriteProcessException { + return writeTable(table, null); } /** @@ -718,7 +723,7 @@ public boolean writeTable(Tablet tablet) throws IOException, WriteProcessExcepti public boolean writeTable(Tablet tablet, List> deviceIdEndIndexPairs) throws IOException, WriteProcessException { // make sure the ChunkGroupWriter for this Tablet exist and there is no type conflict - checkIsTableExist(tablet); + checkIsTableExistAndSetColumnCategoryList(tablet); // spilt the tablet by deviceId if (deviceIdEndIndexPairs == null) { deviceIdEndIndexPairs = WriteUtils.splitTabletByDevice(tablet); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java b/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java index 7185fffba..e39d2d06e 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java @@ -122,30 +122,39 @@ public Tablet(String deviceId, List schemas, int maxRowNumbe reset(); } + public Tablet(IDeviceID deviceID, List measurementList, List dataTypeList) { + this(deviceID, measurementList, dataTypeList, DEFAULT_SIZE); + } + + public Tablet( + IDeviceID deviceID, + List measurementList, + List dataTypeList, + int maxRowNumber) { + this( + deviceID.toString(), + measurementList, + dataTypeList, + ColumnCategory.nCopy(ColumnCategory.MEASUREMENT, measurementList.size()), + maxRowNumber); + } + @TsFileApi - public Tablet(String deviceId, List measurementList, List dataTypeList) { - this(deviceId, measurementList, dataTypeList, DEFAULT_SIZE); + public Tablet(List columnNameList, List dataTypeList) { + this(columnNameList, dataTypeList, DEFAULT_SIZE); } /** - * Return a {@link Tablet} with the specified number of rows (maxBatchSize). Only call this - * constructor directly for testing purposes. {@link Tablet} should normally always be default - * size. + * Return a {@link Tablet} with the specified number of rows (maxBatchSize). Only for writing in + * DeviceTableModelWriter. * - * @param deviceId the name of the device specified to be written in - * @param measurementList the list of measurement names for creating the row batch + * @param columnNameList the list of measurement names for creating the row batch * @param dataTypeList the list of {@link TSDataType}s for creating the row batch * @param maxRowNum the maximum number of rows for this tablet */ @TsFileApi - public Tablet( - String deviceId, List measurementList, List dataTypeList, int maxRowNum) { - this( - deviceId, - measurementList, - dataTypeList, - ColumnCategory.nCopy(ColumnCategory.MEASUREMENT, measurementList.size()), - maxRowNum); + public Tablet(List columnNameList, List dataTypeList, int maxRowNum) { + this(null, columnNameList, dataTypeList, null, maxRowNum, false); } public Tablet( @@ -156,19 +165,30 @@ public Tablet( this(tableName, measurementList, dataTypeList, columnCategoryList, DEFAULT_SIZE); } - @TsFileApi public Tablet( - String tableName, + String insertTargetName, List measurementList, List dataTypeList, List columnCategoryList, int maxRowNum) { - this.insertTargetName = tableName; + this(insertTargetName, measurementList, dataTypeList, columnCategoryList, maxRowNum, true); + } + + protected Tablet( + String insertTargetName, + List measurementList, + List dataTypeList, + List columnCategoryList, + int maxRowNum, + boolean hasColumnCategory) { + this.insertTargetName = insertTargetName; this.schemas = new ArrayList<>(measurementList.size()); for (int i = 0; i < measurementList.size(); i++) { this.schemas.add(new MeasurementSchema(measurementList.get(i), dataTypeList.get(i))); } - setColumnCategories(columnCategoryList); + if (hasColumnCategory) { + setColumnCategories(columnCategoryList); + } this.maxRowNumber = maxRowNum; measurementIndex = new HashMap<>(); constructMeasurementIndexMap(); @@ -248,9 +268,6 @@ public void initBitMaps() { this.bitMaps = new BitMap[schemas.size()]; for (int column = 0; column < schemas.size(); column++) { BitMap bitMap = new BitMap(getMaxRowNumber()); - if (autoUpdateBitMaps) { - bitMap.markAll(); - } this.bitMaps[column] = bitMap; } } @@ -435,10 +452,15 @@ public void addValue(int rowIndex, int columnIndex, LocalDate val) { } private void updateBitMap(int rowIndex, int columnIndex, boolean mark) { - autoUpdateBitMaps = true; if (bitMaps == null) { initBitMaps(); } + if (!autoUpdateBitMaps) { + autoUpdateBitMaps = true; + for (BitMap bitMap : bitMaps) { + bitMap.markAll(); + } + } if (mark) { bitMaps[columnIndex].mark(rowIndex); } else { @@ -484,14 +506,13 @@ private void createColumns() { int columnIndex = 0; for (int i = 0; i < schemas.size(); i++) { IMeasurementSchema schema = schemas.get(i); - ColumnCategory columnCategory = columnCategories.get(i); TSDataType dataType = schema.getType(); - values[columnIndex] = createValueColumnOfDataType(dataType, columnCategory); + values[columnIndex] = createValueColumnOfDataType(dataType); columnIndex++; } } - private Object createValueColumnOfDataType(TSDataType dataType, ColumnCategory columnCategory) { + private Object createValueColumnOfDataType(TSDataType dataType) { Object valueColumn; switch (dataType) { @@ -577,12 +598,12 @@ private void writeBitMaps(DataOutputStream stream) throws IOException { if (bitMaps != null) { int size = (schemas == null ? 0 : schemas.size()); for (int i = 0; i < size; i++) { - if (bitMaps[i] == null) { + if (bitMaps[i] == null || bitMaps[i].isAllUnmarked(rowSize)) { ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream); } else { ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream); - ReadWriteIOUtils.write(bitMaps[i].getSize(), stream); - ReadWriteIOUtils.write(new Binary(bitMaps[i].getByteArray()), stream); + ReadWriteIOUtils.write(rowSize, stream); + ReadWriteIOUtils.write(new Binary(bitMaps[i].getTruncatedByteArray(rowSize)), stream); } } } @@ -1008,13 +1029,26 @@ private boolean isBitMapsEqual(BitMap[] thisBitMaps, BitMap[] thatBitMaps, int c } for (int i = 0; i < columns; i++) { - if (!thisBitMaps[i].equals(thatBitMaps[i])) { + if (!isBitMapEqual(thisBitMaps[i], thatBitMaps[i])) { return false; } } return true; } + private boolean isBitMapEqual(BitMap thisBitMap, BitMap thatBitMap) { + if (thisBitMap == thatBitMap) { + return true; + } + if (thisBitMap == null) { + return thatBitMap.isAllUnmarked(rowSize); + } + if (thatBitMap == null) { + return thisBitMap.isAllUnmarked(rowSize); + } + return thisBitMap.equalsInRange(thatBitMap, rowSize); + } + public boolean isNull(int i, int j) { return bitMaps != null && bitMaps[j] != null && bitMaps[j].isMarked(i); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java new file mode 100644 index 000000000..102fcd691 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.v4; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.encrypt.EncryptParameter; +import org.apache.tsfile.encrypt.IEncryptor; +import org.apache.tsfile.exception.encrypt.EncryptException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl; +import org.apache.tsfile.write.chunk.IChunkGroupWriter; +import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl; +import org.apache.tsfile.write.schema.Schema; +import org.apache.tsfile.write.writer.TsFileIOWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +abstract class AbstractTableModelTsFileWriter implements ITsFileWriter { + + protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig(); + protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableModelTsFileWriter.class); + + /** IO writer of this TsFile. */ + protected final TsFileIOWriter fileWriter; + + protected EncryptParameter encryptParam; + + protected final int pageSize; + protected long recordCount = 0; + + // deviceId -> measurementIdList + protected Map> flushedMeasurementsInDeviceMap = new HashMap<>(); + + // DeviceId -> LastTime + protected Map alignedDeviceLastTimeMap = new HashMap<>(); + + // TimeseriesId -> LastTime + protected Map> nonAlignedTimeseriesLastTimeMap = new HashMap<>(); + + protected Map groupWriters = new TreeMap<>(); + + /** min value of threshold of data points num check. */ + protected long recordCountForNextMemCheck = 100; + + protected long chunkGroupSizeThreshold; + + /** + * init this Writer. + * + * @param file the File to be written by this TsFileWriter + */ + @TsFileApi + protected AbstractTableModelTsFileWriter(File file, long chunkGroupSizeThreshold) + throws IOException { + Schema schema = new Schema(); + TSFileConfig conf = TSFileDescriptor.getInstance().getConfig(); + this.fileWriter = new TsFileIOWriter(file); + fileWriter.setSchema(schema); + + this.pageSize = conf.getPageSizeInByte(); + this.chunkGroupSizeThreshold = chunkGroupSizeThreshold; + if (this.pageSize >= chunkGroupSizeThreshold) { + LOG.warn( + "TsFile's page size {} is greater than chunk group size {}, please enlarge the chunk group" + + " size or decrease page size. ", + pageSize, + chunkGroupSizeThreshold); + } + + String encryptLevel; + byte[] encryptKey; + byte[] dataEncryptKey; + String encryptType; + if (config.getEncryptFlag()) { + encryptLevel = "2"; + encryptType = config.getEncryptType(); + try { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + md.update("IoTDB is the best".getBytes()); + md.update(config.getEncryptKey().getBytes()); + dataEncryptKey = Arrays.copyOfRange(md.digest(), 0, 16); + encryptKey = + IEncryptor.getEncryptor(config.getEncryptType(), config.getEncryptKey().getBytes()) + .encrypt(dataEncryptKey); + } catch (Exception e) { + throw new EncryptException( + "SHA-256 function not found while using SHA-256 to generate data key"); + } + } else { + encryptLevel = "0"; + encryptType = "org.apache.tsfile.encrypt.UNENCRYPTED"; + encryptKey = null; + dataEncryptKey = null; + } + this.encryptParam = new EncryptParameter(encryptType, dataEncryptKey); + if (encryptKey != null) { + StringBuilder valueStr = new StringBuilder(); + + for (byte b : encryptKey) { + valueStr.append(b).append(","); + } + + valueStr.deleteCharAt(valueStr.length() - 1); + String str = valueStr.toString(); + + fileWriter.setEncryptParam(encryptLevel, encryptType, str); + } else { + fileWriter.setEncryptParam(encryptLevel, encryptType, ""); + } + } + + protected IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean isAligned) { + IChunkGroupWriter groupWriter = groupWriters.get(deviceId); + if (groupWriter == null) { + if (isAligned) { + groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam); + ((AlignedChunkGroupWriterImpl) groupWriter) + .setLastTime(alignedDeviceLastTimeMap.get(deviceId)); + } else { + groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId, encryptParam); + ((NonAlignedChunkGroupWriterImpl) groupWriter) + .setLastTimeMap( + nonAlignedTimeseriesLastTimeMap.getOrDefault(deviceId, new HashMap<>())); + } + groupWriters.put(deviceId, groupWriter); + } + return groupWriter; + } + + /** + * calculate total memory size occupied by all ChunkGroupWriter instances currently. + * + * @return total memory size used + */ + protected long calculateMemSizeForAllGroup() { + long memTotalSize = 0; + for (IChunkGroupWriter group : groupWriters.values()) { + memTotalSize += group.updateMaxGroupMemSize(); + } + return memTotalSize; + } + + /** + * check occupied memory size, if it exceeds the chunkGroupSize threshold, flush them to given + * OutputStream. + * + * @throws IOException exception in IO + */ + protected void checkMemorySizeAndMayFlushChunks() throws IOException { + if (recordCount >= recordCountForNextMemCheck) { + long memSize = calculateMemSizeForAllGroup(); + if (memSize > chunkGroupSizeThreshold) { + LOG.debug("start to flush chunk groups, memory space occupy:{}", memSize); + recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize; + flush(); + } else { + recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize; + } + } + } + + /** + * flush the data in all series writers of all chunk group writers and their page writers to + * outputStream. + * + * @throws IOException exception in IO + */ + @TsFileApi + protected void flush() throws IOException { + if (recordCount > 0) { + for (Map.Entry entry : groupWriters.entrySet()) { + IDeviceID deviceId = entry.getKey(); + IChunkGroupWriter groupWriter = entry.getValue(); + fileWriter.startChunkGroup(deviceId); + long pos = fileWriter.getPos(); + long dataSize = groupWriter.flushToFileWriter(fileWriter); + if (fileWriter.getPos() - pos != dataSize) { + throw new IOException( + String.format( + "Flushed data size is inconsistent with computation! Estimated: %d, Actual: %d", + dataSize, fileWriter.getPos() - pos)); + } + fileWriter.endChunkGroup(); + if (groupWriter instanceof AlignedChunkGroupWriterImpl) { + // add flushed measurements + List measurementList = + flushedMeasurementsInDeviceMap.computeIfAbsent(deviceId, p -> new ArrayList<>()); + ((AlignedChunkGroupWriterImpl) groupWriter) + .getMeasurements() + .forEach( + measurementId -> { + if (!measurementList.contains(measurementId)) { + measurementList.add(measurementId); + } + }); + // add lastTime + this.alignedDeviceLastTimeMap.put( + deviceId, ((AlignedChunkGroupWriterImpl) groupWriter).getLastTime()); + } else { + // add lastTime + this.nonAlignedTimeseriesLastTimeMap.put( + deviceId, ((NonAlignedChunkGroupWriterImpl) groupWriter).getLastTimeMap()); + } + } + reset(); + } + } + + protected void reset() { + groupWriters.clear(); + recordCount = 0; + } + + protected TsFileIOWriter getIOWriter() { + return this.fileWriter; + } + + protected Schema getSchema() { + return fileWriter.getSchema(); + } + + /** + * calling this method to write the last data remaining in memory and close the normal and error + * OutputStream. + */ + @Override + @TsFileApi + public void close() { + LOG.info("start close file"); + try { + flush(); + fileWriter.endFile(); + } catch (IOException e) { + LOG.warn("Meet exception when close file writer. ", e); + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java new file mode 100644 index 000000000..117fdc120 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.v4; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.exception.write.ConflictDataTypeException; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.WriteUtils; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class DeviceTableModelWriter extends AbstractTableModelTsFileWriter { + + private String tableName; + private boolean isTableWriteAligned = true; + + public DeviceTableModelWriter(File file, TableSchema tableSchema, long memoryThreshold) + throws IOException { + super(file, memoryThreshold); + registerTableSchema(tableSchema); + } + + /** + * Write the tablet in to the TsFile with the table-view. The method will try to split the tablet + * by device. + * + * @param table data to write + * @throws IOException if the file cannot be written + * @throws WriteProcessException if the schema is not registered first + */ + @TsFileApi + public void write(Tablet table) throws IOException, WriteProcessException { + // make sure the ChunkGroupWriter for this Tablet exist and there is no type conflict + checkIsTableExistAndSetColumnCategoryList(table); + // spilt the tablet by deviceId + List> deviceIdEndIndexPairs = WriteUtils.splitTabletByDevice(table); + + int startIndex = 0; + for (Pair pair : deviceIdEndIndexPairs) { + // get corresponding ChunkGroupWriter and write this Tablet + recordCount += + tryToInitialGroupWriter(pair.left, isTableWriteAligned) + .write(table, startIndex, pair.right); + startIndex = pair.right; + } + checkMemorySizeAndMayFlushChunks(); + } + + private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet) + throws WriteProcessException { + String tabletTableName = tablet.getTableName(); + if (tabletTableName != null && !this.tableName.equals(tabletTableName)) { + throw new NoTableException(tabletTableName); + } + tablet.setTableName(this.tableName); + final TableSchema tableSchema = getSchema().getTableSchemaMap().get(tableName); + + List columnCategoryListForTablet = + new ArrayList<>(tablet.getSchemas().size()); + for (IMeasurementSchema writingColumnSchema : tablet.getSchemas()) { + final int columnIndex = tableSchema.findColumnIndex(writingColumnSchema.getMeasurementName()); + if (columnIndex < 0) { + throw new NoMeasurementException(writingColumnSchema.getMeasurementName()); + } + final IMeasurementSchema registeredColumnSchema = + tableSchema.getColumnSchemas().get(columnIndex); + if (!writingColumnSchema.getType().equals(registeredColumnSchema.getType())) { + throw new ConflictDataTypeException( + writingColumnSchema.getType(), registeredColumnSchema.getType()); + } + columnCategoryListForTablet.add(tableSchema.getColumnTypes().get(columnIndex)); + } + tablet.setColumnCategories(columnCategoryListForTablet); + } + + private void registerTableSchema(TableSchema tableSchema) { + this.tableName = tableSchema.getTableName(); + getSchema().registerTableSchema(tableSchema); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java new file mode 100644 index 000000000..e3a34634c --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.v4; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.write.record.Tablet; + +import java.io.IOException; + +public interface ITsFileWriter extends AutoCloseable { + + @TsFileApi + void write(Tablet tablet) throws IOException, WriteProcessException; + + @TsFileApi + void close(); +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TsFileWriterBuilder.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TsFileWriterBuilder.java new file mode 100644 index 000000000..da3ebc4c3 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TsFileWriterBuilder.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.v4; + +import org.apache.tsfile.common.TsFileApi; +import org.apache.tsfile.file.metadata.TableSchema; + +import java.io.File; +import java.io.IOException; + +public class TsFileWriterBuilder { + + private static final long defaultMemoryThresholdInByte = 32 * 1024 * 1024; + private File file; + private TableSchema tableSchema; + private long memoryThresholdInByte = defaultMemoryThresholdInByte; + + @TsFileApi + public ITsFileWriter build() throws IOException { + validateParameters(); + return new DeviceTableModelWriter(file, tableSchema, memoryThresholdInByte); + } + + @TsFileApi + public TsFileWriterBuilder file(File file) { + this.file = file; + return this; + } + + @TsFileApi + public TsFileWriterBuilder tableSchema(TableSchema schema) { + this.tableSchema = schema; + return this; + } + + @TsFileApi + public TsFileWriterBuilder memoryThreshold(long memoryThreshold) { + this.memoryThresholdInByte = memoryThreshold; + return this; + } + + private void validateParameters() { + if (file == null || file.isDirectory()) { + throw new IllegalArgumentException("The file must be a non-null and non-directory File."); + } + if (this.tableSchema == null) { + throw new IllegalArgumentException("TableSchema must not be null."); + } + if (this.memoryThresholdInByte <= 0) { + throw new IllegalArgumentException("Memory threshold must be > 0 bytes."); + } + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileReaderTest.java index 9445c559d..7353e59b9 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileReaderTest.java @@ -26,8 +26,6 @@ import org.apache.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.StringArrayDeviceID; -import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.common.Path; @@ -44,12 +42,9 @@ import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.TsFileGeneratorForTest; -import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.write.TsFileWriter; import org.apache.tsfile.write.record.TSRecord; -import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.record.datapoint.IntDataPoint; -import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.apache.tsfile.write.schema.Schema; @@ -58,12 +53,8 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import static org.apache.tsfile.read.filter.factory.ValueFilterApi.DEFAULT_MEASUREMENT_INDEX; @@ -562,91 +553,4 @@ public void testGetAlignedChunkMetadata() throws IOException { } TsFileGeneratorForTest.closeAlignedTsFile(); } - - @Test - public void testGetDeviceMethods() throws IOException, WriteProcessException { - String filePath = TsFileGeneratorForTest.getTestTsFilePath("root.testsg", 0, 0, 0); - try { - File file = TsFileGeneratorUtils.generateAlignedTsFile(filePath, 5, 1, 10, 1, 1, 10, 100); - try (TsFileReader tsFileReader = new TsFileReader(file)) { - Assert.assertEquals( - Arrays.asList( - "root.testsg.d10000", - "root.testsg.d10001", - "root.testsg.d10002", - "root.testsg.d10003", - "root.testsg.d10004"), - tsFileReader.getAllDevices()); - List timeseriesSchema = - tsFileReader.getTimeseriesSchema("root.testsg.d10000"); - Assert.assertEquals(2, timeseriesSchema.size()); - Assert.assertEquals("", timeseriesSchema.get(0).getMeasurementName()); - Assert.assertEquals("s0", timeseriesSchema.get(1).getMeasurementName()); - } - } finally { - Files.deleteIfExists(Paths.get(filePath)); - } - } - - @Test - public void testGetTableDeviceMethods() throws IOException, WriteProcessException { - String filePath = TsFileGeneratorForTest.getTestTsFilePath("root.testsg", 0, 0, 0); - try { - File file = TsFileGeneratorUtils.generateAlignedTsFile(filePath, 5, 1, 10, 1, 1, 10, 100); - List deviceIDList = new ArrayList<>(); - TableSchema tableSchema = - new TableSchema( - "t1", - Arrays.asList( - new MeasurementSchema("id1", TSDataType.STRING), - new MeasurementSchema("id2", TSDataType.STRING), - new MeasurementSchema("id3", TSDataType.STRING), - new MeasurementSchema("s1", TSDataType.INT32)), - Arrays.asList( - Tablet.ColumnCategory.ID, - Tablet.ColumnCategory.ID, - Tablet.ColumnCategory.ID, - Tablet.ColumnCategory.MEASUREMENT)); - try (TsFileWriter writer = new TsFileWriter(file)) { - writer.registerTableSchema(tableSchema); - Tablet tablet = - new Tablet( - tableSchema.getTableName(), - IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()), - IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()), - tableSchema.getColumnTypes()); - - String[][] ids = - new String[][] { - {null, null, null}, - {null, null, "id3-4"}, - {null, "id2-1", "id3-1"}, - {null, "id2-5", null}, - {"id1-2", null, "id3-2"}, - {"id1-3", "id2-3", null}, - {"id1-6", null, null}, - }; - for (int i = 0; i < ids.length; i++) { - tablet.addTimestamp(i, i); - tablet.addValue("id1", i, ids[i][0]); - tablet.addValue("id2", i, ids[i][1]); - tablet.addValue("id3", i, ids[i][2]); - deviceIDList.add( - new StringArrayDeviceID(tableSchema.getTableName(), ids[i][0], ids[i][1], ids[i][2])); - tablet.addValue("s1", i, i); - } - tablet.setRowSize(ids.length); - writer.writeTable(tablet); - } - try (TsFileReader tsFileReader = new TsFileReader(file)) { - Assert.assertEquals( - new HashSet<>(deviceIDList), new HashSet<>(tsFileReader.getAllTableDevices("t1"))); - Assert.assertEquals("t1", tsFileReader.getAllTables().get(0)); - Assert.assertEquals( - tableSchema, tsFileReader.getTableSchema(Collections.singletonList("t1")).get(0)); - } - } finally { - Files.deleteIfExists(Paths.get(filePath)); - } - } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java new file mode 100644 index 000000000..5f39d4f8e --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.v4.DeviceTableModelReader; +import org.apache.tsfile.utils.TsFileGeneratorForTest; +import org.apache.tsfile.utils.TsFileGeneratorUtils; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TsFileV4ReadWriteInterfacesTest { + + @Test + public void testGetTableDeviceMethods() throws Exception { + String filePath = TsFileGeneratorForTest.getTestTsFilePath("root.testsg", 0, 0, 0); + try { + int deviceNum = 5; + int measurementNum = 1; + int pointNum = 10; + long startTime = 1; + int startValue = 1; + int chunkGroupSize = 10; + int pageSize = 100; + File file = + TsFileGeneratorUtils.generateAlignedTsFile( + filePath, + deviceNum, + measurementNum, + pointNum, + startTime, + startValue, + chunkGroupSize, + pageSize); + List deviceIDList = new ArrayList<>(); + TableSchema tableSchema = + new TableSchema( + "t1", + Arrays.asList( + new MeasurementSchema("id1", TSDataType.STRING), + new MeasurementSchema("id2", TSDataType.STRING), + new MeasurementSchema("id3", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT32)), + Arrays.asList( + Tablet.ColumnCategory.ID, + Tablet.ColumnCategory.ID, + Tablet.ColumnCategory.ID, + Tablet.ColumnCategory.MEASUREMENT)); + try (ITsFileWriter writer = + new TsFileWriterBuilder().file(file).tableSchema(tableSchema).build()) { + Tablet tablet = + new Tablet( + tableSchema.getTableName(), + IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()), + IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()), + tableSchema.getColumnTypes()); + + String[][] ids = + new String[][] { + {null, null, null}, + {null, null, "id3-4"}, + {null, "id2-1", "id3-1"}, + {null, "id2-5", null}, + {"id1-2", null, "id3-2"}, + {"id1-3", "id2-3", null}, + {"id1-6", null, null}, + }; + for (int i = 0; i < ids.length; i++) { + tablet.addTimestamp(i, i); + tablet.addValue("id1", i, ids[i][0]); + tablet.addValue("id2", i, ids[i][1]); + tablet.addValue("id3", i, ids[i][2]); + deviceIDList.add( + new StringArrayDeviceID(tableSchema.getTableName(), ids[i][0], ids[i][1], ids[i][2])); + tablet.addValue("s1", i, i); + } + tablet.setRowSize(ids.length); + writer.write(tablet); + } + try (DeviceTableModelReader tsFileReader = new DeviceTableModelReader(file)) { + Assert.assertEquals("t1", tsFileReader.getAllTableSchema().get(0).getTableName()); + Assert.assertEquals(tableSchema, tsFileReader.getTableSchemas("t1").get()); + } + } finally { + Files.deleteIfExists(Paths.get(filePath)); + } + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/query/ResultSetTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/query/ResultSetTest.java index 176045e68..2bc65d200 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/query/ResultSetTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/query/ResultSetTest.java @@ -20,13 +20,15 @@ package org.apache.tsfile.read.query; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.exception.write.WriteProcessException; -import org.apache.tsfile.read.TsFileReader; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.query.dataset.ResultSetMetadata; +import org.apache.tsfile.read.v4.DeviceTableModelReader; import org.apache.tsfile.utils.TsFileGeneratorForTest; -import org.apache.tsfile.write.TsFileWriter; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; import org.junit.After; import org.junit.Assert; @@ -59,49 +61,80 @@ public void deleteFile() throws IOException { } @Test - public void test1() throws IOException, WriteProcessException { + public void testQueryTable() throws Exception { + TableSchema tableSchema = + new TableSchema( + "t1", + Arrays.asList( + new MeasurementSchema("id1", TSDataType.STRING), + new MeasurementSchema("id2", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.BOOLEAN), + new MeasurementSchema("s2", TSDataType.BOOLEAN)), + Arrays.asList( + Tablet.ColumnCategory.ID, + Tablet.ColumnCategory.ID, + Tablet.ColumnCategory.MEASUREMENT, + Tablet.ColumnCategory.MEASUREMENT)); Tablet tablet = new Tablet( - "root.sg1.d1", + Arrays.asList("id1", "id2", "s1", "s2"), Arrays.asList( - new MeasurementSchema("s1", TSDataType.BOOLEAN), - new MeasurementSchema("s2", TSDataType.BOOLEAN))); - tablet.addTimestamp(0, 1); + TSDataType.STRING, TSDataType.STRING, TSDataType.BOOLEAN, TSDataType.BOOLEAN), + 1024); + tablet.addTimestamp(0, 0); + tablet.addValue("id1", 0, "id_field1"); + tablet.addValue("id2", 0, "id_field2"); tablet.addValue("s1", 0, true); tablet.addValue("s2", 0, false); - tablet.addTimestamp(1, 2); + + tablet.addTimestamp(1, 1); + tablet.addValue("id1", 1, "id_field1_2"); tablet.addValue("s2", 1, true); - try (TsFileWriter writer = new TsFileWriter(tsfile)) { - writer.registerTimeseries("root.sg1.d1", new MeasurementSchema("s1", TSDataType.BOOLEAN)); - writer.registerTimeseries("root.sg1.d1", new MeasurementSchema("s2", TSDataType.BOOLEAN)); + tablet.addTimestamp(2, 2); + + try (ITsFileWriter writer = + new TsFileWriterBuilder().file(tsfile).tableSchema(tableSchema).build()) { writer.write(tablet); } - try (TsFileReader tsFileReader = new TsFileReader(tsfile)) { - // s1 s2 s3 s4 - ResultSet resultSet = - tsFileReader.query( - Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4"), - 0, - 2); - ResultSet.ResultSetMetadata resultSetMetadata = resultSet.getMetadata(); - // Time s1 s2 - Assert.assertEquals(3, resultSetMetadata.getColumnNum()); + try (DeviceTableModelReader tsFileReader = new DeviceTableModelReader(tsfile); + ResultSet resultSet = + tsFileReader.query("t1", Arrays.asList("id1", "id2", "s2", "s1"), 0, 2); ) { + // id1 id2 s2 s1 + ResultSetMetadata resultSetMetadata = resultSet.getMetadata(); + // Time id1 id2 s2 s1 Assert.assertEquals("Time", resultSetMetadata.getColumnName(1)); Assert.assertEquals(TSDataType.INT64, resultSetMetadata.getColumnType(1)); - Assert.assertEquals("root.sg1.d1.s1", resultSetMetadata.getColumnName(2)); - Assert.assertEquals(TSDataType.BOOLEAN, resultSetMetadata.getColumnType(2)); - Assert.assertEquals("root.sg1.d1.s2", resultSetMetadata.getColumnName(3)); - Assert.assertEquals(TSDataType.BOOLEAN, resultSetMetadata.getColumnType(3)); - Assert.assertTrue(resultSet.next()); - Assert.assertEquals(1, resultSet.getLong(1)); - Assert.assertTrue(resultSet.getBoolean(2)); - Assert.assertFalse(resultSet.getBoolean(3)); + Assert.assertEquals("id1", resultSetMetadata.getColumnName(2)); + Assert.assertEquals(TSDataType.STRING, resultSetMetadata.getColumnType(2)); + Assert.assertEquals("id2", resultSetMetadata.getColumnName(3)); + Assert.assertEquals(TSDataType.STRING, resultSetMetadata.getColumnType(3)); + Assert.assertEquals("s2", resultSetMetadata.getColumnName(4)); + Assert.assertEquals(TSDataType.BOOLEAN, resultSetMetadata.getColumnType(4)); + Assert.assertEquals("s1", resultSetMetadata.getColumnName(5)); + Assert.assertEquals(TSDataType.BOOLEAN, resultSetMetadata.getColumnType(5)); + Assert.assertTrue(resultSet.next()); Assert.assertEquals(2, resultSet.getLong(1)); Assert.assertTrue(resultSet.isNull(2)); - Assert.assertTrue(resultSet.getBoolean(3)); + Assert.assertTrue(resultSet.isNull(3)); + Assert.assertTrue(resultSet.isNull(4)); + Assert.assertTrue(resultSet.isNull(5)); + + Assert.assertTrue(resultSet.next()); + Assert.assertEquals(0, resultSet.getLong(1)); + Assert.assertEquals("id_field1", resultSet.getString(2)); + Assert.assertEquals("id_field2", resultSet.getString(3)); + Assert.assertFalse(resultSet.getBoolean(4)); + Assert.assertTrue(resultSet.getBoolean(5)); + + Assert.assertTrue(resultSet.next()); + Assert.assertEquals(1, resultSet.getLong(1)); + Assert.assertEquals("id_field1_2", resultSet.getString(2)); + Assert.assertTrue(resultSet.isNull(3)); + Assert.assertTrue(resultSet.getBoolean(4)); + Assert.assertTrue(resultSet.isNull(5)); } } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/utils/BitMapTest.java b/java/tsfile/src/test/java/org/apache/tsfile/utils/BitMapTest.java index d441e6b1a..03b3630dc 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/utils/BitMapTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/utils/BitMapTest.java @@ -20,6 +20,7 @@ import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -64,4 +65,43 @@ public void testInitFromBytes() { assertEquals(bitmap1.isMarked(i), bitmap2.isMarked(i)); } } + + @Test + public void testIsAllUnmarkedInRange() { + BitMap bitMap = new BitMap(16); + assertTrue(bitMap.isAllUnmarked(6)); + assertTrue(bitMap.isAllUnmarked(8)); + assertTrue(bitMap.isAllUnmarked(9)); + assertTrue(bitMap.isAllUnmarked(16)); + + bitMap.mark(3); + assertTrue(bitMap.isAllUnmarked(2)); + assertTrue(bitMap.isAllUnmarked(3)); + assertFalse(bitMap.isAllUnmarked(4)); + assertFalse(bitMap.isAllUnmarked(16)); + bitMap.unmark(3); + + bitMap.mark(9); + assertTrue(bitMap.isAllUnmarked(9)); + assertFalse(bitMap.isAllUnmarked(10)); + } + + @Test + public void testGetTruncatedByteArray() { + BitMap bitMap = new BitMap(16); + assertArrayEquals(new byte[2], bitMap.getTruncatedByteArray(13)); + assertArrayEquals(new byte[2], bitMap.getTruncatedByteArray(16)); + + bitMap.mark(3); + byte[] truncatedArray = bitMap.getTruncatedByteArray(12); + assertEquals(2, truncatedArray.length); + + assertEquals((byte) 0b00001000, truncatedArray[0]); + assertEquals((byte) 0b00000000, truncatedArray[1]); + + truncatedArray = bitMap.getTruncatedByteArray(8); + assertEquals(1, truncatedArray.length); + + assertEquals((byte) 0b00001000, truncatedArray[0]); + } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/DefaultSchemaTemplateTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/DefaultSchemaTemplateTest.java index 5e7564b96..3ff19a1b7 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/DefaultSchemaTemplateTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/DefaultSchemaTemplateTest.java @@ -81,13 +81,13 @@ public void testUsingDefaultSchemaTemplate() throws IOException, WriteProcessExc } // write Tablet to TsFile if (tablet.getRowSize() == tablet.getMaxRowNumber()) { - writer.write(tablet); + writer.writeTree(tablet); tablet.reset(); } } // write Tablet to TsFile if (tablet.getRowSize() != 0) { - writer.write(tablet); + writer.writeTree(tablet); tablet.reset(); } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java index edade15f4..abf7e63ab 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java @@ -387,13 +387,13 @@ public void writeNonAlignedWithTabletWithNullValue() { } // write if (tablet.getRowSize() == tablet.getMaxRowNumber()) { - tsFileWriter.write(tablet); + tsFileWriter.writeTree(tablet); tablet.reset(); } } // write if (tablet.getRowSize() != 0) { - tsFileWriter.write(tablet); + tsFileWriter.writeTree(tablet); tablet.reset(); } @@ -440,13 +440,13 @@ public void writeNonAlignedWithTabletWithNegativeTimestamps() { } // write if (tablet.getRowSize() == tablet.getMaxRowNumber()) { - tsFileWriter.write(tablet); + tsFileWriter.writeTree(tablet); tablet.reset(); } } // write if (tablet.getRowSize() != 0) { - tsFileWriter.write(tablet); + tsFileWriter.writeTree(tablet); tablet.reset(); } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriterTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriterTest.java index 5a925c664..676aecfce 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriterTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriterTest.java @@ -248,7 +248,7 @@ public void writeTablet() throws IOException, WriteProcessException { ((float[]) tablet.values[0])[0] = 5.0f; ((int[]) tablet.values[1])[0] = 5; tablet.setRowSize(1); - writer.write(tablet); + writer.writeTree(tablet); closeFile(); readOneRow(); } @@ -266,7 +266,7 @@ public void writeTablet2() throws IOException, WriteProcessException { tablet.timestamps[0] = 10000; ((float[]) tablet.values[0])[0] = 5.0f; tablet.setRowSize(1); - writer.write(tablet); + writer.writeTree(tablet); closeFile(); // in this case, the value of s2 = 0 at time 10000. readOneRow(0);