Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileStream requires fake ObjectStore when ParquetFileReaderFactory is used #4533

Closed
Cheappie opened this issue Dec 6, 2022 · 8 comments · Fixed by #4601
Closed

FileStream requires fake ObjectStore when ParquetFileReaderFactory is used #4533

Cheappie opened this issue Dec 6, 2022 · 8 comments · Fixed by #4601
Labels
enhancement New feature or request

Comments

@Cheappie
Copy link
Contributor

Cheappie commented Dec 6, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
I am using ParquetExec in combination with ParquetFileReaderFactory to bypass ObjectStore. In order to create FileScanConfig, I need to fill a fake parameter for object_store_url, later FileStream creation fails because It tries to fetch ObjectStore that doesn't exist. Right now I work around that problem by creating fake ObjectStore that is never used.

Describe the solution you'd like
One solution that comes to my mind is making FileOpener self contained by combining each Opener with ObjectStore and FileMeta. There are some challenges with that solution, e.g. ParquetFileReaderFactory wants to own FileMeta when creating a reader, that would require to either clone FileMeta or take self in FileOpener to move FileMeta out of self. Alternatively we could have FileMeta behind shared pointer, but then we cannot move ObjectMeta out of it.

One thing that might be a positive outcome, is that in future files with different openers could be processed within single FileStream.

Please share your thoughts, I really wouldn't mind if there is a simpler way to improve that situation.

@Cheappie Cheappie added the enhancement New feature or request label Dec 6, 2022
@tustvold
Copy link
Contributor

tustvold commented Dec 7, 2022

I need to fill a fake parameter for object_store_url,

FWIW this is to support serialization of FileScanConfig to/from protobuf, which is important to Ballista.

One solution that comes to my mind is making FileOpener
There are some challenges with that solution, e.g. ParquetFileReaderFactory wants to own FileMeta when creating a reader

Would it work to just inline the ObjectStore and not the FileMeta?

@Cheappie
Copy link
Contributor Author

Cheappie commented Dec 13, 2022

Would it work to just inline the ObjectStore and not the FileMeta?

Unfortunately not, FileOpener requires ObjectStore as an argument to open fn.

There is yet another way, a bit more intrusive but in long term might be worthwhile. DataFusion could have Its own trait for ObjectStore read(get) operations, that exposes low-level interface and simplifies integration. I had to rewrite few utility fn's because they works only with ObjectStore. However that interface should allow us to wrap high-level ObjectStore and proxy get operations.

trait DFGetObjectStore { // for sure there is a better name for that trait e.g. `AsyncFileReaderFactory` :)
    ...get operations
}

struct DFGetObjectStoreProxyImpl(ObjectStore)
impl DFGetObjectStore for DFGetObjectStoreProxyImpl { 
    ...proxy get operations
}

The reason why I prefer a factory of FileReader over ObjectStore with get operations that produce bytes, is that I can hold file handle for as long as file is being read. Otherwise in my case new version of file might arrive and parquet metadata that was read a moment ago, wouldn't be valid anymore.

Could you tell me what are best practices for interacting with SessionContext ? Should It be created per query ? I wonder because there is no per request(query) id, so I cannot identify to what request some query belongs from TableProvider scan operation. However there is a session_id, so It should be possible if I would create new session for each query, but that requires cloning SessionState.

What do you think about moving schema inference into scan and removing it from TableProvider trait ?

@tustvold
Copy link
Contributor

tustvold commented Dec 13, 2022

Unfortunately not, FileOpener requires ObjectStore as an argument to open fn.

But we could change that? All the FileOpener are constructed in a context that could resolve the object store if it wanted to. All that would change is the logic would move out of FileStream::new

DataFusion could have Its own trait for ObjectStore read(get) operations, that exposes low-level interface and simplifies integration.

No objection on principle, but I'm sceptical that introducing more indirection is necessary nor desirable. We already have far more factories, provider, etc... than I think is strictly necessary and it makes reasoning about the code incredibly hard.

Could you tell me what are best practices for interacting with SessionContext ?

I don't honestly know, I believe @alamb is currently working on making the state/config slightly less impenetrable.

What do you think about moving schema inference into scan and removing it from TableProvider trait ?

I don't think this is possible, as planning needs to know the schema. In general though performing schema inference per query is very expensive, especially for non-parquet data. I strongly recommend investing in some sort of catalog to store this data.

@Cheappie
Copy link
Contributor Author

Cheappie commented Dec 13, 2022

But we could change that? All the FileOpener are constructed in a context that could resolve the object store if it wanted to. All that would change is the logic would move out of FileStream::new.

It should be possible, if we would make FileOpener open accept open(..., ctx: Arc<TaskContext>, object_store_url: ..., file_meta: FileMeta. Then each FileOpener could fetch ObjectStore on It's own.

DataFusion could have Its own trait for ObjectStore read(get) operations, that exposes low-level interface and simplifies integration.

No objection on principle, but I'm sceptical that introducing more indirection is necessary nor desirable. We already have far more factories, provider, etc... than I think is strictly necessary and it makes reasoning about the code incredibly hard.

That route should hopefully unify ParquetFileReaderFactory and ObjectStore. While enabling both low-level and high-level setups. I mean all usages of both of these would be superseded by AsyncFileReaderFactory in internals of DataFusion. But on the surface that should be possible to either pass AsyncFileReaderFactory or ObjectStore that would be wrapped by some proxy impl of AsyncFileReaderFactory. The only place where ObjectStore would be still necessary is for spills (write ops), am I right ?

Could you tell me what are best practices for interacting with SessionContext ?

I don't honestly know, I believe @alamb is currently working on making the state/config slightly less impenetrable.

Is there an open issue/pr for that work ? Maybe I could suggest adding unique id to identify requests in TableProvider scan operations.

What do you think about moving schema inference into scan and removing it from TableProvider trait ?

I don't think this is possible, as planning needs to know the schema. In general though performing schema inference per query is very expensive, especially for non-parquet data. I strongly recommend investing in some sort of catalog to store this data.

I thought that maybe schema could be kept(cached) in TableProvider implementations but It would be exposed only through scan operation. For example FileScanConfig holds a reference to the schema. Sure It could be solved differently, but in my case schema inference is not as bad as It would seem to be.

tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Dec 13, 2022
@tustvold
Copy link
Contributor

#4601 PTAL

@alamb
Copy link
Contributor

alamb commented Dec 13, 2022

Is there an open issue/pr for that work ? Maybe I could suggest adding unique id to identify requests in TableProvider scan operations.

I am tracking some work in #4349

@Cheappie
Copy link
Contributor Author

What do you think about moving schema inference into scan and removing it from TableProvider trait ?

I don't think this is possible, as planning needs to know the schema. In general though performing schema inference per query is very expensive, especially for non-parquet data. I strongly recommend investing in some sort of catalog to store this data.

I thought that maybe schema could be kept(cached) in TableProvider implementations but It would be exposed only through scan operation. For example FileScanConfig holds a reference to the schema. Sure It could be solved differently, but in my case schema inference is not as bad as It would seem to be.

bump, what do you think ? In such case, schema inference would be a detail of scan operation, It could be inferred ahead of time or on every query.

@tustvold
Copy link
Contributor

Are you suggesting not having schema information in the LogicalPlan, and only inferring schema on lowering to the physical plan? This feels like a fairly fundamental change?

tustvold added a commit that referenced this issue Dec 14, 2022
* Remove ObjectStore from FileStream (#4533)

* Fix avro
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants