Flink: use schema visitor for parquet writer#1272
Conversation
| import org.apache.parquet.schema.Type; | ||
|
|
||
| public class FlinkParquetReaders extends BaseParquetReaders<Row> { | ||
| public class FlinkParquetReaders { |
There was a problem hiding this comment.
This class seems don't have to be public, only the FlinkParquetReader will access those readers. It also don't need to be accessed by other classes I think.
There was a problem hiding this comment.
Make sense to me.
| @Override | ||
| public ParquetValueReader<?> struct(Types.StructType ignored, GroupType struct, | ||
| List<ParquetValueReader<?>> fieldReaders) { | ||
| // the expected struct is ignored because nested fields are never found when the |
There was a problem hiding this comment.
nit: the comment is not complete ?
| return new ParquetValueReaders.UnboxedReader<>(desc); | ||
| } | ||
| case TIME_MICROS: | ||
| return new TimeMillisReader(desc); |
There was a problem hiding this comment.
Q: is there any problem here ? the original type is TIME_MICROS, while the reader name is TimeMillisReader ?
There was a problem hiding this comment.
This is because Flink only supports milliseconds and the parquet store microseconds, so the naming express that it reads out milliseconds.
There was a problem hiding this comment.
I agree this is confusing. There are other places where we use a unit in the class name to indicate the unit being read. Instead, let's be more specific and use something like LossyMicrosToMillisTimeReader.
| } | ||
| } | ||
|
|
||
| protected MessageType type() { |
There was a problem hiding this comment.
Will any subclass of ReadBuilder access the message type ?
There was a problem hiding this comment.
Previously, the FallbackReader uses it. Now I think this could be removed since the fallback reader defines its own type . That is because we can't get the type from passing builder.
| }; | ||
| } | ||
|
|
||
| private static Iterable<Record> generateIcebergGenerics(Schema schema, int numRecords, |
There was a problem hiding this comment.
Seems it could share the common code with RandomGenericData#generate ? Make the RandomGenericData#generate to return a Iterable ?
There was a problem hiding this comment.
You are right, let me refactor this.
There was a problem hiding this comment.
This method accepts a Record supplier and then generate records. We should keep it for generating fallback records and dictionary encoded records. But for generateRecords method we can update it to call RandomGenericData#generate directly.
| } | ||
| } | ||
|
|
||
| private static class TimeMicrosWriter extends ParquetValueWriters.PrimitiveWriter<Integer> { |
There was a problem hiding this comment.
The reader is named TimeMillisReader, and the writer is TimeMicrosWriter, could them be symmetrical ?
There was a problem hiding this comment.
The naming logic is what we actually perform. In the reader side, we read in the milliseconds for Flink. In the writer side, we write out microseconds for Parquet.
| public void write(int repetitionLevel, DecimalData decimal) { | ||
| Preconditions.checkArgument(decimal.scale() == scale, | ||
| "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal); | ||
| Preconditions.checkArgument(decimal.precision() <= precision, |
There was a problem hiding this comment.
Seem the upper bound of precision of IntegerDecimalWriter is 9 ? Could we add the precision <= 9 assertion ?
There was a problem hiding this comment.
Will use the latest DecimalUtil.
There was a problem hiding this comment.
Seems DecimalUtil doesn't handle this. I fixed in the new commit.
| public void write(int repetitionLevel, DecimalData decimal) { | ||
| Preconditions.checkArgument(decimal.scale() == scale, | ||
| "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal); | ||
| Preconditions.checkArgument(decimal.precision() <= precision, |
There was a problem hiding this comment.
Also could we add the precision <= 18 assertion ?
There was a problem hiding this comment.
How about adding this when allocating the writer? Seems like that would be a suitable place since here we are checking Flink type.
There was a problem hiding this comment.
I think it would be better to do this in the constructor, like @chenjunjiedada suggests. That way we have a check that precision is not larger than the maximum allowed by the type, and that the correct writer is used for the type.
|
@openinx, Thanks a lot for your comments. Will rebase and update PRs. |
e78c2ec to
f0641b4
Compare
| @Override | ||
| public FileAppender<Row> newAppender(OutputFile outputFile, FileFormat format) { | ||
| MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); | ||
| LogicalType logicalType = FlinkSchemaUtil.convert(schema); |
There was a problem hiding this comment.
BTW, we may also need to add the parquet into the parameterized unit tests, such as TestIcebergStreamWriter & TestTaskWriters.
There was a problem hiding this comment.
Agreed, will take a look when these PRs get in.
| }; | ||
| } | ||
|
|
||
| private static Iterable<RowData> generateRowData(Schema schema, int numRecords, |
There was a problem hiding this comment.
We could use RandomRowData#generate when rebasing the patch https://github.com/apache/iceberg/pull/1320/files#diff-4b2a9fd76495497db9212d74bf03f671R33.
| public void write(int repetitionLevel, DecimalData decimal) { | ||
| Preconditions.checkArgument(decimal.scale() == scale, | ||
| "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal); | ||
| Preconditions.checkArgument(decimal.precision() <= 9, |
There was a problem hiding this comment.
Seems it should be decimal.precision <= precision ?
There was a problem hiding this comment.
Seems like I misunderstood your comments, let me update this.
| public void write(int repetitionLevel, DecimalData decimal) { | ||
| Preconditions.checkArgument(decimal.scale() == scale, | ||
| "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal); | ||
| Preconditions.checkArgument(decimal.precision() <= 18, |
| private class ElementIterator<E> implements Iterator<E> { | ||
| private final int size; | ||
| private final ArrayData list; | ||
| private int index; | ||
|
|
||
| private ElementIterator(ArrayData list) { | ||
| this.list = list; | ||
| size = list.size(); | ||
| index = 0; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasNext() { | ||
| return index != size; | ||
| } | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| public E next() { | ||
| if (index >= size) { | ||
| throw new NoSuchElementException(); | ||
| } | ||
|
|
||
| E element; | ||
| if (list.isNullAt(index)) { | ||
| element = null; | ||
| } else { | ||
| element = (E) ArrayData.createElementGetter(elementType).getElementOrNull(list, index); | ||
| } | ||
|
|
||
| index += 1; | ||
|
|
||
| return element; | ||
| } | ||
| } |
There was a problem hiding this comment.
How about moving this ElementIterator to be a static class, then the map's EntryIterator could share it ? Seems we could do it, you could decide wether there is necessary.
There was a problem hiding this comment.
I 'm not sure how can it be shared with EntryIterator.
| import org.apache.parquet.schema.PrimitiveType; | ||
| import org.apache.parquet.schema.Type; | ||
|
|
||
| public class ParquetWithFlinkSchemaVisitor<T> { |
There was a problem hiding this comment.
TODO: we could share both flink and spark ParquetSchemaVisitor in a common class , can be a separate issue.
There was a problem hiding this comment.
Agreed, I would prefer to do the refactor in a separated PR.
There was a problem hiding this comment.
Yes, a WithPartner visitor like @JingsongLi added would be great.
940e435 to
09c1b41
Compare
|
From other comments, it sounds like I should review |
09c1b41 to
a73a7d7
Compare
|
@chenjunjiedada, looks like this is conflicting again. Must have been one of the patches I merged this morning. Sorry about that! I'll take a look at this one next, thanks for your patience with reviews. I've been running behind on reviews lately. |
|
@rdblue , Never mind, it is just a small conflict that already fixed. Take your time. |
| } | ||
|
|
||
| @Override | ||
| public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType primitive) { |
There was a problem hiding this comment.
Nit: s in sType indicates Spark. The equivalent here would be fType or a better name.
|
|
||
| @Override | ||
| public void write(int repetitionLevel, Integer value) { | ||
| long micros = Long.valueOf(value) * 1000; |
There was a problem hiding this comment.
This conversion from Integer doesn't make much sense. Java exposes 2 valueOf with string arguments and one with a primitive long argument. The last is what is called here. In that case, this is implicitly casting Integer to long, boxing the result, and then multiplying to produce a primitive.
It would be better to use value.longValue() * 1000 instead.
| if (list.isNullAt(index)) { | ||
| element = null; | ||
| } else { | ||
| element = (E) ArrayData.createElementGetter(elementType).getElementOrNull(list, index); |
There was a problem hiding this comment.
This method is called in a tight loop, so for performance any preparation that can be done in advance should be.
That means this getter should be created in the constructor and stored as an instance field. Then it can be called here.
Also, if there is already a null check above, does this need to call getElementOrNull or should it just call a get variant that assumes the value is non-null?
Alternatively, you could replace the if here:
E element = (E) getter.getElementOrNull(list, index);
There was a problem hiding this comment.
That means this getter should be created in the constructor and stored as an instance field. Then it can be called here.
Yeah, that sounds good to me, great point.
does this need to call getElementOrNull or should it just call a get variant that assumes the value is non-null?
Thegetterin ArrayData don't have agetinterface, it have only the interface:
/**
* Accessor for getting the elements of an array during runtime.
*
* @see #createElementGetter(LogicalType)
*/
interface ElementGetter extends Serializable {
@Nullable Object getElementOrNull(ArrayData array, int pos);
}Replacing the if-else to be E element = (E) getter.getElementOrNull(list, index); sounds reasonable to me.
| } | ||
|
|
||
| if (values.isNullAt(index)) { | ||
| entry.set((K) ArrayData.createElementGetter(keyType).getElementOrNull(keys, index), null); |
There was a problem hiding this comment.
Same here. The getters for keys and values should be instance fields.
There was a problem hiding this comment.
Keys are not allowed to be null, so there should be no need to call getElementOrNull for the key.
| protected Object get(Row row, int index) { | ||
| return row.getField(index); | ||
| protected Object get(RowData struct, int index) { | ||
| return RowData.createFieldGetter(types.get(index), index).getFieldOrNull(struct); |
There was a problem hiding this comment.
Each getter should be stored as a field in an array.
| final int fieldPos = i; | ||
| assertEquals(types.get(i), logicalType, expected, | ||
| () -> RowData.createFieldGetter(logicalType, fieldPos).getFieldOrNull(actualRowData)); | ||
| RowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData)); |
|
@chenjunjiedada, I'm going to merge this. The remaining issues are minor or are not correctness problems. Just be sure to follow up and fix the getter problems or else this will be slower than it should be. |
|
Thanks, @chenjunjiedada for building this, and @openinx for reviewing! |
This is sub PR for #1237. I will rebase this once
#1266get merged.