Skip to content

Commit

Permalink
SAMZA-1968: Samza-sql - Change Calcite sql type for samza sql rel mes…
Browse files Browse the repository at this point in the history
…sage __key__ to accept any format

Author: Aditya Toomula <atoomula@linkedin.com>

Reviewers: Srinivasulu Punuru <spunuru@linkedin.com>

Closes apache#774 from atoomula/keyformat
  • Loading branch information
atoomula authored and srinipunuru committed Oct 30, 2018
1 parent 64c8263 commit dcd4b55
Show file tree
Hide file tree
Showing 21 changed files with 570 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.samza.sql;

import com.google.common.base.Joiner;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -40,6 +41,7 @@ public class SamzaSqlRelRecord implements Serializable {
private final ArrayList<String> fieldNames;
@JsonProperty("fieldValues")
private final ArrayList<Object> fieldValues;
private final int hashCode;

/**
* Creates a {@link SamzaSqlRelRecord} from the list of relational fields and values.
Expand All @@ -59,6 +61,8 @@ public SamzaSqlRelRecord(@JsonProperty("fieldNames") List<String> fieldNames,

this.fieldNames.addAll(fieldNames);
this.fieldValues.addAll(fieldValues);

hashCode = Objects.hash(fieldNames, fieldValues);
}

/**
Expand Down Expand Up @@ -96,7 +100,7 @@ public Optional<Object> getField(String name) {

@Override
public int hashCode() {
return Objects.hash(fieldNames, fieldValues);
return hashCode;
}

@Override
Expand All @@ -110,4 +114,11 @@ public boolean equals(Object obj) {
SamzaSqlRelRecord other = (SamzaSqlRelRecord) obj;
return Objects.equals(fieldNames, other.fieldNames) && Objects.equals(fieldValues, other.fieldValues);
}

@Override
public String toString() {
String nameStr = Joiner.on(",").join(fieldNames);
String valueStr = Joiner.on(",").useForNull("null").join(fieldValues);
return "[Names:{" + nameStr + "} Values:{" + valueStr + "}]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testGenerateResultSchema() {
Assert.assertEquals("NewCompany", ts.getFieldName(2));
Assert.assertEquals("OldCompany", ts.getFieldName(3));
Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(4));
Assert.assertEquals("VARCHAR", ts.getFieldTypeName(0));
Assert.assertEquals("ANY", ts.getFieldTypeName(0));
Assert.assertEquals("VARCHAR", ts.getFieldTypeName(1));
Assert.assertEquals("VARCHAR", ts.getFieldTypeName(2));
Assert.assertEquals("VARCHAR", ts.getFieldTypeName(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@
public class AvroRelConverter implements SamzaRelConverter {

protected final Config config;
private final Schema avroSchema;
private final Schema payloadSchema;

private static final Logger LOG = LoggerFactory.getLogger(AvroRelConverter.class);

public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
this.config = config;
this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream));
this.payloadSchema = Schema.parse(schemaProvider.getSchema(systemStream));
}

/**
Expand All @@ -76,45 +76,50 @@ public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaP
*/
@Override
public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) {
List<Object> fieldValues = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();
List<String> payloadFieldNames = new ArrayList<>();
List<Object> payloadFieldValues = new ArrayList<>();
Object value = samzaMessage.getValue();
if (value instanceof IndexedRecord) {
IndexedRecord record = (IndexedRecord) value;
// Please note that record schema and cached schema could be different due to schema evolution.
// Always represent record schema in the form of cached schema. This approach has the side-effect
// of dropping the newly added fields in the scenarios where the record schema has newer version
// than the cached schema. [TODO: SAMZA-1679]
Schema recordSchema = record.getSchema();
fieldNames.addAll(avroSchema.getFields().stream()
.map(Schema.Field::name)
.collect(Collectors.toList()));
fieldValues.addAll(fieldNames.stream()
.map(f -> convertToJavaObject(
recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null,
getNonNullUnionSchema(avroSchema.getField(f).schema())))
.collect(Collectors.toList()));
fetchFieldNamesAndValuesFromIndexedRecord((IndexedRecord) value, payloadFieldNames, payloadFieldValues,
payloadSchema);
} else if (value == null) {
fieldNames.addAll(avroSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
IntStream.range(0, fieldNames.size()).forEach(x -> fieldValues.add(null));
payloadFieldNames.addAll(payloadSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
IntStream.range(0, payloadFieldNames.size()).forEach(x -> payloadFieldValues.add(null));
} else {
String msg = "Avro message converter doesn't support messages of type " + value.getClass();
LOG.error(msg);
throw new SamzaException(msg);
}

return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, fieldValues);
return new SamzaSqlRelMessage(samzaMessage.getKey(), payloadFieldNames, payloadFieldValues);
}

public void fetchFieldNamesAndValuesFromIndexedRecord(IndexedRecord record, List<String> fieldNames,
List<Object> fieldValues, Schema cachedSchema) {
// Please note that record schema and cached schema could be different due to schema evolution.
// Always represent record schema in the form of cached schema. This approach has the side-effect
// of dropping the newly added fields in the scenarios where the record schema has newer version
// than the cached schema. [TODO: SAMZA-1679]
Schema recordSchema = record.getSchema();
fieldNames.addAll(cachedSchema.getFields().stream()
.map(Schema.Field::name)
.collect(Collectors.toList()));
fieldValues.addAll(fieldNames.stream()
.map(f -> convertToJavaObject(
recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null,
getNonNullUnionSchema(payloadSchema.getField(f).schema())))
.collect(Collectors.toList()));
}

private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
List<Object> values = new ArrayList<>();
List<Object> fieldValues = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();
if (avroRecord != null) {
fieldNames.addAll(avroRecord.getSchema().getFields()
.stream()
.map(Schema.Field::name)
.collect(Collectors.toList()));
values.addAll(avroRecord.getSchema().getFields()
fieldValues.addAll(avroRecord.getSchema().getFields()
.stream()
.map(f -> convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema())))
Expand All @@ -125,19 +130,19 @@ private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
throw new SamzaException(msg);
}

return new SamzaSqlRelRecord(fieldNames, values);
return new SamzaSqlRelRecord(fieldNames, fieldValues);
}

/**
* Convert the nested relational message to the output samza message.
*/
@Override
public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
return convertToSamzaMessage(relMessage, this.avroSchema);
return convertToSamzaMessage(relMessage, this.payloadSchema);
}

protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage, Schema avroSchema) {
return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), avroSchema));
protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage, Schema payloadSchema) {
return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), payloadSchema));
}

private GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@


public interface AvroRelSchemaProvider extends RelSchemaProvider {
/**
* Get payload schema corresponding to the system stream.
* @param systemStream system stream for which payload schema needs to be obtained.
* @return schema in the form of string
*/
String getSchema(SystemStream systemStream);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
/**
* Samza sql relational message. Each Samza sql relational message represents a relational row in a table.
* Each row of the relational table consists of a primary key and {@link SamzaSqlRelRecord}, which consists of a list
* of column values and the associated column names.
* of column values and the associated column names. Please note that the primary key itself could be a
* {@link SamzaSqlRelRecord}.
*/
public class SamzaSqlRelMessage implements Serializable {

public static final String KEY_NAME = "__key__";

// key could be a record in itself.
private final Object key;

@JsonProperty("samzaSqlRelRecord")
Expand Down Expand Up @@ -122,4 +124,52 @@ public boolean equals(Object obj) {
SamzaSqlRelMessage other = (SamzaSqlRelMessage) obj;
return Objects.equals(key, other.key) && Objects.equals(samzaSqlRelRecord, other.samzaSqlRelRecord);
}

@Override
public String toString() {
return "RelMessage: {" + samzaSqlRelRecord + "}";
}

/**
* Create composite key from the rel message.
* @param message Represents the samza sql rel message to extract the key values from.
* @param keyValueIdx list of key values in the form of field indices within the rel message.
* @param keyPartNames Represents the key field names.
* @return the composite key of the rel message
*/
public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> keyValueIdx,
List<String> keyPartNames) {
Validate.isTrue(keyValueIdx.size() == keyPartNames.size(), "Key part name and value list sizes are different");
ArrayList<Object> keyPartValues = new ArrayList<>();
for (int idx : keyValueIdx) {
keyPartValues.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx));
}
return new SamzaSqlRelRecord(keyPartNames, keyPartValues);
}

/**
* Create composite key from the rel message.
* @param message Represents the samza sql rel message to extract the key values and names from.
* @param relIdx list of keys in the form of field indices within the rel message.
* @return the composite key of the rel message
*/
public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) {
return createSamzaSqlCompositeKey(message, relIdx,
getSamzaSqlCompositeKeyFieldNames(message.getSamzaSqlRelRecord().getFieldNames(), relIdx));
}

/**
* Get composite key field names.
* @param fieldNames list of field names to extract the key names from.
* @param nameIds indices within the field names.
* @return list of composite key field names
*/
public static List<String> getSamzaSqlCompositeKeyFieldNames(List<String> fieldNames,
List<Integer> nameIds) {
List<String> keyPartNames = new ArrayList<>();
for (int idx : nameIds) {
keyPartNames.add(fieldNames.get(idx));
}
return keyPartNames;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.sql.fn;

import org.apache.samza.config.Config;
import org.apache.samza.sql.udfs.ScalarUdf;


/**
* UDF that converts an object to it's string representation.
*/
public class ConvertToStringUdf implements ScalarUdf<String> {
@Override
public void init(Config udfConfig) {
}

@Override
public String execute(Object... args) {
return args[0].toString();
}
}

Loading

0 comments on commit dcd4b55

Please sign in to comment.