Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(planner): add more execution information in system.query_profile #11876

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/query/pipeline/transforms/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod profile_wrapper;
pub mod profile_wrapper;
pub mod transforms;
pub use profile_wrapper::ProfileWrapper;
pub use transforms::Aborting;
101 changes: 89 additions & 12 deletions src/query/pipeline/transforms/src/processors/profile_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,52 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Instant;

use common_exception::Result;
use common_expression::DataBlock;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::Processor;
use common_profile::ProcessorProfile;
use common_profile::SharedProcessorProfiles;

pub struct ProfileWrapper<T> {
use crate::processors::transforms::Transform;
use crate::processors::transforms::Transformer;

/// A profile wrapper for `Processor` trait.
/// This wrapper will record the time cost of each processor.
/// But because of the limitation of `Processor` trait,
/// we can't get the number of rows processed by the processor.
pub struct ProcessorProfileWrapper<T> {
inner: T,
prof_span_id: u32,
prof_span_set: SharedProcessorProfiles,
prof_id: u32,
proc_profs: SharedProcessorProfiles,

prof: ProcessorProfile,
}

impl<T> ProfileWrapper<T>
impl<T> ProcessorProfileWrapper<T>
where T: Processor + 'static
{
pub fn create(
inner: T,
prof_span_id: u32,
prof_span_set: SharedProcessorProfiles,
prof_id: u32,
proc_profs: SharedProcessorProfiles,
) -> Box<dyn Processor> {
Box::new(Self {
inner,
prof_span_id,
prof_span_set,
prof_id,
proc_profs,
prof: ProcessorProfile::default(),
})
}
}

#[async_trait::async_trait]
impl<T> Processor for ProfileWrapper<T>
impl<T> Processor for ProcessorProfileWrapper<T>
where T: Processor + 'static
{
fn name(&self) -> String {
Expand All @@ -60,10 +71,10 @@ where T: Processor + 'static
fn event(&mut self) -> Result<Event> {
match self.inner.event()? {
Event::Finished => {
self.prof_span_set
self.proc_profs
.lock()
.unwrap()
.update(self.prof_span_id, self.prof);
.update(self.prof_id, self.prof);
Ok(Event::Finished)
}
v => Ok(v),
Expand All @@ -74,7 +85,11 @@ where T: Processor + 'static
let instant = Instant::now();
self.inner.process()?;
let elapsed = instant.elapsed();
self.prof = self.prof + ProcessorProfile { cpu_time: elapsed };
self.prof = self.prof
+ ProcessorProfile {
cpu_time: elapsed,
..Default::default()
};
Ok(())
}

Expand All @@ -84,3 +99,65 @@ where T: Processor + 'static
self.inner.async_process().await
}
}

/// A profile wrapper for `Transform` trait.
/// This wrapper will record the time cost and the information
/// about the number of rows processed by the processor.
pub struct TransformProfileWrapper<T> {
inner: T,
prof_id: u32,
proc_profs: SharedProcessorProfiles,

prof: ProcessorProfile,
}

impl<T> TransformProfileWrapper<T>
where T: Transform + 'static
{
pub fn create(
inner: T,
input_port: Arc<InputPort>,
output_port: Arc<OutputPort>,
prof_id: u32,
proc_profs: SharedProcessorProfiles,
) -> Box<dyn Processor> {
Box::new(Transformer::create(input_port, output_port, Self {
inner,
prof_id,
proc_profs,
prof: ProcessorProfile::default(),
}))
}
}

impl<T> Transform for TransformProfileWrapper<T>
where T: Transform + 'static
{
const NAME: &'static str = "TransformProfileWrapper";

fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
let input_rows = data.num_rows();
let input_bytes = data.memory_size();

let instant = Instant::now();
let res = self.inner.transform(data)?;
let elapsed = instant.elapsed();
self.prof = self.prof
+ ProcessorProfile {
cpu_time: elapsed,
input_rows: self.prof.input_rows + input_rows,
input_bytes: self.prof.input_bytes + input_bytes,
output_rows: self.prof.output_rows + res.num_rows(),
output_bytes: self.prof.output_bytes + res.memory_size(),
};
Ok(res)
}

fn on_finish(&mut self) -> Result<()> {
self.proc_profs
.lock()
.unwrap()
.update(self.prof_id, self.prof);
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::transform_multi_sort_merge::try_add_multi_sort_merge;
use super::transform_sort_merge::try_create_transform_sort_merge;
use super::transform_sort_merge_limit::try_create_transform_sort_merge_limit;
use super::TransformSortPartial;
use crate::processors::ProfileWrapper;
use crate::processors::profile_wrapper::ProcessorProfileWrapper;

#[allow(clippy::too_many_arguments)]
pub fn build_full_sort_pipeline(
Expand All @@ -43,7 +43,7 @@ pub fn build_full_sort_pipeline(
let transform =
TransformSortPartial::try_create(input, output, limit, sort_desc.clone())?;
if let Some((plan_id, prof)) = &prof_info {
Ok(ProcessorPtr::create(ProfileWrapper::create(
Ok(ProcessorPtr::create(ProcessorProfileWrapper::create(
transform,
*plan_id,
prof.clone(),
Expand Down Expand Up @@ -78,7 +78,7 @@ pub fn build_full_sort_pipeline(
};

if let Some((plan_id, prof)) = &prof_info {
Ok(ProcessorPtr::create(ProfileWrapper::create(
Ok(ProcessorPtr::create(ProcessorProfileWrapper::create(
transform,
*plan_id,
prof.clone(),
Expand Down
12 changes: 12 additions & 0 deletions src/query/profile/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ pub type SharedProcessorProfiles = Arc<Mutex<ProcessorProfiles<u32>>>;
pub struct ProcessorProfile {
/// The time spent to process in nanoseconds
pub cpu_time: Duration,
/// Row count of the input data
pub input_rows: usize,
/// Byte size of the input data
pub input_bytes: usize,
/// Row count of the output data
pub output_rows: usize,
/// Byte size of the output data
pub output_bytes: usize,
}

impl std::ops::Add for ProcessorProfile {
Expand All @@ -42,6 +50,10 @@ impl std::ops::Add for ProcessorProfile {
fn add(self, rhs: Self) -> Self::Output {
Self {
cpu_time: self.cpu_time + rhs.cpu_time,
input_rows: self.input_rows + rhs.input_rows,
input_bytes: self.input_bytes + rhs.input_bytes,
output_rows: self.output_rows + rhs.output_rows,
output_bytes: self.output_bytes + rhs.output_bytes,
}
}
}
Expand Down
33 changes: 31 additions & 2 deletions src/query/profile/src/prof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::fmt::Display;
use std::fmt::Formatter;
use std::time::Duration;

use crate::ProcessorProfile;

#[derive(Debug, Clone)]
pub struct QueryProfile {
/// Query ID of the query profile
Expand Down Expand Up @@ -45,8 +47,8 @@ pub struct OperatorProfile {
/// IDs of the children plan nodes
pub children: Vec<u32>,

/// The time spent to process data
pub cpu_time: Duration,
/// The execution information of the plan operator
pub execution_info: OperatorExecutionInfo,

/// Attribute of the plan operator
pub attribute: OperatorAttribute,
Expand Down Expand Up @@ -95,6 +97,33 @@ impl Display for OperatorType {
}
}

#[derive(Debug, Clone, Default)]
pub struct OperatorExecutionInfo {
pub process_time: Duration,
pub input_rows: usize,
pub input_bytes: usize,
pub output_rows: usize,
pub output_bytes: usize,
}

impl From<ProcessorProfile> for OperatorExecutionInfo {
fn from(value: ProcessorProfile) -> Self {
(&value).into()
}
}

impl From<&ProcessorProfile> for OperatorExecutionInfo {
fn from(value: &ProcessorProfile) -> Self {
OperatorExecutionInfo {
process_time: value.cpu_time,
input_rows: value.input_rows,
input_bytes: value.input_bytes,
output_rows: value.output_rows,
output_bytes: value.output_bytes,
}
}
}

#[derive(Debug, Clone)]
pub enum OperatorAttribute {
Join(JoinAttribute),
Expand Down
Loading