Skip to content

Commit

Permalink
wip: refactoring pruner
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Jul 25, 2022
1 parent 3257e7b commit 72a43d7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 13 deletions.
18 changes: 8 additions & 10 deletions query/src/storages/fuse/pruning/block_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ impl BlockPruner {
push_down: &Option<Extras>,
) -> Result<Vec<(usize, BlockMeta)>> {
let segment_locs = self.table_snapshot.segments.clone();
let segment_num = segment_locs.len();

if segment_locs.is_empty() {
return Ok(vec![]);
Expand All @@ -75,10 +74,7 @@ impl BlockPruner {
.filter(|p| p.order_by.is_empty())
.and_then(|p| p.limit);

let filter_expr = push_down
.as_ref()
.map(|extra| extra.filters.get(0))
.flatten();
let filter_expr = push_down.as_ref().and_then(|extra| extra.filters.get(0));

let segment_num = segment_locs.len();

Expand All @@ -92,29 +88,31 @@ impl BlockPruner {
.blocks
.clone()
.into_iter()
.map(move |item| (idx, item)),
.map(move |item| (idx, item))
.collect::<Vec<_>>(),
)
})
.buffered(std::cmp::min(10, segment_num))
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten();
return Ok(a.collect::<Vec<_>>());
.flatten()
.collect::<Vec<_>>();
return Ok(a);
}

// Segments and blocks are accumulated concurrently, thus an atomic counter is used
// to **try** collecting as less blocks as possible. But concurrency is preferred to
// "accuracy". In [FuseTable::do_read_partitions], the "limit" will be treated precisely.

//
let accumulated_rows = AtomicUsize::new(0);

// A !Copy Wrapper of u64
struct NonCopy<T>(T);

// convert u64 (which is Copy) into NonCopy( struct which is !Copy)
// so that "async move" can be avoided in the latter async block
// See https://github.com/rust-lang/rust/issues/81653

let segment_locs = segment_locs
.into_iter()
.enumerate()
Expand Down
6 changes: 3 additions & 3 deletions tests/logictest/http_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def format_result(results):
if buf == "":
buf = str(item)
else:
buf = buf + " " + str(
item) # every item seperate by space
buf = buf + " " + str(item) # every item seperate by space
if len(buf) == 0:
# empty line in results will replace with tab
buf = "\t"
Expand Down Expand Up @@ -189,7 +188,7 @@ def query_with_session(self, statement):
try:
resp = requests.get(url="http://{}:{}{}".format(
self._host, self._port, response['next_uri']),
headers=self.make_headers())
headers=self.make_headers())
response = json.loads(resp.content)
log.debug(
f"Sql in progress, fetch next_uri content: {response}")
Expand Down Expand Up @@ -225,6 +224,7 @@ def fetch_all(self, statement):
def get_query_option(self):
return self._query_option


# if __name__ == '__main__':
# from config import http_config
# connector = HttpConnector()
Expand Down

0 comments on commit 72a43d7

Please sign in to comment.