Skip to content

Commit 87efa59

Browse files
committed
Fix more CI issues
1 parent 5484dcd commit 87efa59

File tree

8 files changed

+91
-73
lines changed

8 files changed

+91
-73
lines changed

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
7272
public static final String EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA =
7373
"{\"name\": \"new_field_without_default\", \"type\": \"boolean\"},";
7474
public static final String EXTRA_FIELD_NULLABLE_SCHEMA =
75-
",{\"name\": \"new_field_without_default\", \"type\": [\"boolean\", \"null\"]}";
75+
"{\"name\": \"new_field_without_default\", \"type\": [\"null\", \"boolean\"], \"default\": null},";
7676

7777
// TRIP_EXAMPLE_SCHEMA with a new_field added
7878
public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
@@ -152,7 +152,7 @@ public void testSchemaCompatibilityBasic() {
152152
"Added field without default and not nullable is not compatible (Evolved Schema)");
153153

154154
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
155-
+ FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA, false),
155+
+ FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_NULLABLE_SCHEMA + TRIP_SCHEMA_SUFFIX, false),
156156
"Added nullable field is compatible (Evolved Schema)");
157157
}
158158

hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class JsonUtils {
4444
MAPPER.setVisibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE);
4545
MAPPER.setVisibility(PropertyAccessor.SETTER, JsonAutoDetect.Visibility.NONE);
4646
MAPPER.setVisibility(PropertyAccessor.CREATOR, JsonAutoDetect.Visibility.NONE);
47+
// NOTE: Registering [[JavaTimeModule]] is required for Jackson >= 2.11 (Spark >= 3.3)
48+
MAPPER.registerModule(new JavaTimeModule());
4749
}
4850

4951
public static ObjectMapper getObjectMapper() {

hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema schema
107107

108108
@Override
109109
public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
110-
return getIndexedRecordIteratorInternal(readerSchema, Option.of(requestedSchema));
110+
return getIndexedRecordIteratorInternal(requestedSchema, Option.empty());
111111
}
112112

113113
@Override

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ import org.apache.hudi.common.model.HoodieTableType
2323
import org.apache.hudi.config.HoodieWriteConfig
2424
import org.apache.hudi.exception.SchemaCompatibilityException
2525
import org.apache.hudi.testutils.HoodieClientTestBase
26-
2726
import org.apache.spark.SparkException
2827
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
2928
import org.apache.spark.sql.types._
3029
import org.junit.jupiter.api.{AfterEach, BeforeEach}
30+
import org.junit.jupiter.api.Assertions.assertDoesNotThrow
31+
import org.junit.jupiter.api.function.Executable
3132
import org.junit.jupiter.params.ParameterizedTest
3233
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
3334

@@ -818,8 +819,8 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss
818819
}
819820

820821
@ParameterizedTest
821-
@ValueSource(booleans = Array(true, false))
822-
def testNestedTypeVectorizedReadWithTypeChange(isCow: Boolean): Unit = {
822+
@ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
823+
def testNestedTypeVectorizedReadWithTypeChange(tableType: String): Unit = {
823824
// test to change the value type of a MAP in a column of ARRAY< MAP<k,v> > type
824825
val tempRecordPath = basePath + "/record_tbl/"
825826
val arrayMapData = Seq(
@@ -836,7 +837,7 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss
836837
df1.show(false)
837838

838839
// recreate table
839-
initialiseTable(df1, tempRecordPath, isCow)
840+
initialiseTable(df1, tempRecordPath, tableType.equals("COPY_ON_WRITE"))
840841

841842
// read out the table, will not throw any exception
842843
readTable(tempRecordPath)
@@ -855,15 +856,19 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss
855856
df2.printSchema()
856857
df2.show(false)
857858
// upsert
858-
upsertData(df2, tempRecordPath, isCow)
859+
upsertData(df2, tempRecordPath, tableType.equals("COPY_ON_WRITE"))
859860

860861
// after implicit type change, read the table with vectorized read enabled
861862
if (HoodieSparkUtils.gteqSpark3_3) {
862-
assertThrows(classOf[SparkException]){
863+
assertThrows(classOf[SparkException]) {
863864
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") {
864865
readTable(tempRecordPath)
865866
}
866867
}
868+
} else {
869+
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") {
870+
readTable(tempRecordPath)
871+
}
867872
}
868873

869874
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala

Lines changed: 63 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,70 +1132,73 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
11321132
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
11331133
}
11341134

1135+
@ParameterizedTest
11351136
@CsvSource(Array("avro, 6", "parquet, 6"))
11361137
def testLogicalTypesReadRepair(logBlockFormat: String, tableVersion: Int): Unit = {
1137-
val logBlockString = if (logBlockFormat == "avro") {
1138-
""
1139-
} else {
1140-
"_parquet_log"
1141-
}
1142-
val prevTimezone = spark.conf.get("spark.sql.session.timeZone")
1143-
val propertyValue: String = System.getProperty("spark.testing")
1144-
try {
1145-
if (HoodieSparkUtils.isSpark3_3) {
1146-
System.setProperty("spark.testing", "true")
1138+
if (HoodieSparkUtils.gteqSpark3_4) {
1139+
val logBlockString = if (logBlockFormat == "avro") {
1140+
""
1141+
} else {
1142+
"_parquet_log"
11471143
}
1148-
spark.conf.set("spark.sql.session.timeZone", "UTC")
1149-
val tableName = "trips_logical_types_json_mor_read_v" + tableVersion + logBlockString
1150-
val dataPath = "file://" + basePath + "/" + tableName
1151-
val zipOutput = Paths.get(new URI(dataPath))
1152-
HoodieTestUtils.extractZipToDirectory("/" + tableName + ".zip", zipOutput, getClass)
1153-
val tableBasePath = zipOutput.toString
1154-
1155-
val df = spark.read.format("hudi").load(tableBasePath)
1156-
1157-
val rows = df.collect()
1158-
assertEquals(20, rows.length)
1159-
for (row <- rows) {
1160-
val hash = row.get(6).asInstanceOf[String].hashCode()
1161-
if ((hash & 1)== 0) {
1162-
assertEquals("2020-01-01T00:00:00.001Z", row.get(15).asInstanceOf[Timestamp].toInstant.toString)
1163-
assertEquals("2020-06-01T12:00:00.000001Z", row.get(16).asInstanceOf[Timestamp].toInstant.toString)
1164-
assertEquals("2015-05-20T12:34:56.001", row.get(17).toString)
1165-
assertEquals("2017-07-07T07:07:07.000001", row.get(18).toString)
1166-
} else {
1167-
assertEquals("2019-12-31T23:59:59.999Z", row.get(15).asInstanceOf[Timestamp].toInstant.toString)
1168-
assertEquals("2020-06-01T11:59:59.999999Z", row.get(16).asInstanceOf[Timestamp].toInstant.toString)
1169-
assertEquals("2015-05-20T12:34:55.999", row.get(17).toString)
1170-
assertEquals("2017-07-07T07:07:06.999999", row.get(18).toString)
1144+
val prevTimezone = spark.conf.get("spark.sql.session.timeZone")
1145+
val propertyValue: String = System.getProperty("spark.testing")
1146+
try {
1147+
if (HoodieSparkUtils.isSpark3_3) {
1148+
System.setProperty("spark.testing", "true")
11711149
}
1172-
}
1173-
assertEquals(10, df.filter("ts_millis > timestamp('2020-01-01 00:00:00Z')").count())
1174-
assertEquals(10, df.filter("ts_millis < timestamp('2020-01-01 00:00:00Z')").count())
1175-
assertEquals(0, df.filter("ts_millis > timestamp('2020-01-01 00:00:00.001Z')").count())
1176-
assertEquals(0, df.filter("ts_millis < timestamp('2019-12-31 23:59:59.999Z')").count())
1177-
1178-
assertEquals(10, df.filter("ts_micros > timestamp('2020-06-01 12:00:00Z')").count())
1179-
assertEquals(10, df.filter("ts_micros < timestamp('2020-06-01 12:00:00Z')").count())
1180-
assertEquals(0, df.filter("ts_micros > timestamp('2020-06-01 12:00:00.000001Z')").count())
1181-
assertEquals(0, df.filter("ts_micros < timestamp('2020-06-01 11:59:59.999999Z')").count())
1182-
1183-
assertEquals(10, df.filter("local_ts_millis > CAST('2015-05-20 12:34:56' AS TIMESTAMP_NTZ)").count())
1184-
assertEquals(10, df.filter("local_ts_millis < CAST('2015-05-20 12:34:56' AS TIMESTAMP_NTZ)").count())
1185-
assertEquals(0, df.filter("local_ts_millis > CAST('2015-05-20 12:34:56.001' AS TIMESTAMP_NTZ)").count())
1186-
assertEquals(0, df.filter("local_ts_millis < CAST('2015-05-20 12:34:55.999' AS TIMESTAMP_NTZ)").count())
1187-
1188-
assertEquals(10, df.filter("local_ts_micros > CAST('2017-07-07 07:07:07' AS TIMESTAMP_NTZ)").count())
1189-
assertEquals(10, df.filter("local_ts_micros < CAST('2017-07-07 07:07:07' AS TIMESTAMP_NTZ)").count())
1190-
assertEquals(0, df.filter("local_ts_micros > CAST('2017-07-07 07:07:07.000001' AS TIMESTAMP_NTZ)").count())
1191-
assertEquals(0, df.filter("local_ts_micros < CAST('2017-07-07 07:07:06.999999' AS TIMESTAMP_NTZ)").count())
1192-
} finally {
1193-
spark.conf.set("spark.sql.session.timeZone", prevTimezone)
1194-
if (HoodieSparkUtils.isSpark3_3) {
1195-
if (propertyValue == null) {
1196-
System.clearProperty("spark.testing")
1197-
} else {
1198-
System.setProperty("spark.testing", propertyValue)
1150+
spark.conf.set("spark.sql.session.timeZone", "UTC")
1151+
val tableName = "trips_logical_types_json_mor_read_v" + tableVersion + logBlockString
1152+
val dataPath = "file://" + basePath + "/" + tableName
1153+
val zipOutput = Paths.get(new URI(dataPath))
1154+
HoodieTestUtils.extractZipToDirectory("/" + tableName + ".zip", zipOutput, getClass)
1155+
val tableBasePath = zipOutput.toString
1156+
1157+
val df = spark.read.format("hudi").load(tableBasePath)
1158+
1159+
val rows = df.collect()
1160+
assertEquals(20, rows.length)
1161+
for (row <- rows) {
1162+
val hash = row.get(6).asInstanceOf[String].hashCode()
1163+
if ((hash & 1) == 0) {
1164+
assertEquals("2020-01-01T00:00:00.001Z", row.get(15).asInstanceOf[Timestamp].toInstant.toString)
1165+
assertEquals("2020-06-01T12:00:00.000001Z", row.get(16).asInstanceOf[Timestamp].toInstant.toString)
1166+
assertEquals("2015-05-20T12:34:56.001", row.get(17).toString)
1167+
assertEquals("2017-07-07T07:07:07.000001", row.get(18).toString)
1168+
} else {
1169+
assertEquals("2019-12-31T23:59:59.999Z", row.get(15).asInstanceOf[Timestamp].toInstant.toString)
1170+
assertEquals("2020-06-01T11:59:59.999999Z", row.get(16).asInstanceOf[Timestamp].toInstant.toString)
1171+
assertEquals("2015-05-20T12:34:55.999", row.get(17).toString)
1172+
assertEquals("2017-07-07T07:07:06.999999", row.get(18).toString)
1173+
}
1174+
}
1175+
assertEquals(10, df.filter("ts_millis > timestamp('2020-01-01 00:00:00Z')").count())
1176+
assertEquals(10, df.filter("ts_millis < timestamp('2020-01-01 00:00:00Z')").count())
1177+
assertEquals(0, df.filter("ts_millis > timestamp('2020-01-01 00:00:00.001Z')").count())
1178+
assertEquals(0, df.filter("ts_millis < timestamp('2019-12-31 23:59:59.999Z')").count())
1179+
1180+
assertEquals(10, df.filter("ts_micros > timestamp('2020-06-01 12:00:00Z')").count())
1181+
assertEquals(10, df.filter("ts_micros < timestamp('2020-06-01 12:00:00Z')").count())
1182+
assertEquals(0, df.filter("ts_micros > timestamp('2020-06-01 12:00:00.000001Z')").count())
1183+
assertEquals(0, df.filter("ts_micros < timestamp('2020-06-01 11:59:59.999999Z')").count())
1184+
1185+
assertEquals(10, df.filter("local_ts_millis > CAST('2015-05-20 12:34:56' AS TIMESTAMP_NTZ)").count())
1186+
assertEquals(10, df.filter("local_ts_millis < CAST('2015-05-20 12:34:56' AS TIMESTAMP_NTZ)").count())
1187+
assertEquals(0, df.filter("local_ts_millis > CAST('2015-05-20 12:34:56.001' AS TIMESTAMP_NTZ)").count())
1188+
assertEquals(0, df.filter("local_ts_millis < CAST('2015-05-20 12:34:55.999' AS TIMESTAMP_NTZ)").count())
1189+
1190+
assertEquals(10, df.filter("local_ts_micros > CAST('2017-07-07 07:07:07' AS TIMESTAMP_NTZ)").count())
1191+
assertEquals(10, df.filter("local_ts_micros < CAST('2017-07-07 07:07:07' AS TIMESTAMP_NTZ)").count())
1192+
assertEquals(0, df.filter("local_ts_micros > CAST('2017-07-07 07:07:07.000001' AS TIMESTAMP_NTZ)").count())
1193+
assertEquals(0, df.filter("local_ts_micros < CAST('2017-07-07 07:07:06.999999' AS TIMESTAMP_NTZ)").count())
1194+
} finally {
1195+
spark.conf.set("spark.sql.session.timeZone", prevTimezone)
1196+
if (HoodieSparkUtils.isSpark3_3) {
1197+
if (propertyValue == null) {
1198+
System.clearProperty("spark.testing")
1199+
} else {
1200+
System.setProperty("spark.testing", propertyValue)
1201+
}
11991202
}
12001203
}
12011204
}

hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,11 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
7878
)
7979
}
8080
}
81-
private lazy val hasTimestampMillisFieldInTableSchema = true
81+
private lazy val hasTimestampMillisFieldInTableSchema = if (avroTableSchema == null) {
82+
true
83+
} else {
84+
AvroSchemaRepair.hasTimestampMillisField(avroTableSchema)
85+
}
8286
private lazy val supportBatchWithTableSchema = !hasTimestampMillisFieldInTableSchema
8387

8488
def supportsColumnar(sparkSession: SparkSession, schema: StructType): Boolean = {
@@ -95,7 +99,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
9599
*/
96100
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
97101
val conf = sparkSession.sessionState.conf
98-
ParquetUtils.isBatchReadSupportedForSchema(conf, schema)
102+
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && supportBatchWithTableSchema
99103
}
100104

101105
override def buildReaderWithPartitionValues(sparkSession: SparkSession,

hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
8080
)
8181
}
8282
}
83-
private lazy val hasTimestampMillisFieldInTableSchema = true
83+
private lazy val hasTimestampMillisFieldInTableSchema = if (avroTableSchema == null) {
84+
true
85+
} else {
86+
AvroSchemaRepair.hasTimestampMillisField(avroTableSchema)
87+
}
8488
private lazy val supportBatchWithTableSchema = HoodieSparkUtils.gteqSpark3_5 || !hasTimestampMillisFieldInTableSchema
8589

8690
def supportsColumnar(sparkSession: SparkSession, schema: StructType): Boolean = {

hudi-utilities/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@
354354
<dependency>
355355
<groupId>org.apache.kafka</groupId>
356356
<artifactId>kafka_${scala.binary.version}</artifactId>
357-
<version>${kafka.version}</version>
357+
<version>${kafka.spark3.version}</version>
358358
<scope>test</scope>
359359
</dependency>
360360

0 commit comments

Comments
 (0)