Skip to content

Commit

Permalink
[Improve][Connector-v2] Check all fields in schema case-sensitive
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Sep 3, 2024
1 parent 62a7cd6 commit 9b67afc
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public enum PaimonConnectorErrorCode implements SeaTunnelErrorCode {
TABLE_PRE_COMMIT_FAILED("PAIMON-03", "Paimon pre commit failed"),
GET_TABLE_FAILED("PAIMON-04", "Get table from database failed"),
AUTHENTICATE_KERBEROS_FAILED("PAIMON-05", "Authenticate kerberos failed"),
LOAD_CATALOG("PAIMON-06", "Load catalog failed");
LOAD_CATALOG("PAIMON-06", "Load catalog failed"),
GET_FILED_FAILED("PAIMON-07", "Get field failed");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public static InternalRow reconvert(
binaryWriter.setNullAt(i);
continue;
}
checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields);
checkCanWriteWithSchema(i, seaTunnelRowType, sinkTotalFields);
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
Expand Down Expand Up @@ -486,7 +486,7 @@ public static InternalRow reconvert(
return binaryRow;
}

private static void checkCanWriteWithType(
private static void checkCanWriteWithSchema(
int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
String sourceFieldName = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType<?> sourceFieldType = seaTunnelRowType.getFieldType(i);
Expand All @@ -495,7 +495,8 @@ private static void checkCanWriteWithType(
RowTypeConverter.reconvert(sourceFieldName, seaTunnelRowType.getFieldType(i));
DataField exceptDataField = new DataField(i, sourceFieldName, exceptDataType);
DataType sinkDataType = sinkDataField.type();
if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())
|| !StringUtils.equals(sourceFieldName, sinkDataField.name())) {
throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
"Paimon",
sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;

import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -73,12 +74,11 @@ public static Column toSeaTunnelType(BasicTypeDefine<DataType> typeDefine) {
public static DataField getDataField(List<DataField> fields, String fieldName) {
Optional<DataField> firstField =
fields.parallelStream().filter(field -> field.name().equals(fieldName)).findFirst();
if (firstField.isPresent()) {
return firstField.get();
if (!firstField.isPresent()) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.GET_FILED_FAILED,
"Can not get the filed [" + fieldName + "] from source table");
}
throw new SeaTunnelException(
String.format(
"Con not get the DataField named: [%s] in sink schema. The schema of paimon is case-sensitive in default. Please check it.",
fieldName));
return firstField.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.SeaTunnelException;

import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.data.BinaryArray;
Expand All @@ -43,6 +42,7 @@
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -71,6 +71,23 @@ public class RowConverterTest {
private SeaTunnelRowType seaTunnelRowType;

private volatile boolean isCaseSensitive = false;
private volatile int index = 0;
private static final String[] filedNames = {
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_string",
"c_bytes",
"c_boolean",
"c_date",
"c_timestamp",
"c_map",
"c_array"
};

public static final List<String> KEY_NAME_LIST = Arrays.asList("c_tinyint");

Expand Down Expand Up @@ -122,7 +139,7 @@ public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {

@BeforeEach
public void before() {
initSeaTunnelRowTypeCaseSensitive(isCaseSensitive);
initSeaTunnelRowTypeCaseSensitive(isCaseSensitive, index);
byte tinyint = 1;
short smallint = 2;
int intNum = 3;
Expand Down Expand Up @@ -199,32 +216,14 @@ public void before() {
internalRow = binaryRow;
}

private void initSeaTunnelRowTypeCaseSensitive(boolean isUpperCase) {
String[] filedNames = {
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_string",
"c_bytes",
"c_boolean",
"c_date",
"c_timestamp",
"c_map",
"c_array"
};
String[] upperCaseFieldNames = new String[filedNames.length];
private void initSeaTunnelRowTypeCaseSensitive(boolean isUpperCase, int index) {
String[] oneUpperCaseFiledNames = Arrays.copyOf(filedNames, filedNames.length);
if (isUpperCase) {
for (int i = 0; i < filedNames.length; i++) {
upperCaseFieldNames[i] = filedNames[i].toUpperCase();
}
oneUpperCaseFiledNames[index] = oneUpperCaseFiledNames[index].toUpperCase();
}
seaTunnelRowType =
new SeaTunnelRowType(
isUpperCase ? upperCaseFieldNames : filedNames,
oneUpperCaseFiledNames,
new SeaTunnelDataType<?>[] {
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
Expand Down Expand Up @@ -264,16 +263,32 @@ public void seaTunnelToPaimon() {
Assertions.assertEquals(reconvert, internalRow);

isCaseSensitive = true;
before();
SeaTunnelException actualException1 =
Assertions.assertThrows(
SeaTunnelException.class,
() ->
RowConverter.reconvert(
seaTunnelRow, seaTunnelRowType, getTableSchema(30, 8)));
Assertions.assertEquals(
"Con not get the DataField named: [C_DECIMAL] in sink schema. The schema of paimon is case-sensitive in default. Please check it.",
actualException1.getMessage());

for (int i = 0; i < filedNames.length; i++) {
index = i;
before();
String sourceFiledname = seaTunnelRowType.getFieldName(i);
DataType exceptDataType =
RowTypeConverter.reconvert(sourceFiledname, seaTunnelRowType.getFieldType(i));
DataField exceptDataField = new DataField(i, sourceFiledname, exceptDataType);
TableSchema sinkTableSchema = getTableSchema(30, 8);
SeaTunnelRuntimeException actualException1 =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() ->
RowConverter.reconvert(
seaTunnelRow, seaTunnelRowType, sinkTableSchema));
Assertions.assertEquals(
CommonError.writeRowErrorWithSchemaIncompatibleSchema(
"Paimon",
sourceFiledname
+ StringUtils.SPACE
+ seaTunnelRowType.getFieldType(i).getSqlType(),
exceptDataField.asSQLString(),
sinkTableSchema.fields().get(i).asSQLString())
.getMessage(),
actualException1.getMessage());
}
}

@Test
Expand Down

0 comments on commit 9b67afc

Please sign in to comment.