From 1a15c87ba3eb3f43d7b1f68b38ea61d4bf4bf04c Mon Sep 17 00:00:00 2001 From: dujie Date: Wed, 25 May 2022 01:13:48 +0800 Subject: [PATCH] [feat-880][hdfs]support orc writer when fullcolname size greater than column size --- .../hdfs/sink/HdfsOrcOutputFormat.java | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java index d365e8b8e3..e4cff5c448 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java @@ -75,6 +75,8 @@ public class HdfsOrcOutputFormat extends BaseHdfsOutputFormat { private FileOutputFormat outputFormat; private JobConf jobConf; + protected int[] colIndices; + /** 初始化对象大小计算器 */ protected void initRowSizeCalculator() { rowSizeCalculator = RowSizeCalculator.getRowSizeCalculator(); @@ -108,17 +110,16 @@ protected void openSource() { } FileOutputFormat.setOutputCompressorClass(jobConf, codecClass); - int size = hdfsConf.getColumn().size(); + int size = hdfsConf.getFullColumnType().size(); decimalColInfo = Maps.newHashMapWithExpectedSize(size); List structFieldObjectInspectors = new ArrayList<>(); for (int i = 0; i < size; i++) { - FieldConf fieldConf = hdfsConf.getColumn().get(i); - String columnType = fieldConf.getType(); + String columnType = hdfsConf.getFullColumnType().get(i); if (ColumnTypeUtil.isDecimalType(columnType)) { ColumnTypeUtil.DecimalInfo decimalInfo = ColumnTypeUtil.getDecimalInfo(columnType, ORC_DEFAULT_DECIMAL_INFO); - decimalColInfo.put(fieldConf.getName(), decimalInfo); + decimalColInfo.put(hdfsConf.getFullColumnName().get(i), decimalInfo); } ColumnType type = ColumnType.getType(columnType); structFieldObjectInspectors.add(HdfsUtil.columnTypeToObjectInspetor(type)); @@ -135,6 +136,22 @@ protected void openSource() { this.inspector = ObjectInspectorFactory.getStandardStructObjectInspector( fullColumnNameList, structFieldObjectInspectors); + + colIndices = new int[hdfsConf.getFullColumnName().size()]; + for (int i = 0; i < hdfsConf.getFullColumnName().size(); ++i) { + int j = 0; + for (; j < hdfsConf.getColumn().size(); ++j) { + if (hdfsConf.getFullColumnName() + .get(i) + .equalsIgnoreCase(hdfsConf.getColumn().get(j).getName())) { + colIndices[i] = j; + break; + } + } + if (j == hdfsConf.getColumn().size()) { + colIndices[i] = -1; + } + } } @Override @@ -204,8 +221,18 @@ public void writeSingleRecordToFile(RowData rowData) throws WriteRecordException } try { + List recordList = new ArrayList<>(); + for (int i = 0; i < hdfsConf.getFullColumnName().size(); ++i) { + int colIndex = colIndices[i]; + if (colIndex == -1) { + recordList.add(null); + } else { + recordList.add(data[colIndex]); + } + } + this.recordWriter.write( - NullWritable.get(), this.orcSerde.serialize(data, this.inspector)); + NullWritable.get(), this.orcSerde.serialize(recordList, this.inspector)); rowsOfCurrentBlock++; lastRow = rowData; } catch (IOException e) {