Skip to content

Commit

Permalink
add hang tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Sep 19, 2023
1 parent 14afcdb commit 6f07b53
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
#include "arrow/dataset/file_parquet.h"

#include <memory>
#include <thread>
#include <utility>
#include <vector>

#include "arrow/compute/api_scalar.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/test_util_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/io/test_common.h"
#include "arrow/io/util_internal.h"
Expand Down Expand Up @@ -367,6 +369,68 @@ TEST_F(TestParquetFileFormat, MultithreadedScan) {
ASSERT_EQ(batches.size(), kNumRowGroups);
}

class AsyncBufferReader : public ::arrow::io::BufferReader {
public:
explicit AsyncBufferReader(std::shared_ptr<Buffer> buffer, const ::arrow::io::IOContext& ctx): ::arrow::io::BufferReader(std::move(buffer)), ctx_(ctx) {}
/// EXPERIMENTAL: Read data asynchronously.
Future<std::shared_ptr<Buffer>> ReadAsync(const ::arrow::io::IOContext& ctx, int64_t position,
int64_t nbytes) override {
auto self = checked_pointer_cast<AsyncBufferReader>(shared_from_this());
return DeferNotOk(ctx.executor()->Submit([self, position, nbytes]() {
return self->ReadAt(position, nbytes);
}));
}

const ::arrow::io::IOContext& io_context() const override {
return ctx_;
}

private:
::arrow::io::IOContext ctx_;
};

TEST_F(TestParquetFileFormat, SingleThreadExecutor) {
::arrow::io::IOContext default_io_context;
auto pool_executor = dynamic_cast<::arrow::internal::ThreadPool*>(default_io_context.executor());
if (pool_executor == nullptr) {
GTEST_SKIP();
}
auto origin_capacity = pool_executor->GetCapacity();
ASSERT_OK(pool_executor->SetCapacity(/*threads=*/1));

// Reset capacity for pool_executor
struct PoolResetGuard {
PoolResetGuard(::arrow::internal::ThreadPool* pool, int origin_capacity)
: pool(pool), origin_capacity(origin_capacity) {}
~PoolResetGuard() {
Status s= pool->SetCapacity(origin_capacity);
if (!s.ok()) {
std::cerr << "Failed to reset pool capacity: " << s.ToString() << std::endl;
}
}

::arrow::internal::ThreadPool* pool;
int origin_capacity;
};
PoolResetGuard guard(pool_executor, origin_capacity);

auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));

ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
auto async_buffer_reader = std::make_shared<AsyncBufferReader>(buffer, default_io_context);
auto source =
std::make_shared<FileSource>(std::move(async_buffer_reader), buffer->size());
auto options = std::make_shared<ScanOptions>();
options->io_context = default_io_context;

{
auto fragment = MakeFragment(*source);
auto count_rows = fragment->CountRows(literal(true), options);
ASSERT_OK_AND_ASSIGN(auto result, count_rows.MoveResult());
ASSERT_EQ(expected_rows(), result);
}
}

class TestParquetFileSystemDataset : public WriteFileSystemDatasetMixin,
public testing::Test {
public:
Expand Down

0 comments on commit 6f07b53

Please sign in to comment.