-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-37487: [C++][Parquet] Dataset: Implement sync ParquetFileFormat::GetReader
#37514
Conversation
|
79bb6d0
to
555eb62
Compare
555eb62
to
a23d4db
Compare
I think we already have the test for |
@pitrou @westonpace would you mind take a look? |
@westonpace By the way, call Blocking Get in same thread pool might cause the deadlock, should we try to find out which would causing this and trying to fix them? |
I'll try and look in more detail later today but here is the general advice. Async methods which return a future. User calls always start out as synchronous methods (e.g. we don't have adapters for python's asyncio) We can convert from synchronous to asynchronous using something like...
This is ok. One conversion from synchronous to asynchronous is ok and inevitable (since user calls start as sync). However, it is not ok to convert from asynchronous to synchronous. For example, if
This is also not ok:
Normally the fix is for Creating a At a glance, it looks like that is what is happening here, which is maybe ok. However, I have to look in more detail. |
Thanks for your nice advice! I think this patch would be OK, I can check more Async usage around dataset. |
@westonpace can we try to review this patch first? Since I believe at least this patch might solve the problem :-) |
@pitrou Would you mind help take a look? Since the logic is easy and simple here... |
ping @pitrou @westonpace @bkietz for help... |
This isn't tested and we don't have follow up from the original issue's poster that this patch fixes the deadlock. @mapleFU could you add (something like) the original repro as a test? It'd be best if we can confirm that # test.py
import pyarrow.dataset as pads
ds = pads.dataset("/tmp/test.parq", format="parquet") # can be any parquet
ds.count_rows(use_threads=False)
ARROW_IO_THREADS=1 python test.py @yiteng-guo could you try out this patch and confirm it fixes the deadlock for you? |
Hi @bkietz , I've write a test using a extended file-reader, it can reproduce the case.
However, default io-pool is widely used in scanner, it's a bit hard to hook all io_context( Might need to change Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR, const std::shared_ptr<ScanOptions>& scan_options=NULLPTR); So I change the global thread pool during testing, would you mind take a look? |
6f07b53
to
6fe5f7e
Compare
::arrow::io::IOContext default_io_context; | ||
auto pool_executor = | ||
dynamic_cast<::arrow::internal::ThreadPool*>(default_io_context.executor()); | ||
if (pool_executor == nullptr) { | ||
GTEST_SKIP(); | ||
} | ||
auto origin_capacity = pool_executor->GetCapacity(); | ||
ASSERT_OK(pool_executor->SetCapacity(/*threads=*/1)); | ||
|
||
// Reset capacity for pool_executor | ||
struct PoolResetGuard { | ||
PoolResetGuard(::arrow::internal::ThreadPool* pool, int origin_capacity) | ||
: pool(pool), origin_capacity(origin_capacity) {} | ||
~PoolResetGuard() { | ||
Status s = pool->SetCapacity(origin_capacity); | ||
if (!s.ok()) { | ||
std::cerr << "Failed to reset pool capacity: " << s.ToString() << std::endl; | ||
} | ||
} | ||
|
||
::arrow::internal::ThreadPool* pool; | ||
int origin_capacity; | ||
}; | ||
PoolResetGuard guard(pool_executor, origin_capacity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use SetIOThreadPoolCapacity
and GetIOThreadPoolCapacity
here:
::arrow::io::IOContext default_io_context; | |
auto pool_executor = | |
dynamic_cast<::arrow::internal::ThreadPool*>(default_io_context.executor()); | |
if (pool_executor == nullptr) { | |
GTEST_SKIP(); | |
} | |
auto origin_capacity = pool_executor->GetCapacity(); | |
ASSERT_OK(pool_executor->SetCapacity(/*threads=*/1)); | |
// Reset capacity for pool_executor | |
struct PoolResetGuard { | |
PoolResetGuard(::arrow::internal::ThreadPool* pool, int origin_capacity) | |
: pool(pool), origin_capacity(origin_capacity) {} | |
~PoolResetGuard() { | |
Status s = pool->SetCapacity(origin_capacity); | |
if (!s.ok()) { | |
std::cerr << "Failed to reset pool capacity: " << s.ToString() << std::endl; | |
} | |
} | |
::arrow::internal::ThreadPool* pool; | |
int origin_capacity; | |
}; | |
PoolResetGuard guard(pool_executor, origin_capacity); | |
// Reset capacity for pool_executor | |
struct PoolResetGuard { | |
int original_capacity = io::GetIOThreadPoolCapacity(); | |
~PoolResetGuard() { | |
DCHECK_OK(io::SetIOThreadPoolCapacity(original_capacity)); | |
} | |
} guard; |
Since this directly mutates the IO thread pool (pointed to by the default IO context), you shouldn't need AsyncBufferReader::ReadAsync
I agree that it's odd BufferReader::ReadAsync
and MemoryMappedFile::ReadAsync
(the only overrides I see) ignore the io context argument. @lidavidm do you think they should transfer to the other executor if it's different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, firstly I'm trying to change ParquetFileFragment
uses same io-executor, but I guess it should be put in another patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it's odd BufferReader::ReadAsync and MemoryMappedFile::ReadAsync (the only overrides I see) ignore the io context argument
I think BufferReader::ReadAsync
is so lightweight that we can ignore ioContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given both access memory/"memory" I think that's quite intentional
@bkietz I've tried to resolve the comments, would you mind take a look again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit 76c4a6e. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about possible false positives for unstable benchmarks that are known to sometimes produce them. |
…rmat::GetReader` (apache#37514) ### Rationale for this change As apache#37487 says. When thread cnt == 1, the thread might blocking in `ParquetFileFormat::GetReaderAsync`, that's because: 1. `ParquetFileFormat::CountRows` would call `EnsureCompleteMetadata` in `io_executor` 2. `EnsureCompleteMetadata` call `ParquetFileFormat::GetReader`, which dispatch real request to async mode 3. `async` is executed in `io_executor`. 1/3 in same fix-sized executor, causing deadlock. ### What changes are included in this PR? Implement sync `ParquetFileFormat::GetReader`. ### Are these changes tested? Currently not ### Are there any user-facing changes? Bugfix * Closes: apache#37487 Authored-by: mwish <maplewish117@gmail.com> Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
…rmat::GetReader` (apache#37514) ### Rationale for this change As apache#37487 says. When thread cnt == 1, the thread might blocking in `ParquetFileFormat::GetReaderAsync`, that's because: 1. `ParquetFileFormat::CountRows` would call `EnsureCompleteMetadata` in `io_executor` 2. `EnsureCompleteMetadata` call `ParquetFileFormat::GetReader`, which dispatch real request to async mode 3. `async` is executed in `io_executor`. 1/3 in same fix-sized executor, causing deadlock. ### What changes are included in this PR? Implement sync `ParquetFileFormat::GetReader`. ### Are these changes tested? Currently not ### Are there any user-facing changes? Bugfix * Closes: apache#37487 Authored-by: mwish <maplewish117@gmail.com> Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
Rationale for this change
As #37487 says. When thread cnt == 1, the thread might blocking in
ParquetFileFormat::GetReaderAsync
, that's because:ParquetFileFormat::CountRows
would callEnsureCompleteMetadata
inio_executor
EnsureCompleteMetadata
callParquetFileFormat::GetReader
, which dispatch real request to async modeasync
is executed inio_executor
.1/3 in same fix-sized executor, causing deadlock.
What changes are included in this PR?
Implement sync
ParquetFileFormat::GetReader
.Are these changes tested?
Currently not
Are there any user-facing changes?
Bugfix