Skip to content

Commit

Permalink
[Feature-#1451] add column replace for hbase column (#1452)
Browse files Browse the repository at this point in the history
(cherry picked from commit d3db08b)
  • Loading branch information
liumengkai authored and OT-XY committed Mar 3, 2023
1 parent 99f35a2 commit 1c893db
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,15 @@ public static String replaceColToStringFunc(String express) {

return express;
}

public static List<String> getRegexColumnName(String qualifier) {
Matcher matcher = COL_PATTERN.matcher(qualifier);
ArrayList<String> columnQualifier = new ArrayList<>();
while (matcher.find()) {
String columnGroup = matcher.group();
String column = columnGroup.substring(2, columnGroup.length() - 1);
columnQualifier.add(column);
}
return columnQualifier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@
import java.text.SimpleDateFormat;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -96,6 +98,8 @@ public class HBaseColumnConverter

private final List<String> columnNames = new ArrayList<>();

private final List<String> columnNamesWithoutcf = new ArrayList<>();

private final String encoding;

private final HBaseConf hBaseConf;
Expand All @@ -106,7 +110,13 @@ public class HBaseColumnConverter

private final List<FieldConf> fieldList;

private final byte[][][] familyAndQualifier;
private byte[][][] familyAndQualifier;

private final byte[][][] familyAndQualifierBack;

private final ArrayList<HashMap<String, Integer>> columnConfig;

private final HashSet<Integer> columnConfigIndex;

public HBaseColumnConverter(HBaseConf hBaseConf, RowType rowType) {
super(rowType, hBaseConf);
Expand All @@ -123,6 +133,9 @@ public HBaseColumnConverter(HBaseConf hBaseConf, RowType rowType) {
createExternalConverter(rowType.getTypeAt(i)), rowType.getTypeAt(i)));
}
this.familyAndQualifier = new byte[rowType.getFieldCount()][][];
this.familyAndQualifierBack = new byte[rowType.getFieldCount()][][];
this.columnConfig = new ArrayList<>(rowType.getFieldCount());
this.columnConfigIndex = new HashSet<>(rowType.getFieldCount());
for (int i = 0; i < hBaseConf.getColumn().size(); i++) {
FieldConf fieldConf = hBaseConf.getColumn().get(i);
String name = fieldConf.getName();
Expand All @@ -131,20 +144,26 @@ public HBaseColumnConverter(HBaseConf hBaseConf, RowType rowType) {
if (cfAndQualifier.length == 2
&& StringUtils.isNotBlank(cfAndQualifier[0])
&& StringUtils.isNotBlank(cfAndQualifier[1])) {

columnNamesWithoutcf.add(cfAndQualifier[1]);
byte[][] qualifierKeys = new byte[2][];
qualifierKeys[0] = Bytes.toBytes(cfAndQualifier[0]);
qualifierKeys[1] = Bytes.toBytes(cfAndQualifier[1]);
qualifierKeys[0] = Bytes.toBytes(cfAndQualifier[0]); // 列族
qualifierKeys[1] = Bytes.toBytes(cfAndQualifier[1]); // 列名
columnConfig.add(i, handleColumnConfig(cfAndQualifier[1]));
familyAndQualifier[i] = qualifierKeys;
familyAndQualifierBack[i] = Arrays.copyOf(qualifierKeys, qualifierKeys.length);
} else if (KEY_ROW_KEY.equals(name)) {
rowKeyIndex = i;
columnNamesWithoutcf.add(KEY_ROW_KEY);
columnConfig.add(i, null);
} else if (!StringUtils.isBlank(fieldConf.getValue())) {
familyAndQualifier[i] = new byte[2][];
familyAndQualifierBack[i] = new byte[2][];
} else {
throw new IllegalArgumentException(
"hbase 中,column 的列配置格式应该是:列族:列名. 您配置的列错误:" + name);
}
}

fieldList = hBaseConf.getColumnMetaInfos();

this.hBaseConf = hBaseConf;
Expand Down Expand Up @@ -193,10 +212,28 @@ public Mutation toExternal(RowData rowData, Mutation output) throws Exception {
}

for (int i = 0; i < rowData.getArity(); i++) {
if (rowKeyIndex == i) {
if (rowKeyIndex == i || columnConfigIndex.contains(i)) {
continue;
}
if (columnConfig.get(i) != null) {
byte[][] qualifier = familyAndQualifier[i];
qualifier[1] =
fillColumnConfig(new String(qualifier[1]), columnConfig.get(i), rowData);
familyAndQualifier[i] = qualifier;
}
toExternalConverters.get(i).serialize(rowData, i, put);
if (i == rowData.getArity() - 1) {
for (int x = 0; x < familyAndQualifierBack.length; x++) {
familyAndQualifier[x] =
Arrays.copyOf(
familyAndQualifierBack[x], familyAndQualifierBack[x].length);
if (x + 1 < familyAndQualifierBack.length
&& familyAndQualifierBack[x + 1] == null) {
familyAndQualifier[x + 1] = null;
x = x + 1;
}
}
}
}
return put;
}
Expand Down Expand Up @@ -594,4 +631,34 @@ private static SimpleDateFormat getSimpleDateFormat(String sign) {
}
return format;
}

private HashMap<String, Integer> handleColumnConfig(String qualifier) {
HashMap<String, Integer> columnConfigMap = new HashMap<>(columnNames.size());
List<String> regexColumnNameList = FunctionParser.getRegexColumnName(qualifier);
if (!regexColumnNameList.isEmpty()) {
for (int i = 0; i < regexColumnNameList.size(); i++) {
columnConfigMap.put(
regexColumnNameList.get(i),
columnNamesWithoutcf.indexOf(regexColumnNameList.get(i)));
columnConfigIndex.add(columnNamesWithoutcf.indexOf(regexColumnNameList.get(i)));
}
} else {
columnConfigMap = null;
}
return columnConfigMap;
}

private byte[] fillColumnConfig(
String columnValue, HashMap<String, Integer> columnConfigMap, RowData rowData) {
List<String> regexColumnNameList = FunctionParser.getRegexColumnName(columnValue);
for (String regrexColumn : regexColumnNameList) {
Integer columnIndex = columnConfigMap.get(regrexColumn);
columnValue =
StringUtils.replace(
columnValue,
"$(" + regrexColumn + ")",
rowData.getString(columnIndex).toString());
}
return Bytes.toBytes(columnValue);
}
}

0 comments on commit 1c893db

Please sign in to comment.