Skip to content

Merge DeletedRecord is not correctly with EventTimePayload when writing to mor in Flink and ordering field type is String or Decimal #17642

@cbg-wx

Description

@cbg-wx

Bug Description

What happened:

  • When the ordering field of String type or Decimal type,and these is a record marked as delete record by _hoodie_is_deleted=true,the delete record will always be chosen during merging,regardless of the ordering value.
  • Canal collects data from MySQL and sends it to Kafka. The order of the arriving data is inconsistent with the actual production data order. This data contains delete records. Although the EventTimePayload is used, the data is not correctly sorted according to the preCombine field after being consumed by Flink and written to Hudi.

What you expected:

  • Delete record with smaller ordering field should not be chosen during merging.
  • Based on the EventTimePayload, Flink writes data to Hudi to correctly sort out-of-order data according to the preCombine fields.

Steps to reproduce:

Put the following test code in ITTestHoodieDataSource

  • Scene01: when turn off the compaction make insert record(first arrive) and delete record(second arrive) in log,HoodieMergedLogRecordScanner#processNextDeletedRecord insert record class is Utf8,delete record class is String.
@Test
void testHardDeleteWithStringOrderingFieldScene01() throws Exception {
    ExecMode execMode = ExecMode.BATCH;
    String hoodieTableDDL = "create table t1(\n"
            + "  uuid varchar(20),\n"
            + "  name varchar(10),\n"
            + "  age int,\n"
            + "  _hoodie_is_deleted boolean,\n"
            + "  `partition` varchar(20),\n"
            + "  ts STRING,\n"
            + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
            + ")\n"
            + "PARTITIONED BY (`partition`)\n"
            + "with (\n"
            + "  'connector' = 'hudi',\n"
            + "  'table.type' = 'MERGE_ON_READ',\n"
            + "  'index.type' = 'BUCKET',\n"
            + "  'path' = '" + tempFile.getAbsolutePath() + "',\n"
            + "  'read.streaming.skip_compaction' = 'false'\n"
            + ")";
    batchTableEnv.executeSql(hoodieTableDDL);

    // first commit
    String insertInto = "insert into t1 values\n"
            + "('id1','Danny',23,false,'par1', '101'),\n"
            + "('id2','Stephen',33,false,'par1', '103')";
    execInsertSql(batchTableEnv, insertInto);

    final String expected = "["
            + "+I[id1, Danny, 23, false, par1, 101], "
            + "+I[id2, Stephen, 33, false, par1, 103]]";

    // second commit, hard delete record with smaller order value
    insertInto = "insert into t1 values\n"
            + "('id2','Stephen',33, true,'par1', '102')";
    execInsertSql(batchTableEnv, insertInto);
    List<Row> result2 =  execSelectSql(batchTableEnv, "select * from t1", execMode);
    // no record is deleted.
    assertRowsEquals(result2, expected);
}
  • Scene02: when turn off the compaction make delete record(first arrive) and insert record(second arrive) in log,EventTimeAvroPayload#preCombine delete record class is String,insert record class is Utf8.
@Test
void testHardDeleteWithStringOrderingFieldScene02() throws Exception {
    ExecMode execMode = ExecMode.BATCH;
    String hoodieTableDDL = "create table t1(\n"
            + "  uuid varchar(20),\n"
            + "  name varchar(10),\n"
            + "  age int,\n"
            + "  _hoodie_is_deleted boolean,\n"
            + "  `partition` varchar(20),\n"
            + "  ts STRING,\n"
            + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
            + ")\n"
            + "PARTITIONED BY (`partition`)\n"
            + "with (\n"
            + "  'connector' = 'hudi',\n"
            + "  'table.type' = 'MERGE_ON_READ',\n"
            + "  'index.type' = 'BUCKET',\n"
            + "  'compaction.async.enabled' = 'false',\n"
            + "  'compaction.schedule.enabled' = 'true',\n"
            + "  'path' = '" + tempFile.getAbsolutePath() + "',\n"
            + "  'read.streaming.skip_compaction' = 'false'\n"
            + ")";
    batchTableEnv.executeSql(hoodieTableDDL);

    // first commit
    String insertInto = "insert into t1 values\n"
            + "('id1','Danny',23,false,'par1', '101'),\n"
            + "('id2','Stephen',33,true,'par1', '103')";
    execInsertSql(batchTableEnv, insertInto);

    final String expected = "["
            + "+I[id1, Danny, 23, false, par1, 101], "
            + "+I[id2, Stephen, 33, false, par1, 103]]";

    // second commit, hard delete record with smaller order value
    insertInto = "insert into t1 values\n"
            + "('id2','Stephen',33, false,'par1', '102')";
    execInsertSql(batchTableEnv, insertInto);
    List<Row> result2 =  execSelectSql(batchTableEnv, "select * from t1", execMode);
    // no record is deleted.
    assertRowsEquals(result2, expected);
}
  • Scene03: when turn on the compaction make insert record(first arrive) in parquet and delete record(second arrive) in log,EventTimeAvroPayload#combineAndGetUpdateValue insert record class is Utf8,delete record class is Sting.
@Test
void testHardDeleteWithStringOrderingFieldScene03() throws Exception {
    ExecMode execMode = ExecMode.BATCH;
    String hoodieTableDDL = "create table t1(\n"
            + "  uuid varchar(20),\n"
            + "  name varchar(10),\n"
            + "  age int,\n"
            + "  _hoodie_is_deleted boolean,\n"
            + "  `partition` varchar(20),\n"
            + "  ts STRING,\n"
            + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
            + ")\n"
            + "PARTITIONED BY (`partition`)\n"
            + "with (\n"
            + "  'connector' = 'hudi',\n"
            + "  'table.type' = 'MERGE_ON_READ',\n"
            + "  'index.type' = 'BUCKET',\n"
            + "  'compaction.async.enabled' = 'true',\n"
            + "  'compaction.schedule.enabled' = 'true',\n"
            + "  'compaction.delta_commits' = '1',\n"
            + "  'path' = '" + tempFile.getAbsolutePath() + "',\n"
            + "  'read.streaming.skip_compaction' = 'false'\n"
            + ")";
    batchTableEnv.executeSql(hoodieTableDDL);

    // first commit
    String insertInto = "insert into t1 values\n"
            + "('id1','Danny',23,false,'par1', '101'),\n"
            + "('id2','Stephen',33,false,'par1', '103')";
    execInsertSql(batchTableEnv, insertInto);

    final String expected = "["
            + "+I[id1, Danny, 23, false, par1, 101], "
            + "+I[id2, Stephen, 33, false, par1, 103]]";

    // second commit, hard delete record with smaller order value
    insertInto = "insert into t1 values\n"
            + "('id2','Stephen',33, true,'par1', '102')";
    execInsertSql(batchTableEnv, insertInto);
    List<Row> result2 =  execSelectSql(batchTableEnv, "select * from t1", execMode);
    // no record is deleted.
    assertRowsEquals(result2, expected);
}

Environment

Hudi version: hudi-0.14.1、hudi-0.15.0、hudi-1.0.2
Sink engine: flink-1.16.2
Query engine: spark-3.4.2
Relevant configs: hivesync RT

Logs and Stack Trace

Scene01
Image

Scene02
Image
Scene03
Image

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions