Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds is_refreshing support for parquet read #3600

Merged
merged 14 commits into from
Mar 31, 2023

Conversation

devinrsmith
Copy link
Member

@devinrsmith devinrsmith commented Mar 24, 2023

Additionally, applies black formatting to parquet.py, and plumbs target_page_size from #2555.

Fixes #3596

Additionaly, applies black formatting to parquet.py.

Fixes deephaven#3596
@devinrsmith devinrsmith added parquet Related to the Parquet integration DocumentationNeeded ReleaseNotesNeeded Release notes are needed labels Mar 24, 2023
@devinrsmith devinrsmith added this to the Mar 2023 milestone Mar 24, 2023
@devinrsmith devinrsmith self-assigned this Mar 24, 2023
Copy link
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need additional changes:

  • io.deephaven.parquet.table.layout.KeyValuePartitionLayout is inferring partitioning column types from the set of values encountered. This will create issues if we change our mind about types on a subsequent scan. We should consider whether we can look for this scenario and provide a better error message.
  • io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout and io.deephaven.parquet.table.layout.ParquetFlatPartitionedLayout should probably record the location keys they have created in a map by path, in order to avoid recreating them. This will reduce some of the costs associated with repeat scans.
  • We should check for trailing magic numbers and refuse to create ParquetTableLocationKey instances for incomplete files.

@@ -513,8 +526,8 @@ public static Table readPartitionedTableInferSchema(
getUnboxedTypeIfBoxed(partitionValue.getClass()), null, ColumnDefinition.ColumnType.Partitioning));
}
allColumns.addAll(schemaInfo.getFirst());
return readPartitionedTable(recordingLocationKeyFinder, schemaInfo.getSecond(),
TableDefinition.of(allColumns));
return readPartitionedTable(readInstructions.isRefreshing() ? locationKeyFinder : initialKeys,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little unfortunate if it causes us to scan for keys an extra time initially. I suppose it depends on the key finder implementation from a performance standpoint.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some of these interfaces could be improved. For example, I think there is room for the user to explicitly provide the schema instead of inferring it from the first file.

This situation could also be improved by updating or changing TableLocationKeyFinder - we shouldn't need to read all of the table locations if we only want just the first one (for inferring schema).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already have an exposed public method that allows the user to specify. I also think it's not that easy for a user to do.

The key finders were built for that static use case. I agree that we could have a richer interface (findFirstKey, findNewKeys(Set)).

@devinrsmith
Copy link
Member Author

io.deephaven.parquet.table.layout.KeyValuePartitionLayout is inferring partitioning column types from the set of values encountered. This will create issues if we change our mind about types on a subsequent scan. We should consider whether we can look for this scenario and provide a better error message.

I'm assuming that all TLKs found by an instance of tableLocationKeyFinder.findKeys(...) should have consistent TableLocationKey#getPartitionKeys?

io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout and io.deephaven.parquet.table.layout.ParquetFlatPartitionedLayout should probably record the location keys they have created in a map by path, in order to avoid recreating them. This will reduce some of the costs associated with repeat scans.

I wish there was a better way to do this instead of TableLocationKeyFinder - it seems like we could have a listener based interface or some such that automatically produces / notifies when a new TableLocationKey is created / found. (In the case of file-based TableLocationKey, could be based off of java.nio.file.WatchService... of course, you could still do a polling based impl w/ a map... regardless, the caller-based TableLocationKeyFinder#findKeys seems a bit funky to me.)

@devinrsmith devinrsmith requested a review from rcaudy March 25, 2023 04:28
@devinrsmith
Copy link
Member Author

I'm very surprised that we are washing through string CSV building for io.deephaven.parquet.table.layout.KeyValuePartitionLayout; as such, I think any "costs associated with repeat scans" will be better focused on cleaning that up first?

It's going to be a much larger change if we want to cache and validate the same way for ParquetKeyValuePartitionedLayout
@devinrsmith
Copy link
Member Author

devinrsmith commented Mar 27, 2023

I'm not sure if it's "worth it" to cache ParquetFlatPartitionedLayout, seems like premature optimization, but I've gone ahead and implemented it that way.

As far as "should we check magic bytes in header/footer", that potentially adds a lot of extraneous overhead unless managed carefully. In the case of ParquetKeyValuePartitionedLayout, I think there would be a lot more groundwork we'd need to cover to get this working efficiently. I've gone ahead and added the check to ParquetFlatPartitionedLayout.

@rcaudy
Copy link
Member

rcaudy commented Mar 28, 2023

io.deephaven.parquet.table.layout.KeyValuePartitionLayout is inferring partitioning column types from the set of values encountered. This will create issues if we change our mind about types on a subsequent scan. We should consider whether we can look for this scenario and provide a better error message.

I'm assuming that all TLKs found by an instance of tableLocationKeyFinder.findKeys(...) should have consistent TableLocationKey#getPartitionKeys?

io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout and io.deephaven.parquet.table.layout.ParquetFlatPartitionedLayout should probably record the location keys they have created in a map by path, in order to avoid recreating them. This will reduce some of the costs associated with repeat scans.

I wish there was a better way to do this instead of TableLocationKeyFinder - it seems like we could have a listener based interface or some such that automatically produces / notifies when a new TableLocationKey is created / found. (In the case of file-based TableLocationKey, could be based off of java.nio.file.WatchService... of course, you could still do a polling based impl w/ a map... regardless, the caller-based TableLocationKeyFinder#findKeys seems a bit funky to me.)

Yes, we require identical partition layouts across TLKs for the same table. No way for the system to work properly without that, the partitions are columns of the data.

The deeper interfaces expect a TableLocationProvider (TLP) to support a listener model, and the typical listener then pushes into a buffer object that allows polling by the "real" engine constructs. The PollingTableLocationProvider implementation is much simpler than a lot of the other refreshing TLPs that we don't have in community right now. WatchService, in my opinion, is kind of a hot mess; it's lossy, with significant implementation differences across OSes. We can absolutely implement per-layout alternative TLPs that can discover parquet files, but it's likely a premature optimization. In one of my other replies I suggested something like findMoreKeys(Map) as way to make the key finder interface better.

@rcaudy
Copy link
Member

rcaudy commented Mar 28, 2023

I'm very surprised that we are washing through string CSV building for io.deephaven.parquet.table.layout.KeyValuePartitionLayout; as such, I think any "costs associated with repeat scans" will be better focused on cleaning that up first?

No way. I'm very, very happy with using the CSV parser here. It's basically perfect for the purpose, since it supports columnar string->type inference and conversion out of the box. I would hate to have to build that from scratch, and I like standardizing on the CSV parser's decisions. The only reasonable alternative would be to show one column at a time to some lower layer of the parser to avoid constructing the CSV text.

I admit this does present an issue for repeated evaluation. We can't have subsequent find calls pick different types. I don't see that there's a good solution, besides insisting on the types from the first evaluation continuing and complaining if we can't fit new values.

We could certainly support a version where instead of inference, the user specifies partition key names, types, and conversions-from-string.

Copy link
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better, but some of the outstanding comments stand.

@devinrsmith devinrsmith requested a review from rcaudy March 30, 2023 05:28
@devinrsmith
Copy link
Member Author

The only new safety check manifests itself if all the partitions change values at once (otherwise, there is internal logic in io.deephaven.parquet.table.layout.KeyValuePartitionLayout that ensures no inconsistencies).

io.deephaven.engine.table.impl.locations.TableDataException: PollingTableLocationProvider[StandaloneTableKey] has produced an inconsistent TableLocationKey with unexpected partition keys. expected=[TimestampHour2] actual=[TimestampHour].                         
        at io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider.verifyPartitionKeys(AbstractTableLocationProvider.java:212)                                                                                                                    
        at io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider.handleTableLocationKey(AbstractTableLocationProvider.java:113)                                                                                                                 
        at io.deephaven.parquet.table.layout.KeyValuePartitionLayout.lambda$findKeys$0(KeyValuePartitionLayout.java:163)                                                                                                                                              
        at io.deephaven.engine.rowset.RowSet.lambda$forAllRowKeys$0(RowSet.java:306)                                               
        at io.deephaven.engine.rowset.impl.singlerange.SingleRange.ixForEachLong(SingleRange.java:105)                                                                                                                                                                
        at io.deephaven.engine.rowset.impl.WritableRowSetImpl.forEachRowKey(WritableRowSetImpl.java:339)                                                                                                                                                              
        at io.deephaven.engine.rowset.RowSet.forAllRowKeys(RowSet.java:305)                                                        
        at io.deephaven.parquet.table.layout.KeyValuePartitionLayout.findKeys(KeyValuePartitionLayout.java:159)                                                                                                                                                       
        at io.deephaven.engine.table.impl.locations.impl.PollingTableLocationProvider.refresh(PollingTableLocationProvider.java:51)                                                                                                                                   
        at io.deephaven.engine.table.impl.locations.util.ExecutorTableDataRefreshService$ScheduledTableLocationProviderRefresh.refresh(ExecutorTableDataRefreshService.java:121)                                                                                      
        at io.deephaven.engine.table.impl.locations.util.ExecutorTableDataRefreshService$ScheduledSubscriptionTask.doRefresh(ExecutorTableDataRefreshService.java:87)

private void verifyPartitionKeys(@NotNull TableLocationKey locationKey) {
if (partitionKeys == null) {
partitionKeys = new ArrayList<>(locationKey.getPartitionKeys());
} else if (!equals(partitionKeys, locationKey.getPartitionKeys())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an improvement. We should consider also verifying the types of the partition values. It's a little bit tricky, though, since we'd want to allow for assignable types based on the schema (either inferred or supplied).

Comment on lines 194 to 196
_JParquetTools.writeTable(
table.j_table, _JFile(path), write_instructions
)
Copy link
Contributor

@jmao-denver jmao-denver Mar 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was formatting like that that turned @chipkent off when I first started using Black.

Copy link
Contributor

@jmao-denver jmao-denver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally quite like Black's strong opinionated style. But if we decide to use it, we should use it across all the Python code base. It is best that we reach an agreement as a team, followed up by an initial reformatting of everything, and have the necessary enforcement in place for PRs.

rcaudy
rcaudy previously approved these changes Mar 31, 2023
@devinrsmith
Copy link
Member Author

Created #3638

Copy link
Contributor

@jmao-denver jmao-denver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions about parameter default values.

Copy link
Contributor

@jmao-denver jmao-denver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python changes look good to me.

@devinrsmith devinrsmith merged commit a94110e into deephaven:main Mar 31, 2023
@devinrsmith devinrsmith deleted the refreshing-parquet branch March 31, 2023 17:21
@github-actions github-actions bot locked and limited conversation to collaborators Mar 31, 2023
@deephaven-internal
Copy link
Contributor

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
DocumentationNeeded parquet Related to the Parquet integration ReleaseNotesNeeded Release notes are needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parquet directory rerfesh service support
4 participants