Skip to content

Use Iceberg generic record to replace Avro IndexedRecord and add a translation to/from data type in Flink #870

@waterlx

Description

@waterlx

Current implementation uses Avro as the intermediate fomat, as:
Input as Java Map, serialized to Avro IndexRecord (as the intermediate format), then call Parquet.write(Iceberg API) to write into Parquet file.

  1. We need to remove that dependency and use Iceberg generic record/built-in types (maps with non-string keys, decimals, date/time types...) as the input (addressed)
  • Remove AvroSerializer and its implementations (renamed to RecordSerializer as we need a serializer here to transform input format to Iceberg Record))
  • Remove AvroUtils (addressed)
  • Remove the setting on serializer in IcebergSinkAppender (Kept as we need a serializer here)
  1. Add the translation to/from Flink native formats (like we did for InternalRow for Spark) (will be addressed later)

  2. FileWriter#write() could accept convention type in addition to Iceberg generic record. See this comment. (will be addressed later, currently, the code only supports Iceberg Record as input )

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions