Skip to content

Conversation

@stevenzwu
Copy link
Contributor

@stevenzwu stevenzwu commented Mar 8, 2021

This is the first feature PR for FLIP-27 source split from the uber PR #2105

Currently, there are at least two open questions to be addressed. Since I will be out for the rest of the week, I like to put this out first.

  1. @openinx suggested that we break the DataIterator into two levels (combined and file tasks). I have a question that maybe @openinx can confirm in the comment from the uber PR.

  2. Reader is currently implemented on top of FileSourceSplit and BulkFormat. The original reason is that Jingsong mentioned that we may be able to take advantage of the high-performant vectorized readers from Flink. But I am revisiting that decision. It is unlikely Flink's vectorized readers will support deletes. It seems that Iceberg is also adding vectorized readers and I assume Iceberg implementations will support deletes.

@openinx @sundargates @tweise


public void seek(CheckpointedPosition checkpointedPosition) {
// skip files
Preconditions.checkArgument(checkpointedPosition.getOffset() < combinedTask.files().size(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could simplify it as:

    Preconditions.checkArgument(checkpointedPosition.getOffset() < combinedTask.files().size(),
        "Checkpointed file offset is %s, while CombinedScanTask has %s files",
        checkpointedPosition.getOffset(), combinedTask.files().size());

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

public interface IcebergSourceOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've introduced a FlinkTableOptions , I think it's not friendly to create options classes for source, sink, table etc. Maybe we could rename the FlinkTableOptions to FlinkConfigOptions, and put all the options into that class.


@Override
public Reader<T> createReader(Configuration config, IcebergSourceSplit split) throws IOException {
return new ReaderAdaptor<T>(bulkFormatProvider, config, split, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: new ReaderAdaptor<>(...) ?


@Override
public Reader<T> restoreReader(Configuration config, IcebergSourceSplit split) throws IOException {
return new ReaderAdaptor<T>(bulkFormatProvider, config, split, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

final CheckpointedPosition position = icebergSplit.checkpointedPosition();
if (position != null) {
// skip files based on offset in checkpointed position
Preconditions.checkArgument(position.getOffset() < icebergSplit.task().files().size(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use the method

checkArgument(boolean expression, @Nullable String errorMessageTemplate, @Nullable Object... errorMessageArgs)

fileOffset++;
final FileSourceSplit fileSourceSplit = new FileSourceSplit(
"",
new Path(URI.create(fileScanTask.file().path().toString())),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the FileSourceSplit will use the flink's fs interface to access the underlying files ? We iceberg currently has our own FileIO interface, the object storage services are implementing this interface to write/read data to cloud. If we introduce flink fs here , I'm concerning that we have to implement both flink fs interfaces and iceberg FileIO interfaces for making the experimental unified source work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We extended from FileSourceSplit mainly for the BulkFormat batch reader interface so that we can plug in vectorized readers from Flink. I am also debating if this is the right thing to do as mentioned in the original description.

But this is not really relevant to FileIO, which deals with underneath filesystem like S3. Here we are mainly talking about file format reader (like Parquet, Orc).


private final long fileOffset;
private final RecordIterator<T> iterator;
private final MutableRecordAndPosition mutableRecordAndPosition;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use MutableRecordAndPosition<T> here

import org.apache.iceberg.flink.source.split.MutableIcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class IcebergSourceReader<T> extends
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class don't have to be introduced in this PR ? I see there's no usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is not used in this PR. It is added for the completeness of the split reader module

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets drop it for now and do it in a follow up PR, this one is already really really large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. will do

@openinx
Copy link
Member

openinx commented Mar 10, 2021

Actually, I did not fully understand the whole PR (#2105) when reviewing this separate PR, I think I will need more time to understand the whole codes firstly.

@stevenzwu stevenzwu force-pushed the flip27SplitReader branch 2 times, most recently from 672e638 to cec66f9 Compare March 22, 2021 18:39
@stevenzwu
Copy link
Contributor Author

@openinx I updated the PR on refactoring the DataIterator: using composition instead of inheritance (like the old RowDataIterator). Please take a look when you got a chance.

@stevenzwu
Copy link
Contributor Author

@openinx if we are committed to have vectorized reader in iceberg-flink that @zhangjun0x01 is working on (PR #2566 ), then I will update this PR and avoid the dep/extension from FileSourceSplit and BulkFormat. The only reason I did it is to reuse the vectorized readers from Flink.

I am also wondering if the vectorized readers support deletes filtering? I know Flink vectorized readers impl for sure won't support as it is an Iceberg concept.

return position;
}

public static class Position {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx following up your comment from the uber PR: https://github.com/apache/iceberg/pull/2105/files#r630834205.

The reason I introduce this mutable Position class is to avoid the construction of a <fileOffset, recordOffset> object. It is the current cursor for the iterator.

Didn't track the recordOffset inside the FileIteratorReader for the same reason. Otherwise, position() getter will construct a new object each time.

We can't use CheckpointedPosition from Flink for two reasons: (1) it is immutable (2) we want to return the current position (not necessarily CheckpointedPosition).

@holdenk
Copy link
Contributor

holdenk commented Jun 25, 2021

Hey @stevenzwu can you rebase or merge in master branch github is showing a conflicting file.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. I don't have enough context around this I'll try and spin up some more context for next week but in the meantime if any of the people with more context (like @JingsongLi or @rdblue ) have some cycles to review that would be rad :)

import org.apache.flink.configuration.ConfigOptions;

public class FlinkTableOptions {
public class FlinkConfigOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like an existing public API, renaming might have some challenges or at the very least need a comment in the migration guide. Another option would be to extend FlinkConfigOptions into a deprecated FlinkTableOptions so as not to break compatiblity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the class is use internally. the config keys are public contract and weren't be changed. Let me move this one out to a separate small PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* Context object with optional arguments for a Flink Scan.
*/
class ScanContext implements Serializable {
public class ScanContext implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make so much more of this ScanContext public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because now this class is accessed by classes in sub package (like reader/RowDataIteratorBulkFormat.java)

import org.apache.iceberg.flink.source.split.MutableIcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class IcebergSourceReader<T> extends
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets drop it for now and do it in a follow up PR, this one is already really really large.

@stevenzwu
Copy link
Contributor Author

stevenzwu commented Jun 25, 2021

@holdenk thx a lot for taking a look. will rebase it after the decision on the vectorized readers below.

@rdblue @JingsongLi please help take a look and move the new FLIP-27 based Iceberg source move forward if you can. It is part 3 of this uber PR #2105

Right now, there is a pending decision before this PR can be reviewed. Currently, this PR is based on the premise of reusing the BulkFormat from Flink for vectorized readers (for Parquet, Orc etc.), originally suggested by @JingsongLi . I am rethinking that choice. It is unlikely Flink (vectorized) readers with support delete filters like Iceberg readers. Maybe iceberg-flink module needs to have its own vectorized readers to support deletes. @zhangjun0x01 already submitted a PR #2566 for Orc. Then this PR needs to be adjusted and break away from flink file source.


public class IcebergSourceSplit extends FileSourceSplit {

public enum Status {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Given that this Status is not a field in the Split itself, wondering if we should move this to a separate file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. will move it out

import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

public class IcebergSourceSplit extends FileSourceSplit {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the rationale for extending from FileSourceSplit given that it is used for representing a file itself and based on the passed args from the constructor in line 56, it doesn't seem to map well to the iceberg file split?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was hoping to extend from flink-connector-files to leverage the vectorized readers in Flink. I have been revisiting that decision. After discussion with @openinx offline, we agreed that iceberg-flink probably should have its own vectorized readers (mainly for delete filters in the future). flink-connector-files' vectorized readers won't support delete filter, as it is a concept for Iceberg only (not the raw file format like Parquet or Orc). Will update this PR

return checkpointedPosition;
}

public byte[] serializedFormCache() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm guessing this need not be public given that it's an optimization primarily around making serialization cheap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is actually used by IcebergSourceSplit

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean by IcebergSourceSplitSerializer. Since that's same package it doesn't need to be public?

@stevenzwu stevenzwu force-pushed the flip27SplitReader branch 2 times, most recently from 134aac0 to 82fd27f Compare July 1, 2021 06:38
@stevenzwu
Copy link
Contributor Author

@openinx @sundargates @holdenk @tweise Now this sub PR is ready for review.

I have changed the code not to extend from FileSourceSplit and BulkFormat, as we are aligned that Iceberg source reader probably can't reuse the vectorized readers from Flink. The main reason is that future Iceberg V2 format supports deletes, which is a concept applicable to Iceberg (not the raw file formats like Orc, Parquet etc.). Hence we can't reuse Flink's vectorized readers with delete filters.

I also moved some unrelated refactoring out of this PR.

build.gradle Outdated
void onOutput(TestDescriptor testDescriptor, TestOutputEvent testOutputEvent) {
if (lastDescriptor != testDescriptor) {
buildLog << "--------\n- Test log for: "<< testDescriptor << "\n--------\n"
buildLog << "--------\n- Test log for: " << testDescriptor << "\n--------\n"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like extraneous changes unrelated to the diff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. annoying auto-formatting from Intellij. will fix it.

return inputFiles.get(location);
}

public void seek(CheckpointedPosition checkpointedPosition) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a need to make use of this DS given that BulkFormat is not going to be supported initially?

tasks = null;
}

public Position position() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that you are using CheckpointedPosition DS to communicate the position that the iterator has to seek to from outside. However, in order to communicate the current position to the outside world, you are using the internal Position DS. Wondering if we can keep this consistent to be either CheckpointedPosition or the mutable Position?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. It is also related to your question above. Let me see how to unify them and maybe move away from Flink's CheckpointedPosition

public class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {

@Nullable
private final ReaderFactory.RecordIterator recordsForSplit;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the RecordIterator type here not parameterized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they should be. will fix

*
* @param <T> output record type
*/
interface Reader<T> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have the reader extend CloseableIterable<RecordsWithSplitIds>? This way the intermediate DS called FileRecords can be completely avoided. A lot of the abstraction makes it slightly complex to read otherwise IMO.

interface Reader<T> extends CloseableIterable<RecordsWithSplitIds<T>> {
}

Copy link
Contributor Author

@stevenzwu stevenzwu Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would need FileRecords, as it is an implementation of the RecordsWithSplitIds interface.

I see your point on the complexity. let me see how to simplify the abstractions.

*
* @param <T> The type of the record.
*/
interface RecordIterator<T> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need this internal interface given RecordsWithSplitIds is pretty much a superset of this DS.

import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;

public class RowDataIteratorReaderFactory implements ReaderFactory<RowData> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this generic by asking the user to provide the factory for generating T (RowData in this specific case) from a given CombinedScanTask or make that abstract and have a default implementation for RowData that uses the DataIterator?

class BatchDataIteratorFactory<T> implements Function<FileScanTaskSplit, CloseableIterable<RecordsWithSplitIds<T>> {
  protected abstract DataIterator<T> getIteratorFor(CombinedScanTask task);
}

* @param split Iceberg source split
* @return a batch reader
*/
Reader<T> create(Configuration config, IcebergSourceSplit split);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the configuration passed per split? Wouldn't it be the same for the full table? If so, should it be passed to the constructor as a property of the implementation rather than per split?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. will change

import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;

public interface ReaderFactory<T> extends Serializable {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to avoid this interface and replace it with an existing Java type such as java.util.Function, as it would lead to less need for understanding new types. If you want to have this abstraction, then you can just define this interface as

public interface ReaderFactory<T> extends Function<IcebergSourceSplit, CloseableIterable<RecordsWithSplitIds<T>> {}

private final Configuration config;
private final DataIteratorBatcher<T> batcher;

public DataIteratorReaderFactory(Configuration config, DataIteratorBatcher<T> batcher) {
Copy link
Contributor Author

@stevenzwu stevenzwu Jul 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sundargates I introduced this Batcher interface. RowDataIterator reuses RowData object so that we need to use array pool for the reader. Avro iterator doesn't reuse object. So we don't need to use array pool and clone object. A different Batcher can be plugged in for Avro record output type reader

build.gradle Outdated
compile project(':iceberg-parquet')
compile project(':iceberg-hive-metastore')

compileOnly "org.apache.flink:flink-connector-base"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interfaces (such as SplitReader, RecordsWithSplitIds, SourceReaderOptions) from this jar are stable enough to expose for downstream project like iceberg , ha ? I rise this Q because I don't see it's marked with a Public annotation and seems don't have any compatibility guarantee ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SplitReader, RecordsWithSplitIds are part of the core API that source implementation needs to extend/implement from.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a little research and saw Stephan recently merged the @PublicEvolve annotation to all 3 interfaces that you pointed out: https://issues.apache.org/jira/browse/FLINK-22358

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be too restrictive to only depend on @Public interfaces. FLIP-27 connectors are becoming the default and these interface should mature soon though. Iceberg project will need a story to support multiple Flink versions at some point.

build.gradle Outdated
def buildLog = new File(logFile)
addTestOutputListener(new TestOutputListener() {
def lastDescriptor

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please don't introduce any unrelated changes for avoiding conflicts when rebase to people's own repo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry. that was a mistake. will fix

build.gradle Outdated
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}

compile "org.apache.flink:flink-connector-base"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This jar was not included into the flink-dist binary package, so we have to include it into iceberg-flink-runtime jar explicitly ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is correct

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add a comment since this question is almost certain to come up again.

}
}

public static List<IcebergSourceSplit> planIcebergSourceSplits(
Copy link
Member

@openinx openinx Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a javadoc (Or replace it with a more clear name) to indicate why do we need to add an extra planIcebergSourceSplits (compared to createInputSplits) ? Seems it's not easy to identify the difference between the name 'InputSplits' and name 'IcebergSourceSplits'. I think it's used for implementing the flip27's SourceSplit, right ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to align with the createInputSplits by naming this as createIcebergSourceSplits.

Copy link
Contributor Author

@stevenzwu stevenzwu Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think we should rename createInputSplits to planFlinkInputSplit. We are not creating splits out of nowhere. Both are just discover/plan splits from table by calling the same planTasks. I can add some javadoc on the new planIcebergSourceSplits method.

Since createInputSplits is non-public, we should be safe to rename.

import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.io.CloseableIterator;

@FunctionalInterface
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add

import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterator;

public abstract class DataIteratorReaderFunction<T> implements ReaderFunction<T> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal or javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add Javadoc. In this PR, there is only one implementation of RowDataReaderFunction. but we can extend from it with AvroGenericRecordReaderFunction that directly reads Parquet files into Avro GenericRecord. That is why this is left as public

* A batch of recrods for one split
*/
@Internal
public class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found the class name not intuitive. From usage, this appears to be a "fetch result"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to rename it to SplitRecords. Javadoc explains the purpose of this class. yes, it is for "fetch result"

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, IcebergSourceSplit> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider the need to subclass the reader for customization? Maybe it should be protected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now, I don't anticipate any need of extending from this class

}

/**
* Each record's {@link RecordAndPosition} will have the same fileOffset (for {@link RecordAndPosition#getOffset()}.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this move to class level doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will move

return checkpointedPosition;
}

public byte[] serializedFormCache() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean by IcebergSourceSplitSerializer. Since that's same package it doesn't need to be public?


/**
* TODO: use Java serialization for now.
* will switch to more stable serializer from issue-1698.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add full link. With new version such subsequent change will be backward compatible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add link.

Regarding the future change with stable serialize, it will be backward compatible as we can bump up the serializer version to v2 and can continue to deserialize state written by v1

  @Override
  public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
    switch (version) {
      case 1:
        return deserializeV1(serialized);
      default:
        throw new IOException("Unknown version: " + version);
    }
  }

@rdblue
Copy link
Contributor

rdblue commented Oct 10, 2021

@stevenzwu, I'll try to review this in the next week. Thank you!

FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the name of this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create/generate implies creating sth new. This is actually plan/discover splits from table. Hence changed the method name. I actually also renamed the class name from FlinkSplitGenerator to FlinkSplitPlanner. This is an internal class. So it shouldn't break user code.

Preconditions.checkArgument(startingPosition.fileOffset() < combinedTask.files().size(),
"Checkpointed file offset is %d, while CombinedScanTask has %d files",
startingPosition.fileOffset(), combinedTask.files().size());
for (long i = 0L; i < startingPosition.fileOffset(); ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is fileOffset() a long? That seems odd to me. When would you need to address more than 2 billion files in a single combined scan task?

Copy link
Contributor Author

@stevenzwu stevenzwu Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

integer would certainly be sufficient. I was using long to match the type in RecordAndPosition from flink-connector-files module. Looking at it again. The long offset from Flink's RecordAndPosition actually meant byte offset within a file. I will define our own RecordAndPosition and change fileOffset to int type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as another comment. will update

this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
this.combinedTask = task;

this.tasks = task.files().iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably rename this to fileTasks since there is now a combinedTask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. will rename it to fileTasksIterator

startingPosition.recordOffset());
}
}
this.position.update(startingPosition.fileOffset(), startingPosition.recordOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can position be final since this is using update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will change Position to final


@Internal
public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this file doesn't need to change. Can you remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will revert

ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));

private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this addition related to FLIP-27?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed for event time aligned assigner / rough ordering where we use the min-max stats from timestamp column to order the splits and assignment. it is not directly related to the reader part. It is just part of the sub PR as I copied the classes from the the uber PR #2105 for this sub PR.

}

FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this keep the monitor function so that it can maintain the old API? Do we need to maintain the old API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlinkSplitGenerator/FlinkSplitPlanner is an internal class. it doesn't affect the public API for user code

import org.apache.iceberg.flink.source.Position;
import org.apache.iceberg.io.CloseableIterator;

class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't much context for me to go on here. Should there be Javadoc to explain what's going on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add Javadoc to explain

*/
@FunctionalInterface
public interface DataIteratorBatcher<T> extends Serializable {
CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> apply(String splitId, DataIterator<T> inputIterator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why name this apply? Is there a more specific verb you could use here?

int num = 0;
while (inputIterator.hasNext() && num < batchSize) {
T nextRecord = inputIterator.next();
recordFactory.clone(nextRecord, batch[num]);
Copy link
Contributor

@rdblue rdblue Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this doing? Is clone expensive?

It could be that the record produced by inputIterator is reused and clone call is making a copy because this can't call inputIterator.next() until the copy is made since the record is not consumed immediately. If that's the case, then I think there should be a comment to point out what's going on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are exactly right. will add code comment

break;
}
}
if (num == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add newlines between control flow blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')

compileOnly "org.apache.flink:flink-connector-base"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange the the build for flink 1.12 & flink 1.13 has been passed, because I don't see the same dependency are added to flink 1.12 build.gradle and 1.13 build.gradle. Maybe I need to check the 1.12's build.gradle again.

Copy link
Contributor Author

@stevenzwu stevenzwu Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx Maybe follow-up on the other comment discussion here.

With the SplitEnumerator API change, looks like I need to put FLIP-27 source in the v1.13 folder. What should we do with future versions (like 1.14)? do we copy the FLIP-27 source code from v1.13 to v1.14 folder?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx, the tests run against the iceberg-flink module. They aren't present in the 1.12 or 1.13 modules. If you want them to be run for those modules, you'd need to add the source folder like you do for src/main/java. If you choose to do that, let's also remove CI for the common module since we don't need to run the tests outside of 1.12 and 1.13 if they are run in those modules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, I think that copying the parts that change is reasonable. And once we remove support for 1.12, you can move the files back into the common module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. that is my plan too. Once 1.12 support is removed, we should be able to move files back to the common module. We just need to be diligent with these efforts.

this.combinedTask = task;
// fileOffset starts at -1 because we started
// from an empty iterator that is not from the split files.
this.position = new Position(-1, 0L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general DataIterator don't use the position or seek method to skip tasks or records. Putting all the flip-27 related logics in the flink common read path does not make sense to me, because every times when I read this class, I need to see which part is related to flip-27, which is the unrelated part.

I will suggest to introduce a separate SeekableDataIterator to isolate the two code path, I made a simple commit for this: https://github.com/openinx/incubator-iceberg/commit/b08dde86aae0c718d9d72acb347dffb3a836b336, you may want to take a look.

Copy link
Contributor Author

@stevenzwu stevenzwu Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say that seek capability is FLIP-27 specific. If we think DataIterator as reading a list of files/splits from CombinedScanTask, it is like a file API where seek is pretty common. It is needed to achieve exactly-once processing semantics. e.g., if we were to implement exactly once semantics for the current streaming source, I would imagine we need this as well.

Thanks a lot for the SeekableDataIterator. I feel that leaving these two empty abstract methods in the base DataIterator is a little weird

protected void advanceRecord()
protected void advanceTask()

Overall, I still think adding seek capability to DataIterator is natural (for file-like read APIs)

@stevenzwu
Copy link
Contributor Author

closing this PR for now. Will further break it down to smaller PR as suggested by @openinx

@stevenzwu stevenzwu closed this Nov 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants