Skip to content

Commit

Permalink
fix(query): fix max_files bug when force=false (#10801)
Browse files Browse the repository at this point in the history
* fix(query): fix max_files bug when force=false

* fix conversation

* fix conversation

* fix clippy err

* replace mut to copy
  • Loading branch information
TCeason authored Mar 28, 2023
1 parent 59a1cf0 commit 2298a02
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 35 deletions.
17 changes: 8 additions & 9 deletions src/common/storage/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ impl StageFilesInfo {
let mut res = Vec::new();
let mut limit: usize = 0;
for file in files {
if limit == max_files {
break;
} else {
limit += 1;
}
let full_path = Path::new(&self.path)
.join(file)
.to_string_lossy()
Expand All @@ -136,7 +131,11 @@ impl StageFilesInfo {
)));
}
if first_only {
break;
return Ok(res);
}
limit += 1;
if limit == max_files {
return Ok(res);
}
}
Ok(res)
Expand Down Expand Up @@ -228,12 +227,12 @@ impl StageFilesInfo {
while let Some(obj) = list.try_next().await? {
let meta = operator.metadata(&obj, StageFileInfo::meta_query()).await?;
if check_file(obj.path(), meta.mode(), &pattern) {
if limit == max_files {
files.push(StageFileInfo::new(obj.path().to_string(), &meta));
if first_only {
return Ok(files);
}
limit += 1;
files.push(StageFileInfo::new(obj.path().to_string(), &meta));
if first_only {
if limit == max_files {
return Ok(files);
}
}
Expand Down
57 changes: 32 additions & 25 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::min;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
Expand Down Expand Up @@ -502,52 +503,58 @@ impl TableContext for QueryContext {
max_files: Option<usize>,
) -> Result<Vec<StageFileInfo>> {
let tenant = self.get_tenant();
let files = files.clone();
let catalog = self.get_catalog(catalog_name)?;
let table = catalog
.get_table(&tenant, database_name, table_name)
.await?;
let table_id = table.get_id();

let mut limit: usize = 0;
let max_files = max_files.unwrap_or(MAX_QUERY_COPIED_FILES_NUM);
let max_copied_files = min(MAX_QUERY_COPIED_FILES_NUM, max_files);
let mut copied_files = BTreeMap::new();
for chunk in files.chunks(MAX_QUERY_COPIED_FILES_NUM) {

let mut results = Vec::with_capacity(files.len());

for chunk in files.chunks(max_copied_files) {
let files = chunk.iter().map(|v| v.path.clone()).collect::<Vec<_>>();
let req = GetTableCopiedFileReq { table_id, files };
let resp = catalog
.get_table_copied_file_info(&tenant, database_name, req)
.await?;
copied_files.extend(resp.file_info);
}

let max_files = max_files.unwrap_or(usize::MAX);
// Colored.
let mut results = Vec::with_capacity(files.len());
for mut file in files {
if limit == max_files {
break;
}
limit += 1;
if let Some(copied_file) = copied_files.get(&file.path) {
match &copied_file.etag {
Some(copied_etag) => {
if let Some(file_etag) = &file.etag {
// Check the 7 bytes etag prefix.
if file_etag.starts_with(copied_etag) {
// Colored
for file in chunk {
let mut file = file.clone();
if let Some(copied_file) = copied_files.get(&file.path) {
match &copied_file.etag {
Some(copied_etag) => {
if let Some(file_etag) = &file.etag {
// Check the 7 bytes etag prefix.
if file_etag.starts_with(copied_etag) {
file.status = StageFileStatus::AlreadyCopied;
}
}
}
None => {
// etag is none, compare with content_length and last_modified.
if copied_file.content_length == file.size
&& copied_file.last_modified == Some(file.last_modified)
{
file.status = StageFileStatus::AlreadyCopied;
}
}
}
None => {
// etag is none, compare with content_length and last_modified.
if copied_file.content_length == file.size
&& copied_file.last_modified == Some(file.last_modified)
{
file.status = StageFileStatus::AlreadyCopied;
}
}
if file.status == StageFileStatus::NeedCopy {
results.push(file);
limit += 1;
if limit == max_files {
return Ok(results);
}
}
}
results.push(file);
}
Ok(results)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
4
8
12
4
4
6
6
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ EOF
echo "copy into test_max_files_force_true from 'fs:///tmp/05_02_06/' FILE_FORMAT = (type = CSV) max_files=2 force=true" | $MYSQL_CLIENT_CONNECT
echo "select count(*) from test_max_files_force_true" | $MYSQL_CLIENT_CONNECT

echo "copy into test_max_files_force_true from 'fs:///tmp/05_02_06/' FILE_FORMAT = (type = CSV) max_files=2 force=true" | $MYSQL_CLIENT_CONNECT
echo "select count(*) from test_max_files_force_true" | $MYSQL_CLIENT_CONNECT

echo "copy into test_max_files_force_true from 'fs:///tmp/05_02_06/' FILE_FORMAT = (type = CSV) max_files=2 force=true" | $MYSQL_CLIENT_CONNECT
echo "select count(*) from test_max_files_force_true" | $MYSQL_CLIENT_CONNECT

echo "copy into test_max_files_force_false from 'fs:///tmp/05_02_06/' FILE_FORMAT = (type = CSV) max_files=2 force=false" | $MYSQL_CLIENT_CONNECT
echo "select count(*) from test_max_files_force_false" | $MYSQL_CLIENT_CONNECT

echo "copy into test_max_files_force_false from 'fs:///tmp/05_02_06/' FILE_FORMAT = (type = CSV) max_files=2 force=false" | $MYSQL_CLIENT_CONNECT
echo "select count(*) from test_max_files_force_false" | $MYSQL_CLIENT_CONNECT
Expand Down

1 comment on commit 2298a02

@vercel
Copy link

@vercel vercel bot commented on 2298a02 Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.vercel.app
databend.rs

Please sign in to comment.