Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): Fix compressed buf not consumed correctly #5727

Merged
merged 13 commits into from
Jun 4, 2022
Prev Previous commit
Next Next commit
Fix list file
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Jun 3, 2022
commit 31c12a318742598ff3fd2fc30400b6cd12eff732
59 changes: 0 additions & 59 deletions common/io/src/files.rs

This file was deleted.

1 change: 0 additions & 1 deletion common/io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@ mod binary_write;

mod buffer;
mod configs;
mod files;
mod format_settings;
mod marshal;
mod operator;
1 change: 0 additions & 1 deletion common/io/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@ pub use crate::configs::StorageFsConfig;
pub use crate::configs::StorageHdfsConfig;
pub use crate::configs::StorageParams;
pub use crate::configs::StorageS3Config;
pub use crate::files::operator_list_files;
pub use crate::format_settings::Compression;
pub use crate::format_settings::FormatSettings;
pub use crate::marshal::Marshal;
12 changes: 10 additions & 2 deletions query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ use std::sync::Arc;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::operator_list_files;
use common_planners::CopyMode;
use common_planners::CopyPlan;
use common_planners::PlanNode;
@@ -29,6 +28,7 @@ use common_planners::StageTableInfo;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;
use futures::StreamExt;
use futures::TryStreamExt;
use regex::Regex;

@@ -77,7 +77,15 @@ impl CopyInterpreter {
files_with_path
} else {
let op = StageSource::get_op(&self.ctx, &table_info.stage_info).await?;
operator_list_files(&op, path).await?
let mut list = vec![];

// TODO: we could rewrite into try_collect.
let mut objects = op.object(path).list().await?;
while let Some(object) = objects.next().await {
list.push(object?.path());
}

list
};

Ok(files_with_path)
21 changes: 19 additions & 2 deletions query/src/interpreters/interpreter_list.rs
Original file line number Diff line number Diff line change
@@ -12,18 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;
use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::Series;
use common_datavalues::SeriesFrom;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::operator_list_files;
use common_planners::ListPlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;
use futures::StreamExt;
use regex::Regex;

use crate::interpreters::Interpreter;
@@ -45,7 +46,23 @@ impl ListInterpreter {
let op = StageSource::get_op(&self.ctx, &self.plan.stage).await?;
let pattern = &self.plan.pattern;
let path = &self.plan.path;
let mut files = operator_list_files(&op, path).await?;

let mut files = if path.ends_with('/') {
let mut list = vec![];
let mut objects = op.object(path).list().await?;
while let Some(object) = objects.next().await {
let name = object?.name();
list.push(name);
}
list
} else {
let o = op.object(path);
match o.metadata().await {
Ok(_) => vec![o.name()],
Err(e) if e.kind() == io::ErrorKind::NotFound => vec![],
Err(e) => return Err(e.into()),
}
};

if !pattern.is_empty() {
let regex = Regex::new(pattern).map_err(|e| {
5 changes: 2 additions & 3 deletions query/src/servers/http/v1/stage.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_io::prelude::StorageParams;
use common_meta_types::StageType;
use poem::error::InternalServerError;
use poem::error::Result as PoemResult;
@@ -74,8 +73,8 @@ pub async fn upload_to_stage(
.headers()
.get("relative_path")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.trim_matches(|c| c == '/');
.unwrap_or("/")
.to_string();

match stage.stage_type {
// It's internal, so we already have an op which has the root path