Skip to content

Commit

Permalink
chore(cluster): disable parallel commit of cluster tasks (#16851) (#1โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ6852)

* chore(cluster): disable parallel commit of cluster tasks

* chore(cluster): disable parallel commit of cluster tasks

Co-authored-by: Winter Zhang <coswde@gmail.com>
  • Loading branch information
dantengsky and zhang2014 committed Nov 21, 2024
1 parent b24b5b5 commit d04b4bb
Showing 1 changed file with 12 additions and 17 deletions.
29 changes: 12 additions & 17 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,28 +135,23 @@ impl ClusterHelper for Cluster {
)))
}

let mut futures = Vec::with_capacity(message.len());
let mut response = HashMap::with_capacity(message.len());
for (id, message) in message {
let node = get_node(&self.nodes, &id)?;

futures.push({
let config = GlobalConfig::instance();
let flight_address = node.flight_address.clone();
let node_secret = node.secret.clone();

async move {
let mut conn = create_client(&config, &flight_address).await?;
Ok::<_, ErrorCode>((
id,
conn.do_action::<_, Res>(path, node_secret, message, timeout)
.await?,
))
}
});
let config = GlobalConfig::instance();
let flight_address = node.flight_address.clone();
let node_secret = node.secret.clone();

let mut conn = create_client(&config, &flight_address).await?;
response.insert(
id,
conn.do_action::<_, Res>(path, node_secret, message, timeout)
.await?,
);
}

let responses: Vec<(String, Res)> = futures::future::try_join_all(futures).await?;
Ok(responses.into_iter().collect::<HashMap<String, Res>>())
Ok(response)
}
}

Expand Down

0 comments on commit d04b4bb

Please sign in to comment.