Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BitSail] Adapt hudi into bitsail type system #26

Merged
merged 1 commit into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public interface FakeReaderOptions extends ReaderOptions.BaseReaderOptions {
ConfigOption<String> UNIQUE_FIELDS =
key(READER_PREFIX + "unique_fields")
.noDefaultValue(String.class);

ConfigOption<Boolean> USE_BITSAIL_TYPE =
key(READER_PREFIX + "use_bitsail_type")
.defaultValue(true);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.bytedance.bitsail.common.BitSailException;
import com.bytedance.bitsail.common.column.BytesColumn;
import com.bytedance.bitsail.common.column.Column;
import com.bytedance.bitsail.common.column.DateColumn;
import com.bytedance.bitsail.common.column.DoubleColumn;
import com.bytedance.bitsail.common.column.LongColumn;
Expand Down Expand Up @@ -73,6 +72,8 @@ public class FakeSource extends InputFormatPlugin<Row, InputSplit> implements Re
private transient RateLimiter rateLimiter;
private Map<String, Set<String>> uniqueFields;

private boolean useBitSailType;

@Override
public Row buildRow(Row reuse, String mandatoryEncoding) throws BitSailException {
rateLimiter.acquire();
Expand All @@ -83,29 +84,49 @@ public Row buildRow(Row reuse, String mandatoryEncoding) throws BitSailException
private Row createRow(Row reuse) {
for (int index = 0; index < columnInfos.size(); index++) {
String fieldName = columnInfos.get(index).getName();
reuse.setField(index, createColumn(rowTypeInfo.getTypeAt(index), uniqueFields.get(fieldName)));
reuse.setField(index, createColumn(rowTypeInfo.getTypeAt(index), uniqueFields.get(fieldName), useBitSailType));
}
return reuse;
}

@SuppressWarnings("checkstyle:MagicNumber")
private Column createColumn(TypeInformation<?> typeInformation, Set<String> existValues) {
private Object createColumn(TypeInformation<?> typeInformation, Set<String> existValues, Boolean useBitSailType) {
boolean isNull = randomNullInt > random.nextInt(10);
if (PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO.equals(typeInformation)) {
long randomNumber = constructRandomValue(existValues, () -> faker.number().randomNumber());
return isNull ? new LongColumn() : new LongColumn(randomNumber);
if (useBitSailType) {
return isNull ? new LongColumn() : new LongColumn(randomNumber);
} else {
return isNull ? null : randomNumber;
}
} else if (PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO.equals(typeInformation)) {
String randomString = constructRandomValue(existValues, () -> faker.letterify("string_test_????"));
return isNull ? new StringColumn() : new StringColumn(randomString);
if (useBitSailType) {
return isNull ? new StringColumn() : new StringColumn(randomString);
} else {
return isNull ? null : randomString;
}
} else if (PrimitiveColumnTypeInfo.DOUBLE_COLUMN_TYPE_INFO.equals(typeInformation)) {
double randomDouble = constructRandomValue(existValues, () -> faker.number().randomDouble(5, -1_000_000_000, 1_000_000_000));
return isNull ? new DoubleColumn() : new DoubleColumn(randomDouble);
if (useBitSailType) {
return isNull ? new DoubleColumn() : new DoubleColumn(randomDouble);
} else {
return isNull ? null : randomDouble;
}
} else if (PrimitiveColumnTypeInfo.BYTES_COLUMN_TYPE_INFO.equals(typeInformation)) {
String randomString = constructRandomValue(existValues, () -> faker.numerify("test_#####"));
return isNull ? new BytesColumn() : new BytesColumn(randomString.getBytes());
if (useBitSailType) {
return isNull ? new BytesColumn() : new BytesColumn(randomString.getBytes());
} else {
return isNull ? null : randomString.getBytes();
}
} else if (PrimitiveColumnTypeInfo.DATE_COLUMN_TYPE_INFO.equals(typeInformation)) {
Date randomDate = constructRandomValue(existValues, () -> faker.date().birthday(10, 30));
return isNull ? new DateColumn() : new DateColumn(randomDate);
if (useBitSailType) {
return isNull ? new DateColumn() : new DateColumn(randomDate);
} else {
return isNull ? null : randomDate;
}
}
throw new RuntimeException("Unsupported type " + typeInformation);
}
Expand All @@ -132,6 +153,7 @@ public void initPlugin() throws Exception {
this.columnInfos = inputSliceConfig.get(ReaderOptions.BaseReaderOptions.COLUMNS);
this.rowTypeInfo = ColumnFlinkTypeInfoUtil.getRowTypeInformation("bitsail", columnInfos);
this.uniqueFields = initUniqueFieldsMapping(inputSliceConfig.get(FakeReaderOptions.UNIQUE_FIELDS));
this.useBitSailType = inputSliceConfig.get(FakeReaderOptions.USE_BITSAIL_TYPE);

if (!uniqueFields.isEmpty() && totalCount > 1000) {
LOG.warn("Unique fields is set and total count is larger than 1000, which may cause OOM problem.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,56 @@
<!-- <version>9.4.15.v20190215</version>-->
<!-- </dependency>-->

<!-- Hadoop -->
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-common</artifactId>-->
<!-- <version>3.1.1</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.curator</groupId>-->
<!-- <artifactId>curator-recipes</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.eclipse.jetty</groupId>-->
<!-- <artifactId>jetty-server</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.eclipse.jetty</groupId>-->
<!-- <artifactId>jetty-servlet</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.eclipse.jetty</groupId>-->
<!-- <artifactId>jetty-util</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.curator</groupId>-->
<!-- <artifactId>curator-client</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.avro</groupId>-->
<!-- <artifactId>avro</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.htrace</groupId>-->
<!-- <artifactId>htrace-core4</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.curator</groupId>-->
<!-- <artifactId>curator-framework</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.eclipse.jetty</groupId>-->
<!-- <artifactId>jetty-webapp</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>log4j</groupId>-->
<!-- <artifactId>log4j</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->

<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -627,6 +677,19 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-fake</artifactId>
<version>${revision}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-print</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import com.bytedance.bitsail.base.execution.ExecutionEnviron;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.option.WriterOptions;
import com.bytedance.bitsail.connector.legacy.hudi.common.SchemaOptions;
import com.bytedance.bitsail.connector.legacy.hudi.configuration.FlinkOptions;
import com.bytedance.bitsail.connector.legacy.hudi.format.RowDataDeserialization;
import com.bytedance.bitsail.connector.legacy.hudi.sink.transform.RowDataToHoodieFunction;
import com.bytedance.bitsail.connector.legacy.hudi.sink.transform.RowToHoodieFunction;
import com.bytedance.bitsail.connector.legacy.hudi.sink.utils.Pipelines;
import com.bytedance.bitsail.connector.legacy.hudi.util.AvroSchemaConverter;
import com.bytedance.bitsail.connector.legacy.hudi.util.SchemaUtils;
Expand All @@ -51,12 +49,6 @@ public class HudiSinkFunctionDAGBuilder<OUT extends Row> extends FlinkDataWriter
@Getter
private BitSailConfiguration jobConf;

/**
* Flink RowType to parse source to RowData.
*/
@Getter
private RowType sourceRowType;

/**
* Flink RowType to convert RowData to HoodieRecord.
* Caution:
Expand All @@ -78,9 +70,7 @@ public void configure(ExecutionEnviron execution, BitSailConfiguration writerCon
jobConf = execution.getGlobalConfiguration();
Map<String, String> properties = extractHudiProperties(jobConf);
conf = FlinkOptions.fromMap(properties);
sourceRowType = SchemaUtils.getRowTypeFromColumnInfoStr(jobConf.get(SchemaOptions.SOURCE_SCHEMA));
sinkRowType = SchemaUtils.getRowTypeFromColumnInfoStr(jobConf.get(SchemaOptions.SINK_SCHEMA));
//TODO: check namespace
sinkRowType = SchemaUtils.getRowTypeFromColumnInfo(jobConf.get(WriterOptions.BaseWriterOptions.COLUMNS));
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, AvroSchemaConverter.convertToSchema(sinkRowType).toString());
}

Expand All @@ -90,12 +80,8 @@ public void addWriter(DataStream<OUT> dataStream, int parallelism) throws Except
.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);

RowDataDeserialization rowDataDeserialization = new RowDataDeserialization<>(jobConf, sourceRowType);

DataStream<HoodieRecord> hoodieDataStream = dataStream
.flatMap(rowDataDeserialization)
.setParallelism(parallelism)
.map(new RowDataToHoodieFunction<>(sinkRowType, conf), TypeInformation.of(HoodieRecord.class))
.map(new RowToHoodieFunction<>(conf, jobConf), TypeInformation.of(HoodieRecord.class))
.setParallelism(parallelism);

Pipelines.hoodieStreamWrite(conf, parallelism, hoodieDataStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@
import java.util.Objects;

@SuppressWarnings("checkstyle:MagicNumber")
public class RowDataDeserialization<I extends Row, O extends RowData> extends RichFlatMapFunction<I, O>
public class RowDataDeserializationFunction<I extends Row, O extends RowData> extends RichFlatMapFunction<I, O>
implements ResultTypeQueryable {
public static final RowData FILTERED_ROW = new GenericRowData(0);
private static final Logger LOG = LoggerFactory.getLogger(RowDataDeserialization.class);
private static final Logger LOG = LoggerFactory.getLogger(RowDataDeserializationFunction.class);
protected BitSailConfiguration jobConf;
protected List<DeserializationSchema<O>> deserializationSchema;
private transient NoOpDirtyCollector dirtyCollector;
private transient MetricManager metrics;

public RowDataDeserialization(BitSailConfiguration jobConf, RowType outputSchema) {
public RowDataDeserializationFunction(BitSailConfiguration jobConf, RowType outputSchema) {
this.jobConf = jobConf;
boolean multiSourceEnabled = jobConf.get(CommonOptions.MULTI_SOURCE_ENABLED);
if (multiSourceEnabled) {
Expand All @@ -64,24 +64,6 @@ public RowDataDeserialization(BitSailConfiguration jobConf, RowType outputSchema
}
}

// private DeserializationSchema<O> getDeserializationSchema(RowType outputType) {
// String formatType = jobConf.get(HudiWriteOptions.FORMAT_TYPE);
// if (formatType == null || formatType.isEmpty()) {
// throw new RuntimeException("Job writer format type is missing.");
// }
//
// DeserializationSchema<O> deserializationSchema;
// switch (formatType) {
// case "json":
// deserializationSchema = (DeserializationSchema<O>) ParserFormatFactory.getJsonDeserializationSchema(jobConf, outputType);
// break;
// default:
// throw new RuntimeException("Unsupported format type: " + formatType);
// }
// return deserializationSchema;
// }

//trick: open() will be called twice for initializing processingTimeService
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Expand Down
Loading