Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to write iceberg tables #5989

Open
wants to merge 32 commits into
base: main
Choose a base branch
from

Conversation

malhotrashivam
Copy link
Contributor

@malhotrashivam malhotrashivam commented Aug 26, 2024

Closes: #6125
Should be merged after #5707, #6156

Also moves existing Iceberg tests from Junit4 to Junit5.

@malhotrashivam malhotrashivam added parquet Related to the Parquet integration DocumentationNeeded ReleaseNotesNeeded Release notes are needed s3 iceberg labels Aug 26, 2024
@malhotrashivam malhotrashivam added this to the 0.37.0 milestone Aug 26, 2024
@malhotrashivam malhotrashivam self-assigned this Aug 26, 2024
@malhotrashivam malhotrashivam marked this pull request as draft September 6, 2024 18:27
@malhotrashivam malhotrashivam changed the title feat: [DO NOT MERGE] Added support to write iceberg tables feat: Added support to write iceberg tables Sep 6, 2024
@malhotrashivam malhotrashivam marked this pull request as ready for review October 1, 2024 17:25
properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

// Following is needed to write new manifest files when writing new data.
// Not setting this will result in using ResolvingFileIO.
properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it a problem to use ResolvingFileIO? You will need to provide HadoopConf info.

Copy link
Contributor Author

@malhotrashivam malhotrashivam Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So from what I understood, ResolvingFileIO would add additional step of resolving which file IO to use.
And based on the file name, I thought we are sure here that its in S3. That's why I thought of using S3FileIO. Does that sound reasonable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your point, we clearly know that this should resolve to S3FileIO. In all other scenarios, though, we've trusted the Iceberg API to resolve correctly and I'd be happier to stick with that.

I don't feel strongly about this however.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this context, I'm happy to have specify S3FileIO specified. I think in general though, we are leaning away from providing these "pre-configured" entrypoints for the user, and prefer they go through the generic catalog creation? In which case I would argue that IcebergToolsS3 we might want to deprecate.

* @param dhTable The deephaven table to append
* @param instructions The instructions for customizations while writing, or null to use default instructions
*/
public void append(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't currently providing a way to add partitioned data to an Iceberg table, but we should create a ticket for this functionality.

Copy link
Contributor Author

@malhotrashivam malhotrashivam Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check with Ryan/Devin if I should add this as part of this PR itself, or should start a separate ticket/PR.
If we decide to do it here, I would need a bit more clarity on the API.

The main difference from non-partitioned writing is providing a set of partition values to which a particular data file will belong to. Iceberg provides a few ways to specify this information for a new data file (reference):

  • withPartition: Accept a org.apache.iceberg.StructLike instance on which it can call get to access different partition values.
  • withPartitionPath: Provide String newPartitionPath which it splits based on the partition spec and = and / characters.
  • withPartitionValues: Provide a List<String> partitionValues.

We would need to decide what to accept from the user, and finalize on the API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support reading partitioned Iceberg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do. cc: @lbooker42

private static void verifyAppendCompatibility(
final Schema icebergSchema,
final TableDefinition tableDefinition) {
// Check that all columns in the table definition are part of the Iceberg schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an error to write a table with extra columns not in Iceberg schema? Or should we only write the matching columns?

Copy link
Contributor Author

@malhotrashivam malhotrashivam Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a iceberg instruction for verify compatability, if the user wants to verify that the data being appended/overwritten is compatible with the original table.

  • For appending, we check that all required columns are present in the data with compatible types, and no extra columns outside of the schema are present.
  • For overwriting, we check if the schema is identical.

If the user wants to override these checks, they can disable them through iceberg instructions.
I can add more details in the comments for the new iceberg instruction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the and no extra columns outside of the schema are present test that concerns me. We will be requiring a user to dropColumns() to meet this compatibility metric when I'm not sure it's important at all.

compatible is pretty broad in definition (IMO), should not mean identical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this can be restrictive. That is why I added an optional iceberg instructions so that user can disable validation if they are sure what they are adding.
Let me keep this thread open to see what everyone thinks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the perspective of DH controlling the writing, I think we can be opinionated, and prefer to give the user less control than we might otherwise want / need to. I don't think it makes sense to allow the user to specify they want to write out Deephaven columns to the parquet file that aren't mapped to the Iceberg table. By default, it may be appropriate to always use the latest Schema at the time it is being written to, but I think we need to allow the user to pick the Schema they want to use for writing. IMO, the physical parquet columns we write should be a (non-strict) subset of that Schema's columns. If there is a map between a DH column and a Schema column, we write it to parquet; otherwise, we exclude it. This also means that every column we write out in this way has an Iceberg field_id we can map into parquet's field_id.

Copy link
Contributor Author

@malhotrashivam malhotrashivam Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the physical parquet columns we write should be a (non-strict) subset of that Schema's columns.
I have something similar right now, along with an extra check that all the required columns from the schema should be there in the tables being appended.

@@ -33,95 +36,18 @@
*/
public abstract class ParquetInstructions implements ColumnToCodecMappings {

private static volatile String defaultCompressionCodecName = CompressionCodecName.SNAPPY.toString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing unnecessary configuration parameters.

@@ -433,6 +382,14 @@ public boolean useDictionary() {
public void useDictionary(final boolean useDictionary) {
this.useDictionary = useDictionary;
}

public OptionalInt getFieldId() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field Id related logic may change when #6156 gets merged.

properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

// Following is needed to write new manifest files when writing new data.
// Not setting this will result in using ResolvingFileIO.
properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName());
Copy link
Contributor Author

@malhotrashivam malhotrashivam Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So from what I understood, ResolvingFileIO would add additional step of resolving which file IO to use.
And based on the file name, I thought we are sure here that its in S3. That's why I thought of using S3FileIO. Does that sound reasonable?

Comment on lines +28 to +32
/**
* A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg
* data files.
*/
public abstract Map<String, String> columnRenames();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was just pulled up from Base instructions, but whenever we have an Iceberg column name, it should ideally be tied to a Schema, I mention as much in #6124. I don't know if this is actionable here, and it may be more of a discussion for #5707

Comment on lines 48 to 52
/**
* A one-to-one {@link Map map} from Deephaven to Iceberg column names to use when writing deephaven tables to
* Iceberg tables.
*/
public abstract Map<String, String> dhToIcebergColumnRenames();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, this is also tied w/ a specific Schema. From a configuration point, I see the ease-of-use for using strings, but I wonder if we should have Map<NestedField, String>, or Map<Integer, String> + Schema (user can still use strings, but we materialize into this). I also suggest we model it with the iceberg data as the key, since it's the target system that is dictating uniqueness in this case (otherwise, we need to add a check that there are no duplicate values). Technically, we could support writing a single DH column to multiple iceberg columns, although I don't see a reason to offer that out of the gate without a clear use case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea, although I would prefer to do this change as part of a separate PR for both reading and writing side together. For now, I have added a check for uniqueness for no duplicate values.
I can also link these comments in the issue #6124.

* The inverse map of {@link #dhToIcebergColumnRenames()}.
*/
@Value.Lazy
public Map<String, String> icebergToDhColumnRenames() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you do have the inverse, but I think this should be the source of the data, and not a view of it. I think we should also consider if we even want to provide as a public helper, or if it should be only for internal use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same response as #5989 (comment)

* A one-to-one {@link Map map} from Deephaven to Iceberg column names to use when writing deephaven tables to
* Iceberg tables.
*/
// TODO Please suggest better name for this method, on the read side its just called columnRenames
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending TODO

Copy link
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review of IcebergCatalogAdapter. Looking pretty good so far!

Comment on lines 922 to 927
public void overwrite(
@NotNull final TableIdentifier tableIdentifier,
@NotNull final Table[] dhTables,
@Nullable final IcebergWriteInstructions instructions) {
writeImpl(tableIdentifier, dhTables, instructions, true, true);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, basically, you're suggesting that the API revolve around immutable parameter structs with builders? It's probably marginally more annoying to users in the console, but let's us reduce overload spam.

We could also just get out of the business of taking POJO table identifiers (or Strings; standardize on one or the other) cutting overloads by half. (Shivam points out that this will be automatically addressed by interposing the TableAdapter layer.)

Table args can just be a varargs list at the end of the method, cutting overloads again by half.

Comment on lines +1104 to +1106
newNamespaceCreated = createNamespaceIfNotExists(tableIdentifier.namespace());
newSpecAndSchema = createSpecAndSchema(useDefinition, writeInstructions);
icebergTable = createNewIcebergTable(tableIdentifier, newSpecAndSchema, writeInstructions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this stuff be done as a transaction? Possibly with removing/adding the data files, as well?

// Write the data to parquet files
int count = 0;
for (final Table dhTable : dhTables) {
final String filename = String.format(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought names came from the catalog. Table locations or whatever.

/**
* Commit the changes to the Iceberg table by creating snapshots.
*/
private static void commit(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placeholder for @rcaudy


@Value.Immutable
@BuildableStyle
public abstract class IcebergAppend {
Copy link
Contributor Author

@malhotrashivam malhotrashivam Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would need to make similar API changes for read operations too, like getTableDefinition, getTableDefinitionTable, and readTable , each of which have 4 overloads.
Is that okay?

Also, any other APIs which you would want me to change like listTablesAsTable, listSnapshotsAsTable, listNamespacesAsTable, listNamespaces, which have 2 or 3 overloads but much simpler.

cc: @devinrsmith

Comment on lines +779 to -807
final Schema schema = tableSnapshot == null ? table.schema() : table.schemas().get(tableSnapshot.schemaId());

// Do we want the latest or a specific snapshot?
final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot();
final Schema schema = snapshot == null ? table.schema() : table.schemas().get(snapshot.schemaId());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lbooker42 Can you please look at this change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
DocumentationNeeded iceberg parquet Related to the Parquet integration ReleaseNotesNeeded Release notes are needed s3
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support to write deephaven tables to iceberg
4 participants