Skip to content

Commit

Permalink
[OPPRO-135] ArrowStream::getOutput swallows errors, and destructor do…
Browse files Browse the repository at this point in the history
…esn't release ArrowArrayStream (facebookincubator#16)

* ArrowStream::getOutput swallows errors

* Also, call release hook after ArrowArrayStream is used
  • Loading branch information
zhztheplayer authored and zhejiangxiaomai committed Oct 20, 2022
1 parent 1c4c0f4 commit ebf45e1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
32 changes: 29 additions & 3 deletions velox/exec/ArrowStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,53 @@ ArrowStream::ArrowStream(
arrowStream_ = arrowStream->arrowStream();
}

ArrowStream::~ArrowStream() {
if (!isFinished0()) {
close0();
}
}

RowVectorPtr ArrowStream::getOutput() {
struct ArrowArray arrowArray;
arrowStream_->get_next(&(*arrowStream_), &arrowArray);
if (arrowStream_->get_next(&(*arrowStream_), &arrowArray)) {
VELOX_FAIL(
"Failed to call get_next on ArrowStream: " + std::string(GetError()))
}
if (arrowArray.release == NULL) {
// End of Stream.
closed_ = true;
return nullptr;
}
struct ArrowSchema arrowSchema;
arrowStream_->get_schema(&(*arrowStream_), &arrowSchema);
if (arrowStream_->get_schema(&(*arrowStream_), &arrowSchema)) {
VELOX_FAIL(
"Failed to call get_schema on ArrowStream: " + std::string(GetError()))
}
// Convert Arrow data into RowVector.
rowVector_ = std::dynamic_pointer_cast<RowVector>(
facebook::velox::importFromArrowAsViewer(arrowSchema, arrowArray));
return rowVector_;
}

const char* ArrowStream::GetError() {
return arrowStream_->get_last_error(arrowStream_.get());
}

void ArrowStream::close() {
closed_ = true;
close0();
SourceOperator::close();
}

bool ArrowStream::isFinished() {
return isFinished0();
}

void ArrowStream::close0() {
arrowStream_->release(arrowStream_.get());
closed_ = true;
}

bool ArrowStream::isFinished0() {
return closed_;
}

Expand Down
10 changes: 8 additions & 2 deletions velox/exec/ArrowStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,31 @@ class ArrowStream : public SourceOperator {
DriverCtx* driverCtx,
std::shared_ptr<const core::ArrowStreamNode> arrowStream);

virtual ~ArrowStream();

RowVectorPtr getOutput() override;

BlockingReason isBlocked(ContinueFuture* /* unused */) override {
return BlockingReason::kNotBlocked;
}

void noMoreInput() override {
Operator::noMoreInput();
close();
}

bool isFinished() override;
const char* GetError();

bool isFinished() override;
void close() override;

private:
bool closed_ = false;
RowVectorPtr rowVector_;
std::shared_ptr<ArrowArrayStream> arrowStream_;

// For calls from destructor
bool isFinished0();
void close0();
};

} // namespace facebook::velox::exec

0 comments on commit ebf45e1

Please sign in to comment.