Skip to content

Commit

Permalink
[Fix] Fix build failed cause by JdbcHiveIT and SparkSinkTest (#5798)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Nov 7, 2023
1 parent a64e177 commit 05c6015
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ protected void insertTestData() {
}

@Override
void compareResult() {}
void compareResult(String executeKey) {}

@Override
String driverUrl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,7 @@

import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import java.io.IOException;

Expand All @@ -37,105 +30,6 @@ public String getPluginName() {
return "SeaTunnelSinkWithBuffer";
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return new SeaTunnelRowType(
new String[] {
"int",
"string",
"boolean",
"float",
"double",
"byte",
"short",
"long",
"decimal",
"date",
"timestamp",
"null",
"array_string",
"array_boolean",
"array_byte",
"array_short",
"array_int",
"array_long",
"array_float",
"array_double",
"map",
"row"
},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.LONG_TYPE,
new DecimalType(10, 2),
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
BasicType.VOID_TYPE,
ArrayType.STRING_ARRAY_TYPE,
ArrayType.BOOLEAN_ARRAY_TYPE,
ArrayType.BYTE_ARRAY_TYPE,
ArrayType.SHORT_ARRAY_TYPE,
ArrayType.INT_ARRAY_TYPE,
ArrayType.LONG_ARRAY_TYPE,
ArrayType.FLOAT_ARRAY_TYPE,
ArrayType.DOUBLE_ARRAY_TYPE,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
new SeaTunnelRowType(
new String[] {
"int",
"string",
"boolean",
"float",
"double",
"byte",
"short",
"long",
"decimal",
"date",
"timestamp",
"null",
"array_string",
"array_boolean",
"array_byte",
"array_short",
"array_int",
"array_long",
"array_float",
"array_double",
"map"
},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.LONG_TYPE,
new DecimalType(10, 2),
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
BasicType.VOID_TYPE,
ArrayType.STRING_ARRAY_TYPE,
ArrayType.BOOLEAN_ARRAY_TYPE,
ArrayType.BYTE_ARRAY_TYPE,
ArrayType.SHORT_ARRAY_TYPE,
ArrayType.INT_ARRAY_TYPE,
ArrayType.LONG_ARRAY_TYPE,
ArrayType.FLOAT_ARRAY_TYPE,
ArrayType.DOUBLE_ARRAY_TYPE,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)
})
});
}

@Override
public SinkWriter<SeaTunnelRow, Void, Void> createWriter(SinkWriter.Context context)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.seatunnel.translation.spark.sink;

import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
Expand Down Expand Up @@ -285,11 +291,121 @@ public void testSparkSinkWriteDataWithCopy() {
row3
});

SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {
"int",
"string",
"boolean",
"float",
"double",
"byte",
"short",
"long",
"decimal",
"date",
"timestamp",
"null",
"array_string",
"array_boolean",
"array_byte",
"array_short",
"array_int",
"array_long",
"array_float",
"array_double",
"map",
"row"
},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.LONG_TYPE,
new org.apache.seatunnel.api.table.type.DecimalType(10, 2),
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
BasicType.VOID_TYPE,
org.apache.seatunnel.api.table.type.ArrayType.STRING_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType.BOOLEAN_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType.BYTE_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType.SHORT_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType.INT_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType.LONG_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType.FLOAT_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType.DOUBLE_ARRAY_TYPE,
new org.apache.seatunnel.api.table.type.MapType<>(
BasicType.STRING_TYPE, BasicType.STRING_TYPE),
new SeaTunnelRowType(
new String[] {
"int",
"string",
"boolean",
"float",
"double",
"byte",
"short",
"long",
"decimal",
"date",
"timestamp",
"null",
"array_string",
"array_boolean",
"array_byte",
"array_short",
"array_int",
"array_long",
"array_float",
"array_double",
"map"
},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.LONG_TYPE,
new org.apache.seatunnel.api.table.type.DecimalType(10, 2),
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
BasicType.VOID_TYPE,
org.apache.seatunnel.api.table.type.ArrayType
.STRING_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType
.BOOLEAN_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType
.BYTE_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType
.SHORT_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType
.INT_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType
.LONG_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType
.FLOAT_ARRAY_TYPE,
org.apache.seatunnel.api.table.type.ArrayType
.DOUBLE_ARRAY_TYPE,
new org.apache.seatunnel.api.table.type.MapType<>(
BasicType.STRING_TYPE, BasicType.STRING_TYPE)
})
});

Dataset<Row> dataset =
spark.createDataFrame(
Arrays.asList(row1WithRow, row2WithRow, row3WithRow),
structType.add("row", structType));
SparkSinkInjector.inject(dataset.write(), new SeaTunnelSinkWithBuffer())
SparkSinkInjector.inject(
dataset.write(),
new SeaTunnelSinkWithBuffer(),
CatalogTableUtil.getCatalogTable("test", "test", "test", "test", rowType))
.option("checkpointLocation", "/tmp")
.mode(SaveMode.Append)
.save();
Expand Down

0 comments on commit 05c6015

Please sign in to comment.