From d6277829ed589be3d18d782447ecae00bbbacf63 Mon Sep 17 00:00:00 2001 From: Gongjiang Tang Date: Mon, 15 Aug 2022 16:06:23 +0800 Subject: [PATCH] [feat]refactor hbase to support transformer and fix some bug --- .../hbase14/common/HBaseTypeUtils.java | 229 ++++++ .../hbase14/converter/AsyncHBaseSerde.java | 86 --- .../converter/HBaseColumnConverter.java | 695 ++++++++++-------- .../converter/HBaseFlatRowConverter.java | 444 +++++++++++ .../converter/HBaseRawTypeConverter.java | 78 +- .../hbase14/converter/HBaseSerde.java | 8 +- .../hbase14/converter/HbaseRowConverter.java | 87 +++ .../hbase14/sink/HBase14SinkFactory.java | 87 +-- .../hbase14/sink/HBaseOutputFormat.java | 49 +- .../sink/HBaseOutputFormatBuilder.java | 7 +- .../hbase14/source/HBase14SourceFactory.java | 165 ++++- .../hbase14/source/HBaseInputFormat.java | 136 ++-- .../source/HBaseInputFormatBuilder.java | 84 ++- .../hbase14/table/HBaseDynamicTableSink.java | 51 +- .../table/HBaseDynamicTableSource.java | 45 +- .../table/Hbase14DynamicTableFactory.java | 53 +- .../table/lookup/HBaseAllTableFunction.java | 27 +- .../table/lookup/HBaseLruTableFunction.java | 229 ++---- .../connector/hbase14/util/DtFileUtils.java | 36 - .../connector/hbase14/util/ScanBuilder.java | 68 ++ .../hbase/HBaseMutationConverter.java | 50 -- .../connector/hbase/HBaseTableSchema.java | 2 +- .../connector/hbase/conf/HBaseConf.java | 12 + .../connector/hbase/conf/HBaseConfigKeys.java | 70 -- .../hbase/converter/type/BINARYSTRING.java | 78 -- .../connector/hbase}/table/HBaseOptions.java | 2 +- .../hbase/util/HBaseConfigUtils.java | 7 +- .../connector/hbase}/util/HBaseHelper.java | 15 +- .../main/java/com/dtstack/chunjun/Main.java | 2 +- .../dtstack/chunjun/conf/OperatorConf.java | 4 + .../com/dtstack/chunjun/util/TableUtil.java | 15 +- .../json/hbase/hbase_transformer_hbase.json | 146 ++++ 32 files changed, 1982 insertions(+), 1085 deletions(-) create mode 100644 chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/common/HBaseTypeUtils.java delete mode 100644 chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/AsyncHBaseSerde.java create mode 100644 chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseFlatRowConverter.java rename chunjun-connectors/{chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase => chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14}/converter/HBaseRawTypeConverter.java (50%) delete mode 100644 chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/DtFileUtils.java create mode 100644 chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/ScanBuilder.java delete mode 100644 chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseMutationConverter.java delete mode 100644 chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/conf/HBaseConfigKeys.java delete mode 100644 chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/type/BINARYSTRING.java rename chunjun-connectors/{chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14 => chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase}/table/HBaseOptions.java (98%) rename chunjun-connectors/{chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14 => chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase}/util/HBaseHelper.java (92%) create mode 100644 chunjun-examples/json/hbase/hbase_transformer_hbase.json diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/common/HBaseTypeUtils.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/common/HBaseTypeUtils.java new file mode 100644 index 0000000000..32559cec4a --- /dev/null +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/common/HBaseTypeUtils.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.hbase14.common; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.hadoop.hbase.util.Bytes; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; + +/** A utility class to process data exchange with HBase type system. */ +@Internal +public class HBaseTypeUtils { + + private static final byte[] EMPTY_BYTES = new byte[] {}; + + private static final int MIN_TIMESTAMP_PRECISION = 0; + private static final int MAX_TIMESTAMP_PRECISION = 3; + private static final int MIN_TIME_PRECISION = 0; + private static final int MAX_TIME_PRECISION = 3; + + /** Deserialize byte array to Java Object with the given type. */ + public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset) { + switch (typeIdx) { + case 0: // byte[] + return value; + case 1: // String + return Arrays.equals(EMPTY_BYTES, value) ? null : new String(value, stringCharset); + case 2: // byte + return value[0]; + case 3: + return Bytes.toShort(value); + case 4: + return Bytes.toInt(value); + case 5: + return Bytes.toLong(value); + case 6: + return Bytes.toFloat(value); + case 7: + return Bytes.toDouble(value); + case 8: + return Bytes.toBoolean(value); + case 9: // sql.Timestamp encoded as long + return new Timestamp(Bytes.toLong(value)); + case 10: // sql.Date encoded as long + return new Date(Bytes.toLong(value)); + case 11: // sql.Time encoded as long + return new Time(Bytes.toLong(value)); + case 12: + return Bytes.toBigDecimal(value); + case 13: + return new BigInteger(value); + + default: + throw new IllegalArgumentException("unsupported type index:" + typeIdx); + } + } + + /** Serialize the Java Object to byte array with the given type. */ + public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset) { + switch (typeIdx) { + case 0: // byte[] + return (byte[]) value; + case 1: // external String + return value == null ? EMPTY_BYTES : ((String) value).getBytes(stringCharset); + case 2: // byte + return value == null ? EMPTY_BYTES : new byte[] {(byte) value}; + case 3: + return Bytes.toBytes((short) value); + case 4: + return Bytes.toBytes((int) value); + case 5: + return Bytes.toBytes((long) value); + case 6: + return Bytes.toBytes((float) value); + case 7: + return Bytes.toBytes((double) value); + case 8: + return Bytes.toBytes((boolean) value); + case 9: // sql.Timestamp encoded to Long + return Bytes.toBytes(((Timestamp) value).getTime()); + case 10: // sql.Date encoded as long + return Bytes.toBytes(((Date) value).getTime()); + case 11: // sql.Time encoded as long + return Bytes.toBytes(((Time) value).getTime()); + case 12: + return Bytes.toBytes((BigDecimal) value); + case 13: + return ((BigInteger) value).toByteArray(); + + default: + throw new IllegalArgumentException("unsupported type index:" + typeIdx); + } + } + + /** + * Gets the type index (type representation in HBase connector) from the {@link + * TypeInformation}. + */ + public static int getTypeIndex(TypeInformation typeInfo) { + return getTypeIndex(typeInfo.getTypeClass()); + } + + /** Checks whether the given Class is a supported type in HBase connector. */ + public static boolean isSupportedType(Class clazz) { + return getTypeIndex(clazz) != -1; + } + + private static int getTypeIndex(Class clazz) { + if (byte[].class.equals(clazz)) { + return 0; + } else if (String.class.equals(clazz)) { + return 1; + } else if (Byte.class.equals(clazz)) { + return 2; + } else if (Short.class.equals(clazz)) { + return 3; + } else if (Integer.class.equals(clazz)) { + return 4; + } else if (Long.class.equals(clazz)) { + return 5; + } else if (Float.class.equals(clazz)) { + return 6; + } else if (Double.class.equals(clazz)) { + return 7; + } else if (Boolean.class.equals(clazz)) { + return 8; + } else if (Timestamp.class.equals(clazz)) { + return 9; + } else if (Date.class.equals(clazz)) { + return 10; + } else if (Time.class.equals(clazz)) { + return 11; + } else if (BigDecimal.class.equals(clazz)) { + return 12; + } else if (BigInteger.class.equals(clazz)) { + return 13; + } else { + return -1; + } + } + + /** Checks whether the given {@link LogicalType} is supported in HBase connector. */ + public static boolean isSupportedType(LogicalType type) { + // ordered by type root definition + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BOOLEAN: + case BINARY: + case VARBINARY: + case DECIMAL: + case TINYINT: + case SMALLINT: + case INTEGER: + case DATE: + case INTERVAL_YEAR_MONTH: + case BIGINT: + case INTERVAL_DAY_TIME: + case FLOAT: + case DOUBLE: + return true; + case TIME_WITHOUT_TIME_ZONE: + final int timePrecision = getPrecision(type); + if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) { + throw new UnsupportedOperationException( + String.format( + "The precision %s of TIME type is out of the range [%s, %s] supported by " + + "HBase connector", + timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION)); + } + return true; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = getPrecision(type); + if (timestampPrecision < MIN_TIMESTAMP_PRECISION + || timestampPrecision > MAX_TIMESTAMP_PRECISION) { + throw new UnsupportedOperationException( + String.format( + "The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " + + "HBase connector", + timestampPrecision, + MIN_TIMESTAMP_PRECISION, + MAX_TIMESTAMP_PRECISION)); + } + return true; + case TIMESTAMP_WITH_TIME_ZONE: + case ARRAY: + case MULTISET: + case MAP: + case ROW: + case STRUCTURED_TYPE: + case DISTINCT_TYPE: + case RAW: + case NULL: + case SYMBOL: + case UNRESOLVED: + return false; + default: + throw new IllegalArgumentException(); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/AsyncHBaseSerde.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/AsyncHBaseSerde.java deleted file mode 100644 index 7d2ac6609c..0000000000 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/AsyncHBaseSerde.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.chunjun.connector.hbase14.converter; - -import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; - -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; - -import org.apache.hadoop.hbase.client.Result; - -import java.util.Map; - -/** - * @program: chunjun - * @author: wuren - * @create: 2021/10/19 - */ -public class AsyncHBaseSerde extends HBaseSerde { - public AsyncHBaseSerde(HBaseTableSchema hbaseSchema, String nullStringLiteral) { - super(hbaseSchema, nullStringLiteral); - } - - /** - * Converts HBase {@link Result} into a new {@link RowData} instance. - * - *

Note: this method is thread-safe. - */ - public RowData convertToNewRow(Map> result, byte[] rowkey) { - // The output rows needs to be initialized each time - // to prevent the possibility of putting the output object into the cache. - GenericRowData resultRow = new GenericRowData(fieldLength); - GenericRowData[] familyRows = new GenericRowData[families.length]; - for (int f = 0; f < families.length; f++) { - familyRows[f] = new GenericRowData(qualifiers[f].length); - } - - return convertToRow(result, resultRow, familyRows, rowkey); - } - - protected RowData convertToRow( - Map> result, - GenericRowData resultRow, - GenericRowData[] familyRows, - byte[] rowkey) { - for (int i = 0; i < fieldLength; i++) { - if (rowkeyIndex == i) { - resultRow.setField(rowkeyIndex, keyDecoder.decode(rowkey)); - } else { - int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i; - // get family key - byte[] familyKey = families[f]; - GenericRowData familyRow = familyRows[f]; - for (int q = 0; q < this.qualifiers[f].length; q++) { - // get quantifier key - byte[] qualifier = qualifiers[f][q]; - // read value - if (result.get(new String(familyKey)) == null) { - familyRow.setField(q, null); - continue; - } - byte[] value = result.get(new String(familyKey)).get(new String(qualifier)); - familyRow.setField(q, qualifierDecoders[f][q].decode(value)); - } - resultRow.setField(i, familyRow); - } - } - return resultRow; - } -} diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseColumnConverter.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseColumnConverter.java index 473dcd6401..4aa1769fc6 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseColumnConverter.java @@ -22,25 +22,19 @@ import com.dtstack.chunjun.connector.hbase.FunctionParser; import com.dtstack.chunjun.connector.hbase.FunctionTree; import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; -import com.dtstack.chunjun.connector.hbase.converter.type.BINARYSTRING; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; import com.dtstack.chunjun.converter.ISerializationConverter; import com.dtstack.chunjun.element.AbstractBaseColumn; import com.dtstack.chunjun.element.ColumnRowData; -import com.dtstack.chunjun.element.column.BigDecimalColumn; -import com.dtstack.chunjun.element.column.BooleanColumn; -import com.dtstack.chunjun.element.column.BytesColumn; -import com.dtstack.chunjun.element.column.SqlDateColumn; -import com.dtstack.chunjun.element.column.StringColumn; -import com.dtstack.chunjun.element.column.TimeColumn; -import com.dtstack.chunjun.element.column.TimestampColumn; +import com.dtstack.chunjun.element.column.*; +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; +import com.dtstack.chunjun.throwable.UnsupportedTypeException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.TimestampType; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; @@ -52,52 +46,81 @@ import org.apache.hadoop.hbase.util.Bytes; import java.math.BigDecimal; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.sql.Time; +import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; +import static com.dtstack.chunjun.connector.hbase.HBaseTypeUtils.MAX_TIMESTAMP_PRECISION; +import static com.dtstack.chunjun.connector.hbase.HBaseTypeUtils.MIN_TIMESTAMP_PRECISION; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; + +/** + * @author jier + * @program flinkx + * @create 2021/04/30 + */ public class HBaseColumnConverter extends AbstractRowConverter { public static final String KEY_ROW_KEY = "rowkey"; - private final HBaseConf hBaseConf; - private final List fieldList; - private final List columnNames; + private FunctionTree functionTree; - // sink - private final boolean walFlag; - // qualifier keys - private final byte[][][] familyAndQualifier; - private final String encoding; - private final String nullMode; private List rowKeyColumnIndex; - private List rowKeyColumns; + private final Integer versionColumnIndex; + private final String versionColumnValue; - private final SimpleDateFormat timeSecondFormat; - private final SimpleDateFormat timeMillisecondFormat; - private FunctionTree functionTree; + private final SimpleDateFormat timeSecondFormat = + getSimpleDateFormat(ConstantValue.TIME_SECOND_SUFFIX); + private final SimpleDateFormat timeMillisecondFormat = + getSimpleDateFormat(ConstantValue.TIME_MILLISECOND_SUFFIX); + + private int rowKeyIndex = -1; + + private final List columnNames = new ArrayList<>(); + + private final String encoding; + + private final HBaseConf hBaseConf; + + private List rowKeyColumns; + + private final String nullMode; + + private final List fieldList; + + private byte[][][] familyAndQualifier; public HBaseColumnConverter(HBaseConf hBaseConf, RowType rowType) { super(rowType); - this.hBaseConf = hBaseConf; - this.fieldList = hBaseConf.getColumn(); - this.encoding = hBaseConf.getEncoding(); - this.nullMode = hBaseConf.getNullMode(); - this.versionColumnIndex = hBaseConf.getVersionColumnIndex(); - this.versionColumnValue = hBaseConf.getVersionColumnValue(); - this.walFlag = hBaseConf.getWalFlag(); - this.familyAndQualifier = new byte[hBaseConf.getColumn().size()][][]; - this.columnNames = new ArrayList<>(hBaseConf.getColumn().size()); + encoding = StringUtils.isEmpty(hBaseConf.getEncoding()) ? "utf-8" : hBaseConf.getEncoding(); + nullMode = hBaseConf.getNullMode(); + for (int i = 0; i < hBaseConf.getColumn().size(); i++) { + toInternalConverters.add( + i, + wrapIntoNullableInternalConverter( + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( + i, + wrapIntoNullableExternalConverter( + createExternalConverter(rowType.getTypeAt(i)), rowType.getTypeAt(i))); + } + this.familyAndQualifier = new byte[rowType.getFieldCount()][][]; for (int i = 0; i < hBaseConf.getColumn().size(); i++) { - String name = hBaseConf.getColumn().get(i).getName(); + FieldConf fieldConf = hBaseConf.getColumn().get(i); + String name = fieldConf.getName(); columnNames.add(name); String[] cfAndQualifier = name.split(":"); if (cfAndQualifier.length == 2 @@ -108,52 +131,25 @@ public HBaseColumnConverter(HBaseConf hBaseConf, RowType rowType) { qualifierKeys[0] = Bytes.toBytes(cfAndQualifier[0]); qualifierKeys[1] = Bytes.toBytes(cfAndQualifier[1]); familyAndQualifier[i] = qualifierKeys; - } else if (!KEY_ROW_KEY.equals(name)) { + } else if (KEY_ROW_KEY.equals(name)) { + rowKeyIndex = i; + } else if (!StringUtils.isBlank(fieldConf.getValue())) { + familyAndQualifier[i] = new byte[2][]; + } else { throw new IllegalArgumentException( "hbase 中,column 的列配置格式应该是:列族:列名. 您配置的列错误:" + name); } } + fieldList = hBaseConf.getColumnMetaInfos(); + this.hBaseConf = hBaseConf; initRowKeyConfig(); - - for (int i = 0; i < hBaseConf.getColumn().size(); i++) { - - toInternalConverters.add( - wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i)))); - toExternalConverters.add( - wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i])); - } - this.timeSecondFormat = getSimpleDateFormat(ConstantValue.TIME_SECOND_SUFFIX); - this.timeMillisecondFormat = getSimpleDateFormat(ConstantValue.TIME_MILLISECOND_SUFFIX); - } - - @Override - protected ISerializationConverter wrapIntoNullableExternalConverter( - ISerializationConverter ISerializationConverter, LogicalType type) { - return ((rowData, index, mutation) -> { - if (rowData != null && !rowData.isNullAt(index)) { - ISerializationConverter.serialize(rowData, index, mutation); - } else { - switch (nullMode.toUpperCase()) { - case "SKIP": - return; - case "EMPTY": - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - HConstants.EMPTY_BYTE_ARRAY); - return; - default: - throw new IllegalArgumentException("Unsupported null mode: " + nullMode); - } - } - }); + this.versionColumnIndex = hBaseConf.getVersionColumnIndex(); + this.versionColumnValue = hBaseConf.getVersionColumnValue(); } @Override + @SuppressWarnings("unchecked") public RowData toInternal(Result input) throws Exception { ColumnRowData result = new ColumnRowData(fieldList.size()); for (int i = 0; i < fieldList.size(); i++) { @@ -169,10 +165,14 @@ public RowData toInternal(Result input) throws Exception { } result.addField(assembleFieldProps(fieldList.get(i), baseColumn)); } - return result; } + @Override + public RowData toInternalLookup(RowData input) { + throw new ChunJunRuntimeException("Hbase Connector doesn't support Lookup Table Function."); + } + @Override public Mutation toExternal(RowData rowData, Mutation output) throws Exception { byte[] rowkey = getRowkey(rowData); @@ -180,243 +180,362 @@ public Mutation toExternal(RowData rowData, Mutation output) throws Exception { Put put; if (version == null) { put = new Put(rowkey); - if (!walFlag) { + if (!hBaseConf.getWalFlag()) { put.setDurability(Durability.SKIP_WAL); } } else { put = new Put(rowkey, version); } - for (int i = 0; i < fieldList.size(); i++) { - if (rowKeyColumnIndex.contains(i)) { + for (int i = 0; i < rowData.getArity(); i++) { + if (rowKeyIndex == i) { continue; } - this.toExternalConverters.get(i).serialize(rowData, i, put); + toExternalConverters.get(i).serialize(rowData, i, put); } - return put; } - /** - * 将外部数据库类型转换为flink内部类型 - * - * @param type type - * @return return - */ @Override - protected IDeserializationConverter createInternalConverter(LogicalType type) { - switch (type.getTypeRoot()) { - case CHAR: - case VARCHAR: - if (type instanceof BINARYSTRING) { - return val -> new StringColumn(Bytes.toStringBinary((byte[]) val)); + @SuppressWarnings("unchecked") + protected ISerializationConverter wrapIntoNullableExternalConverter( + ISerializationConverter serializationConverter, LogicalType type) { + return ((rowData, index, mutation) -> { + if (rowData != null && !rowData.isNullAt(index)) { + serializationConverter.serialize(rowData, index, mutation); + } else { + switch (nullMode.toUpperCase()) { + case "SKIP": + return; + case "EMPTY": + ((Put) mutation) + .addColumn( + familyAndQualifier[index][0], + familyAndQualifier[index][1], + HConstants.EMPTY_BYTE_ARRAY); + return; + default: + throw new IllegalArgumentException("Unsupported null mode: " + nullMode); } - return val -> new StringColumn(new String((byte[]) val, encoding)); + } + }); + } + + @Override + @SuppressWarnings("all") + protected IDeserializationConverter createInternalConverter(LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case TINYINT: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + return new BigDecimalColumn(bytes[0]); + } + }; case BOOLEAN: - return val -> { - // from flink - if (((byte[]) val).length == 1) { - return new BooleanColumn(((byte[]) val)[0] != 0); - } else { - return new BooleanColumn(Boolean.parseBoolean(val.toString())); + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + Boolean result = Bytes.toBoolean(bytes); + return new BooleanColumn(result); } }; - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return val -> - new TimestampColumn( - new BigDecimal(new String((byte[]) val, encoding)).longValue()); - case DECIMAL: - case INTEGER: - case INTERVAL_YEAR_MONTH: case INTERVAL_DAY_TIME: - case FLOAT: + case BIGINT: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + Long value = Bytes.toLong(bytes); + return new BigDecimalColumn(value); + } + }; + case SMALLINT: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + Short value = Bytes.toShort(bytes); + return new BigDecimalColumn(value); + } + }; case DOUBLE: - return val -> { - try { - return new BigDecimalColumn(Bytes.toDouble((byte[]) val)); - } catch (Exception e) { - return new BigDecimalColumn(new String((byte[]) val, encoding)); + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + Double value = Bytes.toDouble(bytes); + return new BigDecimalColumn(value); } }; - case BIGINT: - return val -> { - try { - return new BigDecimalColumn(Bytes.toLong((byte[]) val)); - } catch (Exception e) { - return new BigDecimalColumn(new String((byte[]) val, encoding)); + case FLOAT: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + Float value = Bytes.toFloat(bytes); + return new BigDecimalColumn(value); + } + }; + case DECIMAL: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + BigDecimal value = Bytes.toBigDecimal(bytes); + return new BigDecimalColumn(value); + } + }; + case INTERVAL_YEAR_MONTH: + case INTEGER: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + Integer value = Bytes.toInt(bytes); + return new BigDecimalColumn(value); + } + }; + case DATE: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + Date date; + try { + date = new Date(Bytes.toInt((bytes))); + } catch (Exception e) { + String dateValue = Bytes.toStringBinary((bytes)); + date = DateUtils.parseDate(dateValue); + } + return new SqlDateColumn(date.getTime()); + } + }; + case CHAR: + case VARCHAR: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + String value = new String(bytes, encoding); + return new StringColumn(value); } }; - case TINYINT: - return val -> new BigDecimalColumn(((byte[]) val)[0]); - case SMALLINT: - return val -> new BigDecimalColumn(Bytes.toShort(((byte[]) val))); - case TIME_WITHOUT_TIME_ZONE: - return val -> new TimeColumn(Bytes.toInt(((byte[]) val))); case BINARY: case VARBINARY: - return val -> new BytesColumn(((byte[]) val)); - case DATE: - return val -> { - Date date; - try { - date = new Date(Bytes.toInt(((byte[]) val))); - } catch (Exception e) { - String dateValue = Bytes.toStringBinary(((byte[]) val)); - date = DateUtils.parseDate(dateValue); + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + return new BytesColumn(bytes); } - return new SqlDateColumn(date.getTime()); }; - - case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - return (IDeserializationConverter) - val -> - new TimestampColumn( - Bytes.toLong((byte[]) val), - ((TimestampType) (type)).getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + final int timestampPrecision = getPrecision(logicalType); + if (timestampPrecision < MIN_TIMESTAMP_PRECISION + || timestampPrecision > MAX_TIMESTAMP_PRECISION) { + throw new UnsupportedOperationException( + String.format( + "The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " + + "HBase connector", + timestampPrecision, + MIN_TIMESTAMP_PRECISION, + MAX_TIMESTAMP_PRECISION)); + } + long value = Bytes.toLong(bytes); + Timestamp timestamp = new Timestamp(value); + return new TimestampColumn(timestamp, timestampPrecision); + } + }; + case TIME_WITHOUT_TIME_ZONE: + return new IDeserializationConverter() { + @Override + public AbstractBaseColumn deserialize(byte[] bytes) throws Exception { + int value = Bytes.toInt(bytes); + LocalTime localTime = LocalTime.ofNanoOfDay(value * 1_000_000L); + Time time = Time.valueOf(localTime); + return new TimeColumn(time); + } + }; default: - throw new UnsupportedOperationException("Unsupported type:" + type); + throw new UnsupportedTypeException(logicalType.getTypeRoot()); } } - /** - * 将flink内部的数据类型转换为外部数据库系统类型 - * - * @param type type - * @return return - */ @Override - protected ISerializationConverter createExternalConverter(LogicalType type) { - - switch (type.getTypeRoot()) { - case CHAR: - case VARCHAR: - // get the underlying UTF-8 bytes - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - ((ColumnRowData) rowData) - .getField(index) - .asString() - .getBytes(encoding)); - case BOOLEAN: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - Bytes.toBytes( - ((ColumnRowData) rowData) - .getField(index) - .asBoolean())); - case BINARY: - case VARBINARY: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - ((ColumnRowData) rowData).getField(index).asBinary()); - case DECIMAL: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - Bytes.toBytes( - ((ColumnRowData) rowData) - .getField(index) - .asBigDecimal())); + protected ISerializationConverter createExternalConverter(LogicalType logicalType) { + Function valueDigger; + switch (logicalType.getTypeRoot()) { case TINYINT: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - new byte[] { - ((ColumnRowData) rowData) - .getField(index) - .asInt() - .byteValue() - }); + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + byte value = baseColumn.asInt().byteValue(); + byte[] bytes = new byte[] {value}; + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case BOOLEAN: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + Boolean value = baseColumn.asBoolean(); + byte[] bytes = Bytes.toBytes(value); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case INTERVAL_DAY_TIME: + case BIGINT: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + Long value = baseColumn.asLong(); + byte[] bytes = Bytes.toBytes(value); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; case SMALLINT: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - Bytes.toBytes( - ((ColumnRowData) rowData) - .getField(index) - .asShort())); - case INTEGER: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + Short value = baseColumn.asShort(); + byte[] bytes = Bytes.toBytes(value); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case DOUBLE: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + Double value = baseColumn.asDouble(); + byte[] bytes = Bytes.toBytes(value); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case FLOAT: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + Float value = baseColumn.asFloat(); + byte[] bytes = Bytes.toBytes(value); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case DECIMAL: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + BigDecimal value = baseColumn.asBigDecimal(); + byte[] bytes = Bytes.toBytes(value); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; case DATE: case INTERVAL_YEAR_MONTH: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - Bytes.toBytes( - ((ColumnRowData) rowData).getField(index).asInt())); - case TIME_WITHOUT_TIME_ZONE: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - Bytes.toBytes( - ((ColumnRowData) rowData) - .getField(index) - .asTime() - .getTime())); - case BIGINT: - case INTERVAL_DAY_TIME: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - Bytes.toBytes( - ((ColumnRowData) rowData) - .getField(index) - .asLong())); - case FLOAT: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - Bytes.toBytes( - ((ColumnRowData) rowData) - .getField(index) - .asFloat())); - case DOUBLE: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - Bytes.toBytes( - ((ColumnRowData) rowData) - .getField(index) - .asDouble())); + case INTEGER: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + Integer value = baseColumn.asInt(); + byte[] bytes = Bytes.toBytes(value); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case CHAR: + case VARCHAR: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + String value = baseColumn.asString(); + byte[] bytes = value.getBytes(Charset.forName(encoding)); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case BINARY: + case VARBINARY: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + byte[] bytes = baseColumn.asBinary(); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return (rowData, index, mutation) -> - ((Put) mutation) - .addColumn( - familyAndQualifier[index][0], - familyAndQualifier[index][1], - Bytes.toBytes( - ((ColumnRowData) rowData) - .getField(index) - .asTimestamp() - .getTime())); + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + final int timestampPrecision = getPrecision(logicalType); + if (timestampPrecision < MIN_TIMESTAMP_PRECISION + || timestampPrecision > MAX_TIMESTAMP_PRECISION) { + throw new UnsupportedOperationException( + String.format( + "The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " + + "HBase connector", + timestampPrecision, + MIN_TIMESTAMP_PRECISION, + MAX_TIMESTAMP_PRECISION)); + } + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + Timestamp timestamp = baseColumn.asTimestamp(); + byte[] bytes = Bytes.toBytes(timestamp.getTime()); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case TIME_WITHOUT_TIME_ZONE: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + ColumnRowData columnRowData = (ColumnRowData) rowData; + AbstractBaseColumn baseColumn = columnRowData.getField(pos); + Time time = baseColumn.asTime(); + int data = (int) (time.toLocalTime().toNanoOfDay() / 1_000_000L); + byte[] bytes = Bytes.toBytes(data); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; default: - throw new UnsupportedOperationException("Unsupported type: " + type); + throw new UnsupportedTypeException(logicalType.getTypeRoot()); } } @@ -432,6 +551,22 @@ private byte[] getRowkey(RowData record) throws Exception { return rowKeyStr.getBytes(StandardCharsets.UTF_8); } + private void initRowKeyConfig() { + if (StringUtils.isNotBlank(hBaseConf.getRowkeyExpress())) { + this.functionTree = FunctionParser.parse(hBaseConf.getRowkeyExpress()); + this.rowKeyColumns = FunctionParser.parseRowKeyCol(hBaseConf.getRowkeyExpress()); + this.rowKeyColumnIndex = new ArrayList<>(rowKeyColumns.size()); + for (String rowKeyColumn : rowKeyColumns) { + int index = columnNames.indexOf(rowKeyColumn); + if (index == -1) { + throw new RuntimeException( + "Can not get row key column from columns:" + rowKeyColumn); + } + rowKeyColumnIndex.add(index); + } + } + } + public Long getVersion(RowData record) { if (versionColumnIndex == null && StringUtils.isBlank(versionColumnValue)) { return null; @@ -463,7 +598,7 @@ public Long getVersion(RowData record) { } catch (Exception e) { // ignore } - java.util.Date date; + Date date; try { date = timeMillisecondFormat.parse(timeStampValue.toString()); } catch (ParseException e) { @@ -478,14 +613,16 @@ public Long getVersion(RowData record) { } } return date.getTime(); - } else if (timeStampValue instanceof java.util.Date) { + } else if (timeStampValue instanceof Date) { return ((Date) timeStampValue).getTime(); + } else if (timeStampValue instanceof BigDecimal) { + return ((BigDecimal) timeStampValue).longValue(); } else { - throw new RuntimeException("rowkey类型不兼容: " + timeStampValue.getClass()); + throw new RuntimeException("version 类型不兼容: " + timeStampValue.getClass()); } } - private SimpleDateFormat getSimpleDateFormat(String sign) { + private static SimpleDateFormat getSimpleDateFormat(String sign) { SimpleDateFormat format; if (ConstantValue.TIME_SECOND_SUFFIX.equals(sign)) { format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -495,20 +632,4 @@ private SimpleDateFormat getSimpleDateFormat(String sign) { } return format; } - - private void initRowKeyConfig() { - if (StringUtils.isNotBlank(hBaseConf.getRowkeyExpress())) { - this.functionTree = FunctionParser.parse(hBaseConf.getRowkeyExpress()); - this.rowKeyColumns = FunctionParser.parseRowKeyCol(hBaseConf.getRowkeyExpress()); - this.rowKeyColumnIndex = new ArrayList<>(rowKeyColumns.size()); - for (String rowKeyColumn : rowKeyColumns) { - int index = columnNames.indexOf(rowKeyColumn); - if (index == -1) { - throw new RuntimeException( - "Can not get row key column from columns:" + rowKeyColumn); - } - rowKeyColumnIndex.add(index); - } - } - } } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseFlatRowConverter.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseFlatRowConverter.java new file mode 100644 index 0000000000..f327a2d6aa --- /dev/null +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseFlatRowConverter.java @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.hbase14.converter; + +import com.dtstack.chunjun.connector.hbase.FunctionParser; +import com.dtstack.chunjun.connector.hbase.FunctionTree; +import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; +import com.dtstack.chunjun.constants.ConstantValue; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.converter.ISerializationConverter; +import com.dtstack.chunjun.element.AbstractBaseColumn; +import com.dtstack.chunjun.element.ColumnRowData; +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; +import com.dtstack.chunjun.throwable.UnsupportedTypeException; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.function.Function; + +import static com.dtstack.chunjun.connector.hbase.HBaseTypeUtils.MAX_TIMESTAMP_PRECISION; +import static com.dtstack.chunjun.connector.hbase.HBaseTypeUtils.MAX_TIME_PRECISION; +import static com.dtstack.chunjun.connector.hbase.HBaseTypeUtils.MIN_TIMESTAMP_PRECISION; +import static com.dtstack.chunjun.connector.hbase.HBaseTypeUtils.MIN_TIME_PRECISION; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; + +/** + * @author jier + * @program chunjun FlatRowConverter for sync task when add transformer + * @create 2021/04/30 + */ +public class HBaseFlatRowConverter + extends AbstractRowConverter { + + public static final String KEY_ROW_KEY = "rowkey"; + + private FunctionTree functionTree; + + private List rowKeyColumnIndex; + + private final String encoding; + + private final Integer versionColumnIndex; + + private final String versionColumnValue; + + private final SimpleDateFormat timeSecondFormat = + getSimpleDateFormat(ConstantValue.TIME_SECOND_SUFFIX); + private final SimpleDateFormat timeMillisecondFormat = + getSimpleDateFormat(ConstantValue.TIME_MILLISECOND_SUFFIX); + + private int rowKeyIndex = -1; + + private final List columnNames = new ArrayList<>(); + + private final HBaseConf hBaseConf; + + private List rowKeyColumns; + + private final String nullMode; + + private byte[][][] familyAndQualifier; + + public HBaseFlatRowConverter(HBaseConf hBaseConf, RowType rowType) { + super(rowType); + + nullMode = hBaseConf.getNullMode(); + encoding = StringUtils.isEmpty(hBaseConf.getEncoding()) ? "utf-8" : hBaseConf.getEncoding(); + + for (int i = 0; i < hBaseConf.getColumn().size(); i++) { + toExternalConverters.add( + i, + wrapIntoNullableExternalConverter( + createExternalConverter(rowType.getTypeAt(i)), rowType.getTypeAt(i))); + } + this.familyAndQualifier = new byte[rowType.getFieldCount()][][]; + for (int i = 0; i < hBaseConf.getColumn().size(); i++) { + String name = hBaseConf.getColumn().get(i).getName(); + columnNames.add(name); + String[] cfAndQualifier = name.split(":"); + if (cfAndQualifier.length == 2 + && org.apache.commons.lang.StringUtils.isNotBlank(cfAndQualifier[0]) + && org.apache.commons.lang.StringUtils.isNotBlank(cfAndQualifier[1])) { + + byte[][] qualifierKeys = new byte[2][]; + qualifierKeys[0] = Bytes.toBytes(cfAndQualifier[0]); + qualifierKeys[1] = Bytes.toBytes(cfAndQualifier[1]); + familyAndQualifier[i] = qualifierKeys; + } else if (KEY_ROW_KEY.equals(name)) { + rowKeyIndex = i; + } else { + throw new IllegalArgumentException( + "hbase 中,column 的列配置格式应该是:列族:列名. 您配置的列错误:" + name); + } + } + + this.hBaseConf = hBaseConf; + initRowKeyConfig(); + this.versionColumnIndex = hBaseConf.getVersionColumnIndex(); + this.versionColumnValue = hBaseConf.getVersionColumnValue(); + } + + @Override + public Mutation toExternal(RowData rowData, Mutation output) throws Exception { + byte[] rowkey = getRowkey(rowData); + Long version = getVersion(rowData); + Put put; + if (version == null) { + put = new Put(rowkey); + if (!hBaseConf.getWalFlag()) { + put.setDurability(Durability.SKIP_WAL); + } + } else { + put = new Put(rowkey, version); + } + + for (int i = 0; i < rowData.getArity(); i++) { + if (rowKeyIndex == i) { + continue; + } + toExternalConverters.get(i).serialize(rowData, i, put); + } + return put; + } + + @Override + @SuppressWarnings("unchecked") + protected ISerializationConverter wrapIntoNullableExternalConverter( + ISerializationConverter serializationConverter, LogicalType type) { + return ((rowData, index, mutation) -> { + if (rowData != null && !rowData.isNullAt(index)) { + serializationConverter.serialize(rowData, index, mutation); + } else { + switch (nullMode.toUpperCase()) { + case "SKIP": + return; + case "EMPTY": + ((Put) mutation) + .addColumn( + familyAndQualifier[index][0], + familyAndQualifier[index][1], + HConstants.EMPTY_BYTE_ARRAY); + return; + default: + throw new IllegalArgumentException("Unsupported null mode: " + nullMode); + } + } + }); + } + + @Override + public RowData toInternal(Result input) throws Exception { + throw new ChunJunRuntimeException("This Hbase Convertor doesn't support toInternal."); + } + + @Override + public RowData toInternalLookup(RowData input) { + throw new ChunJunRuntimeException("Hbase Connector doesn't support Lookup Table Function."); + } + + @Override + protected ISerializationConverter createExternalConverter(LogicalType logicalType) { + Function valueDigger; + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + byte[] bytes = Bytes.toBytes(rowData.getBoolean(pos)); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case INTERVAL_DAY_TIME: + case BIGINT: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + byte[] bytes = Bytes.toBytes(rowData.getLong(pos)); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case SMALLINT: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + byte[] bytes = Bytes.toBytes(rowData.getShort(pos)); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case DOUBLE: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + byte[] bytes = Bytes.toBytes(rowData.getDouble(pos)); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case FLOAT: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + byte[] bytes = Bytes.toBytes(rowData.getFloat(pos)); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case DECIMAL: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + DecimalType decimalType = (DecimalType) logicalType; + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + byte[] bytes = + Bytes.toBytes( + rowData.getDecimal(pos, precision, scale).toBigDecimal()); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case DATE: + case INTERVAL_YEAR_MONTH: + case INTEGER: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + byte[] bytes = Bytes.toBytes(rowData.getInt(pos)); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case CHAR: + case VARCHAR: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + byte[] bytes = + rowData.getString(pos) + .toString() + .getBytes(Charset.forName(encoding)); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case BINARY: + case VARBINARY: + return new ISerializationConverter() { + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + byte[] bytes = rowData.getBinary(pos); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return new ISerializationConverter() { + + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + final int timestampPrecision = getPrecision(logicalType); + if (timestampPrecision < MIN_TIMESTAMP_PRECISION + || timestampPrecision > MAX_TIMESTAMP_PRECISION) { + throw new UnsupportedOperationException( + String.format( + "The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " + + "HBase connector", + timestampPrecision, + MIN_TIMESTAMP_PRECISION, + MAX_TIMESTAMP_PRECISION)); + } + long millisecond = + rowData.getTimestamp(pos, timestampPrecision).getMillisecond(); + byte[] bytes = Bytes.toBytes(millisecond); + ; + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + case TIME_WITHOUT_TIME_ZONE: + return new ISerializationConverter() { + + @Override + public void serialize(RowData rowData, int pos, Mutation output) + throws Exception { + final int timePrecision = getPrecision(logicalType); + if (timePrecision < MIN_TIME_PRECISION + || timePrecision > MAX_TIME_PRECISION) { + throw new UnsupportedOperationException( + String.format( + "The precision %s of TIME type is out of the range [%s, %s] supported by " + + "HBase connector", + timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION)); + } + byte[] bytes = Bytes.toBytes(rowData.getInt(pos)); + byte[][] qualifier = familyAndQualifier[pos]; + ((Put) output).addColumn(qualifier[0], qualifier[1], bytes); + } + }; + default: + throw new UnsupportedTypeException(logicalType.getTypeRoot()); + } + } + + private byte[] getRowkey(RowData record) throws Exception { + Map nameValueMap = new HashMap<>((rowKeyColumnIndex.size() << 2) / 3); + for (Integer keyColumnIndex : rowKeyColumnIndex) { + // todo: When flink disableOperatorChaining the record is BinaryRowData and will throw a + // class cast exception + nameValueMap.put( + columnNames.get(keyColumnIndex), + ((GenericRowData) record).getField(keyColumnIndex)); + } + + String rowKeyStr = functionTree.evaluate(nameValueMap); + return rowKeyStr.getBytes(StandardCharsets.UTF_8); + } + + private void initRowKeyConfig() { + if (StringUtils.isNotBlank(hBaseConf.getRowkeyExpress())) { + this.functionTree = FunctionParser.parse(hBaseConf.getRowkeyExpress()); + this.rowKeyColumns = FunctionParser.parseRowKeyCol(hBaseConf.getRowkeyExpress()); + this.rowKeyColumnIndex = new ArrayList<>(rowKeyColumns.size()); + for (String rowKeyColumn : rowKeyColumns) { + int index = columnNames.indexOf(rowKeyColumn); + if (index == -1) { + throw new RuntimeException( + "Can not get row key column from columns:" + rowKeyColumn); + } + rowKeyColumnIndex.add(index); + } + } + } + + public Long getVersion(RowData record) { + if (versionColumnIndex == null && StringUtils.isBlank(versionColumnValue)) { + return null; + } + + Object timeStampValue = versionColumnValue; + if (versionColumnIndex != null) { + // 指定列作为版本,long/doubleColumn直接record.aslong, 其它类型尝试用yyyy-MM-dd HH:mm:ss,yyyy-MM-dd + // HH:mm:ss SSS去format + if (versionColumnIndex >= record.getArity() || versionColumnIndex < 0) { + throw new IllegalArgumentException( + "version column index out of range: " + versionColumnIndex); + } + if (record.isNullAt(versionColumnIndex)) { + throw new IllegalArgumentException("null verison column!"); + } + + timeStampValue = ((ColumnRowData) record).getField(versionColumnIndex).getData(); + } + + if (timeStampValue instanceof Long) { + return (Long) timeStampValue; + } else if (timeStampValue instanceof Double) { + return ((Double) timeStampValue).longValue(); + } else if (timeStampValue instanceof String) { + + try { + return Long.valueOf(timeStampValue.toString()); + } catch (Exception e) { + // ignore + } + Date date; + try { + date = timeMillisecondFormat.parse(timeStampValue.toString()); + } catch (ParseException e) { + try { + date = timeSecondFormat.parse(timeStampValue.toString()); + } catch (ParseException e1) { + LOG.info( + String.format( + "您指定第[%s]列作为hbase写入版本,但在尝试用yyyy-MM-dd HH:mm:ss 和 yyyy-MM-dd HH:mm:ss SSS 去解析为Date时均出错,请检查并修改", + versionColumnIndex)); + throw new RuntimeException(e1); + } + } + return date.getTime(); + } else if (timeStampValue instanceof Date) { + return ((Date) timeStampValue).getTime(); + } else { + throw new RuntimeException("version 类型不兼容: " + timeStampValue.getClass()); + } + } + + private static SimpleDateFormat getSimpleDateFormat(String sign) { + SimpleDateFormat format; + if (ConstantValue.TIME_SECOND_SUFFIX.equals(sign)) { + format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + } else { + format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS"); + } + return format; + } +} diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseRawTypeConverter.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseRawTypeConverter.java similarity index 50% rename from chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseRawTypeConverter.java rename to chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseRawTypeConverter.java index 2881588e8f..3f690ba80f 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseRawTypeConverter.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseRawTypeConverter.java @@ -1,46 +1,54 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Copyright 2021 the original author or authors. * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ +package com.dtstack.chunjun.connector.hbase14.converter; -package com.dtstack.chunjun.connector.hbase.converter; - -import com.dtstack.chunjun.connector.hbase.converter.type.BINARYSTRING; +import com.dtstack.chunjun.constants.ConstantValue; +import com.dtstack.chunjun.converter.RawTypeConverter; import com.dtstack.chunjun.throwable.UnsupportedTypeException; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; import java.util.Locale; -public class HBaseRawTypeConverter { - public static DataType apply(String type) { - switch (type.toUpperCase(Locale.ENGLISH)) { +public class HBaseRawTypeConverter implements RawTypeConverter { + + public static final HBaseRawTypeConverter INSTANCE = new HBaseRawTypeConverter(); + + private HBaseRawTypeConverter() {} + + public DataType apply(String type) { + type = type.toUpperCase(Locale.ENGLISH); + int leftIndex = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); + int rightIndex = type.indexOf(ConstantValue.RIGHT_PARENTHESIS_SYMBOL); + String dataType = type; + String precision = null; + String[] split = null; + if (leftIndex > 0 && rightIndex > 0) { + dataType = type.substring(0, leftIndex); + precision = type.substring(leftIndex + 1, type.length() - 1); + } + switch (dataType) { case "BOOLEAN": return DataTypes.BOOLEAN(); case "TINYINT": case "INT8": case "UINT8": return DataTypes.TINYINT(); - case "BINARY_STRING": - return new AtomicDataType(new BINARYSTRING(true, LogicalTypeRoot.VARCHAR)); case "SMALLINT": - case "SHORT": case "UINT16": case "INT16": return DataTypes.SMALLINT(); @@ -60,7 +68,6 @@ public static DataType apply(String type) { case "UINT64": case "INT64": case "BIGINT": - case "LONG": return DataTypes.BIGINT(); case "FLOAT": case "FLOAT32": @@ -70,6 +77,14 @@ public static DataType apply(String type) { case "DECIMAL64": case "DECIMAL128": case "DEC": + if (precision != null) { + split = precision.split(ConstantValue.COMMA_SYMBOL); + if (split.length == 2) { + return DataTypes.DECIMAL( + Integer.parseInt(split[0].trim()), + Integer.parseInt(split[1].trim())); + } + } return DataTypes.DECIMAL(38, 18); case "DOUBLE": case "FLOAT64": @@ -85,7 +100,6 @@ public static DataType apply(String type) { case "TINYBLOB": case "MEDIUMBLOB": case "LONGBLOB": - case "BINARY": case "STRUCT": case "VARCHAR": case "STRING": @@ -99,8 +113,18 @@ public static DataType apply(String type) { case "TIME": return DataTypes.TIME(); case "TIMESTAMP": + if (precision != null) { + split = precision.split(ConstantValue.COMMA_SYMBOL); + if (split.length == 1) { + return DataTypes.TIMESTAMP(Integer.parseInt(split[0].trim())); + } + } + return DataTypes.TIMESTAMP(3); case "DATETIME": - return DataTypes.TIMESTAMP(); + return DataTypes.TIMESTAMP(3); + case "BYTES": + case "BINARY": + return DataTypes.BYTES(); case "NOTHING": case "NULLABLE": case "NULL": diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseSerde.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseSerde.java index e76ec5c2a0..50d4edd288 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseSerde.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HBaseSerde.java @@ -53,10 +53,10 @@ public class HBaseSerde implements Serializable { private static final byte[] EMPTY_BYTES = new byte[] {}; - private static final int MIN_TIMESTAMP_PRECISION = 0; - private static final int MAX_TIMESTAMP_PRECISION = 3; - private static final int MIN_TIME_PRECISION = 0; - private static final int MAX_TIME_PRECISION = 3; + public static final int MIN_TIMESTAMP_PRECISION = 0; + public static final int MAX_TIMESTAMP_PRECISION = 3; + public static final int MIN_TIME_PRECISION = 0; + public static final int MAX_TIME_PRECISION = 3; private final byte[] nullStringBytes; diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HbaseRowConverter.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HbaseRowConverter.java index 7379964795..6cf7737e5e 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HbaseRowConverter.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/converter/HbaseRowConverter.java @@ -20,16 +20,30 @@ import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.converter.IDeserializationConverter; +import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.RowKind; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; + +import java.math.BigDecimal; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; public class HbaseRowConverter extends AbstractRowConverter { + private static final int MIN_TIMESTAMP_PRECISION = 0; + private static final int MAX_TIMESTAMP_PRECISION = 3; + private static final int MIN_TIME_PRECISION = 0; + private static final int MAX_TIME_PRECISION = 3; private HBaseTableSchema schema; private String nullStringLiteral; private transient HBaseSerde serde; @@ -37,11 +51,13 @@ public class HbaseRowConverter public HbaseRowConverter(HBaseTableSchema schema, String nullStringLiteral) { // super(rowType); this.schema = schema; + this.nullStringLiteral = nullStringLiteral; } @Override public RowData toInternal(Result input) throws Exception { + if (serde == null) { this.serde = new HBaseSerde(schema, nullStringLiteral); } @@ -49,6 +65,77 @@ public RowData toInternal(Result input) throws Exception { return serde.convertToRow(input); } + @Override + protected IDeserializationConverter createInternalConverter(LogicalType type) { + // ordered by type root definition + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + // reuse bytes + return (IDeserializationConverter) StringData::fromBytes; + case BOOLEAN: + return (IDeserializationConverter) Bytes::toBoolean; + case BINARY: + case VARBINARY: + return value -> value; + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return (IDeserializationConverter) + value -> { + BigDecimal decimal = Bytes.toBigDecimal(value); + return DecimalData.fromBigDecimal(decimal, precision, scale); + }; + case TINYINT: + return (IDeserializationConverter) value -> value[0]; + case SMALLINT: + return (IDeserializationConverter) Bytes::toShort; + case INTEGER: + case DATE: + case INTERVAL_YEAR_MONTH: + return (IDeserializationConverter) Bytes::toInt; + case TIME_WITHOUT_TIME_ZONE: + final int timePrecision = getPrecision(type); + if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) { + throw new UnsupportedOperationException( + String.format( + "The precision %s of TIME type is out of the range [%s, %s] supported by " + + "HBase connector", + timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION)); + } + return (IDeserializationConverter) Bytes::toInt; + case BIGINT: + case INTERVAL_DAY_TIME: + return (IDeserializationConverter) Bytes::toLong; + case FLOAT: + return (IDeserializationConverter) Bytes::toFloat; + case DOUBLE: + return (IDeserializationConverter) Bytes::toDouble; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = getPrecision(type); + if (timestampPrecision < MIN_TIMESTAMP_PRECISION + || timestampPrecision > MAX_TIMESTAMP_PRECISION) { + throw new UnsupportedOperationException( + String.format( + "The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " + + "HBase connector", + timestampPrecision, + MIN_TIMESTAMP_PRECISION, + MAX_TIMESTAMP_PRECISION)); + } + return (IDeserializationConverter) + value -> { + // TODO: support higher precision + long milliseconds = Bytes.toLong(value); + return TimestampData.fromEpochMillis(milliseconds); + }; + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + @Override public Mutation toExternal(RowData rowData, Mutation output) throws Exception { if (serde == null) { diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java index 9804865e56..9d80d5e8a1 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java @@ -1,26 +1,12 @@ -/* - * Copyright 2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package com.dtstack.chunjun.connector.hbase14.sink; +import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.conf.SyncConf; import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; -import com.dtstack.chunjun.connector.hbase.converter.HBaseRawTypeConverter; import com.dtstack.chunjun.connector.hbase14.converter.HBaseColumnConverter; -import com.dtstack.chunjun.connector.hbase14.converter.HbaseRowConverter; +import com.dtstack.chunjun.connector.hbase14.converter.HBaseFlatRowConverter; +import com.dtstack.chunjun.connector.hbase14.converter.HBaseRawTypeConverter; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.RawTypeConverter; import com.dtstack.chunjun.sink.SinkFactory; @@ -30,8 +16,8 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.commons.lang.StringUtils; @@ -40,21 +26,25 @@ import java.util.Map; public class HBase14SinkFactory extends SinkFactory { - - private final HBaseConf hbaseConf; + private final HBaseConf hBaseConf; public HBase14SinkFactory(SyncConf config) { super(config); - hbaseConf = + hBaseConf = GsonUtil.GSON.fromJson( GsonUtil.GSON.toJson(config.getWriter().getParameter()), HBaseConf.class); - super.initCommonConf(hbaseConf); - hbaseConf.setColumn(syncConf.getWriter().getFieldList()); + super.initCommonConf(hBaseConf); + if (hBaseConf.getTable() == null || "".equals(hBaseConf.getTable().trim())) { + // adapt to the chunjun 1.10 script + hBaseConf.setTable(syncConf.getWriter().getTable().getTableName()); + } + hBaseConf.setColumn(syncConf.getWriter().getFieldList()); + hBaseConf.setColumnMetaInfos(syncConf.getReader().getFieldList()); if (config.getWriter().getParameter().get("rowkeyColumn") != null) { String rowkeyColumn = buildRowKeyExpress(config.getWriter().getParameter().get("rowkeyColumn")); - hbaseConf.setRowkeyExpress(rowkeyColumn); + hBaseConf.setRowkeyExpress(rowkeyColumn); } if (config.getWriter().getParameter().get("versionColumn") != null) { @@ -62,13 +52,13 @@ public HBase14SinkFactory(SyncConf config) { (Map) config.getWriter().getParameter().get("versionColumn"); if (null != versionColumn.get("index") && StringUtils.isNotBlank(versionColumn.get("index").toString())) { - hbaseConf.setVersionColumnIndex( + hBaseConf.setVersionColumnIndex( Integer.valueOf(versionColumn.get("index").toString())); } if (null != versionColumn.get("value") && StringUtils.isNotBlank(versionColumn.get("value").toString())) { - hbaseConf.setVersionColumnValue(versionColumn.get("value").toString()); + hBaseConf.setVersionColumnValue(versionColumn.get("value").toString()); } } } @@ -76,32 +66,30 @@ public HBase14SinkFactory(SyncConf config) { @Override public DataStreamSink createSink(DataStream dataSet) { HBaseOutputFormatBuilder builder = new HBaseOutputFormatBuilder(); - builder.setConfig(hbaseConf); - builder.setHbaseConf(hbaseConf); + builder.setConfig(hBaseConf); - builder.setHbaseConfig(hbaseConf.getHbaseConfig()); - builder.setTableName(hbaseConf.getTable()); - builder.setWriteBufferSize(hbaseConf.getWriteBufferSize()); + builder.setHbaseConfig(hBaseConf.getHbaseConfig()); + builder.setTableName(hBaseConf.getTable()); + builder.setWriteBufferSize(hBaseConf.getWriteBufferSize()); AbstractRowConverter rowConverter; if (useAbstractBaseColumn) { final RowType rowType = - TableUtil.createRowType(hbaseConf.getColumn(), getRawTypeConverter()); - rowConverter = new HBaseColumnConverter(hbaseConf, rowType); + TableUtil.createRowType(hBaseConf.getColumn(), getRawTypeConverter()); + rowConverter = new HBaseColumnConverter(hBaseConf, rowType); } else { - TableSchema tableSchema = - TableUtil.createTableSchema(hbaseConf.getColumn(), getRawTypeConverter()); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); - String nullStringLiteral = hbaseConf.getNullStringLiteral(); - rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral); + // if use transform, use HBaseFlatRowConverter + final RowType rowType = + TableUtil.createRowType(hBaseConf.getColumn(), getRawTypeConverter()); + rowConverter = new HBaseFlatRowConverter(hBaseConf, rowType); } - builder.setRowConverter(rowConverter, useAbstractBaseColumn); + builder.setRowConverter(rowConverter); return createOutput(dataSet, builder.finish()); } @Override public RawTypeConverter getRawTypeConverter() { - return HBaseRawTypeConverter::apply; + return HBaseRawTypeConverter.INSTANCE; } /** Compatible with old formats */ @@ -124,7 +112,7 @@ private String buildRowKeyExpress(Object rowKeyInfo) { Integer index = ValueUtil.getInt(item.get("index")); if (index != null && index != -1) { expressBuilder.append( - String.format("$(%s)", hbaseConf.getColumn().get(index).getName())); + String.format("$(%s)", hBaseConf.getColumn().get(index).getName())); continue; } @@ -136,4 +124,21 @@ private String buildRowKeyExpress(Object rowKeyInfo) { return expressBuilder.toString(); } + + HBaseTableSchema buildHBaseTableSchema(String tableName, List fieldConfList) { + HBaseTableSchema hbaseSchema = new HBaseTableSchema(); + hbaseSchema.setTableName(tableName); + RawTypeConverter rawTypeConverter = getRawTypeConverter(); + for (FieldConf fieldConf : fieldConfList) { + String fieldName = fieldConf.getName(); + DataType dataType = rawTypeConverter.apply(fieldConf.getType()); + if ("rowkey".equalsIgnoreCase(fieldName)) { + hbaseSchema.setRowKey(fieldName, dataType); + } else if (fieldName.contains(":")) { + String[] fields = fieldName.split(":"); + hbaseSchema.addColumn(fields[0], fields[1], dataType); + } + } + return hbaseSchema; + } } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBaseOutputFormat.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBaseOutputFormat.java index 12e362dddd..e397544b36 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBaseOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBaseOutputFormat.java @@ -18,9 +18,7 @@ package com.dtstack.chunjun.connector.hbase14.sink; -import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; -import com.dtstack.chunjun.connector.hbase.util.HBaseConfigUtils; -import com.dtstack.chunjun.connector.hbase14.util.HBaseHelper; +import com.dtstack.chunjun.connector.hbase.util.HBaseHelper; import com.dtstack.chunjun.sink.format.BaseRichOutputFormat; import com.dtstack.chunjun.throwable.WriteRecordException; @@ -29,17 +27,15 @@ import org.apache.commons.lang3.Validate; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -61,7 +57,6 @@ public class HBaseOutputFormat extends BaseRichOutputFormat { private transient Connection connection; private transient BufferedMutator bufferedMutator; - private transient Table table; @Override @@ -71,9 +66,10 @@ public void configure(Configuration parameters) {} protected void writeSingleRecordInternal(RowData rawRecord) throws WriteRecordException { int i = 0; try { - - bufferedMutator.mutate((Mutation) rowConverter.toExternal(rawRecord, null)); - + Mutation mutation = null; + mutation = (Mutation) rowConverter.toExternal(rawRecord, mutation); + bufferedMutator.mutate(mutation); + bufferedMutator.flush(); } catch (Exception ex) { if (i < rawRecord.getArity()) { throw new WriteRecordException( @@ -85,18 +81,7 @@ protected void writeSingleRecordInternal(RowData rawRecord) throws WriteRecordEx @Override public void openInternal(int taskNumber, int numTasks) throws IOException { - boolean openKerberos = HBaseConfigUtils.isEnableKerberos(hbaseConfig); - if (openKerberos) { - UserGroupInformation ugi = HBaseHelper.getUgi(hbaseConfig); - ugi.doAs( - (PrivilegedAction) - () -> { - openConnection(); - return null; - }); - } else { - openConnection(); - } + openConnection(); } public void openConnection() { @@ -104,9 +89,16 @@ public void openConnection() { Validate.isTrue(hbaseConfig != null && hbaseConfig.size() != 0, "hbaseConfig不能为空Map结构!"); try { + connection = HBaseHelper.getHbaseConnection(hbaseConfig); org.apache.hadoop.conf.Configuration hConfiguration = HBaseHelper.getConfig(hbaseConfig); - connection = ConnectionFactory.createConnection(hConfiguration); + try (Admin admin = this.connection.getAdmin()) { + boolean exist = admin.tableExists(TableName.valueOf(tableName)); + if (!exist) { + throw new IOException( + "Target table is not exist,please check for table: " + tableName); + } + } bufferedMutator = connection.getBufferedMutator( @@ -129,7 +121,9 @@ protected void writeMultipleRecordsInternal() throws Exception { try { List mutations = new ArrayList<>(); for (RowData record : rows) { - mutations.add((Mutation) rowConverter.toExternal(record, null)); + Mutation mutation = null; + mutation = (Mutation) rowConverter.toExternal(record, mutation); + mutations.add(mutation); } results = new Object[mutations.size()]; table.batch(mutations, results); @@ -147,7 +141,6 @@ protected void writeMultipleRecordsInternal() throws Exception { @Override public void closeInternal() throws IOException { - HBaseHelper.closeBufferedMutator(bufferedMutator); HBaseHelper.closeConnection(connection); } @@ -156,7 +149,7 @@ public void setTableName(String tableName) { this.tableName = tableName; } - public void setHbaseConf(Map hbaseConfig) { + public void setHbaseConfig(Map hbaseConfig) { this.hbaseConfig = hbaseConfig; } @@ -171,8 +164,4 @@ public String getTableName() { public Map getHbaseConfig() { return hbaseConfig; } - - public void setHbaseConf(HBaseConf config) { - this.config = config; - } } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBaseOutputFormatBuilder.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBaseOutputFormatBuilder.java index e4b0e6ef15..d60a2ab9ed 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBaseOutputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBaseOutputFormatBuilder.java @@ -18,7 +18,6 @@ package com.dtstack.chunjun.connector.hbase14.sink; -import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; import com.dtstack.chunjun.connector.hbase.conf.HBaseConfigConstants; import com.dtstack.chunjun.sink.format.BaseRichOutputFormatBuilder; @@ -40,16 +39,12 @@ public HBaseOutputFormatBuilder() { super(new HBaseOutputFormat()); } - public void setHbaseConf(HBaseConf config) { - format.setHbaseConf(config); - } - public void setTableName(String tableName) { format.setTableName(tableName); } public void setHbaseConfig(Map hbaseConfig) { - format.setHbaseConf(hbaseConfig); + format.setHbaseConfig(hbaseConfig); } public void setWriteBufferSize(Long writeBufferSize) { diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBase14SourceFactory.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBase14SourceFactory.java index 61ee91a543..e461fde716 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBase14SourceFactory.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBase14SourceFactory.java @@ -15,89 +15,206 @@ */ package com.dtstack.chunjun.connector.hbase14.source; +import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.conf.OperatorConf; import com.dtstack.chunjun.conf.SyncConf; import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; -import com.dtstack.chunjun.connector.hbase.converter.HBaseRawTypeConverter; import com.dtstack.chunjun.connector.hbase14.converter.HBaseColumnConverter; +import com.dtstack.chunjun.connector.hbase14.converter.HBaseRawTypeConverter; import com.dtstack.chunjun.connector.hbase14.converter.HbaseRowConverter; +import com.dtstack.chunjun.connector.hbase14.util.ScanBuilder; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.RawTypeConverter; import com.dtstack.chunjun.source.SourceFactory; import com.dtstack.chunjun.util.GsonUtil; import com.dtstack.chunjun.util.TableUtil; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class HBase14SourceFactory extends SourceFactory { private static final Logger LOG = LoggerFactory.getLogger(HBase14SourceFactory.class); - private final HBaseConf config; + private final HBaseConf hBaseConf; public HBase14SourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) { super(syncConf, env); - config = + OperatorConf reader = syncConf.getReader(); + hBaseConf = GsonUtil.GSON.fromJson( - GsonUtil.GSON.toJson(syncConf.getReader().getParameter()), HBaseConf.class); + GsonUtil.GSON.toJson(reader.getParameter()), HBaseConf.class); + super.initCommonConf(hBaseConf); + if (hBaseConf.getTable() == null || "".equals(hBaseConf.getTable().trim())) { + // adapt to the chunjun 1.10 script + hBaseConf.setTable(syncConf.getReader().getTable().getTableName()); + } + hBaseConf.setColumnMetaInfos(reader.getFieldList()); Map range = (Map) syncConf.getReader().getParameter().get("range"); if (range != null) { if (range.get("startRowkey") != null && StringUtils.isNotBlank(range.get("startRowkey").toString())) { - config.setStartRowkey(range.get("startRowkey").toString()); + hBaseConf.setStartRowkey(range.get("startRowkey").toString()); } if (range.get("endRowkey") != null && StringUtils.isNotBlank(range.get("endRowkey").toString())) { - config.setEndRowkey(range.get("endRowkey").toString()); + hBaseConf.setEndRowkey(range.get("endRowkey").toString()); } if (range.get("isBinaryRowkey") != null) { - config.setBinaryRowkey((Boolean) range.get("isBinaryRowkey")); + hBaseConf.setBinaryRowkey((Boolean) range.get("isBinaryRowkey")); } } - - super.initCommonConf(config); - config.setColumn(syncConf.getReader().getFieldList()); + typeInformation = + TableUtil.getTypeInformation( + fieldList.stream() + .peek( + fieldConf -> + fieldConf.setName( + fieldConf.getName().replace(":", "."))) + .collect(Collectors.toList()), + getRawTypeConverter(), + useAbstractBaseColumn); + if (hBaseConf.getNullStringLiteral() == null + || "".equals(hBaseConf.getNullStringLiteral().trim())) { + hBaseConf.setNullStringLiteral("null"); + } } @Override public RawTypeConverter getRawTypeConverter() { - return HBaseRawTypeConverter::apply; + return HBaseRawTypeConverter.INSTANCE; } @Override @SuppressWarnings("all") public DataStream createSource() { - HBaseInputFormatBuilder builder = new HBaseInputFormatBuilder(); - builder.setConfig(config); - builder.sethHBaseConf(config); - - builder.setHbaseConfig(config.getHbaseConfig()); + List fieldConfList = hBaseConf.getColumnMetaInfos(); AbstractRowConverter rowConverter; + ScanBuilder scanBuilder; if (useAbstractBaseColumn) { final RowType rowType = - TableUtil.createRowType(config.getColumn(), getRawTypeConverter()); - rowConverter = new HBaseColumnConverter(config, rowType); + TableUtil.createRowType(hBaseConf.getColumn(), getRawTypeConverter()); + rowConverter = new HBaseColumnConverter(hBaseConf, rowType); + scanBuilder = ScanBuilder.forSync(fieldConfList); } else { - TableSchema tableSchema = - TableUtil.createTableSchema(config.getColumn(), getRawTypeConverter()); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); - String nullStringLiteral = config.getNullStringLiteral(); - rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral); + String nullStringLiteral = hBaseConf.getNullStringLiteral(); + Set columnSet = new LinkedHashSet<>(); + for (FieldConf fieldConf : fieldConfList) { + String fieldName = fieldConf.getName(); + if ("rowkey".equalsIgnoreCase(fieldName)) { + columnSet.add(fieldName); + } else if (fieldName.contains(".")) { + String[] familyQualifier = fieldName.split("\\."); + columnSet.add(familyQualifier[0]); + } + } + this.typeInformation = buildType(columnSet); + HBaseTableSchema hBaseTableSchema = + buildHBaseTableSchema(hBaseConf.getTable(), fieldConfList); + syncConf.getReader().setFieldNameList(new ArrayList<>(columnSet)); + final RowType rowType = + TableUtil.createRowType(hBaseConf.getColumn(), getRawTypeConverter()); + rowConverter = new HbaseRowConverter(hBaseTableSchema, nullStringLiteral); + scanBuilder = ScanBuilder.forSql(hBaseTableSchema); } - builder.setRowConverter(rowConverter, useAbstractBaseColumn); + HBaseInputFormatBuilder builder = + HBaseInputFormatBuilder.newBuild(hBaseConf.getTable(), scanBuilder); + + builder.setConfig(hBaseConf); + builder.setColumnMetaInfos(hBaseConf.getColumnMetaInfos()); + builder.setHbaseConfig(hBaseConf.getHbaseConfig()); + builder.setEndRowKey(hBaseConf.getEndRowkey()); + builder.setIsBinaryRowkey(hBaseConf.isBinaryRowkey()); + builder.setScanCacheSize(hBaseConf.getScanCacheSize()); + builder.setStartRowKey(hBaseConf.getStartRowkey()); + + builder.setRowConverter(rowConverter); return createInput(builder.finish()); } + + private TypeInformation buildType(Set columnSet) { + if (columnSet.size() == 0) { + return new GenericTypeInfo<>(RowData.class); + } + Map> family = new LinkedHashMap<>(); + for (String colName : columnSet) { + if (!"rowkey".equalsIgnoreCase(colName)) { + family.put(colName, new LinkedHashMap<>()); + } + } + + TableSchema.Builder builder = TableSchema.builder(); + for (int i = 0; i < fieldList.size(); i++) { + FieldConf fieldConf = fieldList.get(i); + String fieldName = fieldConf.getName(); + if ("rowkey".equalsIgnoreCase(fieldConf.getName())) { + DataType dataType = this.getRawTypeConverter().apply(fieldConf.getType()); + builder.add(TableColumn.physical(fieldName, dataType)); + } else if (fieldName.contains(".")) { + String[] familyQualifier = fieldName.split("\\."); + Map qualifier = family.get(familyQualifier[0]); + qualifier.put( + familyQualifier[1], this.getRawTypeConverter().apply(fieldConf.getType())); + } + } + for (Map.Entry> familyQualifierEntry : family.entrySet()) { + String familyName = familyQualifierEntry.getKey(); + List rowFieldList = new ArrayList<>(); + for (Map.Entry qualifierEntry : + familyQualifierEntry.getValue().entrySet()) { + String qualifierName = qualifierEntry.getKey(); + DataTypes.Field rowField = + DataTypes.FIELD(qualifierName, qualifierEntry.getValue()); + rowFieldList.add(rowField); + } + builder.add( + TableColumn.physical( + familyName, + DataTypes.ROW(rowFieldList.toArray(new DataTypes.Field[] {})))); + } + TableSchema tableSchema = builder.build(); + DataType[] dataTypes = tableSchema.toRowDataType().getChildren().toArray(new DataType[] {}); + return TableUtil.getTypeInformation(dataTypes, tableSchema.getFieldNames()); + } + + HBaseTableSchema buildHBaseTableSchema(String tableName, List fieldConfList) { + HBaseTableSchema hbaseSchema = new HBaseTableSchema(); + hbaseSchema.setTableName(tableName); + RawTypeConverter rawTypeConverter = getRawTypeConverter(); + for (FieldConf fieldConf : fieldConfList) { + String fieldName = fieldConf.getName(); + DataType dataType = rawTypeConverter.apply(fieldConf.getType()); + if ("rowkey".equalsIgnoreCase(fieldName)) { + hbaseSchema.setRowKey(fieldName, dataType); + } else if (fieldName.contains(".")) { + String[] fields = fieldName.split("\\."); + hbaseSchema.addColumn(fields[0], fields[1], dataType); + } + } + return hbaseSchema; + } } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBaseInputFormat.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBaseInputFormat.java index 45cf28d314..638083dcef 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBaseInputFormat.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBaseInputFormat.java @@ -18,17 +18,16 @@ package com.dtstack.chunjun.connector.hbase14.source; -import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; -import com.dtstack.chunjun.connector.hbase.util.HBaseConfigUtils; -import com.dtstack.chunjun.connector.hbase14.util.HBaseHelper; +import com.dtstack.chunjun.connector.hbase.util.HBaseHelper; +import com.dtstack.chunjun.connector.hbase14.util.ScanBuilder; import com.dtstack.chunjun.source.format.BaseRichInputFormat; -import com.dtstack.chunjun.throwable.ReadRecordException; import org.apache.flink.core.io.InputSplit; import org.apache.flink.table.data.RowData; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; @@ -37,10 +36,8 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -53,9 +50,17 @@ * @author huyifan.zju@163.com */ public class HBaseInputFormat extends BaseRichInputFormat { - protected Map hbaseConfig; - protected HBaseConf hBaseConf; + protected final String tableName; + protected String startRowkey; + protected String endRowkey; + protected List columnNames; + protected List columnValues; + protected List columnFormats; + protected List columnTypes; + protected boolean isBinaryRowkey; + /** 客户端每次 rpc fetch 的行数 */ + protected int scanCacheSize = 1000; private transient Connection connection; private transient Scan scan; @@ -63,13 +68,29 @@ public class HBaseInputFormat extends BaseRichInputFormat { private transient ResultScanner resultScanner; private transient Result next; + private ScanBuilder scanBuilder; + + public HBaseInputFormat(String tableName, ScanBuilder scanBuilder) { + this.scanBuilder = scanBuilder; + this.tableName = tableName; + } + @Override public void openInputFormat() throws IOException { super.openInputFormat(); LOG.info("HbaseOutputFormat openInputFormat start"); - connection = HBaseHelper.getHbaseConnection(hbaseConfig); + this.scan = scanBuilder.buildScan(); + this.scan.setCaching(scanCacheSize); + this.connection = HBaseHelper.getHbaseConnection(hbaseConfig); + try (Admin admin = this.connection.getAdmin()) { + boolean exist = admin.tableExists(TableName.valueOf(tableName)); + if (!exist) { + throw new IOException( + "Target table is not exist,please check for table: " + tableName); + } + } LOG.info("HbaseOutputFormat openInputFormat end"); } @@ -77,65 +98,8 @@ public void openInputFormat() throws IOException { @Override public InputSplit[] createInputSplitsInternal(int minNumSplits) throws IOException { try (Connection connection = HBaseHelper.getHbaseConnection(hbaseConfig)) { - if (HBaseConfigUtils.isEnableKerberos(hbaseConfig)) { - UserGroupInformation ugi = HBaseHelper.getUgi(hbaseConfig); - return ugi.doAs( - (PrivilegedAction) - () -> - split( - connection, - hBaseConf.getTable(), - hBaseConf.getStartRowkey(), - hBaseConf.getEndRowkey(), - hBaseConf.isBinaryRowkey())); - } else { - return split( - connection, - hBaseConf.getTable(), - hBaseConf.getStartRowkey(), - hBaseConf.getEndRowkey(), - hBaseConf.isBinaryRowkey()); - } - } - } - - @Override - public void openInternal(InputSplit inputSplit) throws IOException { - HBaseInputSplit hbaseInputSplit = (HBaseInputSplit) inputSplit; - byte[] startRow = Bytes.toBytesBinary(hbaseInputSplit.getStartkey()); - byte[] stopRow = Bytes.toBytesBinary(hbaseInputSplit.getEndKey()); - - if (null == connection || connection.isClosed()) { - connection = HBaseHelper.getHbaseConnection(hbaseConfig); + return split(connection, tableName, startRowkey, endRowkey, isBinaryRowkey); } - - table = connection.getTable(TableName.valueOf(hBaseConf.getTable())); - scan = new Scan(); - scan.setStartRow(startRow); - scan.setStopRow(stopRow); - scan.setCaching(hBaseConf.getScanCacheSize()); - resultScanner = table.getScanner(scan); - } - - @Override - public boolean reachedEnd() throws IOException { - next = resultScanner.next(); - return next == null; - } - - @Override - public RowData nextRecordInternal(RowData rawRow) throws ReadRecordException { - try { - rawRow = rowConverter.toInternal(next); - return rawRow; - } catch (Exception se) { - throw new ReadRecordException("", se, 0, rawRow); - } - } - - @Override - public void closeInternal() throws IOException { - HBaseHelper.closeConnection(connection); } public HBaseInputSplit[] split( @@ -255,4 +219,42 @@ private String getStartKey(byte[] startRowkeyByte, byte[] regionStarKey) { } return Bytes.toStringBinary(tempStartRowkeyByte); } + + @Override + public void openInternal(InputSplit inputSplit) throws IOException { + HBaseInputSplit hbaseInputSplit = (HBaseInputSplit) inputSplit; + byte[] startRow = Bytes.toBytesBinary(hbaseInputSplit.getStartkey()); + byte[] stopRow = Bytes.toBytesBinary(hbaseInputSplit.getEndKey()); + + if (null == connection || connection.isClosed()) { + connection = HBaseHelper.getHbaseConnection(hbaseConfig); + } + + table = connection.getTable(TableName.valueOf(tableName)); + // scan = new Scan(); + this.scan.setStartRow(startRow); + this.scan.setStopRow(stopRow); + this.scan.setCaching(scanCacheSize); + resultScanner = table.getScanner(scan); + } + + @Override + public boolean reachedEnd() throws IOException { + next = resultScanner.next(); + return next == null; + } + + @Override + public RowData nextRecordInternal(RowData rawRow) { + try { + return rowConverter.toInternal(next); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void closeInternal() throws IOException { + HBaseHelper.closeConnection(connection); + } } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBaseInputFormatBuilder.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBaseInputFormatBuilder.java index 5ae57377ef..978270c897 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBaseInputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBaseInputFormatBuilder.java @@ -17,13 +17,18 @@ */ package com.dtstack.chunjun.connector.hbase14.source; -import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; +import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.hbase.conf.HBaseConfigConstants; +import com.dtstack.chunjun.connector.hbase14.util.ScanBuilder; import com.dtstack.chunjun.source.format.BaseRichInputFormatBuilder; import org.apache.flink.util.Preconditions; +import org.apache.commons.lang.StringUtils; + +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * The builder of HbaseInputFormat @@ -32,30 +37,87 @@ * * @author huyifan.zju@163.com */ -public class HBaseInputFormatBuilder extends BaseRichInputFormatBuilder { +public class HBaseInputFormatBuilder extends BaseRichInputFormatBuilder { + + public static HBaseInputFormatBuilder newBuild(String tableName, ScanBuilder scanBuilder) { + HBaseInputFormat format = new HBaseInputFormat(tableName, scanBuilder); + return new HBaseInputFormatBuilder(format); + } - public HBaseInputFormatBuilder() { - super(new HBaseInputFormat()); + public HBaseInputFormatBuilder(HBaseInputFormat format) { + super(format); } public void setHbaseConfig(Map hbaseConfig) { - ((HBaseInputFormat) format).hbaseConfig = hbaseConfig; + format.hbaseConfig = hbaseConfig; + } + + public void setStartRowKey(String startRowKey) { + format.startRowkey = startRowKey; + } + + public void setEndRowKey(String endRowKey) { + format.endRowkey = endRowKey; + } + + public void setColumnNames(List columnNames) { + format.columnNames = columnNames; + } + + public void setColumnValues(List columnValues) { + format.columnValues = columnValues; + } + + public void setColumnTypes(List columnTypes) { + format.columnTypes = columnTypes; + } + + public void setIsBinaryRowkey(boolean isBinaryRowkey) { + format.isBinaryRowkey = isBinaryRowkey; } - public void sethHBaseConf(HBaseConf hBaseConf) { - ((HBaseInputFormat) format).hBaseConf = hBaseConf; + public void setColumnFormats(List columnFormats) { + format.columnFormats = columnFormats; + } + + public void setScanCacheSize(int scanCacheSize) { + format.scanCacheSize = scanCacheSize; } @Override protected void checkFormat() { Preconditions.checkArgument( - ((HBaseInputFormat) format).hBaseConf.getScanCacheSize() - <= HBaseConfigConstants.MAX_SCAN_CACHE_SIZE - && ((HBaseInputFormat) format).hBaseConf.getScanCacheSize() - >= HBaseConfigConstants.MIN_SCAN_CACHE_SIZE, + format.scanCacheSize <= HBaseConfigConstants.MAX_SCAN_CACHE_SIZE + && format.scanCacheSize >= HBaseConfigConstants.MIN_SCAN_CACHE_SIZE, "scanCacheSize should be between " + HBaseConfigConstants.MIN_SCAN_CACHE_SIZE + " and " + HBaseConfigConstants.MAX_SCAN_CACHE_SIZE); + + if (format.columnFormats != null) { + for (int i = 0; i < format.columnTypes.size(); ++i) { + Preconditions.checkArgument(StringUtils.isNotEmpty(format.columnTypes.get(i))); + Preconditions.checkArgument( + StringUtils.isNotEmpty(format.columnNames.get(i)) + || StringUtils.isNotEmpty(format.columnTypes.get(i))); + } + } + } + + public void setColumnMetaInfos(List columMetaInfos) { + if (columMetaInfos != null && !columMetaInfos.isEmpty()) { + List nameList = + columMetaInfos.stream().map(FieldConf::getName).collect(Collectors.toList()); + setColumnNames(nameList); + List typeList = + columMetaInfos.stream().map(FieldConf::getType).collect(Collectors.toList()); + setColumnTypes(typeList); + List valueList = + columMetaInfos.stream().map(FieldConf::getValue).collect(Collectors.toList()); + setColumnValues(valueList); + List formatList = + columMetaInfos.stream().map(FieldConf::getFormat).collect(Collectors.toList()); + setColumnFormats(formatList); + } } } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseDynamicTableSink.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseDynamicTableSink.java index fa71d321fc..9b36168a07 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseDynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseDynamicTableSink.java @@ -17,21 +17,18 @@ */ package com.dtstack.chunjun.connector.hbase14.table; +import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; import com.dtstack.chunjun.connector.hbase14.converter.HbaseRowConverter; import com.dtstack.chunjun.connector.hbase14.sink.HBaseOutputFormatBuilder; -import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.sink.DtOutputFormatSinkFunction; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.utils.TypeConversions; import java.util.ArrayList; import java.util.List; @@ -46,12 +43,17 @@ public class HBaseDynamicTableSink implements DynamicTableSink { private final HBaseConf conf; private final TableSchema tableSchema; private final HBaseTableSchema hbaseSchema; + protected final String nullStringLiteral; public HBaseDynamicTableSink( - HBaseConf conf, TableSchema tableSchema, HBaseTableSchema hbaseSchema) { + HBaseConf conf, + TableSchema tableSchema, + HBaseTableSchema hbaseSchema, + String nullStringLiteral) { this.conf = conf; this.tableSchema = tableSchema; this.hbaseSchema = hbaseSchema; + this.nullStringLiteral = nullStringLiteral; } @Override @@ -61,44 +63,31 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { @Override public SinkFunctionProvider getSinkRuntimeProvider(Context context) { - List logicalTypes = new ArrayList<>(); - - String[] familyNames = hbaseSchema.getFamilyNames(); - int rowKeyIndex = hbaseSchema.getRowKeyIndex(); - for (int i = 0; i < familyNames.length; i++) { - if (i == rowKeyIndex) { - logicalTypes.add( - TypeConversions.fromDataToLogicalType( - hbaseSchema.getRowKeyDataType().get())); - } - DataType[] qualifierDataTypes = hbaseSchema.getQualifierDataTypes(familyNames[i]); - for (DataType dataType : qualifierDataTypes) { - logicalTypes.add(TypeConversions.fromDataToLogicalType(dataType)); - } + final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); + String[] fieldNames = tableSchema.getFieldNames(); + List columnList = new ArrayList<>(fieldNames.length); + for (int i = 0; i < fieldNames.length; i++) { + FieldConf field = new FieldConf(); + field.setName(fieldNames[i]); + field.setType(rowType.getTypeAt(i).asSummaryString()); + field.setIndex(i); + columnList.add(field); } - // todo 测试下顺序是否是一致的 - RowType of = RowType.of(logicalTypes.toArray(new LogicalType[0])); - HBaseOutputFormatBuilder builder = new HBaseOutputFormatBuilder(); - builder.setConfig(conf); - builder.setHbaseConf(conf); builder.setHbaseConfig(conf.getHbaseConfig()); builder.setTableName(conf.getTable()); - builder.setWriteBufferSize(conf.getWriteBufferSize()); - String nullStringLiteral = conf.getNullStringLiteral(); - - AbstractRowConverter rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral); - builder.setRowConverter(rowConverter); - + HbaseRowConverter hbaseRowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral); + builder.setRowConverter(hbaseRowConverter); + builder.setConfig(conf); return SinkFunctionProvider.of( new DtOutputFormatSinkFunction(builder.finish()), conf.getParallelism()); } @Override public DynamicTableSink copy() { - return new HBaseDynamicTableSink(conf, tableSchema, hbaseSchema); + return new HBaseDynamicTableSink(conf, tableSchema, hbaseSchema, nullStringLiteral); } @Override diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseDynamicTableSource.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseDynamicTableSource.java index 95c44a5026..5ae76c9ec4 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseDynamicTableSource.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseDynamicTableSource.java @@ -25,6 +25,7 @@ import com.dtstack.chunjun.connector.hbase14.source.HBaseInputFormatBuilder; import com.dtstack.chunjun.connector.hbase14.table.lookup.HBaseAllTableFunction; import com.dtstack.chunjun.connector.hbase14.table.lookup.HBaseLruTableFunction; +import com.dtstack.chunjun.connector.hbase14.util.ScanBuilder; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.lookup.AbstractLruTableFunction; import com.dtstack.chunjun.lookup.conf.LookupConf; @@ -40,44 +41,60 @@ */ public class HBaseDynamicTableSource extends BaseHBaseDynamicTableSource { + private final HBaseConf hBaseConf; + private TableSchema tableSchema; + private final LookupConf lookupConf; + private HBaseTableSchema hbaseSchema; + protected final String nullStringLiteral; + public HBaseDynamicTableSource( HBaseConf conf, TableSchema tableSchema, LookupConf lookupConf, - HBaseTableSchema hbaseSchema) { + HBaseTableSchema hbaseSchema, + String nullStringLiteral) { super(tableSchema, hbaseSchema, conf, lookupConf); + this.hBaseConf = conf; + this.tableSchema = tableSchema; + this.lookupConf = lookupConf; + this.hbaseSchema = hbaseSchema; + this.hbaseSchema.setTableName(hBaseConf.getTable()); + this.nullStringLiteral = nullStringLiteral; } @Override public DynamicTableSource copy() { - return new HBaseDynamicTableSource(this.hBaseConf, tableSchema, lookupConf, hbaseSchema); + return new HBaseDynamicTableSource( + this.hBaseConf, tableSchema, lookupConf, hbaseSchema, nullStringLiteral); + } + + @Override + public String asSummaryString() { + return "Hbase2DynamicTableSource:"; } @Override - protected BaseRichInputFormatBuilder getBaseRichInputFormatBuilder() { - HBaseInputFormatBuilder builder = new HBaseInputFormatBuilder(); + protected BaseRichInputFormatBuilder getBaseRichInputFormatBuilder() { + ScanBuilder scanBuilder = ScanBuilder.forSql(hbaseSchema); + HBaseInputFormatBuilder builder = + HBaseInputFormatBuilder.newBuild(hBaseConf.getTable(), scanBuilder); + builder.setColumnMetaInfos(hBaseConf.getColumnMetaInfos()); builder.setConfig(hBaseConf); builder.setHbaseConfig(hBaseConf.getHbaseConfig()); - builder.sethHBaseConf(hBaseConf); - - AbstractRowConverter rowConverter = - new HbaseRowConverter(hbaseSchema, hBaseConf.getNullStringLiteral()); + // 投影下推后, hbaseSchema 会被过滤无用的字段,而 tableSchema 不变, 后面根据 hbaseSchema 生成 hbase scan + AbstractRowConverter rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral); builder.setRowConverter(rowConverter); return builder; } @Override protected AbstractLruTableFunction getAbstractLruTableFunction() { - return new HBaseLruTableFunction(lookupConf, hbaseSchema, hBaseConf); + AbstractRowConverter rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral); + return new HBaseLruTableFunction(lookupConf, hbaseSchema, hBaseConf, rowConverter); } @Override protected AbstractHBaseAllTableFunction getAbstractAllTableFunction() { return new HBaseAllTableFunction(lookupConf, hbaseSchema, hBaseConf); } - - @Override - public String asSummaryString() { - return "Hbase14DynamicTableSource:"; - } } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/Hbase14DynamicTableFactory.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/Hbase14DynamicTableFactory.java index 26a2d3ff52..7d38a03437 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/Hbase14DynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/Hbase14DynamicTableFactory.java @@ -31,18 +31,20 @@ import org.apache.hadoop.hbase.HConstants; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; - -import static com.dtstack.chunjun.connector.hbase14.table.HBaseOptions.NULL_STRING_LITERAL; -import static com.dtstack.chunjun.connector.hbase14.table.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL; -import static com.dtstack.chunjun.connector.hbase14.table.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS; -import static com.dtstack.chunjun.connector.hbase14.table.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE; -import static com.dtstack.chunjun.connector.hbase14.table.HBaseOptions.TABLE_NAME; -import static com.dtstack.chunjun.connector.hbase14.table.HBaseOptions.ZOOKEEPER_QUORUM; -import static com.dtstack.chunjun.connector.hbase14.table.HBaseOptions.ZOOKEEPER_ZNODE_PARENT; +import java.util.stream.Collectors; + +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.NULL_STRING_LITERAL; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.TABLE_NAME; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.ZOOKEEPER_QUORUM; +import static com.dtstack.chunjun.connector.hbase.table.HBaseOptions.ZOOKEEPER_ZNODE_PARENT; import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_ASYNC_TIMEOUT; import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_CACHE_MAX_ROWS; import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_CACHE_PERIOD; @@ -60,7 +62,7 @@ public class Hbase14DynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { - public static final String IDENTIFIER = "hbase14-x"; + public static final String IDENTIFIER = "hbase2-x"; public static final String PROPERTIES_PREFIX = "properties."; @Override @@ -118,7 +120,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { getLookupConf(config, context.getObjectIdentifier().getObjectName()); HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(physicalSchema); String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL); - return new HBaseDynamicTableSource(conf, physicalSchema, lookupConf, hbaseSchema); + return new HBaseDynamicTableSource( + conf, physicalSchema, lookupConf, hbaseSchema, nullStringLiteral); } private static void validatePrimaryKey(TableSchema schema) { @@ -176,6 +179,10 @@ private static Map getHBaseClientProperties(Map hbaseProperties.put(HConstants.ZOOKEEPER_QUORUM, options.getString(ZOOKEEPER_QUORUM)); hbaseProperties.put( HConstants.ZOOKEEPER_ZNODE_PARENT, options.getString(ZOOKEEPER_ZNODE_PARENT)); + // for hbase 2.x + hbaseProperties.put( + "hbase." + HConstants.ZOOKEEPER_ZNODE_PARENT, + options.getString(ZOOKEEPER_ZNODE_PARENT)); if (containsHBaseClientProperties(tableOptions)) { tableOptions.keySet().stream() @@ -213,18 +220,20 @@ public DynamicTableSink createDynamicTableSink(Context context) { long bufferFlushMaxSizeInBytes = config.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes(); conf.setWriteBufferSize(bufferFlushMaxSizeInBytes); - return new HBaseDynamicTableSink(conf, physicalSchema, hbaseSchema); + conf.setRowkeyExpress(generateRowKey(hbaseSchema)); + String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL); + return new HBaseDynamicTableSink(conf, physicalSchema, hbaseSchema, nullStringLiteral); } - // private String generateRowKey(HBaseTableSchema hbaseSchema) { - // int rowIndex = 1; - // if (hbaseSchema.getRowKeyIndex() > 1) { - // rowIndex = hbaseSchema.getRowKeyIndex(); - // } - // String familyName = hbaseSchema.getFamilyNames()[rowIndex - 1]; - // String[] qualifierNames = hbaseSchema.getQualifierNames(familyName); - // return Arrays.stream(qualifierNames) - // .map(key -> "${" + key + "}") - // .collect(Collectors.joining("_")); - // } + private String generateRowKey(HBaseTableSchema hbaseSchema) { + int rowIndex = 1; + if (hbaseSchema.getRowKeyIndex() > 1) { + rowIndex = hbaseSchema.getRowKeyIndex(); + } + String familyName = hbaseSchema.getFamilyNames()[rowIndex - 1]; + String[] qualifierNames = hbaseSchema.getQualifierNames(familyName); + return Arrays.stream(qualifierNames) + .map(key -> "${" + key + "}") + .collect(Collectors.joining("_")); + } } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/lookup/HBaseAllTableFunction.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/lookup/HBaseAllTableFunction.java index 945271552b..209edcec43 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/lookup/HBaseAllTableFunction.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/lookup/HBaseAllTableFunction.java @@ -22,9 +22,8 @@ import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; import com.dtstack.chunjun.connector.hbase.table.lookup.AbstractHBaseAllTableFunction; import com.dtstack.chunjun.connector.hbase.util.HBaseConfigUtils; +import com.dtstack.chunjun.connector.hbase.util.HBaseHelper; import com.dtstack.chunjun.connector.hbase14.converter.HBaseSerde; -import com.dtstack.chunjun.connector.hbase14.converter.HbaseRowConverter; -import com.dtstack.chunjun.connector.hbase14.util.HBaseHelper; import com.dtstack.chunjun.lookup.conf.LookupConf; import com.dtstack.chunjun.security.KerberosUtil; @@ -33,7 +32,6 @@ import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -54,24 +52,21 @@ public class HBaseAllTableFunction extends AbstractHBaseAllTableFunction { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(HBaseAllTableFunction.class); - private Connection conn; - private String tableName; private Table table; private ResultScanner resultScanner; + private final HBaseTableSchema hbaseTableSchema; private transient HBaseSerde serde; + private final String nullStringLiteral; + private final HBaseConf hBaseConf; public HBaseAllTableFunction( LookupConf lookupConf, HBaseTableSchema hbaseTableSchema, HBaseConf hBaseConf) { - super( - null, - null, - lookupConf, - new HbaseRowConverter(hbaseTableSchema, hBaseConf.getNullStringLiteral()), - hbaseTableSchema, - hBaseConf); - this.tableName = hbaseTableSchema.getTableName(); + super(null, null, lookupConf, null, hbaseTableSchema, hBaseConf); + this.hbaseTableSchema = hbaseTableSchema; + this.hBaseConf = hBaseConf; + this.nullStringLiteral = hBaseConf.getNullStringLiteral(); } @Override @@ -82,11 +77,10 @@ public void open(FunctionContext context) throws Exception { @Override protected void loadData(Object cacheRef) { - Configuration hbaseDomainConf = HBaseConfiguration.create(); + Configuration hbaseDomainConf = new Configuration(); for (Map.Entry entry : hBaseConf.getHbaseConfig().entrySet()) { hbaseDomainConf.set(entry.getKey(), entry.getValue().toString()); } - int loadDataCount = 0; try { if (HBaseConfigUtils.isEnableKerberos(hbaseDomainConf)) { @@ -117,8 +111,7 @@ protected void loadData(Object cacheRef) { } else { conn = ConnectionFactory.createConnection(hbaseDomainConf); } - - table = conn.getTable(TableName.valueOf(tableName)); + table = conn.getTable(TableName.valueOf(hbaseTableSchema.getTableName())); resultScanner = table.getScanner(new Scan()); Map tmpCache = (Map) cacheRef; for (Result r : resultScanner) { diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/lookup/HBaseLruTableFunction.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/lookup/HBaseLruTableFunction.java index b2049496cd..be18c0b74c 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/lookup/HBaseLruTableFunction.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/lookup/HBaseLruTableFunction.java @@ -21,38 +21,29 @@ import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; import com.dtstack.chunjun.connector.hbase.table.lookup.AbstractHBaseLruTableFunction; -import com.dtstack.chunjun.connector.hbase.util.HBaseConfigUtils; -import com.dtstack.chunjun.connector.hbase14.converter.AsyncHBaseSerde; -import com.dtstack.chunjun.connector.hbase14.converter.HbaseRowConverter; -import com.dtstack.chunjun.connector.hbase14.util.DtFileUtils; +import com.dtstack.chunjun.connector.hbase.util.HBaseHelper; +import com.dtstack.chunjun.connector.hbase14.converter.HBaseSerde; +import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.enums.ECacheContentType; import com.dtstack.chunjun.factory.ChunJunThreadFactory; import com.dtstack.chunjun.lookup.cache.CacheMissVal; import com.dtstack.chunjun.lookup.cache.CacheObj; import com.dtstack.chunjun.lookup.conf.LookupConf; -import org.apache.flink.runtime.security.DynamicConfiguration; -import org.apache.flink.runtime.security.KerberosUtils; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; -import com.google.common.collect.Maps; -import com.stumbleupon.async.Deferred; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.authentication.util.KerberosName; -import org.hbase.async.Config; -import org.hbase.async.GetRequest; -import org.hbase.async.HBaseClient; -import org.hbase.async.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.security.krb5.KrbException; - -import javax.security.auth.login.AppConfigurationEntry; +import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -62,36 +53,36 @@ public class HBaseLruTableFunction extends AbstractHBaseLruTableFunction { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(HBaseLruTableFunction.class); - private static final int DEFAULT_BOSS_THREADS = 1; private static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; private static final int DEFAULT_POOL_SIZE = DEFAULT_IO_THREADS + DEFAULT_BOSS_THREADS; - private final String nullStringLiteral; + private transient Connection connection; + private transient Table table; + + private transient ExecutorService executorService; - private transient HBaseClient hBaseClient; - private String tableName; + private final HBaseTableSchema hbaseTableSchema; - private transient AsyncHBaseSerde serde; + private transient HBaseSerde serde; + + private final HBaseConf hBaseConf; public HBaseLruTableFunction( - LookupConf lookupConf, HBaseTableSchema hbaseTableSchema, HBaseConf hBaseConf) { - super( - lookupConf, - new HbaseRowConverter(hbaseTableSchema, hBaseConf.getNullStringLiteral()), - hbaseTableSchema, - hBaseConf); - this.nullStringLiteral = hBaseConf.getNullStringLiteral(); + LookupConf lookupConf, + HBaseTableSchema hbaseTableSchema, + HBaseConf hBaseConf, + AbstractRowConverter rowConverter) { + super(lookupConf, rowConverter, hbaseTableSchema, hBaseConf); + this.hBaseConf = hBaseConf; + this.hbaseTableSchema = hbaseTableSchema; } @Override public void open(FunctionContext context) throws Exception { super.open(context); - Configuration conf = HBaseConfigUtils.getConfig(hBaseConf.getHbaseConfig()); - - this.serde = new AsyncHBaseSerde(hbaseTableSchema, nullStringLiteral); - tableName = hbaseTableSchema.getTableName(); - ExecutorService executorService = + this.serde = new HBaseSerde(hbaseTableSchema, hBaseConf.getNullStringLiteral()); + this.executorService = new ThreadPoolExecutor( DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, @@ -100,154 +91,54 @@ public void open(FunctionContext context) throws Exception { new LinkedBlockingQueue<>(), new ChunJunThreadFactory("hbase-async")); - Config asyncClientConfig = new Config(); - for (Map.Entry entry : hBaseConf.getHbaseConfig().entrySet()) { - asyncClientConfig.overrideConfig(entry.getKey(), entry.getValue().toString()); - } - - if (HBaseConfigUtils.isEnableKerberos(conf)) { - System.setProperty( - HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, - asyncClientConfig.getString(HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF)); - String principal = - asyncClientConfig.getString( - HBaseConfigUtils.KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL); - String keytab = - asyncClientConfig.getString(HBaseConfigUtils.KEY_HBASE_CLIENT_KEYTAB_FILE); - DtFileUtils.checkExists(keytab); - LOG.info("Kerberos login with keytab: {} and principal: {}", keytab, principal); - String name = "HBaseClient"; - asyncClientConfig.overrideConfig("hbase.sasl.clientconfig", name); - appendJaasConf(name, keytab, principal); - refreshConfig(); - } - - hBaseClient = new HBaseClient(asyncClientConfig, executorService); - try { - Deferred deferred = - hBaseClient - .ensureTableExists(tableName) - .addCallbacks( - arg -> new CheckResult(true, ""), - arg -> new CheckResult(false, arg.toString())); - - CheckResult result = (CheckResult) deferred.join(); - if (!result.isConnect()) { - throw new RuntimeException(result.getExceptionMsg()); - } - - } catch (Exception e) { - throw new RuntimeException("create hbase connection fail:", e); - } + this.connection = HBaseHelper.getHbaseConnection(hBaseConf); + this.table = connection.getTable(TableName.valueOf(hbaseTableSchema.getTableName())); } @Override public void handleAsyncInvoke( CompletableFuture> future, Object... rowKeys) { - Object rowKey = rowKeys[0]; - byte[] key = serde.getRowKey(rowKey); - String keyStr = new String(key); - GetRequest getRequest = new GetRequest(tableName, key); - hBaseClient - .get(getRequest) - .addCallbacks( - keyValues -> { - try { - Map> sideMap = Maps.newHashMap(); - for (KeyValue keyValue : keyValues) { - String cf = new String(keyValue.family()); - String col = new String(keyValue.qualifier()); - if (!sideMap.containsKey(cf)) { - Map cfMap = Maps.newHashMap(); - cfMap.put(col, keyValue.value()); - sideMap.put(cf, cfMap); - } else { - sideMap.get(cf).putIfAbsent(col, keyValue.value()); - } + + executorService.execute( + new Runnable() { + @Override + public void run() { + Object rowKey = rowKeys[0]; + byte[] key = serde.getRowKey(rowKey); + String keyStr = new String(key); + try { + Get get = new Get(key); + Result result = table.get(get); + if (!result.isEmpty()) { + RowData data = serde.convertToNewRow(result); + if (openCache()) { + sideCache.putCache( + keyStr, + CacheObj.buildCacheObj( + ECacheContentType.MultiLine, + Collections.singletonList(data))); } - RowData rowData = serde.convertToNewRow(sideMap, key); - if (keyValues.size() > 0) { - try { - if (openCache()) { - sideCache.putCache( - keyStr, - CacheObj.buildCacheObj( - ECacheContentType.MultiLine, - Collections.singletonList(rowData))); - } - future.complete(Collections.singletonList(rowData)); - } catch (Exception e) { - future.completeExceptionally(e); - } - } else { - dealMissKey(future); - if (openCache()) { - sideCache.putCache(keyStr, CacheMissVal.getMissKeyObj()); - } + future.complete(Collections.singletonList(data)); + } else { + dealMissKey(future); + if (openCache()) { + sideCache.putCache(keyStr, CacheMissVal.getMissKeyObj()); } - } catch (Exception e) { - future.completeExceptionally(e); - LOG.error("record:" + keyStr); - LOG.error("get side record exception:", e); } - return ""; - }, - o -> { + } catch (IOException e) { LOG.error("record:" + keyStr); - LOG.error("get side record exception:" + o); + LOG.error("get side record exception:" + e); future.complete(Collections.EMPTY_LIST); - return ""; - }); - } - - private void refreshConfig() throws KrbException { - sun.security.krb5.Config.refresh(); - KerberosName.resetDefaultRealm(); - // reload java.security.auth.login.config - // javax.security.auth.login.Configuration.setConfiguration(null); - } - - private void appendJaasConf(String name, String keytab, String principal) { - javax.security.auth.login.Configuration priorConfig = - javax.security.auth.login.Configuration.getConfiguration(); - // construct a dynamic JAAS configuration - DynamicConfiguration currentConfig = new DynamicConfiguration(priorConfig); - // wire up the configured JAAS login contexts to use the krb5 entries - AppConfigurationEntry krb5Entry = KerberosUtils.keytabEntry(keytab, principal); - currentConfig.addAppConfigurationEntry(name, krb5Entry); - javax.security.auth.login.Configuration.setConfiguration(currentConfig); + } + } + }); } @Override public void close() throws Exception { + table.close(); + connection.close(); + executorService.shutdown(); super.close(); - hBaseClient.shutdown(); - } - - class CheckResult { - private boolean connect; - - private String exceptionMsg; - - CheckResult(boolean connect, String msg) { - this.connect = connect; - this.exceptionMsg = msg; - } - - public boolean isConnect() { - return connect; - } - - public void setConnect(boolean connect) { - this.connect = connect; - } - - public String getExceptionMsg() { - return exceptionMsg; - } - - public void setExceptionMsg(String exceptionMsg) { - this.exceptionMsg = exceptionMsg; - } } } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/DtFileUtils.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/DtFileUtils.java deleted file mode 100644 index 3cc08bfdd0..0000000000 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/DtFileUtils.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.chunjun.connector.hbase14.util; - -import org.apache.flink.util.Preconditions; - -import java.io.File; - -/** - * @program: flinkStreamSQL - * @author: wuren - * @create: 2020/09/21 - */ -public class DtFileUtils { - public static void checkExists(String path) { - File file = new File(path); - String errorMsg = "%s file is not exist!"; - Preconditions.checkState(file.exists(), errorMsg, path); - } -} diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/ScanBuilder.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/ScanBuilder.java new file mode 100644 index 0000000000..63f41d1ff4 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/ScanBuilder.java @@ -0,0 +1,68 @@ +package com.dtstack.chunjun.connector.hbase14.util; + +import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.hbase.HBaseTableSchema; + +import org.apache.flink.table.types.DataType; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +public class ScanBuilder implements Serializable { + + private static final long serialVersionUID = 12L; + + private final boolean isSync; + private final HBaseTableSchema hBaseTableSchema; + private final List fieldConfList; + + private ScanBuilder(HBaseTableSchema hBaseTableSchema) { + this.isSync = false; + this.fieldConfList = null; + this.hBaseTableSchema = hBaseTableSchema; + } + + private ScanBuilder(List fieldConfList) { + this.isSync = true; + this.fieldConfList = fieldConfList; + this.hBaseTableSchema = null; + } + + public static ScanBuilder forSql(HBaseTableSchema hBaseTableSchema) { + return new ScanBuilder(hBaseTableSchema); + } + + public static ScanBuilder forSync(List fieldConfList) { + return new ScanBuilder(fieldConfList); + } + + public Scan buildScan() { + Scan scan = new Scan(); + if (isSync) { + for (FieldConf fieldConf : fieldConfList) { + String fieldName = fieldConf.getName(); + if (!"rowkey".equalsIgnoreCase(fieldName)) { + if (fieldName.contains(".")) { + String[] fields = fieldName.split("\\."); + scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1])); + } + } + } + return scan; + } else { + String[] familyNames = this.hBaseTableSchema.getFamilyNames(); + for (String familyName : familyNames) { + Map familyInfo = hBaseTableSchema.getFamilyInfo(familyName); + for (Map.Entry columnInfoEntry : familyInfo.entrySet()) { + scan.addColumn( + Bytes.toBytes(familyName), Bytes.toBytes(columnInfoEntry.getKey())); + } + } + } + return scan; + } +} diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseMutationConverter.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseMutationConverter.java deleted file mode 100644 index 48340daf06..0000000000 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseMutationConverter.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.chunjun.connector.hbase; - -/** - * @program: chunjun - * @author: wuren - * @create: 2021/10/15 - */ -import org.apache.flink.annotation.Internal; - -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; - -import java.io.Serializable; - -/** - * A converter used to converts the input record into HBase {@link Mutation}. - * - * @param type of input record. - */ -@Internal -public interface HBaseMutationConverter extends Serializable { - - /** Initialization method for the function. It is called once before conversion method. */ - void open(); - - /** - * Converts the input record into HBase {@link Mutation}. A mutation can be a {@link Put} or - * {@link Delete}. - */ - Mutation convertToMutation(T record); -} diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchema.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchema.java index 52c2887a49..da8d5960cd 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchema.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/HBaseTableSchema.java @@ -251,7 +251,7 @@ public DataType[] getQualifierDataTypes(String family) { * are returned. * @return The names and types of all registered column qualifiers of a specific column family. */ - private Map getFamilyInfo(String family) { + public Map getFamilyInfo(String family) { return familyMap.get(family); } diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/conf/HBaseConf.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/conf/HBaseConf.java index 5bc6ccb0e8..5fd06772e8 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/conf/HBaseConf.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/conf/HBaseConf.java @@ -16,10 +16,14 @@ package com.dtstack.chunjun.connector.hbase.conf; import com.dtstack.chunjun.conf.ChunJunCommonConf; +import com.dtstack.chunjun.conf.FieldConf; +import java.util.List; import java.util.Map; public class HBaseConf extends ChunJunCommonConf { + // 该字段与 com.dtstack.chunjun.conf.FlinkxCommonConf.column 不同,该字段储存的是 ":" 转化为 "." 后的字段名 + private List columnMetaInfos; private String encoding = "UTF-8"; private Map hbaseConfig; @@ -150,4 +154,12 @@ public String getNullStringLiteral() { public void setNullStringLiteral(String nullStringLiteral) { this.nullStringLiteral = nullStringLiteral; } + + public List getColumnMetaInfos() { + return columnMetaInfos; + } + + public void setColumnMetaInfos(List columnMetaInfos) { + this.columnMetaInfos = columnMetaInfos; + } } diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/conf/HBaseConfigKeys.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/conf/HBaseConfigKeys.java deleted file mode 100644 index 9e65c1edb8..0000000000 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/conf/HBaseConfigKeys.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dtstack.chunjun.connector.hbase.conf; - -/** - * This class defines configuration keys for HbaseReader and HbaseWriter - * - *

Company: www.dtstack.com - * - * @author huyifan.zju@163.com - */ -public class HBaseConfigKeys { - - public static final String KEY_SCAN_CACHE_SIZE = "scanCacheSize"; - - public static final String KEY_SCAN_BATCH_SIZE = "scanBatchSize"; - - public static final String KEY_TABLE = "table"; - - public static final String KEY_HBASE_CONFIG = "hbaseConfig"; - - public static final String KEY_START_ROW_KEY = "startRowkey"; - - public static final String KEY_END_ROW_KEY = "endRowkey"; - - public static final String KEY_IS_BINARY_ROW_KEY = "isBinaryRowkey"; - - public static final String KEY_ENCODING = "encoding"; - - public static final String KEY_RANGE = "range"; - - public static final String KEY_COLUMN_NAME = "name"; - - public static final String KEY_COLUMN_TYPE = "type"; - - public static final String KEY_ROW_KEY_COLUMN = "rowkeyColumn"; - - public static final String KEY_ROW_KEY_COLUMN_INDEX = "index"; - - public static final String KEY_ROW_KEY_COLUMN_TYPE = "type"; - - public static final String KEY_ROW_KEY_COLUMN_VALUE = "value"; - - public static final String KEY_NULL_MODE = "nullMode"; - - public static final String KEY_WAL_FLAG = "walFlag"; - - public static final String KEY_VERSION_COLUMN = "versionColumn"; - - public static final String KEY_WRITE_BUFFER_SIZE = "writeBufferSize"; - - public static final String KEY_VERSION_COLUMN_INDEX = "index"; - - public static final String KEY_VERSION_COLUMN_VALUE = "value"; -} diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/type/BINARYSTRING.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/type/BINARYSTRING.java deleted file mode 100644 index 296bed7455..0000000000 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/type/BINARYSTRING.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.chunjun.connector.hbase.converter.type; - -import org.apache.flink.table.data.StringData; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; -import org.apache.flink.table.types.logical.LogicalTypeVisitor; - -import java.io.Reader; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -public class BINARYSTRING extends LogicalType { - - private static final Set INPUT_CONVERSION = - conversionSet(String.class.getName(), StringData.class.getName()); - - private static final Class DEFAULT_CONVERSION = String.class; - - private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); - - public BINARYSTRING(boolean isNullable, LogicalTypeRoot typeRoot) { - super(isNullable, typeRoot); - } - - @Override - public String asSerializableString() { - return "HBASE-Biary-String"; - } - - @Override - public boolean supportsInputConversion(Class clazz) { - return INPUT_CONVERSION.contains(clazz.getName()); - } - - @Override - public boolean supportsOutputConversion(Class clazz) { - return OUTPUT_CONVERSION.contains(clazz.getName()); - } - - @Override - public Class getDefaultConversion() { - return DEFAULT_CONVERSION; - } - - @Override - public List getChildren() { - return Collections.emptyList(); - } - - @Override - public R accept(LogicalTypeVisitor visitor) { - return visitor.visit(this); - } - - @Override - public LogicalType copy(boolean isNullable) { - return new BINARYSTRING(isNullable, getTypeRoot()); - } -} diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseOptions.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseOptions.java similarity index 98% rename from chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseOptions.java rename to chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseOptions.java index 35af0fa948..ba0f554cf0 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/table/HBaseOptions.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/table/HBaseOptions.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.dtstack.chunjun.connector.hbase14.table; +package com.dtstack.chunjun.connector.hbase.table; import com.dtstack.chunjun.table.options.BaseFileOptions; diff --git a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseConfigUtils.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseConfigUtils.java index 438a3b2c7e..148132987a 100644 --- a/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseConfigUtils.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseConfigUtils.java @@ -45,8 +45,9 @@ public class HBaseConfigUtils { private static final Logger LOG = LoggerFactory.getLogger(HBaseConfigUtils.class); - private static final String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication"; - private static final String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; + protected static final String KEY_HBASE_SECURITY_AUTHENTICATION = + "hbase.security.authentication"; + protected static final String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; private static final String KEY_HBASE_MASTER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal"; public static final String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = @@ -56,7 +57,7 @@ public class HBaseConfigUtils { public static final String KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL = "hbase.client.kerberos.principal"; - private static final String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; + protected static final String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; public static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; public static final String KEY_KEY_TAB = "hbase.keytab"; diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/HBaseHelper.java b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseHelper.java similarity index 92% rename from chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/HBaseHelper.java rename to chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseHelper.java index eb099c6dbc..22b4a37253 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/util/HBaseHelper.java +++ b/chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/util/HBaseHelper.java @@ -16,14 +16,15 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.hbase14.util; +package com.dtstack.chunjun.connector.hbase.util; -import com.dtstack.chunjun.connector.hbase.util.HBaseConfigUtils; +import com.dtstack.chunjun.connector.hbase.conf.HBaseConf; import com.dtstack.chunjun.security.KerberosUtil; import com.dtstack.chunjun.util.FileSystemUtil; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import com.google.common.collect.ImmutableMap; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; @@ -48,6 +49,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static com.dtstack.chunjun.connector.hbase.util.HBaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION; +import static com.dtstack.chunjun.connector.hbase.util.HBaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION; +import static com.dtstack.chunjun.connector.hbase.util.HBaseConfigUtils.KEY_HBASE_SECURITY_AUTH_ENABLE; import static com.dtstack.chunjun.connector.hbase.util.HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF; import static com.dtstack.chunjun.security.KerberosUtil.KRB_STR; @@ -61,9 +65,10 @@ public class HBaseHelper { private static final Logger LOG = LoggerFactory.getLogger(HBaseHelper.class); - private static final String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication"; - private static final String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; - private static final String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; + public static Connection getHbaseConnection(HBaseConf hBaseConf) { + Map hbaseConfig = ImmutableMap.copyOf(hBaseConf.getHbaseConfig()); + return getHbaseConnection(hbaseConfig); + } public static Connection getHbaseConnection(Map hbaseConfigMap) { Validate.isTrue(MapUtils.isNotEmpty(hbaseConfigMap), "hbaseConfig不能为空Map结构!"); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/Main.java b/chunjun-core/src/main/java/com/dtstack/chunjun/Main.java index e0bc305909..32e6bc1178 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/Main.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/Main.java @@ -223,7 +223,7 @@ private static void exeSyncJob( if (speed.getWriterChannel() > 0) { dataStreamSink.setParallelism(speed.getWriterChannel()); } - env.disableOperatorChaining(); + // env.disableOperatorChaining(); JobExecutionResult result = env.execute(options.getJobName()); if (env instanceof MyLocalStreamEnvironment) { PrintUtil.printResult(result.getAllAccumulatorResults()); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/conf/OperatorConf.java b/chunjun-core/src/main/java/com/dtstack/chunjun/conf/OperatorConf.java index 98a91dd152..6760b3e213 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/conf/OperatorConf.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/conf/OperatorConf.java @@ -266,4 +266,8 @@ public String toString() { + table + '}'; } + + public void setFieldNameList(List fieldNameList) { + this.fieldNameList = fieldNameList; + } } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/util/TableUtil.java b/chunjun-core/src/main/java/com/dtstack/chunjun/util/TableUtil.java index b30e5dd9db..2c856be13b 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/util/TableUtil.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/util/TableUtil.java @@ -127,10 +127,17 @@ public static boolean useGenericTypeInfo(RowType rowType) { public static RowType getRowType( DataType[] dataTypes, String[] fieldNames, String[] formatField) { List rowFieldList = new ArrayList<>(dataTypes.length); - for (int i = 0; i < dataTypes.length; i++) { - rowFieldList.add( - new RowType.RowField( - fieldNames[i], dataTypes[i].getLogicalType(), formatField[i])); + if (formatField == null || formatField.length == 0) { + for (int i = 0; i < dataTypes.length; i++) { + rowFieldList.add( + new RowType.RowField(fieldNames[i], dataTypes[i].getLogicalType())); + } + } else { + for (int i = 0; i < dataTypes.length; i++) { + rowFieldList.add( + new RowType.RowField( + fieldNames[i], dataTypes[i].getLogicalType(), formatField[i])); + } } return new RowType(rowFieldList); diff --git a/chunjun-examples/json/hbase/hbase_transformer_hbase.json b/chunjun-examples/json/hbase/hbase_transformer_hbase.json new file mode 100644 index 0000000000..30c6ca1f3f --- /dev/null +++ b/chunjun-examples/json/hbase/hbase_transformer_hbase.json @@ -0,0 +1,146 @@ +{ + "job": { + "content": [ + { + "reader": { + "table" : { + "tableName": "test_source" + }, + "name": "hbasereader", + "parameter": { + "hbaseConfig": { + "hbase.zookeeper.property.clientPort": "2181", + "hbase.rootdir": "hdfs://ns1/user/hbase/", + "hbase.cluster.distributed": "true", + "hbase.zookeeper.quorum": "172.16.85.111,172.16.85.210,172.16.85.218" + }, + "encoding": "utf-8", + "column": [ + { + "name": "info:_int_", + "type": "int" + }, + { + "name": "info:_string_", + "type": "string" + }, + { + "name": "detail:_double_", + "type": "double" + }, + { + "name": "detail:_long_", + "type": "BIGINT" + }, + { + "name": "detail:_boolean_", + "type": "boolean" + }, + { + "name": "detail:_float_", + "type": "float" + }, + { + "name": "detail:_timestamp_", + "type": "timestamp" + }, + { + "name": "detail:_bytes_", + "type": "BINARY" + }, + { + "name": "detail:_time_", + "type": "time" + } + ], + "startRowkey": "", + "endRowkey": "", + "isBinaryRowkey": true + } + }, + "transformer": { + "transformSql": "select info._int_+2 ,info._string_,detail._double_,detail._long_,detail._boolean_,detail._float_,detail._timestamp_,detail._bytes_,detail._time_ from test_source" + }, + "writer": { + "name": "hbasewriter", + "table" : { + "tableName": "test_sink" + }, + "parameter": { + "hbaseConfig": { + "hbase.zookeeper.property.clientPort": "2181", + "hbase.rootdir": "hdfs://ns1/user/hbase/", + "hbase.cluster.distributed": "true", + "hbase.zookeeper.quorum": "172.16.85.111,172.16.85.210,172.16.85.218" + }, + "nullMode" : "null", + "walFlag": false, + "writeBufferSize" : 1, + "rowkeyExpress" : "$(info:_int_)", + "versionColumnValue" : "1", + "column": [ + { + "name": "info:_int_", + "type": "int" + }, + { + "name": "info:_string_", + "type": "string" + }, + { + "name": "detail:_double_", + "type": "double" + }, + { + "name": "detail:_long_", + "type": "BIGINT" + }, + { + "name": "detail:_boolean_", + "type": "boolean" + }, + { + "name": "detail:_float_", + "type": "float" + }, + { + "name": "detail:_timestamp_", + "type": "timestamp" + }, + { + "name": "detail:_bytes_", + "type": "BINARY" + }, + { + "name": "detail:_time_", + "type": "time" + } + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1, + "bytes": 0 + }, + "errorLimit": { + "record": 100 + }, + "restore": { + "maxRowNumForCheckpoint": 0, + "isRestore": false, + "isStream": false, + "restoreColumnName": "", + "restoreColumnIndex": 0 + }, + "log": { + "isLogger": false, + "level": "debug", + "path": "", + "pattern": "" + } + } + } +}