nameValueMap = new HashMap<>((rowKeyColumnIndex.size() << 2) / 3);
+ for (Integer keyColumnIndex : rowKeyColumnIndex) {
+ nameValueMap.put(
+ columnNames.get(keyColumnIndex),
+ ((ColumnRowData) record).getField(keyColumnIndex).getData());
+ }
+
+ String rowKeyStr = functionTree.evaluate(nameValueMap);
+ return rowKeyStr.getBytes(StandardCharsets.UTF_8);
+ }
+
+ public Long getVersion(RowData record) {
+ if (versionColumnIndex == null && StringUtils.isBlank(versionColumnValue)) {
+ return null;
+ }
+
+ Object timeStampValue = versionColumnValue;
+ if (versionColumnIndex != null) {
+ // 指定列作为版本,long/doubleColumn直接record.aslong, 其它类型尝试用yyyy-MM-dd HH:mm:ss,yyyy-MM-dd
+ // HH:mm:ss SSS去format
+ if (versionColumnIndex >= record.getArity() || versionColumnIndex < 0) {
+ throw new IllegalArgumentException(
+ "version column index out of range: " + versionColumnIndex);
+ }
+ if (record.isNullAt(versionColumnIndex)) {
+ throw new IllegalArgumentException("null verison column!");
+ }
+
+ timeStampValue = ((ColumnRowData) record).getField(versionColumnIndex).getData();
+ }
+
+ if (timeStampValue instanceof Long) {
+ return (Long) timeStampValue;
+ } else if (timeStampValue instanceof Double) {
+ return ((Double) timeStampValue).longValue();
+ } else if (timeStampValue instanceof String) {
+
+ try {
+ return Long.valueOf(timeStampValue.toString());
+ } catch (Exception e) {
+ // ignore
+ }
+ java.util.Date date;
+ try {
+ date = timeMillisecondFormat.parse(timeStampValue.toString());
+ } catch (ParseException e) {
+ try {
+ date = timeSecondFormat.parse(timeStampValue.toString());
+ } catch (ParseException e1) {
+ LOG.info(
+ String.format(
+ "您指定第[%s]列作为hbase写入版本,但在尝试用yyyy-MM-dd HH:mm:ss 和 yyyy-MM-dd HH:mm:ss SSS 去解析为Date时均出错,请检查并修改",
+ versionColumnIndex));
+ throw new RuntimeException(e1);
+ }
+ }
+ return date.getTime();
+ } else if (timeStampValue instanceof java.util.Date) {
+ return ((Date) timeStampValue).getTime();
+ } else {
+ throw new RuntimeException("rowkey类型不兼容: " + timeStampValue.getClass());
+ }
+ }
+
+ private SimpleDateFormat getSimpleDateFormat(String sign) {
+ SimpleDateFormat format;
+ if (ConstantValue.TIME_SECOND_SUFFIX.equals(sign)) {
+ format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ } else {
+ format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
+ }
+ return format;
+ }
+
+ private void initRowKeyConfig() {
+ if (StringUtils.isNotBlank(hBaseConf.getRowkeyExpress())) {
+ this.functionTree = FunctionParser.parse(hBaseConf.getRowkeyExpress());
+ this.rowKeyColumns = FunctionParser.parseRowKeyCol(hBaseConf.getRowkeyExpress());
+ this.rowKeyColumnIndex = new ArrayList<>(rowKeyColumns.size());
+ for (String rowKeyColumn : rowKeyColumns) {
+ int index = columnNames.indexOf(rowKeyColumn);
+ if (index == -1) {
+ throw new RuntimeException(
+ "Can not get row key column from columns:" + rowKeyColumn);
+ }
+ rowKeyColumnIndex.add(index);
+ }
+ }
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseSerde.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseSerde.java
similarity index 88%
rename from chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseSerde.java
rename to chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseSerde.java
index 9fbb99059d..e76ec5c2a0 100644
--- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseSerde.java
+++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseSerde.java
@@ -16,7 +16,9 @@
* limitations under the License.
*/
-package com.dtstack.chunjun.connector.hbase;
+package com.dtstack.chunjun.connector.hbase14.converter;
+
+import com.dtstack.chunjun.connector.hbase.HBaseTableSchema;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
@@ -47,16 +49,16 @@
import static org.apache.flink.util.Preconditions.checkArgument;
/** Utilities for HBase serialization and deserialization. */
-public class HBaseSerde {
+public class HBaseSerde implements Serializable {
- protected static final byte[] EMPTY_BYTES = new byte[] {};
+ private static final byte[] EMPTY_BYTES = new byte[] {};
- protected static final int MIN_TIMESTAMP_PRECISION = 0;
- protected static final int MAX_TIMESTAMP_PRECISION = 3;
- protected static final int MIN_TIME_PRECISION = 0;
- protected static final int MAX_TIME_PRECISION = 3;
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+ private static final int MAX_TIMESTAMP_PRECISION = 3;
+ private static final int MIN_TIME_PRECISION = 0;
+ private static final int MAX_TIME_PRECISION = 3;
- protected final byte[] nullStringBytes;
+ private final byte[] nullStringBytes;
// row key index in output row
protected final int rowkeyIndex;
@@ -68,14 +70,14 @@ public class HBaseSerde {
protected final int fieldLength;
- protected GenericRowData reusedRow;
- protected GenericRowData[] reusedFamilyRows;
+ private GenericRowData reusedRow;
+ private GenericRowData[] reusedFamilyRows;
- protected final @Nullable FieldEncoder keyEncoder;
+ private final @Nullable FieldEncoder keyEncoder;
protected final @Nullable FieldDecoder keyDecoder;
- protected final FieldEncoder[][] qualifierEncoders;
+ private final FieldEncoder[][] qualifierEncoders;
protected final FieldDecoder[][] qualifierDecoders;
- protected final GenericRowData rowWithRowKey;
+ private final GenericRowData rowWithRowKey;
public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) {
this.families = hbaseSchema.getFamilyKeys();
@@ -223,43 +225,18 @@ public Get createGet(Object rowKey) {
return get;
}
- /**
- * Converts HBase {@link Result} into a new {@link RowData} instance.
- *
- * Note: this method is thread-safe.
- */
- public RowData convertToNewRow(Result result) {
- // The output rows needs to be initialized each time
- // to prevent the possibility of putting the output object into the cache.
- GenericRowData resultRow = new GenericRowData(fieldLength);
- GenericRowData[] familyRows = new GenericRowData[families.length];
- for (int f = 0; f < families.length; f++) {
- familyRows[f] = new GenericRowData(qualifiers[f].length);
- }
- return convertToRow(result, resultRow, familyRows);
- }
-
- /**
- * Converts HBase {@link Result} into a reused {@link RowData} instance.
- *
- *
Note: this method is NOT thread-safe.
- */
- public RowData convertToReusedRow(Result result) {
- return convertToRow(result, reusedRow, reusedFamilyRows);
- }
-
- protected RowData convertToRow(
- Result result, GenericRowData resultRow, GenericRowData[] familyRows) {
+ /** Converts HBase {@link Result} into {@link RowData}. */
+ public RowData convertToRow(Result result) {
for (int i = 0; i < fieldLength; i++) {
if (rowkeyIndex == i) {
assert keyDecoder != null;
Object rowkey = keyDecoder.decode(result.getRow());
- resultRow.setField(rowkeyIndex, rowkey);
+ reusedRow.setField(rowkeyIndex, rowkey);
} else {
int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
- GenericRowData familyRow = familyRows[f];
+ GenericRowData familyRow = reusedFamilyRows[f];
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
@@ -267,29 +244,25 @@ protected RowData convertToRow(
byte[] value = result.getValue(familyKey, qualifier);
familyRow.setField(q, qualifierDecoders[f][q].decode(value));
}
- resultRow.setField(i, familyRow);
+ reusedRow.setField(i, familyRow);
}
}
- return resultRow;
+ return reusedRow;
}
- /**
- * Converts HBase {@link Result} into {@link RowData}.
- *
- * @deprecated Use {@link #convertToReusedRow(Result)} instead.
- */
- @Deprecated
- public RowData convertToRow(Result result) {
+ /** Converts HBase {@link Result} into {@link RowData}. */
+ public RowData convertToNewRow(Result result) {
+ GenericRowData rowData = new GenericRowData(fieldLength);
for (int i = 0; i < fieldLength; i++) {
if (rowkeyIndex == i) {
assert keyDecoder != null;
Object rowkey = keyDecoder.decode(result.getRow());
- reusedRow.setField(rowkeyIndex, rowkey);
+ rowData.setField(rowkeyIndex, rowkey);
} else {
int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
- GenericRowData familyRow = reusedFamilyRows[f];
+ GenericRowData familyRow = new GenericRowData(this.qualifiers[f].length);
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
@@ -297,10 +270,10 @@ public RowData convertToRow(Result result) {
byte[] value = result.getValue(familyKey, qualifier);
familyRow.setField(q, qualifierDecoders[f][q].decode(value));
}
- reusedRow.setField(i, familyRow);
+ rowData.setField(i, familyRow);
}
}
- return reusedRow;
+ return rowData;
}
// ------------------------------------------------------------------------------------
@@ -309,11 +282,11 @@ public RowData convertToRow(Result result) {
/** Runtime encoder that encodes a specified field in {@link RowData} into byte[]. */
@FunctionalInterface
- protected interface FieldEncoder extends Serializable {
+ public interface FieldEncoder extends Serializable {
byte[] encode(RowData row, int pos);
}
- protected static FieldEncoder createNullableFieldEncoder(
+ private static FieldEncoder createNullableFieldEncoder(
LogicalType fieldType, final byte[] nullStringBytes) {
final FieldEncoder encoder = createFieldEncoder(fieldType);
if (fieldType.isNullable()) {
@@ -342,7 +315,7 @@ protected static FieldEncoder createNullableFieldEncoder(
}
}
- protected static FieldEncoder createFieldEncoder(LogicalType fieldType) {
+ private static FieldEncoder createFieldEncoder(LogicalType fieldType) {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case CHAR:
@@ -400,7 +373,7 @@ protected static FieldEncoder createFieldEncoder(LogicalType fieldType) {
}
}
- protected static FieldEncoder createDecimalEncoder(DecimalType decimalType) {
+ private static FieldEncoder createDecimalEncoder(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return (row, pos) -> {
@@ -409,7 +382,7 @@ protected static FieldEncoder createDecimalEncoder(DecimalType decimalType) {
};
}
- protected static FieldEncoder createTimestampEncoder(final int precision) {
+ private static FieldEncoder createTimestampEncoder(final int precision) {
return (row, pos) -> {
long millisecond = row.getTimestamp(pos, precision).getMillisecond();
return Bytes.toBytes(millisecond);
@@ -427,7 +400,7 @@ protected interface FieldDecoder extends Serializable {
Object decode(byte[] value);
}
- protected static FieldDecoder createNullableFieldDecoder(
+ private static FieldDecoder createNullableFieldDecoder(
LogicalType fieldType, final byte[] nullStringBytes) {
final FieldDecoder decoder = createFieldDecoder(fieldType);
if (fieldType.isNullable()) {
@@ -453,7 +426,7 @@ protected static FieldDecoder createNullableFieldDecoder(
}
}
- protected static FieldDecoder createFieldDecoder(LogicalType fieldType) {
+ private static FieldDecoder createFieldDecoder(LogicalType fieldType) {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case CHAR:
@@ -511,7 +484,7 @@ protected static FieldDecoder createFieldDecoder(LogicalType fieldType) {
}
}
- protected static FieldDecoder createDecimalDecoder(DecimalType decimalType) {
+ private static FieldDecoder createDecimalDecoder(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return value -> {
@@ -520,7 +493,7 @@ protected static FieldDecoder createDecimalDecoder(DecimalType decimalType) {
};
}
- protected static FieldDecoder createTimestampDecoder() {
+ private static FieldDecoder createTimestampDecoder() {
return value -> {
// TODO: support higher precision
long milliseconds = Bytes.toLong(value);
@@ -528,6 +501,16 @@ protected static FieldDecoder createTimestampDecoder() {
};
}
+ @Nullable
+ public FieldEncoder getKeyEncoder() {
+ return keyEncoder;
+ }
+
+ @Nullable
+ public FieldDecoder getKeyDecoder() {
+ return keyDecoder;
+ }
+
public byte[] getRowKey(Object rowKey) {
checkArgument(keyEncoder != null, "row key is not set.");
rowWithRowKey.setField(0, rowKey);
diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HbaseRowConverter.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HbaseRowConverter.java
new file mode 100644
index 0000000000..7379964795
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HbaseRowConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.hbase14.converter;
+
+import com.dtstack.chunjun.connector.hbase.HBaseTableSchema;
+import com.dtstack.chunjun.converter.AbstractRowConverter;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+
+public class HbaseRowConverter
+ extends AbstractRowConverter {
+ private HBaseTableSchema schema;
+ private String nullStringLiteral;
+ private transient HBaseSerde serde;
+
+ public HbaseRowConverter(HBaseTableSchema schema, String nullStringLiteral) {
+ // super(rowType);
+ this.schema = schema;
+ this.nullStringLiteral = nullStringLiteral;
+ }
+
+ @Override
+ public RowData toInternal(Result input) throws Exception {
+ if (serde == null) {
+ this.serde = new HBaseSerde(schema, nullStringLiteral);
+ }
+
+ return serde.convertToRow(input);
+ }
+
+ @Override
+ public Mutation toExternal(RowData rowData, Mutation output) throws Exception {
+ if (serde == null) {
+ this.serde = new HBaseSerde(schema, nullStringLiteral);
+ }
+ RowKind kind = rowData.getRowKind();
+ if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+ return serde.createPutMutation(rowData);
+ } else {
+ return serde.createDeleteMutation(rowData);
+ }
+ }
+
+ @Override
+ public RowData toInternalLookup(RowData input) throws Exception {
+ return input;
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java
index 00db12d245..f73624d372 100644
--- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java
+++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java
@@ -16,17 +16,28 @@
package com.dtstack.chunjun.connector.hbase14.sink;
import com.dtstack.chunjun.conf.SyncConf;
-import com.dtstack.chunjun.connector.hbase.HBaseColumnConverter;
-import com.dtstack.chunjun.connector.hbase14.conf.HBaseConf;
-import com.dtstack.chunjun.connector.hbase14.converter.HBaseRawTypeConverter;
+import com.dtstack.chunjun.connector.hbase.HBaseTableSchema;
+import com.dtstack.chunjun.connector.hbase.conf.HBaseConf;
+import com.dtstack.chunjun.connector.hbase.converter.HBaseRawTypeConverter;
+import com.dtstack.chunjun.connector.hbase14.converter.HBaseColumnConverter;
+import com.dtstack.chunjun.connector.hbase14.converter.HbaseRowConverter;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.converter.RawTypeConverter;
import com.dtstack.chunjun.sink.SinkFactory;
import com.dtstack.chunjun.util.GsonUtil;
+import com.dtstack.chunjun.util.TableUtil;
+import com.dtstack.chunjun.util.ValueUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.List;
+import java.util.Map;
public class HBase14SinkFactory extends SinkFactory {
@@ -38,32 +49,91 @@ public HBase14SinkFactory(SyncConf config) {
GsonUtil.GSON.fromJson(
GsonUtil.GSON.toJson(config.getWriter().getParameter()), HBaseConf.class);
super.initCommonConf(hbaseConf);
- hbaseConf.setColumnMetaInfos(syncConf.getWriter().getFieldList());
+ hbaseConf.setColumn(syncConf.getWriter().getFieldList());
+
+ if (config.getWriter().getParameter().get("rowkeyColumn") != null) {
+ String rowkeyColumn =
+ buildRowKeyExpress(config.getWriter().getParameter().get("rowkeyColumn"));
+ hbaseConf.setRowkeyExpress(rowkeyColumn);
+ }
+
+ if (config.getWriter().getParameter().get("versionColumn") != null) {
+ Map versionColumn =
+ (Map) config.getWriter().getParameter().get("versionColumn");
+ if (null != versionColumn.get("index")
+ && StringUtils.isNotBlank(versionColumn.get("index").toString())) {
+ hbaseConf.setVersionColumnIndex(
+ Integer.valueOf(versionColumn.get("index").toString()));
+ }
+
+ if (null != versionColumn.get("value")
+ && StringUtils.isNotBlank(versionColumn.get("value").toString())) {
+ hbaseConf.setVersionColumnValue(versionColumn.get("value").toString());
+ }
+ }
}
@Override
public DataStreamSink createSink(DataStream dataSet) {
HBaseOutputFormatBuilder builder = new HBaseOutputFormatBuilder();
builder.setConfig(hbaseConf);
- builder.setColumnMetaInfos(hbaseConf.getColumnMetaInfos());
- builder.setEncoding(hbaseConf.getEncoding());
+ builder.setHbaseConf(hbaseConf);
+
builder.setHbaseConfig(hbaseConf.getHbaseConfig());
- builder.setNullMode(hbaseConf.getNullMode());
- builder.setRowkeyExpress(hbaseConf.getRowkeyExpress());
builder.setTableName(hbaseConf.getTable());
- builder.setVersionColumnIndex(hbaseConf.getVersionColumnIndex());
- builder.setVersionColumnValues(hbaseConf.getVersionColumnValue());
- builder.setWalFlag(hbaseConf.getWalFlag());
- builder.setRowkeyExpress(hbaseConf.getRowkeyExpress());
builder.setWriteBufferSize(hbaseConf.getWriteBufferSize());
- AbstractRowConverter rowConverter =
- new HBaseColumnConverter(hbaseConf.getColumnMetaInfos());
+ AbstractRowConverter rowConverter;
+ if (useAbstractBaseColumn) {
+ final RowType rowType =
+ TableUtil.createRowType(hbaseConf.getColumn(), getRawTypeConverter());
+ rowConverter = new HBaseColumnConverter(hbaseConf, rowType);
+ } else {
+ TableSchema tableSchema =
+ TableUtil.createTableSchema(hbaseConf.getColumn(), getRawTypeConverter());
+ HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
+ String nullStringLiteral = hbaseConf.getNullStringLiteral();
+ rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral);
+ }
+
builder.setRowConverter(rowConverter);
return createOutput(dataSet, builder.finish());
}
@Override
public RawTypeConverter getRawTypeConverter() {
- return new HBaseRawTypeConverter();
+ return HBaseRawTypeConverter::apply;
+ }
+
+ /** Compatible with old formats */
+ private String buildRowKeyExpress(Object rowKeyInfo) {
+ if (rowKeyInfo == null) {
+ return null;
+ }
+
+ if (rowKeyInfo instanceof String) {
+ return rowKeyInfo.toString();
+ }
+
+ if (!(rowKeyInfo instanceof List)) {
+ return null;
+ }
+
+ StringBuilder expressBuilder = new StringBuilder();
+
+ for (Map item : ((List