Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ public MaxComputeExternalTable(long id, String name, String remoteName, MaxCompu
super(id, name, remoteName, catalog, db, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
}

private Map<String, com.aliyun.odps.Column> columnNameToOdpsColumn = new HashMap();

@Override
protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
Expand Down Expand Up @@ -166,7 +164,10 @@ private static List<String> parsePartitionValues(List<String> partitionColumns,
}

public Map<String, com.aliyun.odps.Column> getColumnNameToOdpsColumn() {
return columnNameToOdpsColumn;
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getColumnNameToOdpsColumn())
.orElse(Collections.emptyMap());
}

@Override
Expand All @@ -179,7 +180,7 @@ public Optional<SchemaCacheValue> initSchema() {
TableIdentifier tableIdentifier = mcCatalog.getOdpsTableIdentifier(dbName, name);

List<com.aliyun.odps.Column> columns = odpsTable.getSchema().getColumns();

Map<String, com.aliyun.odps.Column> columnNameToOdpsColumn = new HashMap<>();
for (com.aliyun.odps.Column column : columns) {
columnNameToOdpsColumn.put(column.getName(), column);
}
Expand Down Expand Up @@ -218,7 +219,7 @@ public Optional<SchemaCacheValue> initSchema() {
}

return Optional.of(new MaxComputeSchemaCacheValue(schema, odpsTable, tableIdentifier,
partitionColumnNames, partitionSpecs, partitionDorisColumns, partitionTypes));
partitionColumnNames, partitionSpecs, partitionDorisColumns, partitionTypes, columnNameToOdpsColumn));
}

private Type mcTypeToDorisType(TypeInfo typeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.Setter;

import java.util.List;
import java.util.Map;

@Getter
@Setter
Expand All @@ -37,17 +38,19 @@ public class MaxComputeSchemaCacheValue extends SchemaCacheValue {
private List<String> partitionSpecs;
private List<Column> partitionColumns;
private List<Type> partitionTypes;
private Map<String, com.aliyun.odps.Column> columnNameToOdpsColumn;

public MaxComputeSchemaCacheValue(List<Column> schema, Table odpsTable, TableIdentifier tableIdentifier,
List<String> partitionColumnNames, List<String> partitionSpecs, List<Column> partitionColumns,
List<Type> partitionTypes) {
List<Type> partitionTypes, Map<String, com.aliyun.odps.Column> columnNameToOdpsColumn) {
super(schema);
this.odpsTable = odpsTable;
this.tableIdentifier = tableIdentifier;
this.partitionSpecs = partitionSpecs;
this.partitionColumnNames = partitionColumnNames;
this.partitionColumns = partitionColumns;
this.partitionTypes = partitionTypes;
this.columnNameToOdpsColumn = columnNameToOdpsColumn;
}

public List<Column> getPartitionColumns() {
Expand All @@ -57,4 +60,8 @@ public List<Column> getPartitionColumns() {
public List<String> getPartitionColumnNames() {
return partitionColumnNames;
}

public Map<String, com.aliyun.odps.Column> getColumnNameToOdpsColumn() {
return columnNameToOdpsColumn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import com.aliyun.odps.table.read.split.impl.IndexedInputSplit;
import jline.internal.Log;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -88,6 +90,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
static final DateTimeFormatter dateTime3Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
static final DateTimeFormatter dateTime6Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");

private static final Logger LOG = LogManager.getLogger(MaxComputeScanNode.class);

private final MaxComputeExternalTable table;
private Predicate filterPredicate;
List<String> requiredPartitionColumns = new ArrayList<>();
Expand Down Expand Up @@ -355,6 +359,9 @@ private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException

String columnName = convertSlotRefToColumnName(expr.getChild(0));
if (!table.getColumnNameToOdpsColumn().containsKey(columnName)) {
Map<String, com.aliyun.odps.Column> columnMap = table.getColumnNameToOdpsColumn();
LOG.warn("ColumnNameToOdpsColumn size=" + columnMap.size()
+ ", keys=[" + String.join(", ", columnMap.keySet()) + "]");
throw new AnalysisException("Column " + columnName + " not found in table, can not push "
+ "down predicate to MaxCompute " + table.getName());
}
Expand Down Expand Up @@ -415,6 +422,9 @@ private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException
if (odpsOp != null) {
String columnName = convertSlotRefToColumnName(expr.getChild(0));
if (!table.getColumnNameToOdpsColumn().containsKey(columnName)) {
Map<String, com.aliyun.odps.Column> columnMap = table.getColumnNameToOdpsColumn();
LOG.warn("ColumnNameToOdpsColumn size=" + columnMap.size()
+ ", keys=[" + String.join(", ", columnMap.keySet()) + "]");
throw new AnalysisException("Column " + columnName + " not found in table, can not push "
+ "down predicate to MaxCompute " + table.getName());
}
Expand Down
Loading