Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
Expand Down Expand Up @@ -242,15 +243,17 @@ public LocalDateTime getDateTime() {
LocalDateTime result;

ArrowType.Timestamp timestampType = ( ArrowType.Timestamp) column.getField().getFieldType().getType();
if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) {
if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) { //DATETIME
result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx);
} else {
} else if (timestampType.getTimezone() == null) { // TIMESTAMP_NTZ
NullableTimeStampNanoHolder valueHoder = new NullableTimeStampNanoHolder();
((TimeStampNanoVector) column).get(idx, valueHoder);
long timestampNanos = valueHoder.value;

result = LocalDateTime.ofEpochSecond(timestampNanos / 1_000_000_000,
(int) (timestampNanos % 1_000_000_000), java.time.ZoneOffset.UTC);
} else { // TIMESTAMP
result = convertToLocalDateTime((TimeStampNanoTZVector) column, idx);
}

/*
Expand Down Expand Up @@ -339,4 +342,9 @@ public LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), timeZone);
}

public LocalDateTime convertToLocalDateTime(TimeStampNanoTZVector nanoTZVector, int index) {
long timestampNano = nanoTZVector.get(index);
return Instant.ofEpochSecond(timestampNano / 1_000_000_000, timestampNano % 1_000_000_000)
.atZone(timeZone).toLocalDateTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
private int readTimeout;
private int retryTimes;

public boolean dateTimePredicatePushDown;

private static final Map<String, ZoneId> REGION_ZONE_MAP;
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
MCProperties.PROJECT,
Expand Down Expand Up @@ -201,7 +203,9 @@ protected void initLocalObjectsImpl() {
accessKey = credential.getAccessKey();
secretKey = credential.getSecretKey();


dateTimePredicatePushDown = Boolean.parseBoolean(
props.getOrDefault(MCProperties.DATETIME_PREDICATE_PUSH_DOWN,
MCProperties.DEFAULT_DATETIME_PREDICATE_PUSH_DOWN));

Account account = new AliyunAccount(accessKey, secretKey);
this.odps = new Odps(account);
Expand Down Expand Up @@ -338,6 +342,10 @@ public int getReadTimeout() {
return readTimeout;
}

public boolean getDateTimePredicatePushDown() {
return dateTimePredicatePushDown;
}

public ZoneId getProjectDateTimeZone() {
makeSureInitialized();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private Type mcTypeToDorisType(TypeInfo typeInfo) {
case DATETIME: {
return ScalarType.createDatetimeV2Type(3);
}
case TIMESTAMP:
case TIMESTAMP_NTZ: {
return ScalarType.createDatetimeV2Type(6);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
import java.util.stream.Collectors;

public class MaxComputeScanNode extends FileQueryScanNode {
static final DateTimeFormatter dateTime3Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
static final DateTimeFormatter dateTime6Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");

private final MaxComputeExternalTable table;
private Predicate filterPredicate;
Expand Down Expand Up @@ -492,16 +494,40 @@ private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws A
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
}
case DATETIME: {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(3);
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getDateTimePredicatePushDown()) {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(3);

return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType),
((MaxComputeExternalCatalog) table.getCatalog()).getProjectDateTimeZone()) + "\" ";
return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType), dateTime3Formatter,
ZoneId.of("UTC")) + "\" ";
}
break;
}
/**
* Disable the predicate pushdown to the odps API because the timestamp precision of odps is 9 and the
* mapping precision of Doris is 6. If we insert `2023-02-02 00:00:00.123456789` into odps, doris reads
* it as `2023-02-02 00:00:00.123456`. Since "789" is missing, we cannot push it down correctly.
*/
case TIMESTAMP: {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getDateTimePredicatePushDown()) {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(6);

return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType), dateTime6Formatter,
ZoneId.of("UTC")) + "\" ";
}
break;
}
case TIMESTAMP_NTZ: {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(6);
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getDateTimePredicatePushDown()) {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(6);
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
}
break;
}
default: {
break;
Expand All @@ -511,12 +537,11 @@ private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws A
}


public static String convertDateTimezone(String dateTimeStr, ZoneId toZone) {
public static String convertDateTimezone(String dateTimeStr, DateTimeFormatter formatter, ZoneId toZone) {
if (DateUtils.getTimeZone().equals(toZone)) {
return dateTimeStr;
}

DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
LocalDateTime localDateTime = LocalDateTime.parse(dateTimeStr, formatter);

ZonedDateTime sourceZonedDateTime = localDateTime.atZone(DateUtils.getTimeZone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public class MCProperties extends BaseProperties {
public static final String SPLIT_CROSS_PARTITION = "mc.split_cross_partition";
public static final String DEFAULT_SPLIT_CROSS_PARTITION = "true";

public static final String DATETIME_PREDICATE_PUSH_DOWN =
"mc.datetime_predicate_push_down";
public static final String DEFAULT_DATETIME_PREDICATE_PUSH_DOWN = "true";

public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
Expand Down
Loading