Skip to content

Commit 83e1529

Browse files
authored
SAMZA-2321: Samza-sql - Introduce REAL Samza sql schema type (apache#1154)
* Fix discrepancy between sql REAL and FLOAT
1 parent 5af4b3b commit 83e1529

File tree

7 files changed

+36
-17
lines changed

7 files changed

+36
-17
lines changed

samza-api/src/main/java/org/apache/samza/sql/schema/SamzaSqlFieldType.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ public enum SamzaSqlFieldType {
2828
INT32, // four-byte signed integer.
2929
INT64, // eight-byte signed integer.
3030
DECIMAL, // Decimal integer
31-
FLOAT,
32-
DOUBLE,
31+
REAL, // 4 bytes
32+
FLOAT, // 8 bytes
33+
DOUBLE, // 8 bytes
3334
STRING, // String.
3435
DATETIME, // Date and time.
3536
BOOLEAN, // Boolean.

samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,6 @@ public static Object convertToJavaObject(Object avroObj, Schema schema) {
270270
return new ByteString(fixed.bytes());
271271
case BYTES:
272272
return new ByteString(((ByteBuffer) avroObj).array());
273-
case FLOAT:
274-
// Convert Float to Double similar to how JavaTypeFactoryImpl represents Float type
275-
return Double.parseDouble(Float.toString((Float) avroObj));
276273

277274
default:
278275
return avroObj;
@@ -296,8 +293,6 @@ private static boolean isSchemaCompatibleWithRelObj(Object relObj, Schema unionS
296293
return relObj instanceof ByteString;
297294
case BYTES:
298295
return relObj instanceof ByteString;
299-
case FLOAT:
300-
return relObj instanceof Float || relObj instanceof Double;
301296
default:
302297
return true;
303298
}
@@ -319,8 +314,6 @@ private static boolean isSchemaCompatibleWithAvroObj(Object avroObj, Schema unio
319314
return avroObj instanceof GenericData.Fixed;
320315
case BYTES:
321316
return avroObj instanceof ByteBuffer;
322-
case FLOAT:
323-
return avroObj instanceof Float;
324317
default:
325318
return true;
326319
}

samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ private SqlFieldSchema convertField(Schema fieldSchema, boolean isNullable, bool
8181
case DOUBLE:
8282
return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.DOUBLE, isNullable, isOptional);
8383
case FLOAT:
84-
return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.FLOAT, isNullable, isOptional);
84+
// Avro FLOAT is 4 bytes which maps to Sql REAL. Sql FLOAT is 8-bytes
85+
return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.REAL, isNullable, isOptional);
8586
case ENUM:
8687
return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, isNullable, isOptional);
8788
case UNION:

samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ private RelDataType getRelDataType(SqlFieldSchema fieldSchema) {
7979
return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN), fieldSchema.isNullable());
8080
case DOUBLE:
8181
return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE), fieldSchema.isNullable());
82+
case REAL:
83+
return createTypeWithNullability(createSqlType(SqlTypeName.REAL), fieldSchema.isNullable());
8284
case FLOAT:
8385
return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT), fieldSchema.isNullable());
8486
case STRING:

samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.apache.samza.sql.avro.schemas.Profile;
5757
import org.apache.samza.sql.avro.schemas.SimpleRecord;
5858
import org.apache.samza.sql.avro.schemas.StreetNumRecord;
59-
import org.apache.samza.sql.avro.schemas.SubRecord;
6059
import org.apache.samza.sql.data.SamzaSqlRelMessage;
6160
import org.apache.samza.sql.planner.RelSchemaConverter;
6261
import org.apache.samza.sql.schema.SqlSchema;
@@ -353,7 +352,7 @@ private void validateAvroSerializedData(byte[] serializedData, Object unionValue
353352
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("bool_value").get(), boolValue);
354353
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("double_value").get(), doubleValue);
355354
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("string_value").get(), new Utf8(testStrValue));
356-
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value").get(), doubleValue);
355+
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value").get(), floatValue);
357356
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("long_value").get(), longValue);
358357
if (unionValue instanceof String) {
359358
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("union_value").get(), new Utf8((String) unionValue));
@@ -386,10 +385,6 @@ private void validateAvroSerializedData(byte[] serializedData, Object unionValue
386385
Assert.assertTrue(record.get(field.name()).equals(complexRecordValue.get(field.name())));
387386
} else {
388387
Object expected = complexRecordValue.get(field.name());
389-
if (expected instanceof Float) {
390-
// AvroRelConverter converts float to double to be in sync with what Calcite does in JavaTypeFactoryImpl
391-
expected = Double.parseDouble(Float.toString((Float) expected));
392-
}
393388
Assert.assertEquals(expected, record.get(field.name()));
394389
}
395390
}

samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ private Object createComplexRecord(int index) {
325325
record.put("id", index);
326326
record.put("string_value", "Name" + index);
327327
record.put("bytes_value", ByteBuffer.wrap(("sample bytes").getBytes()));
328+
record.put("float_value", index + 0.123456f);
329+
record.put("double_value", index + 0.0123456789);
328330
MyFixed myFixedVar = new MyFixed();
329331
myFixedVar.bytes(DEFAULT_TRACKING_ID_BYTES);
330332
record.put("fixed_value", myFixedVar);

samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public void testEndToEndWithProjection() throws Exception {
304304
.sorted()
305305
.collect(Collectors.toList());
306306
Assert.assertEquals(numMessages, outMessages.size());
307-
Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
307+
Assert.assertEquals(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()), outMessages);
308308
}
309309

310310
@Test
@@ -472,6 +472,31 @@ public void testEndToEndComplexRecord() throws SamzaSqlValidatorException {
472472
Assert.assertEquals(numMessages, outMessages.size());
473473
}
474474

475+
@Test
476+
public void testEndToEndWithFloatToStringConversion() throws Exception {
477+
int numMessages = 20;
478+
479+
TestAvroSystemFactory.messages.clear();
480+
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
481+
String sql1 = "Insert into testavro.outputTopic"
482+
+ " select 'urn:li:member:' || cast(cast(float_value as int) as varchar) as string_value, id, float_value, "
483+
+ " double_value, true as bool_value from testavro.COMPLEX1";
484+
List<String> sqlStmts = Arrays.asList(sql1);
485+
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
486+
487+
Config config = new MapConfig(staticConfigs);
488+
new SamzaSqlValidator(config).validate(sqlStmts);
489+
490+
runApplication(config);
491+
492+
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
493+
.map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("string_value").toString().split(":")[3]))
494+
.sorted()
495+
.collect(Collectors.toList());
496+
Assert.assertEquals(numMessages, outMessages.size());
497+
Assert.assertEquals(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()), outMessages);
498+
}
499+
475500
@Ignore
476501
@Test
477502
public void testEndToEndNestedRecord() throws SamzaSqlValidatorException {

0 commit comments

Comments
 (0)