Skip to content

Commit

Permalink
GH-35176: [C++] Add support for disabling threading for emscripten (#…
Browse files Browse the repository at this point in the history
…35672)

As previously discussed in #35176 this is a patch that adds an option `ARROW_ENABLE_THREADING`. When it is turned off, arrow threadpool and serial executors don't spawn threads, and instead run tasks in the main thread when futures are waited for.

It doesn't mess with threading in projects included as dependencies, e.g. multithreaded malloc implementations because if you're building for a non threaded environment, you can't use those anyway.

Basically where this is at is that it runs the test suite okay, and I think should work well enough to be a backend for pandas on emscripten/pyodide.

What this means is:
1) It is possible to use arrow in non-threaded emscripten/webassembly environments (with some build patches specific to emscripten which I'll put in once this is in)
2) Most of arrow just works, albeit slower in parts.

Things that don't work and probably won't:
1) Server stuff that relies on threads. Not a massive problem I think because environments with threading restrictions are currently typically also restricted from making servers anyway (i.e. they are web browsers)
2) Anything that relies on actually doing two things at once (for obvious reasons)

Things that don't work yet and could be fixed in future:
1) use of asynchronous file/network APIs in emscripten which would mean I/O could work efficiently in one thread.
2) asofjoin - right now the implementation relies on std::thread - it needs refactoring to work with threadpool like everything else in arrow, but I'm not sure I am expert enough in the codebase to do it well.
* Closes: #35176

Lead-authored-by: Joe Marshall <joe.marshall@nottingham.ac.uk>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Weston Pace <weston.pace@gmail.com>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
4 people authored Aug 9, 2023
1 parent f2b1c14 commit b668595
Show file tree
Hide file tree
Showing 25 changed files with 629 additions and 20 deletions.
1 change: 1 addition & 0 deletions ci/scripts/cpp_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ cmake \
-DARROW_C_FLAGS_RELWITHDEBINFO="${ARROW_C_FLAGS_RELWITHDEBINFO:-}" \
-DARROW_DATASET=${ARROW_DATASET:-ON} \
-DARROW_DEPENDENCY_SOURCE=${ARROW_DEPENDENCY_SOURCE:-AUTO} \
-DARROW_ENABLE_THREADING=${ARROW_ENABLE_THREADING:-ON} \
-DARROW_ENABLE_TIMING_TESTS=${ARROW_ENABLE_TIMING_TESTS:-ON} \
-DARROW_EXTRA_ERROR_CONTEXT=${ARROW_EXTRA_ERROR_CONTEXT:-OFF} \
-DARROW_FILESYSTEM=${ARROW_FILESYSTEM:-ON} \
Expand Down
2 changes: 2 additions & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ takes precedence over ccache if a storage backend is configured" ON)

define_option(ARROW_WITH_MUSL "Whether the system libc is musl or not" OFF)

define_option(ARROW_ENABLE_THREADING "Enable threading in Arrow core" ON)

#----------------------------------------------------------------------
set_option_category("Test and benchmark")

Expand Down
21 changes: 17 additions & 4 deletions cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,14 @@ add_arrow_acero_test(hash_join_node_test SOURCES hash_join_node_test.cc
bloom_filter_test.cc)
add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
test_nodes.cc)
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)

# asof_join_node uses std::thread internally
# and doesn't use ThreadPool so it will
# be broken if threading is turned off
if(ARROW_ENABLE_THREADING)
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
endif()

add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
add_arrow_acero_test(aggregate_node_test SOURCES aggregate_node_test.cc)
Expand Down Expand Up @@ -221,7 +228,9 @@ if(ARROW_BUILD_BENCHMARKS)
add_arrow_acero_benchmark(project_benchmark SOURCES benchmark_util.cc
project_benchmark.cc)

add_arrow_acero_benchmark(asof_join_benchmark SOURCES asof_join_benchmark.cc)
if(ARROW_ENABLE_THREADING)
add_arrow_acero_benchmark(asof_join_benchmark SOURCES asof_join_benchmark.cc)
endif()

add_arrow_acero_benchmark(tpch_benchmark SOURCES tpch_benchmark.cc)

Expand All @@ -244,7 +253,9 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static)
if(ARROW_ENABLE_THREADING)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static)
endif()
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static)
Expand All @@ -253,7 +264,9 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared)
if(ARROW_ENABLE_THREADING)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared)
endif()
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared)
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/config.h"
#include "arrow/util/future.h"
#include "arrow/util/string.h"

Expand Down Expand Up @@ -1707,6 +1708,10 @@ class AsofJoinNode : public ExecNode {
}

Status StartProducing() override {
#ifndef ARROW_ENABLE_THREADING
return Status::NotImplemented("ASOF join requires threading enabled");
#endif

ARROW_ASSIGN_OR_RAISE(process_task_, plan_->query_context()->BeginExternalTask(
"AsofJoinNode::ProcessThread"));
if (!process_task_.is_valid()) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/acero/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "arrow/acero/util.h" // PREFETCH
#include "arrow/util/bit_util.h" // Log2
#include "arrow/util/bitmap_ops.h" // CountSetBits
#include "arrow/util/config.h"

namespace arrow {
namespace acero {
Expand Down Expand Up @@ -426,6 +427,9 @@ void BloomFilterBuilder_Parallel::CleanUp() {

std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
BloomFilterBuildStrategy strategy) {
#ifndef ARROW_ENABLE_THREADING
strategy = BloomFilterBuildStrategy::SINGLE_THREADED;
#endif
switch (strategy) {
case BloomFilterBuildStrategy::SINGLE_THREADED: {
std::unique_ptr<BloomFilterBuilder> impl{new BloomFilterBuilder_SingleThreaded()};
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/arrow/acero/bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "arrow/acero/util.h"
#include "arrow/compute/key_hash.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/config.h"
#include "arrow/util/cpu_info.h"

namespace arrow {

Expand Down Expand Up @@ -468,7 +470,7 @@ TEST(BloomFilter, Basic) {

std::vector<BloomFilterBuildStrategy> strategies;
strategies.push_back(BloomFilterBuildStrategy::SINGLE_THREADED);
#ifndef ARROW_VALGRIND
#if defined(ARROW_ENABLE_THREADING) && !defined(ARROW_VALGRIND)
strategies.push_back(BloomFilterBuildStrategy::PARALLEL);
#endif

Expand Down Expand Up @@ -501,7 +503,10 @@ TEST(BloomFilter, Scaling) {
num_build.push_back(4000000);

std::vector<BloomFilterBuildStrategy> strategies;
#ifdef ARROW_ENABLE_THREADING
strategies.push_back(BloomFilterBuildStrategy::PARALLEL);
#endif
strategies.push_back(BloomFilterBuildStrategy::SINGLE_THREADED);

for (const auto hardware_flags : HardwareFlagsForTesting()) {
for (const auto& strategy : strategies) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/acero/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/thread_pool.h"
Expand Down Expand Up @@ -1619,6 +1620,9 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) {
}

TEST(ExecPlanExecution, SegmentedAggregationWithMultiThreading) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading enabled";
#endif
BatchesWithSchema data;
data.batches = {ExecBatchFromJSON({int32()}, "[[1]]")};
data.schema = schema({field("i32", int32())});
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/acero/task_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <mutex>

#include "arrow/util/config.h"
#include "arrow/util/logging.h"

namespace arrow {
Expand Down Expand Up @@ -316,7 +317,11 @@ Status TaskSchedulerImpl::StartScheduling(size_t thread_id, ScheduleImpl schedul
int num_concurrent_tasks,
bool use_sync_execution) {
schedule_impl_ = std::move(schedule_impl);
#ifdef ARROW_ENABLE_THREADING
use_sync_execution_ = use_sync_execution;
#else
use_sync_execution_ = true;
#endif
num_concurrent_tasks_ = num_concurrent_tasks;
num_tasks_to_schedule_.value += num_concurrent_tasks;
return ScheduleMore(thread_id);
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/acero/visibility.h"
#include "arrow/status.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"

namespace arrow {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/acero/task_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "arrow/acero/util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/config.h"
#include "arrow/util/thread_pool.h"

namespace arrow {
Expand Down Expand Up @@ -101,6 +102,9 @@ TaskScheduler::TaskGroupContinuationImpl MakeFinalContinuation(
// concurrently. When all groups in that stage finish the next
// stage is started.
TEST(TaskScheduler, Stress) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading support";
#endif
constexpr int kNumThreads = 8;
constexpr int kNumGroups = 8;
constexpr int kGroupsPerStage = 3;
Expand Down Expand Up @@ -176,6 +180,9 @@ TEST(TaskScheduler, Stress) {
// thread starts a task group while another thread is finishing
// the last of its tasks.
TEST(TaskScheduler, StressTwo) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading support";
#endif
constexpr int kNumThreads = 16;
constexpr int kNumGroups = 8;
constexpr int kTasksPerGroup = 1;
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/dataset/dataset_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "arrow/table.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/config.h"
#include "gtest/gtest.h"

using namespace std::string_view_literals; // NOLINT
Expand Down Expand Up @@ -380,6 +381,9 @@ TEST_F(DatasetWriterTestFixture, MinRowGroupBackpressure) {
}

TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Concurrent writes tests need threads";
#endif
// Use a gated filesystem to queue up many writes behind a file open to make sure the
// file isn't opened multiple times.
auto gated_fs = UseGatedFs();
Expand All @@ -394,6 +398,9 @@ TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
}

TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Concurrent writes tests need threads";
#endif
// NBATCHES must be less than I/O executor concurrency to avoid deadlock / test failure
constexpr int NBATCHES = 6;
auto gated_fs = UseGatedFs();
Expand All @@ -412,6 +419,9 @@ TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) {
}

TEST_F(DatasetWriterTestFixture, MaxOpenFiles) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Concurrent writes tests need threads";
#endif
auto gated_fs = UseGatedFs();
std::atomic<bool> paused = false;
write_options_.max_open_files = 2;
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/engine/substrait/serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator_fwd.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/config.h"
#include "arrow/util/decimal.h"
#include "arrow/util/future.h"
#include "arrow/util/hash_util.h"
Expand Down Expand Up @@ -4458,6 +4459,9 @@ TEST(Substrait, SetRelationBasic) {
}

TEST(Substrait, PlanWithAsOfJoinExtension) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "ASOF join requires threading";
#endif
// This demos an extension relation
std::string substrait_json = R"({
"extensionUris": [],
Expand Down Expand Up @@ -5477,6 +5481,10 @@ TEST(Substrait, MixedSort) {
}

TEST(Substrait, PlanWithExtension) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "ASOF join requires threading";
#endif

// This demos an extension relation
std::string substrait_json = R"({
"extensionUris": [],
Expand Down Expand Up @@ -5665,6 +5673,9 @@ TEST(Substrait, PlanWithExtension) {
}

TEST(Substrait, AsOfJoinDefaultEmit) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "ASOF join requires threading";
#endif
std::string substrait_json = R"({
"extensionUris": [],
"extensions": [],
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/io/memory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "arrow/testing/util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/config.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -918,6 +919,9 @@ TEST(CacheOptions, Basics) {
}

TEST(IOThreadPool, Capacity) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading enabled";
#endif
// Simple sanity check
auto pool = internal::GetIOThreadPool();
int capacity = pool->GetCapacity();
Expand Down
Loading

0 comments on commit b668595

Please sign in to comment.