-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support nebula source for flink sql connector #60
Conversation
Please see the conflicts, sorry for these conflicts because of another similar work which has been merged. |
OK, i will handle the conflicts according to the current sink as quickly as possible. |
# Conflicts: # connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java # connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java # connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java # connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
Codecov Report
@@ Coverage Diff @@
## master #60 +/- ##
============================================
+ Coverage 49.72% 59.65% +9.93%
- Complexity 190 271 +81
============================================
Files 50 50
Lines 1613 1678 +65
Branches 153 155 +2
============================================
+ Hits 802 1001 +199
+ Misses 743 598 -145
- Partials 68 79 +11
Continue to review full report at Codecov.
|
related to #58. UT result are as follow: source vertex:
source edges:
|
@@ -0,0 +1,239 @@ | |||
/* Copyright (c) 2020 vesoft inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a new file, the date should be 2022
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder, indeed, I will revise this.
case TINYINT: | ||
case SMALLINT: | ||
case INTEGER: | ||
return val -> (int) val.asLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest to return Long type, not convert to int
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now after the modification, all integer types now return Long. The converter works fine. However, if the table declaration is INT, the RowDataSerializer(use to serialize RowData to another compute operator) will generate an exception, that is, the integer of type Long cannot be cast to Integer. The current temporary solution is to directly set all types in the table declaration to BIGINT.
|
||
public static boolean checkValidWriteMode(String modeName) { | ||
return chooseWriteMode(modeName) != INSERT | ||
|| INSERT.name().equalsIgnoreCase(modeName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the purpose for this function? always return true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is going to be used to verify the write-mode
parameter passed in through with
in flink sql. This Function returns true if the mode parameter is one of INSERT, UPDATE, DELETE(ignoreCase), return false if it is any other string, such 'write-mode' = 'add'
.
public static boolean checkValidWriteMode(String modeName) {
return chooseWriteMode(modeName) != INSERT
|| INSERT.name().equalsIgnoreCase(modeName);
}
public static WriteModeEnum chooseWriteMode(String modeName) {
if (UPDATE.name().equalsIgnoreCase(modeName)) {
return UPDATE;
}
if (DELETE.name().equalsIgnoreCase(modeName)) {
return DELETE;
}
return INSERT;
}
But now the write-mode
has been removed from with clause, and it seems better to use the enumType
to validate that comes with flink, so this part of the logic can be removed.
try to add flink sql support, it's still under development.