Skip to content

Flink: write the CDC records into apache iceberg tables. #1639

@openinx

Description

@openinx

Currently, we apache iceberg have implemented the equality-deletes and position-delete internally, but we still lack the ability to work with different engines. In this PR #1444 , we spark could read the records by merging delete files and data files, this PR #1517 give flink the similar ability to read delete records. So the core thing what we need is to make the engine writers available, that means :

  1. spark, flink, hive etc could write deletes CDC events into apache iceberg table;
  2. we could execute SQL sentences to delete records which is matching the query conditions in a batch job.

For spark engine, I think @rdblue and @chenjunjiedada have had a good plan to get those finished. I would try to PR a PoC solution for flink in this issue, mainly focused on the first cases, batch delete should be similar to the spark implementation.

We flink have provided a lib (flink-cdc-connectors) to consume various database's change log events, and made them into a RowData DataStream. The RowData has a RowKind to indicate whether this event is an INSERT ? DELETE ? UPDATE_BEFORE ? UPDATE_AFTER ? This is very friendly to accomplish the cdc writers for flink.

/**
 * Lists all kinds of changes that a row can describe in a changelog.
 */
@PublicEvolving
public enum RowKind {

	// Note: Enums have no stable hash code across different JVMs, use toByteValue() for
	// this purpose.

	/**
	 * Insertion operation.
	 */
	INSERT("+I", (byte) 0),

	/**
	 * Update operation with the previous content of the updated row.
	 *
	 * <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that needs
	 * to retract the previous row first. It is useful in cases of a non-idempotent update, i.e., an
	 * update of a row that is not uniquely identifiable by a key.
	 */
	UPDATE_BEFORE("-U", (byte) 1),

	/**
	 * Update operation with new content of the updated row.
	 *
	 * <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that
	 * needs to retract the previous row first. OR it describes an idempotent update, i.e., an update
	 * of a row that is uniquely identifiable by a key.
	 */
	UPDATE_AFTER("+U", (byte) 2),

	/**
	 * Deletion operation.
	 */
	DELETE("-D", (byte) 3);
       // ... 
}

It may need some work to integrate the current flink CDC API with apache iceberg implementation. Let's track those things in this issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions