Skip to content

Commit

Permalink
Fixed formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
mroz45 committed Oct 16, 2024
1 parent f2fc33b commit d5972f2
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 26 deletions.
16 changes: 8 additions & 8 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
// under the License.

#include "arrow/acero/asof_join_node.h"
#include "arrow/acero/accumulation_queue.h"
#include "arrow/acero/backpressure_handler.h"
#include "arrow/acero/concurrent_queue_internal.h"
#include "arrow/acero/accumulation_queue.h"

#include <atomic>
#include <condition_variable>
Expand Down Expand Up @@ -472,7 +472,7 @@ class BackpressureController : public BackpressureControl {
std::atomic<int32_t>& backpressure_counter_;
};

class InputState: public util::SerialSequencingQueue::Processor {
class InputState : public util::SerialSequencingQueue::Processor {
// InputState corresponds to an input
// Input record batches are queued up in InputState until processed and
// turned into output record batches.
Expand All @@ -484,8 +484,8 @@ class InputState: public util::SerialSequencingQueue::Processor {
const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index)
: sequencer_(util::SerialSequencingQueue::Make(this)),
queue_(std::move(handler)),
schema_(schema),
queue_(std::move(handler)),
schema_(schema),
time_col_index_(time_col_index),
key_col_index_(key_col_index),
time_type_id_(schema_->fields()[time_col_index_]->type()->id()),
Expand Down Expand Up @@ -701,12 +701,12 @@ class InputState: public util::SerialSequencingQueue::Processor {
DEBUG_MANIP(std::endl));
return updated;
}
Status InsertBatch(ExecBatch batch){
return sequencer_->InsertBatch(std::move(batch));
Status InsertBatch(ExecBatch batch) {
return sequencer_->InsertBatch(std::move(batch));
}

Status Process(ExecBatch batch) override {
auto rb = *batch.ToRecordBatch(schema_);
auto rb = *batch.ToRecordBatch(schema_);
DEBUG_SYNC(node_, "received batch from input ", index_, ":", DEBUG_MANIP(std::endl),
rb->ToString(), DEBUG_MANIP(std::endl));
return Push(rb);
Expand Down Expand Up @@ -1412,7 +1412,7 @@ class AsofJoinNode : public ExecNode {
// InputReceived may be called after execution was finished. Pushing it to the
// InputState is unnecessary since we're done (and anyway may cause the
// BackPressureController to pause the input, causing a deadlock), so drop it.
if(::arrow::compute::kUnsequencedIndex == batch.index)
if (::arrow::compute::kUnsequencedIndex == batch.index)
return Status::Invalid("AsofJoin requires sequenced input");

if (process_task_.is_finished()) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Result<BatchesWithSchema> MakeBatchesFromNumString(
BatchesWithSchema batches;
batches.schema = schema;
int n_fields = schema->num_fields();
size_t batch_index=0;
size_t batch_index = 0;
for (auto num_batch : num_batches.batches) {
Datum two(Int32Scalar(2));
std::vector<Datum> values;
Expand Down Expand Up @@ -129,7 +129,7 @@ Result<BatchesWithSchema> MakeBatchesFromNumString(
}
}
ExecBatch batch(values, num_batch.length);
batch.index=batch_index++;
batch.index = batch_index++;
batches.batches.push_back(batch);
}
return batches;
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
SourceNodeOptions(std::shared_ptr<Schema> output_schema,
std::function<Future<std::optional<ExecBatch>>()> generator,
Ordering ordering = Ordering::Unordered())
: output_schema(std::move(output_schema)), generator(std::move(generator)),ordering(std::move(ordering)) {}
: output_schema(std::move(output_schema)),
generator(std::move(generator)),
ordering(std::move(ordering)) {}

/// \brief the schema for batches that will be generated by this source
std::shared_ptr<Schema> output_schema;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/acero/test_util_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ BatchesWithSchema MakeBatchesFromString(const std::shared_ptr<Schema>& schema,
out_batches.batches.push_back(out_batches.batches[i]);
}
}
for(size_t batch_index=0;batch_index<out_batches.batches.size();++batch_index){
out_batches.batches[batch_index].index=batch_index;
for (size_t batch_index = 0; batch_index < out_batches.batches.size(); ++batch_index) {
out_batches.batches[batch_index].index = batch_index;
}

return out_batches;
Expand Down
15 changes: 7 additions & 8 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1032,11 +1032,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
} else {
batch_gen = std::move(merged_batch_gen);
}
int64_t index=require_sequenced_output?0:compute::kUnsequencedIndex;
int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex;
auto gen = MakeMappedGenerator(
std::move(batch_gen),
[scan_options, index](const EnumeratedRecordBatch& partial)mutable
-> Result<std::optional<compute::ExecBatch>> {
[scan_options, index](const EnumeratedRecordBatch& partial) mutable
-> Result<std::optional<compute::ExecBatch>> {
// TODO(ARROW-13263) fragments may be able to attach more guarantees to batches
// than this, for example parquet's row group stats. Failing to do this leaves
// perf on the table because row group stats could be used to skip kernel execs in
Expand All @@ -1057,20 +1057,19 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
batch->values.emplace_back(partial.record_batch.index);
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
if (index!=compute::kUnsequencedIndex)
batch->index = index++;
if (index != compute::kUnsequencedIndex) batch->index = index++;
return batch;
});

auto ordering = require_sequenced_output?Ordering::Implicit():Ordering::Unordered();
auto ordering = require_sequenced_output ? Ordering::Implicit() : Ordering::Unordered();

auto fields = scan_options->dataset_schema->fields();
if (scan_options->add_augmented_fields) {
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
}
}

return acero::MakeExecNode(
"source", plan, {},
acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen), ordering});
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4018,8 +4018,8 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
bint require_sequenced_output=False

c_scan_options = Scanner._make_scan_options(dataset, scan_options)
require_sequenced_output=scan_options.get("require_sequenced_output",False)

require_sequenced_output=scan_options.get("require_sequenced_output", False)

self.wrapped.reset(
new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output)
Expand Down
13 changes: 10 additions & 3 deletions python/pyarrow/acero.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class InMemoryDataset:


def _dataset_to_decl(dataset, use_threads=True, require_sequenced_output=False):
decl = Declaration("scan", ScanNodeOptions(dataset, use_threads=use_threads, require_sequenced_output=require_sequenced_output))
decl = Declaration("scan", ScanNodeOptions(
dataset, use_threads=use_threads,
require_sequenced_output=require_sequenced_output))

# Get rid of special dataset columns
# "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
Expand Down Expand Up @@ -311,13 +313,18 @@ def _perform_join_asof(left_operand, left_on, left_by,

# Add the join node to the execplan
if isinstance(left_operand, ds.Dataset):
left_source = _dataset_to_decl(left_operand, use_threads=use_threads, require_sequenced_output=True)
left_source = _dataset_to_decl(
left_operand,
use_threads=use_threads,
require_sequenced_output=True)
else:
left_source = Declaration(
"table_source", TableSourceNodeOptions(left_operand),
)
if isinstance(right_operand, ds.Dataset):
right_source = _dataset_to_decl(right_operand, use_threads=use_threads, require_sequenced_output=True)
right_source = _dataset_to_decl(
right_operand, use_threads=use_threads,
require_sequenced_output=True)
else:
right_source = Declaration(
"table_source", TableSourceNodeOptions(right_operand)
Expand Down

0 comments on commit d5972f2

Please sign in to comment.