Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 224 additions & 107 deletions docs/en/administrator-guide/dynamic-partition.md

Large diffs are not rendered by default.

330 changes: 214 additions & 116 deletions docs/zh-CN/administrator-guide/dynamic-partition.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ protected void batchAddAlterJobV2(List<AlterJobV2> alterJobV2List) {
// return true iff job is actually added this time
private boolean addAlterJobV2ToTableNotFinalStateJobMap(AlterJobV2 alterJobV2) {
if (alterJobV2.isDone()) {
LOG.warn("try to add a final job({}) to a unfinal set", alterJobV2.getJobId());
LOG.warn("try to add a final job({}) to a unfinal set. db: {}, tbl: {}",
alterJobV2.getJobId(), alterJobV2.getDbId(), alterJobV2.getTableId());
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.analysis;

import com.google.common.base.Strings;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
Expand All @@ -26,6 +25,8 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.qe.ShowResultSetMetaData;

import com.google.common.base.Strings;

public class ShowDynamicPartitionStmt extends ShowStmt {
private String db;
private static final ShowResultSetMetaData SHOW_DYNAMIC_PARTITION_META_DATA =
Expand All @@ -37,6 +38,7 @@ public class ShowDynamicPartitionStmt extends ShowStmt {
.addColumn(new Column("End", ScalarType.createVarchar(20)))
.addColumn(new Column("Prefix", ScalarType.createVarchar(20)))
.addColumn(new Column("Buckets", ScalarType.createVarchar(20)))
.addColumn(new Column("StartOf", ScalarType.createVarchar(20)))
.addColumn(new Column("LastUpdateTime", ScalarType.createVarchar(20)))
.addColumn(new Column("LastSchedulerTime", ScalarType.createVarchar(20)))
.addColumn(new Column("State", ScalarType.createVarchar(20)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

package org.apache.doris.catalog;

import org.apache.doris.analysis.TimestampArithmeticExpr.TimeUnit;
import org.apache.doris.common.util.DynamicPartitionUtil.StartOfDate;
import org.apache.doris.common.util.TimeUtils;

import java.util.Map;
import java.util.TimeZone;

public class DynamicPartitionProperty{
public static final String TIME_UNIT = "dynamic_partition.time_unit";
Expand All @@ -26,6 +31,10 @@ public class DynamicPartitionProperty{
public static final String PREFIX = "dynamic_partition.prefix";
public static final String BUCKETS = "dynamic_partition.buckets";
public static final String ENABLE = "dynamic_partition.enable";
public static final String START_DAY_OF_WEEK = "dynamic_partition.start_day_of_week";
public static final String START_DAY_OF_MONTH = "dynamic_partition.start_day_of_month";

public static final int MIN_START_OFFSET = Integer.MIN_VALUE;

private boolean exist;

Expand All @@ -35,22 +44,44 @@ public class DynamicPartitionProperty{
private int end;
private String prefix;
private int buckets;
private StartOfDate startOfWeek;
private StartOfDate startOfMonth;
// TODO: support setting timezone.
private TimeZone tz = TimeUtils.getDefaultTimeZone();


DynamicPartitionProperty(Map<String ,String> properties) {
public DynamicPartitionProperty(Map<String, String> properties) {
if (properties != null && !properties.isEmpty()) {
this.exist = true;
this.enable = Boolean.parseBoolean(properties.get(ENABLE));
this.timeUnit = properties.get(TIME_UNIT);
// In order to compatible dynamic add partition version
this.start = Integer.parseInt(properties.getOrDefault(START, String.valueOf(Integer.MIN_VALUE)));
this.start = Integer.parseInt(properties.getOrDefault(START, String.valueOf(MIN_START_OFFSET)));
this.end = Integer.parseInt(properties.get(END));
this.prefix = properties.get(PREFIX);
this.buckets = Integer.parseInt(properties.get(BUCKETS));
createStartOfs(properties);
} else {
this.exist = false;
}
}

private void createStartOfs(Map<String, String> properties) {
if (properties.containsKey(START_DAY_OF_WEEK)) {
startOfWeek = new StartOfDate(-1, -1, Integer.valueOf(properties.get(START_DAY_OF_WEEK)));
} else {
// default:
startOfWeek = new StartOfDate(-1, -1, 1 /* start from MONDAY */);
}

if (properties.containsKey(START_DAY_OF_MONTH)) {
startOfMonth = new StartOfDate(-1, Integer.valueOf(properties.get(START_DAY_OF_MONTH)), -1);
} else {
// default:
startOfMonth = new StartOfDate(-1, 1 /* 1st of month */, -1);
}
}

public boolean isExist() {
return exist;
}
Expand Down Expand Up @@ -79,13 +110,42 @@ public boolean getEnable() {
return enable;
}

public StartOfDate getStartOfWeek() {
return startOfWeek;
}

public StartOfDate getStartOfMonth() {
return startOfMonth;
}

public String getStartOfInfo() {
if (getTimeUnit().equalsIgnoreCase(TimeUnit.WEEK.toString())) {
return startOfWeek.toDisplayInfo();
} else if (getTimeUnit().equalsIgnoreCase(TimeUnit.MONTH.toString())) {
return startOfMonth.toDisplayInfo();
} else {
return "N/A";
}
}

public TimeZone getTimeZone() {
return tz;
}


@Override
public String toString() {
return ",\n\"" + ENABLE + "\" = \"" + enable + "\"" +
String res = ",\n\"" + ENABLE + "\" = \"" + enable + "\"" +
",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\"" +
",\n\"" + START + "\" = \"" + start + "\"" +
",\n\"" + END + "\" = \"" + end + "\"" +
",\n\"" + PREFIX + "\" = \"" + prefix + "\"" +
",\n\"" + BUCKETS + "\" = \"" + buckets + "\"";
if (getTimeUnit().equalsIgnoreCase(TimeUnit.WEEK.toString())) {
res += ",\n\"" + START_DAY_OF_WEEK + "\" = \"" + startOfWeek.dayOfWeek + "\"";
} else if (getTimeUnit().equalsIgnoreCase(TimeUnit.MONTH.toString())) {
res += ",\n\"" + START_DAY_OF_MONTH + "\" = \"" + startOfMonth.day + "\"";
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.doris.catalog.DynamicPartitionProperty;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.Table;
Expand Down Expand Up @@ -127,20 +126,20 @@ private Map<String, String> createDefaultRuntimeInfo() {
return defaultRuntimeInfo;
}

private ArrayList<AddPartitionClause> getAddPartitionClause(OlapTable olapTable, Column partitionColumn, String partitionFormat) {
private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable,
Column partitionColumn, String partitionFormat) {
ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
Calendar currentDate = Calendar.getInstance(dynamicPartitionProperty.getTimeZone());
for (int i = 0; i <= dynamicPartitionProperty.getEnd(); i++) {
String prevBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
i, (Calendar) calendar.clone(), partitionFormat);
// continue if partition already exists
String nextBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
i + 1, (Calendar) calendar.clone(), partitionFormat);
String prevBorder = DynamicPartitionUtil.getPartitionRangeString(dynamicPartitionProperty,
(Calendar) currentDate.clone(), i, partitionFormat);
String nextBorder = DynamicPartitionUtil.getPartitionRangeString(dynamicPartitionProperty,
(Calendar) currentDate.clone(), i + 1, partitionFormat);
PartitionValue lowerValue = new PartitionValue(prevBorder);
PartitionValue upperValue = new PartitionValue(nextBorder);
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
RangePartitionInfo info = (RangePartitionInfo) (partitionInfo);

boolean isPartitionExists = false;
Range<PartitionKey> addPartitionKeyRange;
try {
Expand All @@ -149,10 +148,11 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(OlapTable olapTable,
addPartitionKeyRange = Range.closedOpen(lowerBound, upperBound);
} catch (AnalysisException e) {
// keys.size is always equal to column.size, cannot reach this exception
LOG.warn("Keys size is not equal to column size. Error={}", e.getMessage());
LOG.warn("Keys size is not equal to column size. Error={}, db: {}, table: {}", e.getMessage(),
db.getFullName(), olapTable.getName());
Copy link
Contributor

@WingsGo WingsGo May 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as following comment

continue;
}
for (Range<PartitionKey> partitionKeyRange : info.getIdToRange(false).values()) {
for (Range<PartitionKey> partitionKeyRange : rangePartitionInfo.getIdToRange(false).values()) {
// only support single column partition now
try {
RangeUtils.checkRangeIntersect(partitionKeyRange, addPartitionKeyRange);
Expand All @@ -161,7 +161,7 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(OlapTable olapTable,
if (addPartitionKeyRange.equals(partitionKeyRange)) {
clearCreatePartitionFailedMsg(olapTable.getName());
} else {
recordCreatePartitionFailedMsg(olapTable.getName(), e.getMessage());
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), e.getMessage());
}
break;
}
Expand All @@ -174,7 +174,8 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(OlapTable olapTable,
PartitionKeyDesc partitionKeyDesc = new PartitionKeyDesc(Collections.singletonList(lowerValue), Collections.singletonList(upperValue));
HashMap<String, String> partitionProperties = new HashMap<>(1);
partitionProperties.put("replication_num", String.valueOf(DynamicPartitionUtil.estimateReplicateNum(olapTable)));
String partitionName = dynamicPartitionProperty.getPrefix() + DynamicPartitionUtil.getFormattedPartitionName(prevBorder, dynamicPartitionProperty.getTimeUnit());
String partitionName = dynamicPartitionProperty.getPrefix() + DynamicPartitionUtil.getFormattedPartitionName(
dynamicPartitionProperty.getTimeZone(), prevBorder, dynamicPartitionProperty.getTimeUnit());
SingleRangePartitionDesc rangePartitionDesc = new SingleRangePartitionDesc(true, partitionName,
partitionKeyDesc, partitionProperties);

Expand All @@ -192,15 +193,23 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(OlapTable olapTable,
return addPartitionClauses;
}

private ArrayList<DropPartitionClause> getDropPartitionClause(OlapTable olapTable, Column partitionColumn, String partitionFormat) {
/*
* 1. get the range of [start, 0) as a reserved range.
* 2. get DropPartitionClause of partitions which range are before this reserved range.
*/
private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapTable olapTable, Column partitionColumn, String partitionFormat) {
ArrayList<DropPartitionClause> dropPartitionClauses = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
if (dynamicPartitionProperty.getStart() == DynamicPartitionProperty.MIN_START_OFFSET) {
// not set start offset, so not drop any partition
return dropPartitionClauses;
}

String lowerBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
dynamicPartitionProperty.getStart(), (Calendar) calendar.clone(), partitionFormat);
String upperBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
0, (Calendar) calendar.clone(), partitionFormat);
Calendar currentDate = Calendar.getInstance(dynamicPartitionProperty.getTimeZone());
String lowerBorder = DynamicPartitionUtil.getPartitionRangeString(dynamicPartitionProperty,
(Calendar) currentDate.clone(), dynamicPartitionProperty.getStart(), partitionFormat);
String upperBorder = DynamicPartitionUtil.getPartitionRangeString(dynamicPartitionProperty,
(Calendar) currentDate.clone(), 0, partitionFormat);
PartitionValue lowerPartitionValue = new PartitionValue(lowerBorder);
PartitionValue upperPartitionValue = new PartitionValue(upperBorder);
Range<PartitionKey> reservePartitionKeyRange;
Expand All @@ -210,7 +219,8 @@ private ArrayList<DropPartitionClause> getDropPartitionClause(OlapTable olapTabl
reservePartitionKeyRange = Range.closedOpen(lowerBound, upperBound);
} catch (AnalysisException e) {
// keys.size is always equal to column.size, cannot reach this exception
LOG.warn("Keys size is not equal to column size. Error={}", e.getMessage());
LOG.warn("Keys size is not equal to column size. Error={}, db: {}, table: {}", e.getMessage(),
db.getFullName(), olapTable.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should remove the warning, because dynamic_partition.start is default to Integer.MIN_VALUE, so we may got a AnalysisException like this Error=errCode = 2, detailMessage = date literal [5877471-07-26 00:00:00] is invalid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will check this offset to make it reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just make a check that if start offset == Integer.MIN_VALUE, it will not drop any partition.
And for adding partition, I just leave it to user to make sure the value is reasonable.

return dropPartitionClauses;
}
RangePartitionInfo info = (RangePartitionInfo) (olapTable.getPartitionInfo());
Expand Down Expand Up @@ -249,8 +259,8 @@ private void executeDynamicPartition() {
ArrayList<DropPartitionClause> dropPartitionClauses;
String tableName;
boolean skipAddPartition = false;
db.readLock();
OlapTable olapTable;
db.readLock();
try {
olapTable = (OlapTable) db.getTable(tableId);
// Only OlapTable has DynamicPartitionProperty
Expand All @@ -264,8 +274,7 @@ private void executeDynamicPartition() {
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
String errorMsg = "Table[" + olapTable.getName() + "]'s state is not NORMAL."
+ "Do not allow doing dynamic add partition. table state=" + olapTable.getState();
recordCreatePartitionFailedMsg(olapTable.getName(), errorMsg);
LOG.info(errorMsg);
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), errorMsg);
skipAddPartition = true;
}

Expand All @@ -275,19 +284,25 @@ private void executeDynamicPartition() {
// scheduler time should be record even no partition added
createOrUpdateRuntimeInfo(olapTable.getName(), LAST_SCHEDULER_TIME, TimeUtils.getCurrentFormatTime());
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
if (rangePartitionInfo.getPartitionColumns().size() != 1) {
// currently only support partition with single column.
iterator.remove();
continue;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single column check work is done when create table, no need to check here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just a self-defence~

Column partitionColumn = rangePartitionInfo.getPartitionColumns().get(0);
String partitionFormat;
try {
partitionFormat = DynamicPartitionUtil.getPartitionFormat(partitionColumn);
} catch (DdlException e) {
recordCreatePartitionFailedMsg(olapTable.getName(), e.getMessage());
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), e.getMessage());
continue;
}

if (!skipAddPartition) {
addPartitionClauses = getAddPartitionClause(olapTable, partitionColumn, partitionFormat);
addPartitionClauses = getAddPartitionClause(db, olapTable, partitionColumn, partitionFormat);
}
dropPartitionClauses = getDropPartitionClause(olapTable, partitionColumn, partitionFormat);
dropPartitionClauses = getDropPartitionClause(db, olapTable, partitionColumn, partitionFormat);
tableName = olapTable.getName();
} finally {
db.readUnlock();
Expand All @@ -299,7 +314,7 @@ private void executeDynamicPartition() {
Catalog.getCurrentCatalog().dropPartition(db, olapTable, dropPartitionClause);
clearDropPartitionFailedMsg(tableName);
} catch (DdlException e) {
recordDropPartitionFailedMsg(tableName, e.getMessage());
recordDropPartitionFailedMsg(db.getFullName(), tableName, e.getMessage());
} finally {
db.writeUnlock();
}
Expand All @@ -311,15 +326,15 @@ private void executeDynamicPartition() {
Catalog.getCurrentCatalog().addPartition(db, tableName, addPartitionClause);
clearCreatePartitionFailedMsg(tableName);
} catch (DdlException e) {
recordCreatePartitionFailedMsg(tableName, e.getMessage());
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage());
}
}
}
}
}

private void recordCreatePartitionFailedMsg(String tableName, String msg) {
LOG.warn("dynamic add partition failed: " + msg);
private void recordCreatePartitionFailedMsg(String dbName, String tableName, String msg) {
LOG.warn("dynamic add partition failed: {}, db: {}, table: {}", msg, dbName, tableName);
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
createOrUpdateRuntimeInfo(tableName, CREATE_PARTITION_MSG, msg);
}
Expand All @@ -329,8 +344,8 @@ private void clearCreatePartitionFailedMsg(String tableName) {
createOrUpdateRuntimeInfo(tableName, CREATE_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
}

private void recordDropPartitionFailedMsg(String tableName, String msg) {
LOG.warn("dynamic drop partition failed: " + msg);
private void recordDropPartitionFailedMsg(String dbName, String tableName, String msg) {
LOG.warn("dynamic drop partition failed: {}, db: {}, table: {}", msg, dbName, tableName);
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
createOrUpdateRuntimeInfo(tableName, DROP_PARTITION_MSG, msg);
}
Expand Down
Loading