Skip to content

Commit a4ec757

Browse files
author
Rafał Hibner
committed
Merge branch 'dataset_max_rows_queued' into combined
2 parents d8ab0e3 + 12f97f7 commit a4ec757

File tree

3 files changed

+31
-21
lines changed

3 files changed

+31
-21
lines changed

cpp/src/arrow/dataset/dataset_writer.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ namespace arrow {
2929
namespace dataset {
3030
namespace internal {
3131

32-
// This lines up with our other defaults in the scanner and execution plan
33-
constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 8 * 1024 * 1024;
34-
3532
/// \brief Utility class that manages a set of writers to different paths
3633
///
3734
/// Writers may be closed and reopened (and a new file created) based on the dataset

cpp/src/arrow/dataset/file_base.cc

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -451,9 +451,11 @@ Status ValidateAndPrepareSchema(const WriteNodeOptions& write_node_options,
451451
class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
452452
public:
453453
DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> custom_schema,
454-
FileSystemDatasetWriteOptions write_options)
454+
FileSystemDatasetWriteOptions write_options,
455+
uint64_t max_rows_queued)
455456
: custom_schema_(std::move(custom_schema)),
456-
write_options_(std::move(write_options)) {}
457+
write_options_(std::move(write_options)),
458+
max_rows_queued_(max_rows_queued) {}
457459

458460
Status Init(const std::shared_ptr<Schema>& schema,
459461
acero::BackpressureControl* backpressure_control,
@@ -463,12 +465,12 @@ class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
463465
} else {
464466
schema_ = schema;
465467
}
466-
ARROW_ASSIGN_OR_RAISE(
467-
dataset_writer_,
468-
internal::DatasetWriter::Make(
469-
write_options_, plan->query_context()->async_scheduler(),
470-
[backpressure_control] { backpressure_control->Pause(); },
471-
[backpressure_control] { backpressure_control->Resume(); }, [] {}));
468+
ARROW_ASSIGN_OR_RAISE(dataset_writer_,
469+
internal::DatasetWriter::Make(
470+
write_options_, plan->query_context()->async_scheduler(),
471+
[backpressure_control] { backpressure_control->Pause(); },
472+
[backpressure_control] { backpressure_control->Resume(); },
473+
[] {}, max_rows_queued_));
472474
return Status::OK();
473475
}
474476

@@ -500,6 +502,7 @@ class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
500502
std::shared_ptr<Schema> custom_schema_;
501503
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
502504
FileSystemDatasetWriteOptions write_options_;
505+
uint64_t max_rows_queued_;
503506
Future<> finished_ = Future<>::Make();
504507
std::shared_ptr<Schema> schema_ = nullptr;
505508
};
@@ -551,8 +554,9 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,
551554
ValidateAndPrepareSchema(write_node_options, input_schema, custom_schema));
552555

553556
std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
554-
std::make_shared<DatasetWritingSinkNodeConsumer>(custom_schema,
555-
write_node_options.write_options);
557+
std::make_shared<DatasetWritingSinkNodeConsumer>(
558+
custom_schema, write_node_options.write_options,
559+
write_node_options.max_rows_queued);
556560
ARROW_ASSIGN_OR_RAISE(
557561
auto node,
558562
// to preserve order explicitly sequence the exec batches
@@ -574,20 +578,21 @@ class TeeNode : public acero::MapNode,
574578
public:
575579
TeeNode(acero::ExecPlan* plan, std::vector<acero::ExecNode*> inputs,
576580
std::shared_ptr<Schema> output_schema,
577-
FileSystemDatasetWriteOptions write_options)
581+
FileSystemDatasetWriteOptions write_options, uint64_t max_rows_queued)
578582
: MapNode(plan, std::move(inputs), std::move(output_schema)),
579-
write_options_(std::move(write_options)) {
583+
write_options_(std::move(write_options)),
584+
max_rows_queued_(max_rows_queued) {
580585
if (write_options.preserve_order) {
581586
sequencer_ = acero::util::SerialSequencingQueue::Make(this);
582587
}
583588
}
584589

585590
Status StartProducing() override {
586-
ARROW_ASSIGN_OR_RAISE(
587-
dataset_writer_,
588-
internal::DatasetWriter::Make(
589-
write_options_, plan_->query_context()->async_scheduler(),
590-
[this] { Pause(); }, [this] { Resume(); }, [this] { MapNode::Finish(); }));
591+
ARROW_ASSIGN_OR_RAISE(dataset_writer_,
592+
internal::DatasetWriter::Make(
593+
write_options_, plan_->query_context()->async_scheduler(),
594+
[this] { Pause(); }, [this] { Resume(); },
595+
[this] { MapNode::Finish(); }, max_rows_queued_));
591596
return MapNode::StartProducing();
592597
}
593598

@@ -605,7 +610,8 @@ class TeeNode : public acero::MapNode,
605610
ValidateAndPrepareSchema(write_node_options, input_schema, custom_schema));
606611

607612
return plan->EmplaceNode<TeeNode>(plan, std::move(inputs), std::move(custom_schema),
608-
std::move(write_node_options.write_options));
613+
std::move(write_node_options.write_options),
614+
write_node_options.max_rows_queued);
609615
}
610616

611617
const char* kind_name() const override { return "TeeNode"; }
@@ -654,6 +660,7 @@ class TeeNode : public acero::MapNode,
654660
private:
655661
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
656662
FileSystemDatasetWriteOptions write_options_;
663+
uint64_t max_rows_queued_;
657664
std::atomic<int32_t> backpressure_counter_ = 0;
658665
std::unique_ptr<acero::util::SerialSequencingQueue> sequencer_;
659666
};

cpp/src/arrow/dataset/file_base.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,9 @@ class ARROW_DS_EXPORT FileWriter {
385385
std::optional<int64_t> bytes_written_;
386386
};
387387

388+
// This lines up with our other defaults in the scanner and execution plan
389+
constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 8 * 1024 * 1024;
390+
388391
/// \brief Options for writing a dataset.
389392
struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
390393
/// Options for individual fragment writing.
@@ -487,6 +490,9 @@ class ARROW_DS_EXPORT WriteNodeOptions : public acero::ExecNodeOptions {
487490
std::shared_ptr<Schema> custom_schema;
488491
/// \brief Optional metadata to attach to written batches
489492
std::shared_ptr<const KeyValueMetadata> custom_metadata;
493+
494+
/// Maximum rows queued before issuing pasue to upstream node
495+
uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued;
490496
};
491497

492498
/// @}

0 commit comments

Comments
 (0)