Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BP-3.0][pipeline-connector][starrocks] Fix char/varchar length inconsistency between cdc and starrocks #2830

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,14 @@ fieldPos, getPrecision(fieldType))
public static final String DATETIME = "DATETIME";
public static final String JSON = "JSON";

/** Max size of char type of StarRocks. */
public static final int MAX_CHAR_SIZE = 255;

/** Max size of varchar type of StarRocks. */
public static final int MAX_VARCHAR_SIZE = 1048576;

/** Transforms CDC {@link DataType} to StarRocks data type. */
private static class CdcDataTypeTransformer
public static class CdcDataTypeTransformer
extends DataTypeDefaultVisitor<StarRocksColumn.Builder> {

private final StarRocksColumn.Builder builder;
Expand Down Expand Up @@ -298,17 +301,37 @@ public StarRocksColumn.Builder visit(DecimalType decimalType) {

@Override
public StarRocksColumn.Builder visit(CharType charType) {
builder.setDataType(CHAR);
builder.setNullable(charType.isNullable());
builder.setColumnSize(charType.getLength());
// CDC and StarRocks use different units for the length. It's the number
// of characters in CDC, and the number of bytes in StarRocks. One chinese
// character will use 3 bytes because it uses UTF-8, so the length of StarRocks
// char type should be three times as that of CDC char type. Specifically, if
// the length of StarRocks exceeds the MAX_CHAR_SIZE, map CDC char type to StarRocks
// varchar type
int length = charType.getLength();
long starRocksLength = length * 3L;
if (starRocksLength <= MAX_CHAR_SIZE) {
builder.setDataType(CHAR);
builder.setNullable(charType.isNullable());
builder.setColumnSize((int) starRocksLength);
} else {
builder.setDataType(VARCHAR);
builder.setNullable(charType.isNullable());
builder.setColumnSize((int) Math.min(starRocksLength, MAX_VARCHAR_SIZE));
}
return builder;
}

@Override
public StarRocksColumn.Builder visit(VarCharType varCharType) {
// CDC and StarRocks use different units for the length. It's the number
// of characters in CDC, and the number of bytes in StarRocks. One chinese
// character will use 3 bytes because it uses UTF-8, so the length of StarRocks
// varchar type should be three times as that of CDC varchar type.
int length = varCharType.getLength();
long starRocksLength = length * 3L;
builder.setDataType(VARCHAR);
builder.setNullable(varCharType.isNullable());
builder.setColumnSize(Math.min(varCharType.getLength(), MAX_VARCHAR_SIZE));
builder.setColumnSize((int) Math.min(starRocksLength, MAX_VARCHAR_SIZE));
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververical.cdc.connectors.starrocks.sink;

import com.starrocks.connector.flink.catalog.StarRocksColumn;
import com.ververica.cdc.common.types.CharType;
import com.ververica.cdc.common.types.VarCharType;
import com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** Tests for {@link StarRocksUtils.CdcDataTypeTransformer}. */
public class CdcDataTypeTransformerTest {

@Test
public void testCharType() {
// map to char of StarRocks if CDC length <= StarRocksUtils.MAX_CHAR_SIZE
StarRocksColumn.Builder smallLengthBuilder =
new StarRocksColumn.Builder().setColumnName("small_char").setOrdinalPosition(0);
new CharType(1).accept(new StarRocksUtils.CdcDataTypeTransformer(smallLengthBuilder));
StarRocksColumn smallLengthColumn = smallLengthBuilder.build();
assertEquals("small_char", smallLengthColumn.getColumnName());
assertEquals(0, smallLengthColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.CHAR, smallLengthColumn.getDataType());
assertEquals(Integer.valueOf(3), smallLengthColumn.getColumnSize().orElse(null));
assertTrue(smallLengthColumn.isNullable());

// map to varchar of StarRocks if CDC length > StarRocksUtils.MAX_CHAR_SIZE
StarRocksColumn.Builder largeLengthBuilder =
new StarRocksColumn.Builder().setColumnName("large_char").setOrdinalPosition(1);
new CharType(StarRocksUtils.MAX_CHAR_SIZE)
.accept(new StarRocksUtils.CdcDataTypeTransformer(largeLengthBuilder));
StarRocksColumn largeLengthColumn = largeLengthBuilder.build();
assertEquals("large_char", largeLengthColumn.getColumnName());
assertEquals(1, largeLengthColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.VARCHAR, largeLengthColumn.getDataType());
assertEquals(
Integer.valueOf(StarRocksUtils.MAX_CHAR_SIZE * 3),
largeLengthColumn.getColumnSize().orElse(null));
assertTrue(largeLengthColumn.isNullable());
}

@Test
public void testVarCharType() {
// the length fo StarRocks should be 3 times as that of CDC if CDC length * 3 <=
// StarRocksUtils.MAX_VARCHAR_SIZE
StarRocksColumn.Builder smallLengthBuilder =
new StarRocksColumn.Builder().setColumnName("small_varchar").setOrdinalPosition(0);
new VarCharType(3).accept(new StarRocksUtils.CdcDataTypeTransformer(smallLengthBuilder));
StarRocksColumn smallLengthColumn = smallLengthBuilder.build();
assertEquals("small_varchar", smallLengthColumn.getColumnName());
assertEquals(0, smallLengthColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.VARCHAR, smallLengthColumn.getDataType());
assertEquals(Integer.valueOf(9), smallLengthColumn.getColumnSize().orElse(null));
assertTrue(smallLengthColumn.isNullable());

// the length fo StarRocks should be StarRocksUtils.MAX_VARCHAR_SIZE if CDC length * 3 >
// StarRocksUtils.MAX_VARCHAR_SIZE
StarRocksColumn.Builder largeLengthBuilder =
new StarRocksColumn.Builder().setColumnName("large_varchar").setOrdinalPosition(1);
new CharType(StarRocksUtils.MAX_VARCHAR_SIZE + 1)
.accept(new StarRocksUtils.CdcDataTypeTransformer(largeLengthBuilder));
StarRocksColumn largeLengthColumn = largeLengthBuilder.build();
assertEquals("large_varchar", largeLengthColumn.getColumnName());
assertEquals(1, largeLengthColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.VARCHAR, largeLengthColumn.getDataType());
assertEquals(
Integer.valueOf(StarRocksUtils.MAX_VARCHAR_SIZE),
largeLengthColumn.getColumnSize().orElse(null));
assertTrue(largeLengthColumn.isNullable());
}
}