Skip to content

Commit

Permalink
[feat-#1293][chunjun-connector-hdfs] add assembleFieldProps for hdfs-…
Browse files Browse the repository at this point in the history
…connector (#1294)

* [feat-#1293][chunjun-connector-hdfs] add assembleFieldProps for hdfs connector during column converter

* [feat-#1293][chunjun-connector-hdfs] add assembleFieldProps for hdfs connector during column converter orc&parquet&text
  • Loading branch information
liumengkai authored Oct 9, 2022
1 parent 6b4add6 commit a2f6b22
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.dtstack.chunjun.connector.hdfs.converter;

import com.dtstack.chunjun.conf.FieldConf;
import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf;
import com.dtstack.chunjun.constants.ConstantValue;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.converter.IDeserializationConverter;
Expand Down Expand Up @@ -61,8 +62,8 @@ public class HdfsOrcColumnConverter
private List<String> ColumnNameList;
private transient Map<String, ColumnTypeUtil.DecimalInfo> decimalColInfo;

public HdfsOrcColumnConverter(List<FieldConf> fieldConfList) {
super(fieldConfList.size());
public HdfsOrcColumnConverter(List<FieldConf> fieldConfList, HdfsConf hdfsConf) {
super(fieldConfList.size(), hdfsConf);
for (int i = 0; i < fieldConfList.size(); i++) {
String type = fieldConfList.get(i).getType();
int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL);
Expand All @@ -83,12 +84,15 @@ public RowData toInternal(RowData input) throws Exception {
ColumnRowData row = new ColumnRowData(input.getArity());
if (input instanceof GenericRowData) {
GenericRowData genericRowData = (GenericRowData) input;
List<FieldConf> fieldConfList = commonConf.getColumn();
for (int i = 0; i < input.getArity(); i++) {
row.addField(
(AbstractBaseColumn)
toInternalConverters
.get(i)
.deserialize(genericRowData.getField(i)));
assembleFieldProps(
fieldConfList.get(i),
(AbstractBaseColumn)
toInternalConverters
.get(i)
.deserialize(genericRowData.getField(i))));
}
} else {
throw new ChunJunRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.dtstack.chunjun.connector.hdfs.converter;

import com.dtstack.chunjun.conf.FieldConf;
import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf;
import com.dtstack.chunjun.connector.hdfs.util.HdfsUtil;
import com.dtstack.chunjun.constants.ConstantValue;
import com.dtstack.chunjun.converter.AbstractRowConverter;
Expand Down Expand Up @@ -64,8 +65,8 @@ public class HdfsParquetColumnConverter
private List<String> columnNameList;
private transient Map<String, ColumnTypeUtil.DecimalInfo> decimalColInfo;

public HdfsParquetColumnConverter(List<FieldConf> fieldConfList) {
super(fieldConfList.size());
public HdfsParquetColumnConverter(List<FieldConf> fieldConfList, HdfsConf hdfsConf) {
super(fieldConfList.size(), hdfsConf);
for (int i = 0; i < fieldConfList.size(); i++) {
String type = fieldConfList.get(i).getType();
int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL);
Expand All @@ -86,12 +87,15 @@ public RowData toInternal(RowData input) throws Exception {
ColumnRowData row = new ColumnRowData(input.getArity());
if (input instanceof GenericRowData) {
GenericRowData genericRowData = (GenericRowData) input;
List<FieldConf> fieldConfList = commonConf.getColumn();
for (int i = 0; i < input.getArity(); i++) {
row.addField(
(AbstractBaseColumn)
toInternalConverters
.get(i)
.deserialize(genericRowData.getField(i)));
assembleFieldProps(
fieldConfList.get(i),
(AbstractBaseColumn)
toInternalConverters
.get(i)
.deserialize(genericRowData.getField(i))));
}
} else {
throw new ChunJunRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.dtstack.chunjun.connector.hdfs.converter;

import com.dtstack.chunjun.conf.FieldConf;
import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf;
import com.dtstack.chunjun.constants.ConstantValue;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.converter.IDeserializationConverter;
Expand Down Expand Up @@ -51,8 +52,8 @@
public class HdfsTextColumnConverter
extends AbstractRowConverter<RowData, RowData, String[], String> {

public HdfsTextColumnConverter(List<FieldConf> fieldConfList) {
super(fieldConfList.size());
public HdfsTextColumnConverter(List<FieldConf> fieldConfList, HdfsConf hdfsConf) {
super(fieldConfList.size(), hdfsConf);
for (int i = 0; i < fieldConfList.size(); i++) {
String type = fieldConfList.get(i).getType();
int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL);
Expand All @@ -73,12 +74,15 @@ public RowData toInternal(RowData input) throws Exception {
ColumnRowData row = new ColumnRowData(input.getArity());
if (input instanceof GenericRowData) {
GenericRowData genericRowData = (GenericRowData) input;
List<FieldConf> fieldConfList = commonConf.getColumn();
for (int i = 0; i < input.getArity(); i++) {
row.addField(
(AbstractBaseColumn)
toInternalConverters
.get(i)
.deserialize(genericRowData.getField(i)));
assembleFieldProps(
fieldConfList.get(i),
(AbstractBaseColumn)
toInternalConverters
.get(i)
.deserialize(genericRowData.getField(i))));
}
} else {
throw new ChunJunRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
useAbstractBaseColumn,
hdfsConf.getFileType(),
hdfsConf.getColumn(),
getRawTypeConverter());
getRawTypeConverter(),
hdfsConf);

builder.setRowConverter(rowConverter, useAbstractBaseColumn);
return createOutput(dataSet, builder.finish());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public DataStream<RowData> createSource() {
useAbstractBaseColumn,
hdfsConf.getFileType(),
hdfsConf.getColumn(),
getRawTypeConverter());
getRawTypeConverter(),
hdfsConf);

builder.setRowConverter(rowConverter, useAbstractBaseColumn);
return createInput(builder.finish());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.dtstack.chunjun.connector.hdfs.util;

import com.dtstack.chunjun.conf.FieldConf;
import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf;
import com.dtstack.chunjun.connector.hdfs.converter.HdfsOrcColumnConverter;
import com.dtstack.chunjun.connector.hdfs.converter.HdfsOrcRowConverter;
import com.dtstack.chunjun.connector.hdfs.converter.HdfsParquetColumnConverter;
Expand Down Expand Up @@ -331,18 +332,19 @@ public static AbstractRowConverter createRowConverter(
boolean useAbstractBaseColumn,
String fileType,
List<FieldConf> fieldConfList,
RawTypeConverter converter) {
RawTypeConverter converter,
HdfsConf hdfsConf) {
AbstractRowConverter rowConverter;
if (useAbstractBaseColumn) {
switch (FileType.getByName(fileType)) {
case ORC:
rowConverter = new HdfsOrcColumnConverter(fieldConfList);
rowConverter = new HdfsOrcColumnConverter(fieldConfList, hdfsConf);
break;
case PARQUET:
rowConverter = new HdfsParquetColumnConverter(fieldConfList);
rowConverter = new HdfsParquetColumnConverter(fieldConfList, hdfsConf);
break;
default:
rowConverter = new HdfsTextColumnConverter(fieldConfList);
rowConverter = new HdfsTextColumnConverter(fieldConfList, hdfsConf);
}
} else {
RowType rowType = TableUtil.createRowType(fieldConfList, converter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ private BaseHdfsOutputFormat createHdfsOutputFormat(
useAbstractBaseColumn,
copyHiveConf.getFileType(),
fieldConfList,
HdfsRawTypeConverter::apply),
HdfsRawTypeConverter::apply,
hiveConf),
useAbstractBaseColumn);
builder.setInitAccumulatorAndDirty(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public AbstractRowConverter(int converterSize) {
this.toExternalConverters = new ArrayList<>(converterSize);
}

public AbstractRowConverter(int converterSize, ChunJunCommonConf commonConf) {
this.toInternalConverters = new ArrayList<>(converterSize);
this.toExternalConverters = new ArrayList<>(converterSize);
this.commonConf = commonConf;
}

protected IDeserializationConverter wrapIntoNullableInternalConverter(
IDeserializationConverter IDeserializationConverter) {
return val -> {
Expand Down Expand Up @@ -187,6 +193,7 @@ protected ISerializationConverter<SinkT> wrapIntoNullableExternalConverter(
public RowData toInternalLookup(LookupT input) throws Exception {
throw new RuntimeException("Subclass need rewriting");
}

/**
* BinaryRowData
*
Expand Down

0 comments on commit a2f6b22

Please sign in to comment.