Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35167][cdc-connector] Introduce MaxCompute pipeline DataSink #3254

Open
wants to merge 27 commits into
base: master
Choose a base branch
from

Conversation

dingxin-tech
Copy link

No description provided.

@github-actions github-actions bot added the docs Improvements or additions to documentation label May 6, 2024
@loserwang1024
Copy link
Contributor

I am wondering how a commercial database sink like MaxCompute to do e2e test?

@dingxin-tech
Copy link
Author

I am wondering how a commercial database sink like MaxCompute to do e2e test?

I will soon be working on creating a Docker image for a MaxCompute Emulator that launches a mocked version of MaxCompute. This will allow for end-to-end testing to be performed by initializing this image prior to regression testing.

@dingxin-tech
Copy link
Author

hi @loserwang1024 , I recently completed the development and release of a Docker image for the MaxCompute Emulator, and I have also added some related regression cases. Could you help me review the code in this pull request?

Copy link
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution, I've left some comments.

<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>自动创建 MaxCompute Transaction 表时使用的桶数。使用方式可以参考 <a href="ttps://help.aliyun.com/zh/maxcompute/user-guide/table-data-format">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid link.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

// "PostPartition",
// new EventTypeInfo(),
// new PostPartitionOperator(stream.getParallelism()))
// .name("PartitionByBucket");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So PartitionOperator is actually unused?
I've found your jira about this and I think that it's a more versatile and scalable solution, so we can wait for that jira completed and base on that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I comment out this part of code for debug use but forget to un-comment.
I think this code will stay here until the runtime optimization your mentioned is passed, and then I will re-implement this feature in that way.

import org.junit.jupiter.api.Test;

/** e2e test of SchemaEvolutionUtils. */
public class SchemaEvolutionUtilsTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test class does not actually take effect, can we use maxcompute/maxcompute-emulator:v0.0.3 image to test it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

case CHAR:
case VARCHAR:
case TIME_WITHOUT_TIME_ZONE:
return TypeInfoFactory.STRING;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataType includes information of nullable/notNull, do we lost this information here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct.

Additionally, I discovered that the MaxCompute SDK has an issue with creating tables based on primary keys. This issue results in ignoring the user-specified primary key order during table creation. I plan to fix this next week.

# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.cdc.connectors.maxcompute.MaxComputeDataSinkFactory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a log4j2-test.properties file under resources for debug or test purpose like other connector.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


@Override
public void write(Event element, Context context) throws IOException {
LOG.info("Sink writer {} write {}.", this.context.getSubtaskId(), element);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unnecessary to create so many logs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I forget to remove this log for debug use.

* completion status of an executor, allowing the system to determine whether all relevant sessions
* have been processed.
*/
public class SessionCommitCoordinator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better for us to change it to Manager or Helper to distinguish it from Flink's OperatorCoordinator

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I didn't think of such a suitable name when I was naming it.

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
sessionCache.clear();
Copy link
Contributor

@lvyanquan lvyanquan Jun 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, do we need to request a new session ID after each checkpoint, which may have a performance impact? It is expected that the checkpoint interval will need to be larger, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do.
Each session can no longer be used after being committed, so we must re-create a session and request a new sessionID.
And indeed, the checkpoint interval will need to be larger.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dingxin-tech for the nice work, the CI wasn't triggered properly, we need adjust the CI setting[1] when new connector or new module joining.

[1]https://github.com/apache/flink-cdc/blob/master/.github/workflows/flink_cdc.yml

@dingxin-tech
Copy link
Author

Thanks @dingxin-tech for the nice work, the CI wasn't triggered properly, we need adjust the CI setting[1] when new connector or new module joining.

[1]https://github.com/apache/flink-cdc/blob/master/.github/workflows/flink_cdc.yml

Hi, I added tests for MaxCompute in the CI file. Additionally, I refactored the code to apply the newly released DataSink feature, which allows specifying a HashFunction, and upgraded the ODPS SDK. Could you please review this pr again?
@leonardBang @lvyanquan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build docs Improvements or additions to documentation e2e-tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants