Skip to content

Commit

Permalink
[FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timest…
Browse files Browse the repository at this point in the history
…amp type

Close apache#21613
  • Loading branch information
laughingman7743 authored and baugarten-stripe committed Jun 9, 2023
1 parent 378746c commit 9e30572
Show file tree
Hide file tree
Showing 46 changed files with 946 additions and 303 deletions.
5 changes: 5 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/protobuf.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type to Protobuf type.
<td><code>enum</code></td>
<td>The enum value of protobuf can be mapped to string or number of flink row accordingly.</td>
</tr>
<tr>
<td><code>ROW&lt;seconds BIGINT, nanos INT&gt;</code></td>
<td><code>google.protobuf.timestamp</code></td>
<td>The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition.</td>
</tr>
</tbody>
</table>

Expand Down
1 change: 1 addition & 0 deletions flink-formats/flink-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ under the License.
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}</protocArtifact>
<includeMavenTypes>direct</includeMavenTypes>
<inputDirectories>
<include>src/test/proto</include>
</inputDirectories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ public class PbConstant {
public static final String GENERATED_ENCODE_METHOD = "encode";
public static final String PB_MAP_KEY_NAME = "key";
public static final String PB_MAP_VALUE_NAME = "value";
public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@

/** store config and common information. */
public class PbFormatContext {
private final String outerPrefix;
private final PbFormatConfig pbFormatConfig;

public PbFormatContext(String outerPrefix, PbFormatConfig pbFormatConfig) {
this.outerPrefix = outerPrefix;
public PbFormatContext(PbFormatConfig pbFormatConfig) {
this.pbFormatConfig = pbFormatConfig;
}

public String getOuterPrefix() {
return outerPrefix;
}

public PbFormatConfig getPbFormatConfig() {
return pbFormatConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public String codegen(String resultVar, String pbObjectCode, int indent)
PbCodegenAppender appender = new PbCodegenAppender(indent);
PbCodegenVarId varUid = PbCodegenVarId.getInstance();
int uid = varUid.getAndIncrement();
String protoTypeStr =
PbCodegenUtils.getTypeStrFromProto(fd, false, formatContext.getOuterPrefix());
String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false);
String listPbVar = "list" + uid;
String flinkArrVar = "newArr" + uid;
String flinkArrEleVar = "subReturnVar" + uid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ public String codegen(String resultVar, String pbObjectCode, int indent)
fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);

PbCodegenAppender appender = new PbCodegenAppender(indent);
String pbKeyTypeStr =
PbCodegenUtils.getTypeStrFromProto(keyFd, false, formatContext.getOuterPrefix());
String pbValueTypeStr =
PbCodegenUtils.getTypeStrFromProto(valueFd, false, formatContext.getOuterPrefix());
String pbKeyTypeStr = PbCodegenUtils.getTypeStrFromProto(keyFd, false);
String pbValueTypeStr = PbCodegenUtils.getTypeStrFromProto(valueFd, false);
String pbMapVar = "pbMap" + uid;
String pbMapEntryVar = "pbEntry" + uid;
String resultDataMapVar = "resultDataMap" + uid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public String codegen(String resultVar, String pbObjectCode, int indent)
String flinkRowDataVar = "rowData" + uid;

int fieldSize = rowType.getFieldNames().size();
String pbMessageTypeStr =
PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
appender.appendLine(pbMessageTypeStr + " " + pbMessageVar + " = " + pbObjectCode);
appender.appendLine(
"GenericRowData " + flinkRowDataVar + " = new GenericRowData(" + fieldSize + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,14 @@ public class ProtoToRowConverter {
public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
try {
String outerPrefix =
PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
Class<?> messageClass =
Class.forName(
formatConfig.getMessageClassName(),
true,
Thread.currentThread().getContextClassLoader());
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor, outerPrefix);
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
// pb3 always read default values
formatConfig =
Expand All @@ -78,7 +76,7 @@ public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
formatConfig.getWriteNullStringLiterals());
}
PbCodegenAppender codegenAppender = new PbCodegenAppender();
PbFormatContext pbFormatContext = new PbFormatContext(outerPrefix, formatConfig);
PbFormatContext pbFormatContext = new PbFormatContext(formatConfig);
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
String generatedClassName = "GeneratedProtoToRow_" + uuid;
String generatedPackageName = ProtoToRowConverter.class.getPackage().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
PbCodegenVarId varUid = PbCodegenVarId.getInstance();
int uid = varUid.getAndIncrement();
PbCodegenAppender appender = new PbCodegenAppender(indent);
String protoTypeStr =
PbCodegenUtils.getTypeStrFromProto(fd, false, formatContext.getOuterPrefix());
String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false);
String pbListVar = "pbList" + uid;
String flinkArrayDataVar = "arrData" + uid;
String pbElementVar = "elementPbVar" + uid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);

PbCodegenAppender appender = new PbCodegenAppender(indent);
String keyProtoTypeStr =
PbCodegenUtils.getTypeStrFromProto(keyFd, false, formatContext.getOuterPrefix());
String valueProtoTypeStr =
PbCodegenUtils.getTypeStrFromProto(valueFd, false, formatContext.getOuterPrefix());
String keyProtoTypeStr = PbCodegenUtils.getTypeStrFromProto(keyFd, false);
String valueProtoTypeStr = PbCodegenUtils.getTypeStrFromProto(valueFd, false);

String flinkKeyArrDataVar = "keyArrData" + uid;
String flinkValueArrDataVar = "valueArrData" + uid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
int uid = varUid.getAndIncrement();
PbCodegenAppender appender = new PbCodegenAppender(indent);
String flinkRowDataVar = "rowData" + uid;
String pbMessageTypeStr =
PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
String messageBuilderVar = "messageBuilder" + uid;
appender.appendLine("RowData " + flinkRowDataVar + " = " + flinkObjectCode);
appender.appendLine(
Expand All @@ -71,15 +70,11 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
String elementPbVar = "elementPbVar" + subUid;
String elementPbTypeStr;
if (elementFd.isMapField()) {
elementPbTypeStr =
PbCodegenUtils.getTypeStrFromProto(
elementFd, false, formatContext.getOuterPrefix());
elementPbTypeStr = PbCodegenUtils.getTypeStrFromProto(elementFd, false);
} else {
elementPbTypeStr =
PbCodegenUtils.getTypeStrFromProto(
elementFd,
PbFormatUtils.isArrayType(subType),
formatContext.getOuterPrefix());
elementFd, PbFormatUtils.isArrayType(subType));
}
String strongCamelFieldName = PbFormatUtils.getStrongCamelCaseJsonName(fieldName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
case SMALLINT:
case TINYINT:
if (fd.getJavaType() == JavaType.ENUM) {
String enumTypeStr =
PbFormatUtils.getFullJavaName(
fd.getEnumType(), formatContext.getOuterPrefix());
String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
appender.appendLine(
resultVar
+ " = "
Expand All @@ -86,9 +84,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
appender.appendLine(fromVar + " = " + flinkObjectCode + ".toString()");
if (fd.getJavaType() == JavaType.ENUM) {
String enumValueDescVar = "enumValueDesc" + uid;
String enumTypeStr =
PbFormatUtils.getFullJavaName(
fd.getEnumType(), formatContext.getOuterPrefix());
String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
appender.appendLine(
"Descriptors.EnumValueDescriptor "
+ enumValueDescVar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ public class RowToProtoConverter {
public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
try {
String outerPrefix =
PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
PbFormatContext formatContext = new PbFormatContext(outerPrefix, formatConfig);
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
PbFormatContext formatContext = new PbFormatContext(formatConfig);

PbCodegenAppender codegenAppender = new PbCodegenAppender(0);
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static String flinkContainerElementCode(
* @return The returned code phrase will be used as java type str in codegen sections.
* @throws PbCodegenException
*/
public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, String outerPrefix)
public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList)
throws PbCodegenException {
String typeStr;
switch (fd.getJavaType()) {
Expand All @@ -90,12 +90,12 @@ public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, Str
FieldDescriptor valueFd =
fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);
// key and value cannot be repeated
String keyTypeStr = getTypeStrFromProto(keyFd, false, outerPrefix);
String valueTypeStr = getTypeStrFromProto(valueFd, false, outerPrefix);
String keyTypeStr = getTypeStrFromProto(keyFd, false);
String valueTypeStr = getTypeStrFromProto(valueFd, false);
typeStr = "Map<" + keyTypeStr + "," + valueTypeStr + ">";
} else {
// simple message
typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType(), outerPrefix);
typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType());
}
break;
case INT:
Expand All @@ -108,7 +108,7 @@ public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, Str
typeStr = "String";
break;
case ENUM:
typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType(), outerPrefix);
typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
break;
case FLOAT:
typeStr = "Float";
Expand Down Expand Up @@ -174,11 +174,10 @@ public static String getTypeStrFromLogicType(LogicalType type) {
public static String pbDefaultValueCode(
FieldDescriptor fieldDescriptor, PbFormatContext pbFormatContext)
throws PbCodegenException {
String outerPrefix = pbFormatContext.getOuterPrefix();
String nullLiteral = pbFormatContext.getPbFormatConfig().getWriteNullStringLiterals();
switch (fieldDescriptor.getJavaType()) {
case MESSAGE:
return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType(), outerPrefix)
return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType())
+ ".getDefaultInstance()";
case INT:
return "0";
Expand All @@ -187,7 +186,7 @@ public static String pbDefaultValueCode(
case STRING:
return "\"" + nullLiteral + "\"";
case ENUM:
return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType(), outerPrefix)
return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType())
+ ".values()[0]";
case FLOAT:
return "0.0f";
Expand Down Expand Up @@ -229,9 +228,7 @@ public static String convertFlinkArrayElementToPbWithDefaultValueCode(
int uid = varUid.getAndIncrement();
String flinkElementVar = "elementVar" + uid;
PbCodegenAppender appender = new PbCodegenAppender(indent);
String protoTypeStr =
PbCodegenUtils.getTypeStrFromProto(
elementPbFd, false, pbFormatContext.getOuterPrefix());
String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(elementPbFd, false);
String dataTypeStr = PbCodegenUtils.getTypeStrFromLogicType(elementDataType);
appender.appendLine(protoTypeStr + " " + resultPbVar);
appender.begin("if(" + flinkArrDataVar + ".isNullAt(" + iVar + ")){");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@

/** Protobuf function util. */
public class PbFormatUtils {
public static String getFullJavaName(Descriptors.Descriptor descriptor, String outerProtoName) {
public static String getFullJavaName(Descriptors.Descriptor descriptor) {
if (null != descriptor.getContainingType()) {
// nested type
String parentJavaFullName =
getFullJavaName(descriptor.getContainingType(), outerProtoName);
String parentJavaFullName = getFullJavaName(descriptor.getContainingType());
return parentJavaFullName + "." + descriptor.getName();
} else {
// top level message
String outerProtoName = getOuterProtoPrefix(descriptor.getFile());
return outerProtoName + descriptor.getName();
}
}

public static String getFullJavaName(
Descriptors.EnumDescriptor enumDescriptor, String outerProtoName) {
public static String getFullJavaName(Descriptors.EnumDescriptor enumDescriptor) {
if (null != enumDescriptor.getContainingType()) {
return getFullJavaName(enumDescriptor.getContainingType(), outerProtoName)
return getFullJavaName(enumDescriptor.getContainingType())
+ "."
+ enumDescriptor.getName();
} else {
String outerProtoName = getOuterProtoPrefix(enumDescriptor.getFile());
return outerProtoName + enumDescriptor.getName();
}
}
Expand All @@ -72,14 +72,46 @@ public static String getStrongCamelCaseJsonName(String name) {
return ProtobufInternalUtils.underScoreToCamelCase(name, true);
}

public static String getOuterProtoPrefix(String name) {
name = name.replace('$', '.');
int index = name.lastIndexOf('.');
if (index != -1) {
// include dot
return name.substring(0, index + 1);
public static String getOuterClassName(Descriptors.FileDescriptor fileDescriptor) {
if (fileDescriptor.getOptions().hasJavaOuterClassname()) {
return fileDescriptor.getOptions().getJavaOuterClassname();
} else {
return "";
String[] fileNames = fileDescriptor.getName().split("/");
String fileName = fileNames[fileNames.length - 1];
String outerName = getStrongCamelCaseJsonName(fileName.split("\\.")[0]);
// https://developers.google.com/protocol-buffers/docs/reference/java-generated#invocation
// The name of the wrapper class is determined by converting the base name of the .proto
// file to camel case if the java_outer_classname option is not specified.
// For example, foo_bar.proto produces the class name FooBar. If there is a service,
// enum, or message (including nested types) in the file with the same name,
// "OuterClass" will be appended to the wrapper class's name.
boolean hasSameNameMessage =
fileDescriptor.getMessageTypes().stream()
.anyMatch(f -> f.getName().equals(outerName));
boolean hasSameNameEnum =
fileDescriptor.getEnumTypes().stream()
.anyMatch(f -> f.getName().equals(outerName));
boolean hasSameNameService =
fileDescriptor.getServices().stream()
.anyMatch(f -> f.getName().equals(outerName));
if (hasSameNameMessage || hasSameNameEnum || hasSameNameService) {
return outerName + PbConstant.PB_OUTER_CLASS_SUFFIX;
} else {
return outerName;
}
}
}

public static String getOuterProtoPrefix(Descriptors.FileDescriptor fileDescriptor) {
String javaPackageName =
fileDescriptor.getOptions().hasJavaPackage()
? fileDescriptor.getOptions().getJavaPackage()
: fileDescriptor.getPackage();
if (fileDescriptor.getOptions().getJavaMultipleFiles()) {
return javaPackageName + ".";
} else {
String outerClassName = getOuterClassName(fileDescriptor);
return javaPackageName + "." + outerClassName + ".";
}
}

Expand Down
Loading

0 comments on commit 9e30572

Please sign in to comment.