Skip to content

Commit

Permalink
Fixed the bug that set table properties ttl='inf' cannot be parsed &&…
Browse files Browse the repository at this point in the history
… ttl='INF' does not take effect && set to default may generate NPE on dataNodes && some minor bugs of delete devices && Enable "inf" in databaseSchema / show ttl for databases in table model / table default ttl = database ttl (apache#14147)
  • Loading branch information
Caideyipi authored Nov 21, 2024
1 parent 4a76dfb commit 82a1dc3
Show file tree
Hide file tree
Showing 14 changed files with 150 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public void testIoTDBPatternWithDataBaseAndTable() throws Exception {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"show databases",
"Database,SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,",
"Database,TTL(ms),SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,",
Collections.emptySet(),
null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testManageDatabase() {
final Statement statement = connection.createStatement()) {

// create
statement.execute("create database test");
statement.execute("create database test with (ttl='INF')");

// create duplicated database without IF NOT EXISTS
try {
Expand All @@ -81,6 +81,7 @@ public void testManageDatabase() {
statement.execute("create database IF NOT EXISTS test");

String[] databaseNames = new String[] {"test"};
String[] TTLs = new String[] {"INF"};
int[] schemaReplicaFactors = new int[] {1};
int[] dataReplicaFactors = new int[] {1};
int[] timePartitionInterval = new int[] {604800000};
Expand All @@ -96,9 +97,10 @@ public void testManageDatabase() {
}
while (resultSet.next()) {
assertEquals(databaseNames[cnt], resultSet.getString(1));
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(2));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(4));
assertEquals(TTLs[cnt], resultSet.getString(2));
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(4));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(5));
cnt++;
}
assertEquals(databaseNames.length, cnt);
Expand All @@ -115,10 +117,11 @@ public void testManageDatabase() {
}
while (resultSet.next()) {
assertEquals(databaseNames[cnt], resultSet.getString(1));
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(2));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(4));
assertEquals(model[cnt], resultSet.getString(5));
assertEquals(TTLs[cnt], resultSet.getString(2));
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(4));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(5));
assertEquals(model[cnt], resultSet.getString(6));
cnt++;
}
assertEquals(databaseNames.length, cnt);
Expand Down Expand Up @@ -154,9 +157,9 @@ public void testManageDatabase() {

// Test create database with properties
statement.execute(
"create database test_prop with (schema_replication_factor=DEFAULT, data_replication_factor=3, time_partition_interval=100000)");

"create database test_prop with (ttl=300, schema_replication_factor=DEFAULT, data_replication_factor=3, time_partition_interval=100000)");
databaseNames = new String[] {"test_prop"};
TTLs = new String[] {"300"};
dataReplicaFactors = new int[] {3};
timePartitionInterval = new int[] {100000};

Expand All @@ -170,9 +173,10 @@ public void testManageDatabase() {
}
while (resultSet.next()) {
assertEquals(databaseNames[cnt], resultSet.getString(1));
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(2));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(4));
assertEquals(TTLs[cnt], resultSet.getString(2));
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(4));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(5));
cnt++;
}
assertEquals(databaseNames.length, cnt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testManageTable() {
final Statement statement = connection.createStatement()) {

statement.execute("create database test1");
statement.execute("create database test2");
statement.execute("create database test2 with (ttl=300)");

// should specify database before create table
try {
Expand Down Expand Up @@ -287,8 +287,11 @@ public void testManageTable() {

statement.execute("alter table if exists table3 add column speed DOUBLE MEASUREMENT");

tableNames = new String[] {"table2"};
ttls = new String[] {"6600000"};
// Test create table with only time column
statement.execute("create table table3()");

tableNames = new String[] {"table3", "table2"};
ttls = new String[] {"300", "6600000"};

// show tables from current database
try (final ResultSet resultSet = statement.executeQuery("SHOW tables")) {
Expand All @@ -307,8 +310,25 @@ public void testManageTable() {
assertEquals(tableNames.length, cnt);
}

// Test create table with only time column
statement.execute("create table table3()");
statement.execute("alter table table3 set properties ttl=300");
statement.execute("alter table table3 set properties ttl=DEFAULT");

// The table3's ttl shall be also 300
try (final ResultSet resultSet = statement.executeQuery("SHOW tables")) {
int cnt = 0;
ResultSetMetaData metaData = resultSet.getMetaData();
assertEquals(showTablesColumnHeaders.size(), metaData.getColumnCount());
for (int i = 0; i < showTablesColumnHeaders.size(); i++) {
assertEquals(
showTablesColumnHeaders.get(i).getColumnName(), metaData.getColumnName(i + 1));
}
while (resultSet.next()) {
assertEquals(tableNames[cnt], resultSet.getString(1));
assertEquals(ttls[cnt], resultSet.getString(2));
cnt++;
}
assertEquals(tableNames.length, cnt);
}

// show tables from a non-exist database
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ public TShowDatabaseResp showDatabase(final GetDatabasePlan getDatabasePlan) {
final String database = databaseSchema.getName();
final TDatabaseInfo databaseInfo = new TDatabaseInfo();
databaseInfo.setName(database);
databaseInfo.setTTL(databaseSchema.isSetTTL() ? databaseSchema.getTTL() : Long.MAX_VALUE);
databaseInfo.setSchemaReplicationFactor(databaseSchema.getSchemaReplicationFactor());
databaseInfo.setDataReplicationFactor(databaseSchema.getDataReplicationFactor());
databaseInfo.setTimePartitionOrigin(databaseSchema.getTimePartitionOrigin());
Expand Down Expand Up @@ -1321,7 +1322,11 @@ public synchronized Pair<TSStatus, TsTable> updateTableProperties(
updatedProperties.forEach(
(k, v) -> {
originalProperties.put(k, originalTable.getPropValue(k).orElse(null));
updatedTable.addProp(k, v);
if (Objects.nonNull(v)) {
updatedTable.addProp(k, v);
} else {
updatedTable.removeProp(k);
}
});

return new Pair<>(RpcUtils.SUCCESS_STATUS, updatedTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -1125,9 +1124,7 @@ public ShowTableResp showTables(final ShowTablePlan plan) {
final TTableInfo info =
new TTableInfo(
pair.getLeft().getTableName(),
pair.getLeft()
.getPropValue(TTL_PROPERTY.toLowerCase(Locale.ENGLISH))
.orElse(TTL_INFINITE));
pair.getLeft().getPropValue(TTL_PROPERTY).orElse(TTL_INFINITE));
info.setState(pair.getRight().ordinal());
return info;
})
Expand All @@ -1140,9 +1137,7 @@ public ShowTableResp showTables(final ShowTablePlan plan) {
tsTable ->
new TTableInfo(
tsTable.getTableName(),
tsTable
.getPropValue(TTL_PROPERTY.toLowerCase(Locale.ENGLISH))
.orElse(TTL_INFINITE)))
tsTable.getPropValue(TTL_PROPERTY).orElse(TTL_INFINITE)))
.collect(Collectors.toList()));
} catch (final MetadataException e) {
return new ShowTableResp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,11 +816,19 @@ public void renameTableColumn(
public void setTableProperties(
final PartialPath database, final String tableName, final Map<String, String> tableProperties)
throws MetadataException {
final TsTable table = getTable(database, tableName);
final IConfigMNode databaseNode = getDatabaseNodeByDatabasePath(database).getAsMNode();
if (!databaseNode.hasChild(tableName)) {
throw new TableNotExistsException(
database.getFullPath().substring(ROOT.length() + 1), tableName);
}
final TsTable table = ((ConfigTableNode) databaseNode.getChild(tableName)).getTable();
tableProperties.forEach(
(k, v) -> {
if (Objects.nonNull(v)) {
table.addProp(k, v);
} else if (k.equals(TsTable.TTL_PROPERTY)
&& databaseNode.getDatabaseSchema().isSetTTL()) {
table.addProp(k, String.valueOf(databaseNode.getDatabaseSchema().getTTL()));
} else {
table.removeProp(k);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.PreCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.RollbackCreateTablePlan;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
Expand All @@ -41,6 +42,7 @@
import org.apache.iotdb.confignode.procedure.impl.schema.SchemaUtils;
import org.apache.iotdb.confignode.procedure.state.schema.CreateTableState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -141,9 +143,14 @@ private void checkTableExistence(final ConfigNodeProcedureEnv env) {
database.substring(ROOT.length() + 1), table.getTableName()),
TABLE_ALREADY_EXISTS.getStatusCode())));
} else {
final TDatabaseSchema schema =
env.getConfigManager().getClusterSchemaManager().getDatabaseSchemaByName(database);
if (schema.isSetTTL() && !table.getPropValue(TsTable.TTL_PROPERTY).isPresent()) {
table.addProp(TsTable.TTL_PROPERTY, String.valueOf(schema.getTTL()));
}
setNextState(CreateTableState.PRE_CREATE);
}
} catch (final MetadataException e) {
} catch (final MetadataException | DatabaseNotExistsException e) {
setFailure(new ProcedureException(e));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@

import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.commons.schema.SchemaConstant.ROOT;
import static org.apache.iotdb.confignode.procedure.state.schema.DeleteDevicesState.CHECK_TABLE_EXISTENCE;
import static org.apache.iotdb.confignode.procedure.state.schema.DeleteDevicesState.CLEAN_DATANODE_SCHEMA_CACHE;
import static org.apache.iotdb.confignode.procedure.state.schema.DeleteDevicesState.CONSTRUCT_BLACK_LIST;
import static org.apache.iotdb.confignode.procedure.state.schema.DeleteDevicesState.DELETE_DATA;
import static org.apache.iotdb.confignode.procedure.state.schema.DeleteDevicesState.DELETE_DEVICE_SCHEMA;
import static org.apache.iotdb.rpc.TSStatusCode.TABLE_ALREADY_EXISTS;
import static org.apache.iotdb.rpc.TSStatusCode.TABLE_NOT_EXISTS;

public class DeleteDevicesProcedure extends AbstractAlterOrDropTableProcedure<DeleteDevicesState> {
private static final Logger LOGGER = LoggerFactory.getLogger(DeleteDevicesProcedure.class);
Expand Down Expand Up @@ -103,7 +104,7 @@ protected Flow executeFromState(final ConfigNodeProcedureEnv env, final DeleteDe
try {
switch (state) {
case CHECK_TABLE_EXISTENCE:
LOGGER.info("Check the existence of table {}.{}", database, table.getTableName());
LOGGER.info("Check the existence of table {}.{}", database, tableName);
checkTableExistence(env);
break;
case CONSTRUCT_BLACK_LIST:
Expand Down Expand Up @@ -133,24 +134,23 @@ protected Flow executeFromState(final ConfigNodeProcedureEnv env, final DeleteDe
}
return Flow.HAS_MORE_STATE;
} finally {
LOGGER.info(
"DeleteTimeSeries-[{}] costs {}ms", state, (System.currentTimeMillis() - startTime));
LOGGER.info("DeleteDevices-[{}] costs {}ms", state, (System.currentTimeMillis() - startTime));
}
}

private void checkTableExistence(final ConfigNodeProcedureEnv env) {
try {
if (env.getConfigManager()
if (!env.getConfigManager()
.getClusterSchemaManager()
.getTableIfExists(database, table.getTableName())
.getTableIfExists(database, tableName)
.isPresent()) {
setFailure(
new ProcedureException(
new IoTDBException(
String.format(
"Table '%s.%s' already exists.",
database.substring(ROOT.length() + 1), table.getTableName()),
TABLE_ALREADY_EXISTS.getStatusCode())));
"Table '%s.%s' not exists.",
database.substring(ROOT.length() + 1), tableName),
TABLE_NOT_EXISTS.getStatusCode())));
} else {
setNextState(CONSTRUCT_BLACK_LIST);
}
Expand Down Expand Up @@ -328,7 +328,7 @@ protected int getStateId(final DeleteDevicesState deleteDevicesState) {

@Override
protected DeleteDevicesState getInitialState() {
return CONSTRUCT_BLACK_LIST;
return CHECK_TABLE_EXISTENCE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,13 +545,15 @@ private ColumnHeaderConstant() {
public static final List<ColumnHeader> showDBColumnHeaders =
ImmutableList.of(
new ColumnHeader(DATABASE, TSDataType.TEXT),
new ColumnHeader(COLUMN_TTL, TSDataType.TEXT),
new ColumnHeader(SCHEMA_REPLICATION_FACTOR, TSDataType.INT32),
new ColumnHeader(DATA_REPLICATION_FACTOR, TSDataType.INT32),
new ColumnHeader(TIME_PARTITION_INTERVAL, TSDataType.INT64));

public static final List<ColumnHeader> showDBDetailsColumnHeaders =
ImmutableList.of(
new ColumnHeader(DATABASE, TSDataType.TEXT),
new ColumnHeader(COLUMN_TTL, TSDataType.TEXT),
new ColumnHeader(SCHEMA_REPLICATION_FACTOR, TSDataType.INT32),
new ColumnHeader(DATA_REPLICATION_FACTOR, TSDataType.INT32),
new ColumnHeader(TIME_PARTITION_INTERVAL, TSDataType.INT64),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@
import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement;

import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;

import java.util.Collections;
Expand All @@ -137,9 +139,11 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENGTH;
import static org.apache.iotdb.commons.schema.table.TsTable.TABLE_ALLOWED_PROPERTIES_2_DEFAULT_VALUE_MAP;
import static org.apache.iotdb.commons.conf.IoTDBConstant.TTL_INFINITE;
import static org.apache.iotdb.commons.schema.table.TsTable.TABLE_ALLOWED_PROPERTIES;
import static org.apache.iotdb.commons.schema.table.TsTable.TTL_PROPERTY;
import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask.DATA_REGION_GROUP_NUM_KEY;
import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask.DATA_REPLICATION_FACTOR_KEY;
Expand Down Expand Up @@ -205,6 +209,14 @@ protected IConfigTask visitCreateDB(final CreateDB node, final MPPQueryContext c

switch (key) {
case TTL_KEY:
final Optional<String> strValue = parseStringFromLiteralIfBinary(value);
if (strValue.isPresent()) {
if (!strValue.get().equalsIgnoreCase(TTL_INFINITE)) {
throw new SemanticException(
"ttl value must be 'INF' or a long literal, but now is: " + value);
}
break;
}
schema.setTTL(parseLongFromLiteral(value, TTL_KEY));
break;
case SCHEMA_REPLICATION_FACTOR_KEY:
Expand Down Expand Up @@ -443,21 +455,20 @@ private Map<String, String> convertPropertiesToMap(
final Map<String, String> map = new HashMap<>();
for (final Property property : propertyList) {
final String key = property.getName().getValue().toLowerCase(Locale.ENGLISH);
if (TABLE_ALLOWED_PROPERTIES_2_DEFAULT_VALUE_MAP.containsKey(key)) {
if (TABLE_ALLOWED_PROPERTIES.contains(key)) {
if (!property.isSetToDefault()) {
final Expression value = property.getNonDefaultValue();
if (value instanceof Literal
&& Objects.equals(
((Literal) value).getTsValue(),
TABLE_ALLOWED_PROPERTIES_2_DEFAULT_VALUE_MAP.get(key))) {
// Ignore default values
final Optional<String> strValue = parseStringFromLiteralIfBinary(value);
if (strValue.isPresent()) {
if (!strValue.get().equalsIgnoreCase(TTL_INFINITE)) {
throw new SemanticException(
"ttl value must be 'INF' or a long literal, but now is: " + value);
}
map.put(key, strValue.get());
continue;
}
// TODO: support validation for other properties
map.put(
key,
String.valueOf(
parseLongFromLiteral(value, TTL_PROPERTY.toLowerCase(Locale.ENGLISH))));
map.put(key, String.valueOf(parseLongFromLiteral(value, TTL_PROPERTY)));
} else if (serializeDefault) {
map.put(key, null);
}
Expand Down Expand Up @@ -542,6 +553,13 @@ protected IConfigTask visitSetConfiguration(SetConfiguration node, MPPQueryConte
return new SetConfigurationTask(((SetConfigurationStatement) node.getInnerTreeStatement()));
}

private Optional<String> parseStringFromLiteralIfBinary(final Object value) {
return value instanceof Literal && ((Literal) value).getTsValue() instanceof Binary
? Optional.of(
((Binary) ((Literal) value).getTsValue()).getStringValue(TSFileConfig.STRING_CHARSET))
: Optional.empty();
}

private long parseLongFromLiteral(final Object value, final String name) {
if (!(value instanceof LongLiteral)) {
throw new SemanticException(
Expand Down
Loading

0 comments on commit 82a1dc3

Please sign in to comment.